diff options
| author | soryu <soryu@soryu.co> | 2026-02-06 02:08:37 +0000 |
|---|---|---|
| committer | soryu <soryu@soryu.co> | 2026-02-06 02:08:37 +0000 |
| commit | 25e1275af1b742cc7866fba91152d9a4734a6f94 (patch) | |
| tree | e92c7e168f4e73c302fb63217ea20bf8cfa2ba7e /makima | |
| parent | 8f725a7c64fbeb85ebeb59b54d2f774e9a0a59d6 (diff) | |
| download | soryu-25e1275af1b742cc7866fba91152d9a4734a6f94.tar.gz soryu-25e1275af1b742cc7866fba91152d9a4734a6f94.zip | |
Fix: Directives API
Diffstat (limited to 'makima')
| -rw-r--r-- | makima/src/orchestration/engine.rs | 186 | ||||
| -rw-r--r-- | makima/src/server/auth.rs | 10 | ||||
| -rw-r--r-- | makima/src/server/handlers/directives.rs | 85 |
3 files changed, 251 insertions, 30 deletions
diff --git a/makima/src/orchestration/engine.rs b/makima/src/orchestration/engine.rs index 5bbb99f..954b857 100644 --- a/makima/src/orchestration/engine.rs +++ b/makima/src/orchestration/engine.rs @@ -13,7 +13,10 @@ use thiserror::Error; use tokio::sync::broadcast; use uuid::Uuid; -use crate::db::models::{AddStepRequest, ChainStep, Directive, DirectiveEvent, UpdateStepRequest}; +use crate::db::models::{ + AddStepRequest, ChainStep, CreateContractRequest, CreateTaskRequest, Directive, + DirectiveEvent, UpdateStepRequest, +}; use crate::db::repository::{self, RepositoryError}; use super::planner::{ChainPlanner, GeneratedChain, PlannerError}; @@ -262,24 +265,82 @@ impl DirectiveEngine { // Chain Management // ======================================================================== - /// Generate initial chain from directive. + /// Generate initial chain from directive using LLM. async fn generate_initial_chain( &self, directive: &Directive, ) -> Result<GeneratedChain, EngineError> { // Build planning prompt - let _prompt = self.planner.build_planning_prompt(directive); + let prompt = self.planner.build_planning_prompt(directive); + + // Try LLM chain generation, fall back to default if unavailable + let chain = match self.generate_chain_via_llm(&prompt).await { + Ok(chain) => { + tracing::info!( + "LLM generated chain with {} steps for directive {}", + chain.steps.len(), + directive.id + ); + chain + } + Err(e) => { + tracing::warn!( + "LLM chain generation failed ({}), using default chain for directive {}", + e, + directive.id + ); + self.build_default_chain(directive) + } + }; + + // Validate the chain + self.planner.validate_chain(&chain)?; + + Ok(chain) + } + + /// Call LLM to generate a chain from the planning prompt. + async fn generate_chain_via_llm(&self, prompt: &str) -> Result<GeneratedChain, EngineError> { + use crate::llm::claude::{ClaudeClient, ClaudeModel, Message, MessageContent}; + + let client = ClaudeClient::from_env(ClaudeModel::Sonnet) + .map_err(|e| EngineError::LlmError(format!("Failed to create LLM client: {}", e)))?; + + let messages = vec![Message { + role: "user".to_string(), + content: MessageContent::Text(prompt.to_string()), + }]; + + let result = client + .chat_with_tools(messages, &[]) + .await + .map_err(|e| EngineError::LlmError(format!("LLM call failed: {}", e)))?; + + let response_text = result + .content + .ok_or_else(|| EngineError::LlmError("Empty LLM response".to_string()))?; + + self.planner + .parse_plan_response(&response_text) + .map_err(|e| EngineError::Planner(e)) + } - // TODO: Call LLM to generate chain - // For now, return a simple placeholder chain - let chain = GeneratedChain { - name: format!("{}-chain", directive.title.to_lowercase().replace(' ', "-")), + /// Build a default chain when LLM is unavailable. + fn build_default_chain(&self, directive: &Directive) -> GeneratedChain { + GeneratedChain { + name: format!( + "{}-chain", + directive.title.to_lowercase().replace(' ', "-") + ), description: format!("Execution plan for: {}", directive.goal), steps: vec![ super::planner::GeneratedStep { name: "research".to_string(), step_type: "research".to_string(), - description: "Research and understand the requirements".to_string(), + description: format!( + "Research and understand the requirements for: {}", + directive.goal + ), depends_on: vec![], requirement_ids: vec![], contract_template: None, @@ -287,7 +348,7 @@ impl DirectiveEngine { super::planner::GeneratedStep { name: "implement".to_string(), step_type: "implement".to_string(), - description: "Implement the solution".to_string(), + description: format!("Implement the solution for: {}", directive.goal), depends_on: vec!["research".to_string()], requirement_ids: vec![], contract_template: None, @@ -301,12 +362,7 @@ impl DirectiveEngine { contract_template: None, }, ], - }; - - // Validate the chain - self.planner.validate_chain(&chain)?; - - Ok(chain) + } } /// Create database steps from a generated chain. @@ -408,16 +464,32 @@ impl DirectiveEngine { let failed_step = steps.iter().find(|s| s.status == "failed"); // Build replan prompt - let _prompt = self.planner.build_replan_prompt( + let prompt = self.planner.build_replan_prompt( &directive, &completed_steps.iter().map(|s| (*s).clone()).collect::<Vec<_>>(), failed_step.map(|s| &*s), reason, ); - // TODO: Call LLM to regenerate chain - // For now, just create a new chain with similar structure - let new_chain = self.generate_initial_chain(&directive).await?; + // Try LLM regeneration, fall back to default + let new_chain = match self.generate_chain_via_llm(&prompt).await { + Ok(chain) => { + tracing::info!( + "LLM regenerated chain with {} steps for directive {}", + chain.steps.len(), + directive.id + ); + chain + } + Err(e) => { + tracing::warn!( + "LLM chain regeneration failed ({}), using default chain for directive {}", + e, + directive.id + ); + self.build_default_chain(&directive) + } + }; // Supersede old chain repository::supersede_chain(&self.pool, current_chain.id).await?; @@ -514,18 +586,74 @@ impl DirectiveEngine { }); // Get contract details from step template - let (_name, _description, _contract_type, _initial_phase) = + let (name, description, contract_type, initial_phase) = self.get_contract_details(directive, step); - // TODO: Actually create the contract via the contracts handler - // For now, just update the step status to running - // In a full implementation, this would: - // 1. Create contract via POST /api/v1/contracts - // 2. Create supervisor task via POST /api/v1/tasks - // 3. Link contract and task to step - // 4. Update step status to running + // Create contract for this step + let contract = repository::create_contract_for_owner( + &self.pool, + directive.owner_id, + CreateContractRequest { + name: name.clone(), + description: description.clone(), + contract_type: Some(contract_type), + template_id: None, + initial_phase: Some(initial_phase), + autonomous_loop: Some(directive.autonomy_level == "full_auto"), + phase_guard: Some(true), + local_only: Some(false), + auto_merge_local: None, + }, + ) + .await + .map_err(|e| EngineError::ContractCreation(format!("Failed to create contract: {}", e)))?; + + // Build task plan from step description and task_plan + let task_plan = step + .task_plan + .clone() + .unwrap_or_else(|| { + format!( + "## Step: {}\n\n{}\n\n## Directive Goal\n{}", + step.name, + description.as_deref().unwrap_or("Complete this step."), + directive.goal, + ) + }); + + // Create supervisor task linked to the contract + let task = repository::create_task_for_owner( + &self.pool, + directive.owner_id, + CreateTaskRequest { + contract_id: Some(contract.id), + name: name.clone(), + description: description.clone(), + plan: task_plan, + parent_task_id: None, + is_supervisor: true, + priority: 5, + repository_url: directive.repository_url.clone(), + base_branch: directive.base_branch.clone(), + target_branch: None, + merge_mode: Some("pr".to_string()), + target_repo_path: None, + completion_action: Some("pr".to_string()), + continue_from_task_id: None, + copy_files: None, + checkpoint_sha: None, + branched_from_task_id: None, + conversation_history: None, + supervisor_worktree_task_id: None, + }, + ) + .await + .map_err(|e| EngineError::ContractCreation(format!("Failed to create task: {}", e)))?; + + // Link contract and task to step + repository::update_step_contract(&self.pool, step.id, contract.id, Some(task.id)).await?; - // Placeholder: mark step as running + // Update step status to running repository::update_step_status(&self.pool, step.id, "running").await?; self.emit_event(EngineEvent::StepStatusChanged { directive_id: directive.id, @@ -541,6 +669,8 @@ impl DirectiveEngine { serde_json::json!({ "step_id": step.id, "step_name": step.name, + "contract_id": contract.id, + "task_id": task.id, }), "system", ) diff --git a/makima/src/server/auth.rs b/makima/src/server/auth.rs index b694df6..90b7bda 100644 --- a/makima/src/server/auth.rs +++ b/makima/src/server/auth.rs @@ -837,6 +837,11 @@ pub async fn log_api_key_event( // Internal Helper Functions // ============================================================================= +/// Public wrapper for resolve_owner_id, used by SSE endpoints that authenticate via query params. +pub async fn resolve_owner_id_public(pool: &PgPool, user_id: Uuid, email: Option<&str>) -> Result<Uuid, AuthError> { + resolve_owner_id(pool, user_id, email).await +} + /// Resolve owner_id from user_id by looking up the users table. /// If the user doesn't exist, auto-creates them on first login. /// Uses ON CONFLICT to handle race conditions when multiple requests arrive simultaneously. @@ -894,6 +899,11 @@ async fn resolve_owner_id(pool: &PgPool, user_id: Uuid, email: Option<&str>) -> } } +/// Public wrapper for validate_api_key, used by SSE endpoints that authenticate via query params. +pub async fn validate_api_key_public(pool: &PgPool, key: &str) -> Result<(Uuid, Uuid), AuthError> { + validate_api_key(pool, key).await +} + /// Validate an API key and return (user_id, owner_id). async fn validate_api_key(pool: &PgPool, key: &str) -> Result<(Uuid, Uuid), AuthError> { let key_hash = hash_api_key(key); diff --git a/makima/src/server/handlers/directives.rs b/makima/src/server/handlers/directives.rs index 6f6c3f1..4a78ab5 100644 --- a/makima/src/server/handlers/directives.rs +++ b/makima/src/server/handlers/directives.rs @@ -39,6 +39,14 @@ pub struct ListEventsQuery { pub limit: Option<i64>, } +/// Query parameters for SSE stream authentication +/// EventSource API cannot set custom headers, so auth is passed via query params +#[derive(Debug, Deserialize)] +pub struct StreamAuthQuery { + pub token: Option<String>, + pub api_key: Option<String>, +} + /// Query parameters for listing evaluations #[derive(Debug, Deserialize)] pub struct ListEvaluationsQuery { @@ -117,7 +125,14 @@ pub async fn list_directives( match repository::list_directives_for_owner(pool, auth.owner_id, params.status.as_deref()).await { - Ok(directives) => Json(directives).into_response(), + Ok(directives) => { + let total = directives.len() as i64; + Json(serde_json::json!({ + "directives": directives, + "total": total, + })) + .into_response() + } Err(e) => { tracing::error!("Failed to list directives: {}", e); ( @@ -1052,10 +1067,13 @@ pub async fn list_events( /// SSE stream of events for a directive /// GET /api/v1/directives/:id/events/stream +/// +/// EventSource API cannot set custom headers, so authentication is accepted +/// via query parameters: ?token=<jwt> or ?api_key=<key> pub async fn stream_events( State(state): State<SharedState>, - Authenticated(auth): Authenticated, Path(id): Path<Uuid>, + Query(auth_params): Query<StreamAuthQuery>, ) -> impl IntoResponse { let Some(ref pool) = state.db_pool else { return ( @@ -1065,6 +1083,69 @@ pub async fn stream_events( .into_response(); }; + // Authenticate via query params (EventSource cannot set headers) + let auth = if let Some(ref token) = auth_params.token { + // JWT token + let verifier = match state.jwt_verifier.as_ref() { + Some(v) => v, + None => { + return ( + StatusCode::UNAUTHORIZED, + Json(ApiError::new("AUTH_NOT_CONFIGURED", "Authentication not configured")), + ) + .into_response() + } + }; + let claims = match verifier.verify(token) { + Ok(c) => c, + Err(_) => { + return ( + StatusCode::UNAUTHORIZED, + Json(ApiError::new("INVALID_TOKEN", "Invalid authentication token")), + ) + .into_response() + } + }; + match crate::server::auth::resolve_owner_id_public(pool, claims.sub, claims.email.as_deref()).await { + Ok(owner_id) => crate::server::auth::AuthenticatedUser { + user_id: claims.sub, + owner_id, + auth_source: crate::server::auth::AuthSource::Jwt, + email: claims.email, + }, + Err(_) => { + return ( + StatusCode::UNAUTHORIZED, + Json(ApiError::new("USER_NOT_FOUND", "User not found")), + ) + .into_response() + } + } + } else if let Some(ref api_key) = auth_params.api_key { + // API key + match crate::server::auth::validate_api_key_public(pool, api_key).await { + Ok((user_id, owner_id)) => crate::server::auth::AuthenticatedUser { + user_id, + owner_id, + auth_source: crate::server::auth::AuthSource::ApiKey, + email: None, + }, + Err(_) => { + return ( + StatusCode::UNAUTHORIZED, + Json(ApiError::new("INVALID_API_KEY", "Invalid or revoked API key")), + ) + .into_response() + } + } + } else { + return ( + StatusCode::UNAUTHORIZED, + Json(ApiError::new("MISSING_TOKEN", "Authentication required via ?token= or ?api_key= query parameter")), + ) + .into_response(); + }; + // Verify ownership match repository::get_directive_for_owner(pool, id, auth.owner_id).await { Ok(Some(_)) => {} |
