//! HTTP handlers for directive CRUD and DAG progression. use axum::{ extract::{Path, State}, http::StatusCode, response::IntoResponse, Json, }; use uuid::Uuid; use crate::db::models::{ CleanupResponse, CreateDirectiveRequest, CreateTaskRequest, CreateDirectiveStepRequest, Directive, DirectiveListResponse, DirectiveStep, DirectiveWithSteps, PickUpOrdersResponse, UpdateDirectiveRequest, UpdateDirectiveStepRequest, UpdateGoalRequest, CreateDirectiveOrderGroupRequest, DirectiveOrderGroup, DirectiveOrderGroupListResponse, UpdateDirectiveOrderGroupRequest, OrderListResponse, }; use crate::db::repository; use crate::orchestration::directive::{ build_cleanup_prompt, build_order_pickup_prompt, classify_goal_change, try_interrupt_planner_with_goal_edit, GoalChangeKind, GoalEditInterruptResult, }; use crate::server::auth::Authenticated; use crate::server::messages::ApiError; use crate::server::state::SharedState; // ============================================================================= // Directive CRUD // ============================================================================= /// List all directives for the authenticated user. #[utoipa::path( get, path = "/api/v1/directives", responses( (status = 200, description = "List of directives", body = DirectiveListResponse), (status = 401, description = "Unauthorized", body = ApiError), (status = 503, description = "Database not configured", body = ApiError), ), security(("bearer_auth" = []), ("api_key" = [])), tag = "Directives" )] pub async fn list_directives( State(state): State, Authenticated(auth): Authenticated, ) -> impl IntoResponse { let Some(ref pool) = state.db_pool else { return ( StatusCode::SERVICE_UNAVAILABLE, Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")), ) .into_response(); }; match repository::list_directives_for_owner(pool, auth.owner_id).await { Ok(directives) => { let total = directives.len() as i64; Json(DirectiveListResponse { directives, total }).into_response() } Err(e) => { tracing::error!("Failed to list directives: {}", e); ( StatusCode::INTERNAL_SERVER_ERROR, Json(ApiError::new("LIST_FAILED", &e.to_string())), ) .into_response() } } } /// Create a new directive. #[utoipa::path( post, path = "/api/v1/directives", request_body = CreateDirectiveRequest, responses( (status = 201, description = "Directive created", body = Directive), (status = 401, description = "Unauthorized", body = ApiError), (status = 503, description = "Database not configured", body = ApiError), ), security(("bearer_auth" = []), ("api_key" = [])), tag = "Directives" )] pub async fn create_directive( State(state): State, Authenticated(auth): Authenticated, Json(req): Json, ) -> impl IntoResponse { let Some(ref pool) = state.db_pool else { return ( StatusCode::SERVICE_UNAVAILABLE, Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")), ) .into_response(); }; match repository::create_directive_for_owner(pool, auth.owner_id, req).await { Ok(directive) => (StatusCode::CREATED, Json(directive)).into_response(), Err(e) => { tracing::error!("Failed to create directive: {}", e); ( StatusCode::INTERNAL_SERVER_ERROR, Json(ApiError::new("CREATE_FAILED", &e.to_string())), ) .into_response() } } } /// Get a directive with all its steps. #[utoipa::path( get, path = "/api/v1/directives/{id}", params(("id" = Uuid, Path, description = "Directive ID")), responses( (status = 200, description = "Directive with steps", body = DirectiveWithSteps), (status = 404, description = "Not found", body = ApiError), (status = 503, description = "Database not configured", body = ApiError), ), security(("bearer_auth" = []), ("api_key" = [])), tag = "Directives" )] pub async fn get_directive( State(state): State, Authenticated(auth): Authenticated, Path(id): Path, ) -> impl IntoResponse { let Some(ref pool) = state.db_pool else { return ( StatusCode::SERVICE_UNAVAILABLE, Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")), ) .into_response(); }; match repository::get_directive_with_steps_for_owner(pool, auth.owner_id, id).await { Ok(Some((directive, steps))) => { Json(DirectiveWithSteps { directive, steps }).into_response() } Ok(None) => ( StatusCode::NOT_FOUND, Json(ApiError::new("NOT_FOUND", "Directive not found")), ) .into_response(), Err(e) => { tracing::error!("Failed to get directive: {}", e); ( StatusCode::INTERNAL_SERVER_ERROR, Json(ApiError::new("GET_FAILED", &e.to_string())), ) .into_response() } } } /// Update a directive. #[utoipa::path( put, path = "/api/v1/directives/{id}", params(("id" = Uuid, Path, description = "Directive ID")), request_body = UpdateDirectiveRequest, responses( (status = 200, description = "Directive updated", body = Directive), (status = 404, description = "Not found", body = ApiError), (status = 409, description = "Version conflict", body = ApiError), (status = 503, description = "Database not configured", body = ApiError), ), security(("bearer_auth" = []), ("api_key" = [])), tag = "Directives" )] pub async fn update_directive( State(state): State, Authenticated(auth): Authenticated, Path(id): Path, Json(req): Json, ) -> impl IntoResponse { let Some(ref pool) = state.db_pool else { return ( StatusCode::SERVICE_UNAVAILABLE, Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")), ) .into_response(); }; match repository::update_directive_for_owner(pool, auth.owner_id, id, req).await { Ok(Some(directive)) => Json(directive).into_response(), Ok(None) => ( StatusCode::NOT_FOUND, Json(ApiError::new("NOT_FOUND", "Directive not found")), ) .into_response(), Err(repository::RepositoryError::VersionConflict { expected, actual }) => ( StatusCode::CONFLICT, Json(ApiError::new( "VERSION_CONFLICT", &format!("Expected version {}, but current is {}", expected, actual), )), ) .into_response(), Err(e) => { tracing::error!("Failed to update directive: {}", e); ( StatusCode::INTERNAL_SERVER_ERROR, Json(ApiError::new("UPDATE_FAILED", &e.to_string())), ) .into_response() } } } /// Delete a directive. #[utoipa::path( delete, path = "/api/v1/directives/{id}", params(("id" = Uuid, Path, description = "Directive ID")), responses( (status = 204, description = "Deleted"), (status = 404, description = "Not found", body = ApiError), (status = 503, description = "Database not configured", body = ApiError), ), security(("bearer_auth" = []), ("api_key" = [])), tag = "Directives" )] pub async fn delete_directive( State(state): State, Authenticated(auth): Authenticated, Path(id): Path, ) -> impl IntoResponse { let Some(ref pool) = state.db_pool else { return ( StatusCode::SERVICE_UNAVAILABLE, Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")), ) .into_response(); }; match repository::delete_directive_for_owner(pool, auth.owner_id, id).await { Ok(true) => StatusCode::NO_CONTENT.into_response(), Ok(false) => ( StatusCode::NOT_FOUND, Json(ApiError::new("NOT_FOUND", "Directive not found")), ) .into_response(), Err(e) => { tracing::error!("Failed to delete directive: {}", e); ( StatusCode::INTERNAL_SERVER_ERROR, Json(ApiError::new("DELETE_FAILED", &e.to_string())), ) .into_response() } } } // ============================================================================= // Step CRUD // ============================================================================= /// Create a step in a directive. #[utoipa::path( post, path = "/api/v1/directives/{id}/steps", params(("id" = Uuid, Path, description = "Directive ID")), request_body = CreateDirectiveStepRequest, responses( (status = 201, description = "Step created", body = DirectiveStep), (status = 404, description = "Directive not found", body = ApiError), (status = 503, description = "Database not configured", body = ApiError), ), security(("bearer_auth" = []), ("api_key" = [])), tag = "Directives" )] pub async fn create_step( State(state): State, Authenticated(auth): Authenticated, Path(id): Path, Json(req): Json, ) -> impl IntoResponse { let Some(ref pool) = state.db_pool else { return ( StatusCode::SERVICE_UNAVAILABLE, Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")), ) .into_response(); }; // Verify directive ownership match repository::get_directive_for_owner(pool, auth.owner_id, id).await { Ok(Some(_)) => {} Ok(None) => { return ( StatusCode::NOT_FOUND, Json(ApiError::new("NOT_FOUND", "Directive not found")), ) .into_response(); } Err(e) => { return ( StatusCode::INTERNAL_SERVER_ERROR, Json(ApiError::new("GET_FAILED", &e.to_string())), ) .into_response(); } } match repository::create_directive_step(pool, id, req).await { Ok(step) => (StatusCode::CREATED, Json(step)).into_response(), Err(e) => { tracing::error!("Failed to create step: {}", e); ( StatusCode::INTERNAL_SERVER_ERROR, Json(ApiError::new("CREATE_FAILED", &e.to_string())), ) .into_response() } } } /// Batch create steps in a directive. #[utoipa::path( post, path = "/api/v1/directives/{id}/steps/batch", params(("id" = Uuid, Path, description = "Directive ID")), request_body = Vec, responses( (status = 201, description = "Steps created", body = Vec), (status = 404, description = "Directive not found", body = ApiError), (status = 503, description = "Database not configured", body = ApiError), ), security(("bearer_auth" = []), ("api_key" = [])), tag = "Directives" )] pub async fn batch_create_steps( State(state): State, Authenticated(auth): Authenticated, Path(id): Path, Json(steps): Json>, ) -> impl IntoResponse { let Some(ref pool) = state.db_pool else { return ( StatusCode::SERVICE_UNAVAILABLE, Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")), ) .into_response(); }; // Verify directive ownership match repository::get_directive_for_owner(pool, auth.owner_id, id).await { Ok(Some(_)) => {} Ok(None) => { return ( StatusCode::NOT_FOUND, Json(ApiError::new("NOT_FOUND", "Directive not found")), ) .into_response(); } Err(e) => { return ( StatusCode::INTERNAL_SERVER_ERROR, Json(ApiError::new("GET_FAILED", &e.to_string())), ) .into_response(); } } match repository::batch_create_directive_steps(pool, id, steps).await { Ok(created) => (StatusCode::CREATED, Json(created)).into_response(), Err(e) => { tracing::error!("Failed to batch create steps: {}", e); ( StatusCode::INTERNAL_SERVER_ERROR, Json(ApiError::new("CREATE_FAILED", &e.to_string())), ) .into_response() } } } /// Update a step. #[utoipa::path( put, path = "/api/v1/directives/{id}/steps/{step_id}", params( ("id" = Uuid, Path, description = "Directive ID"), ("step_id" = Uuid, Path, description = "Step ID"), ), request_body = UpdateDirectiveStepRequest, responses( (status = 200, description = "Step updated", body = DirectiveStep), (status = 404, description = "Not found", body = ApiError), (status = 503, description = "Database not configured", body = ApiError), ), security(("bearer_auth" = []), ("api_key" = [])), tag = "Directives" )] pub async fn update_step( State(state): State, Authenticated(auth): Authenticated, Path((id, step_id)): Path<(Uuid, Uuid)>, Json(req): Json, ) -> impl IntoResponse { let Some(ref pool) = state.db_pool else { return ( StatusCode::SERVICE_UNAVAILABLE, Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")), ) .into_response(); }; // Verify directive ownership match repository::get_directive_for_owner(pool, auth.owner_id, id).await { Ok(Some(_)) => {} Ok(None) => { return ( StatusCode::NOT_FOUND, Json(ApiError::new("NOT_FOUND", "Directive not found")), ) .into_response(); } Err(e) => { return ( StatusCode::INTERNAL_SERVER_ERROR, Json(ApiError::new("GET_FAILED", &e.to_string())), ) .into_response(); } } match repository::update_directive_step(pool, step_id, req).await { Ok(Some(step)) => Json(step).into_response(), Ok(None) => ( StatusCode::NOT_FOUND, Json(ApiError::new("NOT_FOUND", "Step not found")), ) .into_response(), Err(e) => { tracing::error!("Failed to update step: {}", e); ( StatusCode::INTERNAL_SERVER_ERROR, Json(ApiError::new("UPDATE_FAILED", &e.to_string())), ) .into_response() } } } /// Delete a step. #[utoipa::path( delete, path = "/api/v1/directives/{id}/steps/{step_id}", params( ("id" = Uuid, Path, description = "Directive ID"), ("step_id" = Uuid, Path, description = "Step ID"), ), responses( (status = 204, description = "Deleted"), (status = 404, description = "Not found", body = ApiError), (status = 503, description = "Database not configured", body = ApiError), ), security(("bearer_auth" = []), ("api_key" = [])), tag = "Directives" )] pub async fn delete_step( State(state): State, Authenticated(auth): Authenticated, Path((id, step_id)): Path<(Uuid, Uuid)>, ) -> impl IntoResponse { let Some(ref pool) = state.db_pool else { return ( StatusCode::SERVICE_UNAVAILABLE, Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")), ) .into_response(); }; // Verify directive ownership match repository::get_directive_for_owner(pool, auth.owner_id, id).await { Ok(Some(_)) => {} Ok(None) => { return ( StatusCode::NOT_FOUND, Json(ApiError::new("NOT_FOUND", "Directive not found")), ) .into_response(); } Err(e) => { return ( StatusCode::INTERNAL_SERVER_ERROR, Json(ApiError::new("GET_FAILED", &e.to_string())), ) .into_response(); } } match repository::delete_directive_step(pool, step_id).await { Ok(true) => StatusCode::NO_CONTENT.into_response(), Ok(false) => ( StatusCode::NOT_FOUND, Json(ApiError::new("NOT_FOUND", "Step not found")), ) .into_response(), Err(e) => { tracing::error!("Failed to delete step: {}", e); ( StatusCode::INTERNAL_SERVER_ERROR, Json(ApiError::new("DELETE_FAILED", &e.to_string())), ) .into_response() } } } // ============================================================================= // Directive Lifecycle Actions // ============================================================================= /// Start a directive: sets status=active, advances ready steps. #[utoipa::path( post, path = "/api/v1/directives/{id}/start", params(("id" = Uuid, Path, description = "Directive ID")), responses( (status = 200, description = "Directive started", body = DirectiveWithSteps), (status = 404, description = "Not found", body = ApiError), (status = 503, description = "Database not configured", body = ApiError), ), security(("bearer_auth" = []), ("api_key" = [])), tag = "Directives" )] pub async fn start_directive( State(state): State, Authenticated(auth): Authenticated, Path(id): Path, ) -> impl IntoResponse { let Some(ref pool) = state.db_pool else { return ( StatusCode::SERVICE_UNAVAILABLE, Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")), ) .into_response(); }; // Set to active match repository::set_directive_status(pool, auth.owner_id, id, "active").await { Ok(Some(directive)) => { // Advance ready steps let _ = repository::advance_directive_ready_steps(pool, id).await; let steps = repository::list_directive_steps(pool, id).await.unwrap_or_default(); Json(DirectiveWithSteps { directive, steps }).into_response() } Ok(None) => ( StatusCode::NOT_FOUND, Json(ApiError::new("NOT_FOUND", "Directive not found")), ) .into_response(), Err(e) => { tracing::error!("Failed to start directive: {}", e); ( StatusCode::INTERNAL_SERVER_ERROR, Json(ApiError::new("START_FAILED", &e.to_string())), ) .into_response() } } } /// Pause a directive. #[utoipa::path( post, path = "/api/v1/directives/{id}/pause", params(("id" = Uuid, Path, description = "Directive ID")), responses( (status = 200, description = "Directive paused", body = Directive), (status = 404, description = "Not found", body = ApiError), (status = 503, description = "Database not configured", body = ApiError), ), security(("bearer_auth" = []), ("api_key" = [])), tag = "Directives" )] pub async fn pause_directive( State(state): State, Authenticated(auth): Authenticated, Path(id): Path, ) -> impl IntoResponse { let Some(ref pool) = state.db_pool else { return ( StatusCode::SERVICE_UNAVAILABLE, Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")), ) .into_response(); }; match repository::set_directive_status(pool, auth.owner_id, id, "paused").await { Ok(Some(directive)) => Json(directive).into_response(), Ok(None) => ( StatusCode::NOT_FOUND, Json(ApiError::new("NOT_FOUND", "Directive not found")), ) .into_response(), Err(e) => ( StatusCode::INTERNAL_SERVER_ERROR, Json(ApiError::new("PAUSE_FAILED", &e.to_string())), ) .into_response(), } } /// Advance a directive: find newly-ready steps. If all steps done, set idle. #[utoipa::path( post, path = "/api/v1/directives/{id}/advance", params(("id" = Uuid, Path, description = "Directive ID")), responses( (status = 200, description = "Advance result", body = DirectiveWithSteps), (status = 404, description = "Not found", body = ApiError), (status = 503, description = "Database not configured", body = ApiError), ), security(("bearer_auth" = []), ("api_key" = [])), tag = "Directives" )] pub async fn advance_directive( State(state): State, Authenticated(auth): Authenticated, Path(id): Path, ) -> impl IntoResponse { let Some(ref pool) = state.db_pool else { return ( StatusCode::SERVICE_UNAVAILABLE, Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")), ) .into_response(); }; // Verify ownership let directive = match repository::get_directive_for_owner(pool, auth.owner_id, id).await { Ok(Some(d)) => d, Ok(None) => { return ( StatusCode::NOT_FOUND, Json(ApiError::new("NOT_FOUND", "Directive not found")), ) .into_response(); } Err(e) => { return ( StatusCode::INTERNAL_SERVER_ERROR, Json(ApiError::new("GET_FAILED", &e.to_string())), ) .into_response(); } }; // Advance ready steps let _ = repository::advance_directive_ready_steps(pool, id).await; // Check if idle let _ = repository::check_directive_idle(pool, id).await; // Return updated state let directive = match repository::get_directive_for_owner(pool, auth.owner_id, id).await { Ok(Some(d)) => d, _ => directive, }; let steps = repository::list_directive_steps(pool, id).await.unwrap_or_default(); Json(DirectiveWithSteps { directive, steps }).into_response() } /// Mark a step as completed. #[utoipa::path( post, path = "/api/v1/directives/{id}/steps/{step_id}/complete", params( ("id" = Uuid, Path, description = "Directive ID"), ("step_id" = Uuid, Path, description = "Step ID"), ), responses( (status = 200, description = "Step completed", body = DirectiveStep), (status = 404, description = "Not found", body = ApiError), (status = 503, description = "Database not configured", body = ApiError), ), security(("bearer_auth" = []), ("api_key" = [])), tag = "Directives" )] pub async fn complete_step( State(state): State, Authenticated(auth): Authenticated, Path((id, step_id)): Path<(Uuid, Uuid)>, ) -> impl IntoResponse { step_status_change(state, auth, id, step_id, "completed").await } /// Mark a step as failed. #[utoipa::path( post, path = "/api/v1/directives/{id}/steps/{step_id}/fail", params( ("id" = Uuid, Path, description = "Directive ID"), ("step_id" = Uuid, Path, description = "Step ID"), ), responses( (status = 200, description = "Step failed", body = DirectiveStep), (status = 404, description = "Not found", body = ApiError), (status = 503, description = "Database not configured", body = ApiError), ), security(("bearer_auth" = []), ("api_key" = [])), tag = "Directives" )] pub async fn fail_step( State(state): State, Authenticated(auth): Authenticated, Path((id, step_id)): Path<(Uuid, Uuid)>, ) -> impl IntoResponse { step_status_change(state, auth, id, step_id, "failed").await } /// Mark a step as skipped. #[utoipa::path( post, path = "/api/v1/directives/{id}/steps/{step_id}/skip", params( ("id" = Uuid, Path, description = "Directive ID"), ("step_id" = Uuid, Path, description = "Step ID"), ), responses( (status = 200, description = "Step skipped", body = DirectiveStep), (status = 404, description = "Not found", body = ApiError), (status = 503, description = "Database not configured", body = ApiError), ), security(("bearer_auth" = []), ("api_key" = [])), tag = "Directives" )] pub async fn skip_step( State(state): State, Authenticated(auth): Authenticated, Path((id, step_id)): Path<(Uuid, Uuid)>, ) -> impl IntoResponse { step_status_change(state, auth, id, step_id, "skipped").await } /// Helper for step status changes. async fn step_status_change( state: SharedState, auth: crate::server::auth::AuthenticatedUser, directive_id: Uuid, step_id: Uuid, new_status: &str, ) -> impl IntoResponse { let Some(ref pool) = state.db_pool else { return ( StatusCode::SERVICE_UNAVAILABLE, Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")), ) .into_response(); }; // Verify directive ownership match repository::get_directive_for_owner(pool, auth.owner_id, directive_id).await { Ok(Some(_)) => {} Ok(None) => { return ( StatusCode::NOT_FOUND, Json(ApiError::new("NOT_FOUND", "Directive not found")), ) .into_response(); } Err(e) => { return ( StatusCode::INTERNAL_SERVER_ERROR, Json(ApiError::new("GET_FAILED", &e.to_string())), ) .into_response(); } } let req = UpdateDirectiveStepRequest { status: Some(new_status.to_string()), ..Default::default() }; match repository::update_directive_step(pool, step_id, req).await { Ok(Some(step)) => { // After step status change, advance the DAG let _ = repository::advance_directive_ready_steps(pool, directive_id).await; let _ = repository::check_directive_idle(pool, directive_id).await; Json(step).into_response() } Ok(None) => ( StatusCode::NOT_FOUND, Json(ApiError::new("NOT_FOUND", "Step not found")), ) .into_response(), Err(e) => { tracing::error!("Failed to update step status: {}", e); ( StatusCode::INTERNAL_SERVER_ERROR, Json(ApiError::new("UPDATE_FAILED", &e.to_string())), ) .into_response() } } } /// Update a directive's goal (triggers re-planning). #[utoipa::path( put, path = "/api/v1/directives/{id}/goal", params(("id" = Uuid, Path, description = "Directive ID")), request_body = UpdateGoalRequest, responses( (status = 200, description = "Goal updated", body = Directive), (status = 404, description = "Not found", body = ApiError), (status = 503, description = "Database not configured", body = ApiError), ), security(("bearer_auth" = []), ("api_key" = [])), tag = "Directives" )] pub async fn update_goal( State(state): State, Authenticated(auth): Authenticated, Path(id): Path, Json(req): Json, ) -> impl IntoResponse { let Some(ref pool) = state.db_pool else { return ( StatusCode::SERVICE_UNAVAILABLE, Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")), ) .into_response(); }; // Fetch the current directive so we can: // 1. Save the old goal to history (best-effort). // 2. Decide whether to fire a goal-edit interrupt at a running planner. let current = match repository::get_directive_for_owner(pool, auth.owner_id, id).await { Ok(Some(d)) => Some(d), Ok(None) => None, Err(e) => { tracing::warn!( directive_id = %id, error = %e, "Failed to fetch current directive for goal history — continuing with goal update" ); None } }; // Save old goal to history before overwriting (best-effort). if let Some(ref current) = current { if let Err(e) = repository::save_directive_goal_history(pool, id, ¤t.goal).await { tracing::warn!( directive_id = %id, error = %e, "Failed to save goal history before update — continuing with goal update" ); } } // Goal-edit interrupt cycle: if a planner task is currently running for // this directive AND the goal change classifies as 'small', interrupt the // running planner via SendMessage instead of clearing it (which would // trigger a fresh replan on the next orchestrator tick). let mut interrupted = false; if let Some(ref current) = current { if current.orchestrator_task_id.is_some() && classify_goal_change(¤t.goal, &req.goal) == GoalChangeKind::Small { match try_interrupt_planner_with_goal_edit( pool, &state, id, ¤t.goal, &req.goal, ) .await { Ok(GoalEditInterruptResult::Sent) => { interrupted = true; } Ok(GoalEditInterruptResult::Skipped) => {} Err(e) => { tracing::warn!( directive_id = %id, error = %e, "Goal-edit interrupt attempt errored — falling back to replan" ); } } } } // If we successfully interrupted a running planner, persist the new goal // WITHOUT clearing the orchestrator task — the planner will react to the // SendMessage and adjust in-flight. Otherwise, fall through to the normal // path which clears orchestrator_task_id and lets phase_replanning kick // in on the next tick. let update_result = if interrupted { repository::update_directive_goal_keep_orchestrator(pool, auth.owner_id, id, &req.goal) .await } else { repository::update_directive_goal(pool, auth.owner_id, id, &req.goal).await }; match update_result { Ok(Some(directive)) => Json(directive).into_response(), Ok(None) => ( StatusCode::NOT_FOUND, Json(ApiError::new("NOT_FOUND", "Directive not found")), ) .into_response(), Err(e) => { tracing::error!("Failed to update goal: {}", e); ( StatusCode::INTERNAL_SERVER_ERROR, Json(ApiError::new("UPDATE_FAILED", &e.to_string())), ) .into_response() } } } // ============================================================================= // Task Cleanup // ============================================================================= /// Clean up merged steps for an idle directive by spawning a verification task. #[utoipa::path( post, path = "/api/v1/directives/{id}/cleanup", params(("id" = Uuid, Path, description = "Directive ID")), responses( (status = 200, description = "Cleanup task spawned", body = CleanupResponse), (status = 404, description = "Not found", body = ApiError), (status = 409, description = "Directive is not idle", body = ApiError), (status = 503, description = "Database not configured", body = ApiError), ), security(("bearer_auth" = []), ("api_key" = [])), tag = "Directives" )] pub async fn cleanup_directive( State(state): State, Authenticated(auth): Authenticated, Path(id): Path, ) -> impl IntoResponse { let Some(ref pool) = state.db_pool else { return ( StatusCode::SERVICE_UNAVAILABLE, Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")), ) .into_response(); }; // Get the directive with steps and verify ownership let (directive, _steps) = match repository::get_directive_with_steps_for_owner(pool, auth.owner_id, id).await { Ok(Some(ds)) => ds, Ok(None) => { return ( StatusCode::NOT_FOUND, Json(ApiError::new("NOT_FOUND", "Directive not found")), ) .into_response(); } Err(e) => { return ( StatusCode::INTERNAL_SERVER_ERROR, Json(ApiError::new("GET_FAILED", &e.to_string())), ) .into_response(); } }; // Verify directive is idle if directive.status != "idle" { return ( StatusCode::CONFLICT, Json(ApiError::new( "NOT_IDLE", "Directive must be idle to run cleanup", )), ) .into_response(); } // Auto-remove completed steps that were already included in a merged PR if directive.pr_url.is_some() || directive.pr_branch.is_some() { match crate::orchestration::directive::remove_already_merged_steps(pool, id).await { Ok(count) if count > 0 => { tracing::info!("Auto-removed {} completed steps already in PR for directive {} during cleanup", count, id); } Err(e) => { tracing::warn!("Failed to auto-remove merged steps during cleanup for directive {}: {}", id, e); } _ => {} } } // Get completed step tasks for branch name computation let step_tasks = match repository::get_completed_step_tasks(pool, id).await { Ok(tasks) => tasks, Err(e) => { tracing::error!("Failed to get completed step tasks: {}", e); return ( StatusCode::INTERNAL_SERVER_ERROR, Json(ApiError::new("GET_STEPS_FAILED", &e.to_string())), ) .into_response(); } }; if step_tasks.is_empty() { return Json(CleanupResponse { message: "No completed steps to clean up".to_string(), task_id: None, }) .into_response(); } let pr_branch = match &directive.pr_branch { Some(b) => b.clone(), None => { return Json(CleanupResponse { message: "No PR branch set — nothing to verify against".to_string(), task_id: None, }) .into_response(); } }; let base_branch = directive.base_branch.as_deref().unwrap_or("main"); // Build the cleanup prompt let prompt = build_cleanup_prompt(&directive, &step_tasks, &pr_branch, base_branch); // Create the cleanup task (following pick_up_orders pattern) let req = CreateTaskRequest { contract_id: None, name: format!("Cleanup: {}", directive.title), description: Some("Directive cleanup — verify merged branches and remove merged steps".to_string()), plan: prompt, parent_task_id: None, is_supervisor: false, priority: 0, repository_url: directive.repository_url.clone(), base_branch: directive.base_branch.clone(), target_branch: None, merge_mode: None, target_repo_path: None, completion_action: None, continue_from_task_id: None, copy_files: None, checkpoint_sha: None, branched_from_task_id: None, conversation_history: None, supervisor_worktree_task_id: None, directive_id: Some(directive.id), directive_step_id: None, }; let task = match repository::create_task_for_owner(pool, auth.owner_id, req).await { Ok(t) => t, Err(e) => { tracing::error!("Failed to create cleanup task: {}", e); return ( StatusCode::INTERNAL_SERVER_ERROR, Json(ApiError::new("CREATE_TASK_FAILED", &e.to_string())), ) .into_response(); } }; // Assign as orchestrator task if let Err(e) = repository::assign_orchestrator_task(pool, id, task.id).await { tracing::error!("Failed to assign orchestrator task: {}", e); return ( StatusCode::INTERNAL_SERVER_ERROR, Json(ApiError::new("ASSIGN_TASK_FAILED", &e.to_string())), ) .into_response(); } // Cancel old planning tasks let cancelled = repository::cancel_old_planning_tasks(pool, id, task.id).await; if let Ok(count) = cancelled { if count > 0 { tracing::info!( directive_id = %id, cancelled_count = count, "Cancelled old planning tasks superseded by cleanup" ); } } // Set directive to active if let Err(e) = repository::set_directive_status(pool, auth.owner_id, id, "active").await { tracing::warn!("Failed to set directive status to active: {}", e); } // Advance ready steps let _ = repository::advance_directive_ready_steps(pool, id).await; Json(CleanupResponse { message: format!("Cleanup task spawned for {} completed steps", step_tasks.len()), task_id: Some(task.id), }) .into_response() } // ============================================================================= // PR Creation // ============================================================================= /// Trigger PR creation or update for a directive. #[utoipa::path( post, path = "/api/v1/directives/{id}/create-pr", params(("id" = Uuid, Path, description = "Directive ID")), responses( (status = 200, description = "PR task spawned", body = DirectiveWithSteps), (status = 404, description = "Not found", body = ApiError), (status = 409, description = "Completion task already running", body = ApiError), (status = 503, description = "Database not configured", body = ApiError), ), security(("bearer_auth" = []), ("api_key" = [])), tag = "Directives" )] pub async fn create_pr( State(state): State, Authenticated(auth): Authenticated, Path(id): Path, ) -> impl IntoResponse { let Some(ref pool) = state.db_pool else { return ( StatusCode::SERVICE_UNAVAILABLE, Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")), ) .into_response(); }; match crate::orchestration::directive::trigger_completion_task( pool, &state, id, auth.owner_id, ) .await { Ok(_task_id) => { // Return the updated directive with steps match repository::get_directive_with_steps_for_owner(pool, auth.owner_id, id).await { Ok(Some((directive, steps))) => { Json(DirectiveWithSteps { directive, steps }).into_response() } Ok(None) => ( StatusCode::NOT_FOUND, Json(ApiError::new("NOT_FOUND", "Directive not found")), ) .into_response(), Err(e) => ( StatusCode::INTERNAL_SERVER_ERROR, Json(ApiError::new("GET_FAILED", &e.to_string())), ) .into_response(), } } Err(e) => { let msg = e.to_string(); if msg.contains("not found") { ( StatusCode::NOT_FOUND, Json(ApiError::new("NOT_FOUND", &msg)), ) .into_response() } else if msg.contains("already running") || msg.contains("already claimed") { ( StatusCode::CONFLICT, Json(ApiError::new("COMPLETION_IN_PROGRESS", &msg)), ) .into_response() } else { ( StatusCode::INTERNAL_SERVER_ERROR, Json(ApiError::new("CREATE_PR_FAILED", &msg)), ) .into_response() } } } } // ============================================================================= // Order Pickup // ============================================================================= /// Pick up available open orders for a directive by spawning a planning task. #[utoipa::path( post, path = "/api/v1/directives/{id}/pick-up-orders", params(("id" = Uuid, Path, description = "Directive ID")), responses( (status = 200, description = "Orders picked up", body = PickUpOrdersResponse), (status = 404, description = "Not found", body = ApiError), (status = 503, description = "Database not configured", body = ApiError), ), security(("bearer_auth" = []), ("api_key" = [])), tag = "Directives" )] pub async fn pick_up_orders( State(state): State, Authenticated(auth): Authenticated, Path(id): Path, ) -> impl IntoResponse { let Some(ref pool) = state.db_pool else { return ( StatusCode::SERVICE_UNAVAILABLE, Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")), ) .into_response(); }; // Verify directive ownership and get directive with steps let (directive, mut steps) = match repository::get_directive_with_steps_for_owner(pool, auth.owner_id, id).await { Ok(Some((d, s))) => (d, s), Ok(None) => { return ( StatusCode::NOT_FOUND, Json(ApiError::new("NOT_FOUND", "Directive not found")), ) .into_response(); } Err(e) => { tracing::error!("Failed to get directive: {}", e); return ( StatusCode::INTERNAL_SERVER_ERROR, Json(ApiError::new("GET_FAILED", &e.to_string())), ) .into_response(); } }; // Auto-remove completed steps that were already included in a PR if directive.pr_url.is_some() || directive.pr_branch.is_some() { match crate::orchestration::directive::remove_already_merged_steps(pool, id).await { Ok(count) if count > 0 => { tracing::info!("Auto-removed {} completed steps already in PR for directive {}", count, id); // Re-fetch steps since some were removed steps = match repository::list_directive_steps(pool, id).await { Ok(s) => s, Err(e) => { tracing::error!("Failed to re-fetch steps after cleanup: {}", e); return ( StatusCode::INTERNAL_SERVER_ERROR, Json(ApiError::new("REFETCH_STEPS_FAILED", &e.to_string())), ).into_response(); } }; } Err(e) => { tracing::warn!("Failed to auto-remove merged steps for directive {}: {}", id, e); } _ => {} } } // Reconcile existing orders: mark done if step completed, under_review if step in progress match repository::reconcile_directive_orders(pool, auth.owner_id, id).await { Ok(count) => { if count > 0 { tracing::info!("Reconciled {} orders for directive {}", count, id); } } Err(e) => { tracing::warn!("Failed to reconcile directive orders: {}", e); // Non-fatal: continue with pickup even if reconciliation fails } } // Fetch available orders let orders = match repository::get_available_orders_for_pickup(pool, auth.owner_id, id).await { Ok(o) => o, Err(e) => { tracing::error!("Failed to fetch available orders: {}", e); return ( StatusCode::INTERNAL_SERVER_ERROR, Json(ApiError::new("FETCH_ORDERS_FAILED", &e.to_string())), ) .into_response(); } }; // If no orders available, return early if orders.is_empty() { return Json(PickUpOrdersResponse { message: "No orders available to plan".to_string(), order_count: 0, task_id: None, }) .into_response(); } let order_count = orders.len() as i64; let order_ids: Vec = orders.iter().map(|o| o.id).collect(); // Get generation and goal history for the planning prompt let generation = match repository::get_directive_max_generation(pool, id).await { Ok(g) => g + 1, Err(e) => { tracing::error!("Failed to get max generation: {}", e); return ( StatusCode::INTERNAL_SERVER_ERROR, Json(ApiError::new("GENERATION_FAILED", &e.to_string())), ) .into_response(); } }; let goal_history = match repository::get_directive_goal_history(pool, id, 3).await { Ok(h) => h, Err(e) => { tracing::warn!("Failed to get goal history: {}", e); vec![] } }; // Build the specialized planning prompt let plan = build_order_pickup_prompt(&directive, &steps, &orders, generation, &goal_history); // Link orders to the directive if let Err(e) = repository::bulk_link_orders_to_directive(pool, auth.owner_id, &order_ids, id).await { tracing::error!("Failed to link orders to directive: {}", e); return ( StatusCode::INTERNAL_SERVER_ERROR, Json(ApiError::new("LINK_ORDERS_FAILED", &e.to_string())), ) .into_response(); } // Mark picked-up orders as in_progress if let Err(e) = repository::bulk_update_order_status(pool, auth.owner_id, &order_ids, "in_progress").await { tracing::warn!("Failed to update order status to in_progress: {}", e); } // Create the planning task let req = CreateTaskRequest { contract_id: None, name: format!("Pick up orders: {}", directive.title), description: Some("Directive order pickup planning task".to_string()), plan, parent_task_id: None, is_supervisor: false, priority: 0, repository_url: directive.repository_url.clone(), base_branch: directive.base_branch.clone(), target_branch: None, merge_mode: None, target_repo_path: None, completion_action: None, continue_from_task_id: None, copy_files: None, checkpoint_sha: None, branched_from_task_id: None, conversation_history: None, supervisor_worktree_task_id: None, directive_id: Some(directive.id), directive_step_id: None, }; let task = match repository::create_task_for_owner(pool, auth.owner_id, req).await { Ok(t) => t, Err(e) => { tracing::error!("Failed to create pickup planning task: {}", e); return ( StatusCode::INTERNAL_SERVER_ERROR, Json(ApiError::new("CREATE_TASK_FAILED", &e.to_string())), ) .into_response(); } }; // Assign as orchestrator task if let Err(e) = repository::assign_orchestrator_task(pool, id, task.id).await { tracing::error!("Failed to assign orchestrator task: {}", e); return ( StatusCode::INTERNAL_SERVER_ERROR, Json(ApiError::new("ASSIGN_TASK_FAILED", &e.to_string())), ) .into_response(); } // Cancel old planning tasks let cancelled = repository::cancel_old_planning_tasks(pool, id, task.id).await; if let Ok(count) = cancelled { if count > 0 { tracing::info!( directive_id = %id, cancelled_count = count, "Cancelled old planning tasks superseded by order pickup" ); } } // Set directive to active if draft/idle/paused match directive.status.as_str() { "draft" | "idle" | "paused" => { if let Err(e) = repository::set_directive_status(pool, auth.owner_id, id, "active").await { tracing::warn!("Failed to set directive status to active: {}", e); } } _ => {} } // Advance ready steps let _ = repository::advance_directive_ready_steps(pool, id).await; Json(PickUpOrdersResponse { message: format!("Planning {} orders", order_count), order_count, task_id: Some(task.id), }) .into_response() } // ============================================================================= // Directive Order Group (DOG) CRUD // ============================================================================= /// List all DOGs for a directive. #[utoipa::path( get, path = "/api/v1/directives/{id}/dogs", params(("id" = Uuid, Path, description = "Directive ID")), responses( (status = 200, description = "List of DOGs", body = DirectiveOrderGroupListResponse), (status = 401, description = "Unauthorized", body = ApiError), (status = 503, description = "Database not configured", body = ApiError), ), security(("bearer_auth" = []), ("api_key" = [])), tag = "Directive Order Groups" )] pub async fn list_dogs( State(state): State, Authenticated(auth): Authenticated, Path(id): Path, ) -> impl IntoResponse { let Some(ref pool) = state.db_pool else { return ( StatusCode::SERVICE_UNAVAILABLE, Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")), ) .into_response(); }; match repository::list_directive_order_groups(pool, id, auth.owner_id).await { Ok(dogs) => { let total = dogs.len() as i64; Json(DirectiveOrderGroupListResponse { dogs, total }).into_response() } Err(e) => { tracing::error!("Failed to list DOGs: {}", e); ( StatusCode::INTERNAL_SERVER_ERROR, Json(ApiError::new("LIST_FAILED", &e.to_string())), ) .into_response() } } } /// Create a new DOG for a directive. #[utoipa::path( post, path = "/api/v1/directives/{id}/dogs", params(("id" = Uuid, Path, description = "Directive ID")), request_body = CreateDirectiveOrderGroupRequest, responses( (status = 201, description = "DOG created", body = DirectiveOrderGroup), (status = 400, description = "Invalid directive", body = ApiError), (status = 401, description = "Unauthorized", body = ApiError), (status = 503, description = "Database not configured", body = ApiError), ), security(("bearer_auth" = []), ("api_key" = [])), tag = "Directive Order Groups" )] pub async fn create_dog( State(state): State, Authenticated(auth): Authenticated, Path(id): Path, Json(req): Json, ) -> impl IntoResponse { let Some(ref pool) = state.db_pool else { return ( StatusCode::SERVICE_UNAVAILABLE, Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")), ) .into_response(); }; // Verify the directive exists and belongs to this owner match repository::get_directive_for_owner(pool, auth.owner_id, id).await { Ok(Some(_)) => {} Ok(None) => { return ( StatusCode::BAD_REQUEST, Json(ApiError::new( "INVALID_DIRECTIVE", "directive_id must reference a valid directive owned by you", )), ) .into_response(); } Err(e) => { return ( StatusCode::INTERNAL_SERVER_ERROR, Json(ApiError::new("VALIDATION_FAILED", &e.to_string())), ) .into_response(); } } match repository::create_directive_order_group(pool, id, auth.owner_id, req).await { Ok(dog) => (StatusCode::CREATED, Json(dog)).into_response(), Err(e) => { tracing::error!("Failed to create DOG: {}", e); ( StatusCode::INTERNAL_SERVER_ERROR, Json(ApiError::new("CREATE_FAILED", &e.to_string())), ) .into_response() } } } /// Get a DOG by ID. #[utoipa::path( get, path = "/api/v1/directives/{id}/dogs/{dog_id}", params( ("id" = Uuid, Path, description = "Directive ID"), ("dog_id" = Uuid, Path, description = "DOG ID"), ), responses( (status = 200, description = "DOG details", body = DirectiveOrderGroup), (status = 404, description = "Not found", body = ApiError), (status = 401, description = "Unauthorized", body = ApiError), (status = 503, description = "Database not configured", body = ApiError), ), security(("bearer_auth" = []), ("api_key" = [])), tag = "Directive Order Groups" )] pub async fn get_dog( State(state): State, Authenticated(auth): Authenticated, Path((id, dog_id)): Path<(Uuid, Uuid)>, ) -> impl IntoResponse { let Some(ref pool) = state.db_pool else { return ( StatusCode::SERVICE_UNAVAILABLE, Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")), ) .into_response(); }; let _ = id; // directive_id is in the path for REST nesting but we scope by owner_id match repository::get_directive_order_group(pool, dog_id, auth.owner_id).await { Ok(Some(dog)) => Json(dog).into_response(), Ok(None) => ( StatusCode::NOT_FOUND, Json(ApiError::new("NOT_FOUND", "DOG not found")), ) .into_response(), Err(e) => { tracing::error!("Failed to get DOG: {}", e); ( StatusCode::INTERNAL_SERVER_ERROR, Json(ApiError::new("GET_FAILED", &e.to_string())), ) .into_response() } } } /// Update a DOG. #[utoipa::path( patch, path = "/api/v1/directives/{id}/dogs/{dog_id}", params( ("id" = Uuid, Path, description = "Directive ID"), ("dog_id" = Uuid, Path, description = "DOG ID"), ), request_body = UpdateDirectiveOrderGroupRequest, responses( (status = 200, description = "DOG updated", body = DirectiveOrderGroup), (status = 404, description = "Not found", body = ApiError), (status = 401, description = "Unauthorized", body = ApiError), (status = 503, description = "Database not configured", body = ApiError), ), security(("bearer_auth" = []), ("api_key" = [])), tag = "Directive Order Groups" )] pub async fn update_dog( State(state): State, Authenticated(auth): Authenticated, Path((id, dog_id)): Path<(Uuid, Uuid)>, Json(req): Json, ) -> impl IntoResponse { let Some(ref pool) = state.db_pool else { return ( StatusCode::SERVICE_UNAVAILABLE, Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")), ) .into_response(); }; let _ = id; // directive_id is in the path for REST nesting but we scope by owner_id // Validate status if provided if let Some(ref status) = req.status { if !["open", "in_progress", "done", "archived"].contains(&status.as_str()) { return ( StatusCode::BAD_REQUEST, Json(ApiError::new( "VALIDATION_FAILED", "status must be one of: open, in_progress, done, archived", )), ) .into_response(); } } match repository::update_directive_order_group(pool, dog_id, auth.owner_id, req).await { Ok(Some(dog)) => Json(dog).into_response(), Ok(None) => ( StatusCode::NOT_FOUND, Json(ApiError::new("NOT_FOUND", "DOG not found")), ) .into_response(), Err(e) => { tracing::error!("Failed to update DOG: {}", e); ( StatusCode::INTERNAL_SERVER_ERROR, Json(ApiError::new("UPDATE_FAILED", &e.to_string())), ) .into_response() } } } /// Delete a DOG. #[utoipa::path( delete, path = "/api/v1/directives/{id}/dogs/{dog_id}", params( ("id" = Uuid, Path, description = "Directive ID"), ("dog_id" = Uuid, Path, description = "DOG ID"), ), responses( (status = 204, description = "Deleted"), (status = 404, description = "Not found", body = ApiError), (status = 401, description = "Unauthorized", body = ApiError), (status = 503, description = "Database not configured", body = ApiError), ), security(("bearer_auth" = []), ("api_key" = [])), tag = "Directive Order Groups" )] pub async fn delete_dog( State(state): State, Authenticated(auth): Authenticated, Path((id, dog_id)): Path<(Uuid, Uuid)>, ) -> impl IntoResponse { let Some(ref pool) = state.db_pool else { return ( StatusCode::SERVICE_UNAVAILABLE, Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")), ) .into_response(); }; let _ = id; // directive_id is in the path for REST nesting but we scope by owner_id match repository::delete_directive_order_group(pool, dog_id, auth.owner_id).await { Ok(true) => StatusCode::NO_CONTENT.into_response(), Ok(false) => ( StatusCode::NOT_FOUND, Json(ApiError::new("NOT_FOUND", "DOG not found")), ) .into_response(), Err(e) => { tracing::error!("Failed to delete DOG: {}", e); ( StatusCode::INTERNAL_SERVER_ERROR, Json(ApiError::new("DELETE_FAILED", &e.to_string())), ) .into_response() } } } /// List orders belonging to a specific DOG. #[utoipa::path( get, path = "/api/v1/directives/{id}/dogs/{dog_id}/orders", params( ("id" = Uuid, Path, description = "Directive ID"), ("dog_id" = Uuid, Path, description = "DOG ID"), ), responses( (status = 200, description = "List of orders in the DOG", body = OrderListResponse), (status = 401, description = "Unauthorized", body = ApiError), (status = 503, description = "Database not configured", body = ApiError), ), security(("bearer_auth" = []), ("api_key" = [])), tag = "Directive Order Groups" )] pub async fn list_dog_orders( State(state): State, Authenticated(auth): Authenticated, Path((id, dog_id)): Path<(Uuid, Uuid)>, ) -> impl IntoResponse { let Some(ref pool) = state.db_pool else { return ( StatusCode::SERVICE_UNAVAILABLE, Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")), ) .into_response(); }; let _ = id; // directive_id is in the path for REST nesting but we scope by owner_id match repository::list_orders_by_dog(pool, dog_id, auth.owner_id).await { Ok(orders) => { let total = orders.len() as i64; Json(OrderListResponse { orders, total }).into_response() } Err(e) => { tracing::error!("Failed to list orders for DOG: {}", e); ( StatusCode::INTERNAL_SERVER_ERROR, Json(ApiError::new("LIST_FAILED", &e.to_string())), ) .into_response() } } } /// Pick up orders for a specific DOG. Like the directive pick-up-orders /// endpoint but filtered to orders belonging to the specified DOG. #[utoipa::path( post, path = "/api/v1/directives/{id}/dogs/{dog_id}/pick-up-orders", params( ("id" = Uuid, Path, description = "Directive ID"), ("dog_id" = Uuid, Path, description = "DOG ID"), ), responses( (status = 200, description = "Orders picked up for planning", body = PickUpOrdersResponse), (status = 404, description = "Directive or DOG not found", body = ApiError), (status = 401, description = "Unauthorized", body = ApiError), (status = 503, description = "Database not configured", body = ApiError), ), security(("bearer_auth" = []), ("api_key" = [])), tag = "Directive Order Groups" )] pub async fn pick_up_dog_orders( State(state): State, Authenticated(auth): Authenticated, Path((id, dog_id)): Path<(Uuid, Uuid)>, ) -> impl IntoResponse { let Some(ref pool) = state.db_pool else { return ( StatusCode::SERVICE_UNAVAILABLE, Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")), ) .into_response(); }; // Verify directive ownership and get directive with steps let (directive, mut steps) = match repository::get_directive_with_steps_for_owner(pool, auth.owner_id, id).await { Ok(Some((d, s))) => (d, s), Ok(None) => { return ( StatusCode::NOT_FOUND, Json(ApiError::new("NOT_FOUND", "Directive not found")), ) .into_response(); } Err(e) => { tracing::error!("Failed to get directive: {}", e); return ( StatusCode::INTERNAL_SERVER_ERROR, Json(ApiError::new("GET_FAILED", &e.to_string())), ) .into_response(); } }; // Verify the DOG exists and belongs to this owner match repository::get_directive_order_group(pool, dog_id, auth.owner_id).await { Ok(Some(_)) => {} Ok(None) => { return ( StatusCode::NOT_FOUND, Json(ApiError::new("NOT_FOUND", "DOG not found")), ) .into_response(); } Err(e) => { return ( StatusCode::INTERNAL_SERVER_ERROR, Json(ApiError::new("GET_FAILED", &e.to_string())), ) .into_response(); } } // Auto-remove completed steps that were already included in a PR if directive.pr_url.is_some() || directive.pr_branch.is_some() { match crate::orchestration::directive::remove_already_merged_steps(pool, id).await { Ok(count) if count > 0 => { tracing::info!("Auto-removed {} completed steps already in PR for directive {}", count, id); steps = match repository::list_directive_steps(pool, id).await { Ok(s) => s, Err(e) => { tracing::error!("Failed to re-fetch steps after cleanup: {}", e); return ( StatusCode::INTERNAL_SERVER_ERROR, Json(ApiError::new("REFETCH_STEPS_FAILED", &e.to_string())), ).into_response(); } }; } Err(e) => { tracing::warn!("Failed to auto-remove merged steps for directive {}: {}", id, e); } _ => {} } } // Reconcile existing orders match repository::reconcile_directive_orders(pool, auth.owner_id, id).await { Ok(count) => { if count > 0 { tracing::info!("Reconciled {} orders for directive {}", count, id); } } Err(e) => { tracing::warn!("Failed to reconcile directive orders: {}", e); } } // Fetch available orders filtered to this DOG let orders = match repository::get_available_orders_for_dog_pickup(pool, auth.owner_id, id, dog_id).await { Ok(o) => o, Err(e) => { tracing::error!("Failed to fetch available orders for DOG: {}", e); return ( StatusCode::INTERNAL_SERVER_ERROR, Json(ApiError::new("FETCH_ORDERS_FAILED", &e.to_string())), ) .into_response(); } }; // If no orders available, return early if orders.is_empty() { return Json(PickUpOrdersResponse { message: "No orders available to plan for this DOG".to_string(), order_count: 0, task_id: None, }) .into_response(); } let order_count = orders.len() as i64; let order_ids: Vec = orders.iter().map(|o| o.id).collect(); // Get generation and goal history for the planning prompt let generation = match repository::get_directive_max_generation(pool, id).await { Ok(g) => g + 1, Err(e) => { tracing::error!("Failed to get max generation: {}", e); return ( StatusCode::INTERNAL_SERVER_ERROR, Json(ApiError::new("GENERATION_FAILED", &e.to_string())), ) .into_response(); } }; let goal_history = match repository::get_directive_goal_history(pool, id, 3).await { Ok(h) => h, Err(e) => { tracing::warn!("Failed to get goal history: {}", e); vec![] } }; // Build the specialized planning prompt let plan = build_order_pickup_prompt(&directive, &steps, &orders, generation, &goal_history); // Link orders to the directive if let Err(e) = repository::bulk_link_orders_to_directive(pool, auth.owner_id, &order_ids, id).await { tracing::error!("Failed to link orders to directive: {}", e); return ( StatusCode::INTERNAL_SERVER_ERROR, Json(ApiError::new("LINK_ORDERS_FAILED", &e.to_string())), ) .into_response(); } // Mark picked-up orders as in_progress if let Err(e) = repository::bulk_update_order_status(pool, auth.owner_id, &order_ids, "in_progress").await { tracing::warn!("Failed to update order status to in_progress: {}", e); } // Create the planning task let req = CreateTaskRequest { contract_id: None, name: format!("Pick up DOG orders: {}", directive.title), description: Some("Directive order group pickup planning task".to_string()), plan, parent_task_id: None, is_supervisor: false, priority: 0, repository_url: directive.repository_url.clone(), base_branch: directive.base_branch.clone(), target_branch: None, merge_mode: None, target_repo_path: None, completion_action: None, continue_from_task_id: None, copy_files: None, checkpoint_sha: None, branched_from_task_id: None, conversation_history: None, supervisor_worktree_task_id: None, directive_id: Some(directive.id), directive_step_id: None, }; let task = match repository::create_task_for_owner(pool, auth.owner_id, req).await { Ok(t) => t, Err(e) => { tracing::error!("Failed to create DOG pickup planning task: {}", e); return ( StatusCode::INTERNAL_SERVER_ERROR, Json(ApiError::new("CREATE_TASK_FAILED", &e.to_string())), ) .into_response(); } }; // Assign as orchestrator task if let Err(e) = repository::assign_orchestrator_task(pool, id, task.id).await { tracing::error!("Failed to assign orchestrator task: {}", e); return ( StatusCode::INTERNAL_SERVER_ERROR, Json(ApiError::new("ASSIGN_TASK_FAILED", &e.to_string())), ) .into_response(); } // Cancel old planning tasks let cancelled = repository::cancel_old_planning_tasks(pool, id, task.id).await; if let Ok(count) = cancelled { if count > 0 { tracing::info!( directive_id = %id, cancelled_count = count, "Cancelled old planning tasks superseded by DOG order pickup" ); } } // Set directive to active if draft/idle/paused match directive.status.as_str() { "draft" | "idle" | "paused" => { if let Err(e) = repository::set_directive_status(pool, auth.owner_id, id, "active").await { tracing::warn!("Failed to set directive status to active: {}", e); } } _ => {} } // Advance ready steps let _ = repository::advance_directive_ready_steps(pool, id).await; Json(PickUpOrdersResponse { message: format!("Planning {} orders from DOG", order_count), order_count, task_id: Some(task.id), }) .into_response() }