summaryrefslogtreecommitdiff
path: root/makima/src
diff options
context:
space:
mode:
authorsoryu <soryu@soryu.co>2026-02-06 02:08:37 +0000
committersoryu <soryu@soryu.co>2026-02-06 02:08:37 +0000
commit25e1275af1b742cc7866fba91152d9a4734a6f94 (patch)
treee92c7e168f4e73c302fb63217ea20bf8cfa2ba7e /makima/src
parent8f725a7c64fbeb85ebeb59b54d2f774e9a0a59d6 (diff)
downloadsoryu-25e1275af1b742cc7866fba91152d9a4734a6f94.tar.gz
soryu-25e1275af1b742cc7866fba91152d9a4734a6f94.zip
Fix: Directives API
Diffstat (limited to 'makima/src')
-rw-r--r--makima/src/orchestration/engine.rs186
-rw-r--r--makima/src/server/auth.rs10
-rw-r--r--makima/src/server/handlers/directives.rs85
3 files changed, 251 insertions, 30 deletions
diff --git a/makima/src/orchestration/engine.rs b/makima/src/orchestration/engine.rs
index 5bbb99f..954b857 100644
--- a/makima/src/orchestration/engine.rs
+++ b/makima/src/orchestration/engine.rs
@@ -13,7 +13,10 @@ use thiserror::Error;
use tokio::sync::broadcast;
use uuid::Uuid;
-use crate::db::models::{AddStepRequest, ChainStep, Directive, DirectiveEvent, UpdateStepRequest};
+use crate::db::models::{
+ AddStepRequest, ChainStep, CreateContractRequest, CreateTaskRequest, Directive,
+ DirectiveEvent, UpdateStepRequest,
+};
use crate::db::repository::{self, RepositoryError};
use super::planner::{ChainPlanner, GeneratedChain, PlannerError};
@@ -262,24 +265,82 @@ impl DirectiveEngine {
// Chain Management
// ========================================================================
- /// Generate initial chain from directive.
+ /// Generate initial chain from directive using LLM.
async fn generate_initial_chain(
&self,
directive: &Directive,
) -> Result<GeneratedChain, EngineError> {
// Build planning prompt
- let _prompt = self.planner.build_planning_prompt(directive);
+ let prompt = self.planner.build_planning_prompt(directive);
+
+ // Try LLM chain generation, fall back to default if unavailable
+ let chain = match self.generate_chain_via_llm(&prompt).await {
+ Ok(chain) => {
+ tracing::info!(
+ "LLM generated chain with {} steps for directive {}",
+ chain.steps.len(),
+ directive.id
+ );
+ chain
+ }
+ Err(e) => {
+ tracing::warn!(
+ "LLM chain generation failed ({}), using default chain for directive {}",
+ e,
+ directive.id
+ );
+ self.build_default_chain(directive)
+ }
+ };
+
+ // Validate the chain
+ self.planner.validate_chain(&chain)?;
+
+ Ok(chain)
+ }
+
+ /// Call LLM to generate a chain from the planning prompt.
+ async fn generate_chain_via_llm(&self, prompt: &str) -> Result<GeneratedChain, EngineError> {
+ use crate::llm::claude::{ClaudeClient, ClaudeModel, Message, MessageContent};
+
+ let client = ClaudeClient::from_env(ClaudeModel::Sonnet)
+ .map_err(|e| EngineError::LlmError(format!("Failed to create LLM client: {}", e)))?;
+
+ let messages = vec![Message {
+ role: "user".to_string(),
+ content: MessageContent::Text(prompt.to_string()),
+ }];
+
+ let result = client
+ .chat_with_tools(messages, &[])
+ .await
+ .map_err(|e| EngineError::LlmError(format!("LLM call failed: {}", e)))?;
+
+ let response_text = result
+ .content
+ .ok_or_else(|| EngineError::LlmError("Empty LLM response".to_string()))?;
+
+ self.planner
+ .parse_plan_response(&response_text)
+ .map_err(|e| EngineError::Planner(e))
+ }
- // TODO: Call LLM to generate chain
- // For now, return a simple placeholder chain
- let chain = GeneratedChain {
- name: format!("{}-chain", directive.title.to_lowercase().replace(' ', "-")),
+ /// Build a default chain when LLM is unavailable.
+ fn build_default_chain(&self, directive: &Directive) -> GeneratedChain {
+ GeneratedChain {
+ name: format!(
+ "{}-chain",
+ directive.title.to_lowercase().replace(' ', "-")
+ ),
description: format!("Execution plan for: {}", directive.goal),
steps: vec![
super::planner::GeneratedStep {
name: "research".to_string(),
step_type: "research".to_string(),
- description: "Research and understand the requirements".to_string(),
+ description: format!(
+ "Research and understand the requirements for: {}",
+ directive.goal
+ ),
depends_on: vec![],
requirement_ids: vec![],
contract_template: None,
@@ -287,7 +348,7 @@ impl DirectiveEngine {
super::planner::GeneratedStep {
name: "implement".to_string(),
step_type: "implement".to_string(),
- description: "Implement the solution".to_string(),
+ description: format!("Implement the solution for: {}", directive.goal),
depends_on: vec!["research".to_string()],
requirement_ids: vec![],
contract_template: None,
@@ -301,12 +362,7 @@ impl DirectiveEngine {
contract_template: None,
},
],
- };
-
- // Validate the chain
- self.planner.validate_chain(&chain)?;
-
- Ok(chain)
+ }
}
/// Create database steps from a generated chain.
@@ -408,16 +464,32 @@ impl DirectiveEngine {
let failed_step = steps.iter().find(|s| s.status == "failed");
// Build replan prompt
- let _prompt = self.planner.build_replan_prompt(
+ let prompt = self.planner.build_replan_prompt(
&directive,
&completed_steps.iter().map(|s| (*s).clone()).collect::<Vec<_>>(),
failed_step.map(|s| &*s),
reason,
);
- // TODO: Call LLM to regenerate chain
- // For now, just create a new chain with similar structure
- let new_chain = self.generate_initial_chain(&directive).await?;
+ // Try LLM regeneration, fall back to default
+ let new_chain = match self.generate_chain_via_llm(&prompt).await {
+ Ok(chain) => {
+ tracing::info!(
+ "LLM regenerated chain with {} steps for directive {}",
+ chain.steps.len(),
+ directive.id
+ );
+ chain
+ }
+ Err(e) => {
+ tracing::warn!(
+ "LLM chain regeneration failed ({}), using default chain for directive {}",
+ e,
+ directive.id
+ );
+ self.build_default_chain(&directive)
+ }
+ };
// Supersede old chain
repository::supersede_chain(&self.pool, current_chain.id).await?;
@@ -514,18 +586,74 @@ impl DirectiveEngine {
});
// Get contract details from step template
- let (_name, _description, _contract_type, _initial_phase) =
+ let (name, description, contract_type, initial_phase) =
self.get_contract_details(directive, step);
- // TODO: Actually create the contract via the contracts handler
- // For now, just update the step status to running
- // In a full implementation, this would:
- // 1. Create contract via POST /api/v1/contracts
- // 2. Create supervisor task via POST /api/v1/tasks
- // 3. Link contract and task to step
- // 4. Update step status to running
+ // Create contract for this step
+ let contract = repository::create_contract_for_owner(
+ &self.pool,
+ directive.owner_id,
+ CreateContractRequest {
+ name: name.clone(),
+ description: description.clone(),
+ contract_type: Some(contract_type),
+ template_id: None,
+ initial_phase: Some(initial_phase),
+ autonomous_loop: Some(directive.autonomy_level == "full_auto"),
+ phase_guard: Some(true),
+ local_only: Some(false),
+ auto_merge_local: None,
+ },
+ )
+ .await
+ .map_err(|e| EngineError::ContractCreation(format!("Failed to create contract: {}", e)))?;
+
+ // Build task plan from step description and task_plan
+ let task_plan = step
+ .task_plan
+ .clone()
+ .unwrap_or_else(|| {
+ format!(
+ "## Step: {}\n\n{}\n\n## Directive Goal\n{}",
+ step.name,
+ description.as_deref().unwrap_or("Complete this step."),
+ directive.goal,
+ )
+ });
+
+ // Create supervisor task linked to the contract
+ let task = repository::create_task_for_owner(
+ &self.pool,
+ directive.owner_id,
+ CreateTaskRequest {
+ contract_id: Some(contract.id),
+ name: name.clone(),
+ description: description.clone(),
+ plan: task_plan,
+ parent_task_id: None,
+ is_supervisor: true,
+ priority: 5,
+ repository_url: directive.repository_url.clone(),
+ base_branch: directive.base_branch.clone(),
+ target_branch: None,
+ merge_mode: Some("pr".to_string()),
+ target_repo_path: None,
+ completion_action: Some("pr".to_string()),
+ continue_from_task_id: None,
+ copy_files: None,
+ checkpoint_sha: None,
+ branched_from_task_id: None,
+ conversation_history: None,
+ supervisor_worktree_task_id: None,
+ },
+ )
+ .await
+ .map_err(|e| EngineError::ContractCreation(format!("Failed to create task: {}", e)))?;
+
+ // Link contract and task to step
+ repository::update_step_contract(&self.pool, step.id, contract.id, Some(task.id)).await?;
- // Placeholder: mark step as running
+ // Update step status to running
repository::update_step_status(&self.pool, step.id, "running").await?;
self.emit_event(EngineEvent::StepStatusChanged {
directive_id: directive.id,
@@ -541,6 +669,8 @@ impl DirectiveEngine {
serde_json::json!({
"step_id": step.id,
"step_name": step.name,
+ "contract_id": contract.id,
+ "task_id": task.id,
}),
"system",
)
diff --git a/makima/src/server/auth.rs b/makima/src/server/auth.rs
index b694df6..90b7bda 100644
--- a/makima/src/server/auth.rs
+++ b/makima/src/server/auth.rs
@@ -837,6 +837,11 @@ pub async fn log_api_key_event(
// Internal Helper Functions
// =============================================================================
+/// Public wrapper for resolve_owner_id, used by SSE endpoints that authenticate via query params.
+pub async fn resolve_owner_id_public(pool: &PgPool, user_id: Uuid, email: Option<&str>) -> Result<Uuid, AuthError> {
+ resolve_owner_id(pool, user_id, email).await
+}
+
/// Resolve owner_id from user_id by looking up the users table.
/// If the user doesn't exist, auto-creates them on first login.
/// Uses ON CONFLICT to handle race conditions when multiple requests arrive simultaneously.
@@ -894,6 +899,11 @@ async fn resolve_owner_id(pool: &PgPool, user_id: Uuid, email: Option<&str>) ->
}
}
+/// Public wrapper for validate_api_key, used by SSE endpoints that authenticate via query params.
+pub async fn validate_api_key_public(pool: &PgPool, key: &str) -> Result<(Uuid, Uuid), AuthError> {
+ validate_api_key(pool, key).await
+}
+
/// Validate an API key and return (user_id, owner_id).
async fn validate_api_key(pool: &PgPool, key: &str) -> Result<(Uuid, Uuid), AuthError> {
let key_hash = hash_api_key(key);
diff --git a/makima/src/server/handlers/directives.rs b/makima/src/server/handlers/directives.rs
index 6f6c3f1..4a78ab5 100644
--- a/makima/src/server/handlers/directives.rs
+++ b/makima/src/server/handlers/directives.rs
@@ -39,6 +39,14 @@ pub struct ListEventsQuery {
pub limit: Option<i64>,
}
+/// Query parameters for SSE stream authentication
+/// EventSource API cannot set custom headers, so auth is passed via query params
+#[derive(Debug, Deserialize)]
+pub struct StreamAuthQuery {
+ pub token: Option<String>,
+ pub api_key: Option<String>,
+}
+
/// Query parameters for listing evaluations
#[derive(Debug, Deserialize)]
pub struct ListEvaluationsQuery {
@@ -117,7 +125,14 @@ pub async fn list_directives(
match repository::list_directives_for_owner(pool, auth.owner_id, params.status.as_deref()).await
{
- Ok(directives) => Json(directives).into_response(),
+ Ok(directives) => {
+ let total = directives.len() as i64;
+ Json(serde_json::json!({
+ "directives": directives,
+ "total": total,
+ }))
+ .into_response()
+ }
Err(e) => {
tracing::error!("Failed to list directives: {}", e);
(
@@ -1052,10 +1067,13 @@ pub async fn list_events(
/// SSE stream of events for a directive
/// GET /api/v1/directives/:id/events/stream
+///
+/// EventSource API cannot set custom headers, so authentication is accepted
+/// via query parameters: ?token=<jwt> or ?api_key=<key>
pub async fn stream_events(
State(state): State<SharedState>,
- Authenticated(auth): Authenticated,
Path(id): Path<Uuid>,
+ Query(auth_params): Query<StreamAuthQuery>,
) -> impl IntoResponse {
let Some(ref pool) = state.db_pool else {
return (
@@ -1065,6 +1083,69 @@ pub async fn stream_events(
.into_response();
};
+ // Authenticate via query params (EventSource cannot set headers)
+ let auth = if let Some(ref token) = auth_params.token {
+ // JWT token
+ let verifier = match state.jwt_verifier.as_ref() {
+ Some(v) => v,
+ None => {
+ return (
+ StatusCode::UNAUTHORIZED,
+ Json(ApiError::new("AUTH_NOT_CONFIGURED", "Authentication not configured")),
+ )
+ .into_response()
+ }
+ };
+ let claims = match verifier.verify(token) {
+ Ok(c) => c,
+ Err(_) => {
+ return (
+ StatusCode::UNAUTHORIZED,
+ Json(ApiError::new("INVALID_TOKEN", "Invalid authentication token")),
+ )
+ .into_response()
+ }
+ };
+ match crate::server::auth::resolve_owner_id_public(pool, claims.sub, claims.email.as_deref()).await {
+ Ok(owner_id) => crate::server::auth::AuthenticatedUser {
+ user_id: claims.sub,
+ owner_id,
+ auth_source: crate::server::auth::AuthSource::Jwt,
+ email: claims.email,
+ },
+ Err(_) => {
+ return (
+ StatusCode::UNAUTHORIZED,
+ Json(ApiError::new("USER_NOT_FOUND", "User not found")),
+ )
+ .into_response()
+ }
+ }
+ } else if let Some(ref api_key) = auth_params.api_key {
+ // API key
+ match crate::server::auth::validate_api_key_public(pool, api_key).await {
+ Ok((user_id, owner_id)) => crate::server::auth::AuthenticatedUser {
+ user_id,
+ owner_id,
+ auth_source: crate::server::auth::AuthSource::ApiKey,
+ email: None,
+ },
+ Err(_) => {
+ return (
+ StatusCode::UNAUTHORIZED,
+ Json(ApiError::new("INVALID_API_KEY", "Invalid or revoked API key")),
+ )
+ .into_response()
+ }
+ }
+ } else {
+ return (
+ StatusCode::UNAUTHORIZED,
+ Json(ApiError::new("MISSING_TOKEN", "Authentication required via ?token= or ?api_key= query parameter")),
+ )
+ .into_response();
+ };
+
// Verify ownership
match repository::get_directive_for_owner(pool, id, auth.owner_id).await {
Ok(Some(_)) => {}