diff options
Diffstat (limited to 'makima/src')
| -rw-r--r-- | makima/src/bin/makima.rs | 88 | ||||
| -rw-r--r-- | makima/src/daemon/api/chain.rs | 26 | ||||
| -rw-r--r-- | makima/src/daemon/cli/mod.rs | 14 | ||||
| -rw-r--r-- | makima/src/daemon/cli/supervisor.rs | 64 | ||||
| -rw-r--r-- | makima/src/db/models.rs | 67 | ||||
| -rw-r--r-- | makima/src/db/repository.rs | 432 | ||||
| -rw-r--r-- | makima/src/server/handlers/chains.rs | 150 | ||||
| -rw-r--r-- | makima/src/server/handlers/contracts.rs | 29 |
8 files changed, 856 insertions, 14 deletions
diff --git a/makima/src/bin/makima.rs b/makima/src/bin/makima.rs index 2037b47..3215bfd 100644 --- a/makima/src/bin/makima.rs +++ b/makima/src/bin/makima.rs @@ -612,6 +612,94 @@ async fn run_supervisor( .await?; println!("{}", serde_json::to_string(&result.0)?); } + // Chain supervisor commands + SupervisorCommand::ChainStatus(args) => { + let client = ApiClient::new(args.common.api_url, args.common.api_key)?; + eprintln!("Getting chain status for {}...", args.common.chain_id); + let result = client.get_chain(args.common.chain_id).await?; + if args.verbose { + // Get contracts as well + let contracts = client.get_chain_contracts(args.common.chain_id).await?; + println!("{}", serde_json::to_string(&serde_json::json!({ + "chain": result.0, + "contracts": contracts.0 + }))?); + } else { + println!("{}", serde_json::to_string(&result.0)?); + } + } + SupervisorCommand::ChainContracts(args) => { + let client = ApiClient::new(args.common.api_url, args.common.api_key)?; + eprintln!("Listing contracts in chain {}...", args.common.chain_id); + let result = client.get_chain_contracts(args.common.chain_id).await?; + if let Some(status) = &args.status { + // Filter by status client-side + let contracts: Vec<_> = result.0.as_array() + .unwrap_or(&vec![]) + .iter() + .filter(|c| c.get("status").and_then(|s| s.as_str()) == Some(status.as_str())) + .collect(); + println!("{}", serde_json::to_string(&contracts)?); + } else { + println!("{}", serde_json::to_string(&result.0)?); + } + } + SupervisorCommand::ChainProgress(args) => { + let client = ApiClient::new(args.common.api_url, args.common.api_key)?; + eprintln!("Triggering chain progression for {}...", args.common.chain_id); + // Use the start endpoint to progress (it handles already-active chains) + let result = client.start_chain(args.common.chain_id).await; + match result { + Ok(r) => println!("{}", serde_json::to_string(&r.0)?), + Err(e) => { + // Check if already active - in that case, just get status + if e.to_string().contains("ALREADY_ACTIVE") { + eprintln!("Chain is already active, checking current status..."); + let status = client.get_chain(args.common.chain_id).await?; + println!("{}", serde_json::to_string(&status.0)?); + } else { + return Err(e.into()); + } + } + } + } + SupervisorCommand::ChainGraph(args) => { + let client = ApiClient::new(args.common.api_url, args.common.api_key)?; + let result = client.get_chain_graph(args.common.chain_id).await?; + if args.format == "json" { + println!("{}", serde_json::to_string(&result.0)?); + } else { + // ASCII visualization (similar to chain graph command) + if let Some(nodes) = result.0.get("nodes").and_then(|n| n.as_array()) { + // Group by depth + let mut by_depth: std::collections::BTreeMap<i64, Vec<(&str, &str)>> = + std::collections::BTreeMap::new(); + for node in nodes { + let name = node.get("name").and_then(|n| n.as_str()).unwrap_or("?"); + let status = node.get("status").and_then(|s| s.as_str()).unwrap_or("pending"); + let depth = node.get("depth").and_then(|d| d.as_i64()).unwrap_or(0); + by_depth.entry(depth).or_default().push((name, status)); + } + let chain_name = result.0.get("name").and_then(|v| v.as_str()).unwrap_or("Chain"); + println!("Chain: {}", chain_name); + println!(); + for (depth, contracts) in by_depth { + let prefix = " ".repeat(depth as usize); + for (name, status) in contracts { + let icon = match status { + "completed" => "\u{2713}", + "active" => "\u{21bb}", + "failed" => "\u{2717}", + _ => "\u{25cb}", + }; + println!("{}[{}] {} ({})", prefix, icon, name, status); + } + } + } else { + println!("{}", serde_json::to_string(&result.0)?); + } + } + } } Ok(()) diff --git a/makima/src/daemon/api/chain.rs b/makima/src/daemon/api/chain.rs index 7f7826f..c37c980 100644 --- a/makima/src/daemon/api/chain.rs +++ b/makima/src/daemon/api/chain.rs @@ -49,4 +49,30 @@ impl ApiClient { self.delete_with_response(&format!("/api/v1/chains/{}", chain_id)) .await } + + /// Start a chain (creates root contracts and optionally a supervisor). + pub async fn start_chain(&self, chain_id: Uuid) -> Result<JsonValue, ApiError> { + self.post_empty(&format!("/api/v1/chains/{}/start", chain_id)) + .await + } + + /// Start a chain with supervisor enabled. + pub async fn start_chain_with_supervisor( + &self, + chain_id: Uuid, + repository_url: Option<&str>, + ) -> Result<JsonValue, ApiError> { + #[derive(serde::Serialize)] + #[serde(rename_all = "camelCase")] + struct StartRequest<'a> { + with_supervisor: bool, + repository_url: Option<&'a str>, + } + let req = StartRequest { + with_supervisor: true, + repository_url, + }; + self.post(&format!("/api/v1/chains/{}/start", chain_id), &req) + .await + } } diff --git a/makima/src/daemon/cli/mod.rs b/makima/src/daemon/cli/mod.rs index 035a784..25163c2 100644 --- a/makima/src/daemon/cli/mod.rs +++ b/makima/src/daemon/cli/mod.rs @@ -170,6 +170,20 @@ pub enum SupervisorCommand { /// Mark a deliverable as complete MarkDeliverable(supervisor::MarkDeliverableArgs), + + // Chain supervisor commands (for chain-level orchestration) + + /// Get chain status with progress info + ChainStatus(supervisor::ChainStatusArgs), + + /// List contracts in the chain + ChainContracts(supervisor::ChainContractsArgs), + + /// Manually trigger chain progression + ChainProgress(supervisor::ChainProgressArgs), + + /// Get chain DAG visualization + ChainGraph(supervisor::ChainGraphArgs), } /// Contract subcommands for task-contract interaction. diff --git a/makima/src/daemon/cli/supervisor.rs b/makima/src/daemon/cli/supervisor.rs index 6f19697..0b52c9c 100644 --- a/makima/src/daemon/cli/supervisor.rs +++ b/makima/src/daemon/cli/supervisor.rs @@ -445,3 +445,67 @@ pub struct ResumeContractArgs { #[arg(index = 1)] pub contract_id: Uuid, } + +// ============================================================================ +// Chain Supervisor Command Args +// ============================================================================ + +/// Common arguments for chain supervisor commands. +#[derive(Args, Debug, Clone)] +pub struct ChainSupervisorArgs { + /// API URL + #[arg(long, env = "MAKIMA_API_URL", default_value = "https://api.makima.jp")] + pub api_url: String, + + /// API key for authentication + #[arg(long, env = "MAKIMA_API_KEY")] + pub api_key: String, + + /// Current task ID (the chain supervisor's own task ID) + #[arg(long, env = "MAKIMA_TASK_ID")] + pub self_task_id: Option<Uuid>, + + /// Chain ID + #[arg(long, env = "MAKIMA_CHAIN_ID")] + pub chain_id: Uuid, +} + +/// Arguments for chain-status command (get chain status with progress info). +#[derive(Args, Debug)] +pub struct ChainStatusArgs { + #[command(flatten)] + pub common: ChainSupervisorArgs, + + /// Include contract details + #[arg(long)] + pub verbose: bool, +} + +/// Arguments for chain-contracts command (list contracts in the chain). +#[derive(Args, Debug)] +pub struct ChainContractsArgs { + #[command(flatten)] + pub common: ChainSupervisorArgs, + + /// Filter by status (active, completed, failed) + #[arg(long)] + pub status: Option<String>, +} + +/// Arguments for chain-progress command (manually trigger chain progression). +#[derive(Args, Debug)] +pub struct ChainProgressArgs { + #[command(flatten)] + pub common: ChainSupervisorArgs, +} + +/// Arguments for chain-graph command (get chain DAG visualization). +#[derive(Args, Debug)] +pub struct ChainGraphArgs { + #[command(flatten)] + pub common: ChainSupervisorArgs, + + /// Output format (ascii, json) + #[arg(long, default_value = "ascii")] + pub format: String, +} diff --git a/makima/src/db/models.rs b/makima/src/db/models.rs index eeb30e4..0b8ef43 100644 --- a/makima/src/db/models.rs +++ b/makima/src/db/models.rs @@ -2848,6 +2848,56 @@ pub struct CreateChainDeliverableRequest { pub priority: Option<String>, } +/// Validation configuration for checkpoint contracts. +/// Checkpoint contracts validate the outputs of their upstream dependencies +/// before allowing downstream contracts to proceed. +#[derive(Debug, Clone, Serialize, Deserialize, ToSchema)] +#[serde(rename_all = "camelCase")] +pub struct CheckpointValidation { + /// Check that all required deliverables from upstream contracts exist + #[serde(default)] + pub check_deliverables: bool, + + /// Run tests in the repository (requires repository to be configured) + #[serde(default)] + pub run_tests: bool, + + /// Custom validation instructions for Claude to execute. + /// Claude will review the outputs of upstream contracts and verify they meet these criteria. + pub check_content: Option<String>, + + /// Action to take on validation failure: "block" (default), "retry", "warn" + /// - block: Fail the checkpoint and block downstream contracts + /// - retry: Mark upstream contracts for retry (up to max_retries) + /// - warn: Log warning but allow downstream to proceed + #[serde(default = "default_checkpoint_on_failure")] + pub on_failure: String, + + /// Maximum retry attempts for upstream contracts (when on_failure = "retry") + #[serde(default = "default_checkpoint_max_retries")] + pub max_retries: i32, +} + +fn default_checkpoint_on_failure() -> String { + "block".to_string() +} + +fn default_checkpoint_max_retries() -> i32 { + 3 +} + +impl Default for CheckpointValidation { + fn default() -> Self { + Self { + check_deliverables: false, + run_tests: false, + check_content: None, + on_failure: default_checkpoint_on_failure(), + max_retries: default_checkpoint_max_retries(), + } + } +} + /// Request to update an existing chain #[derive(Debug, Clone, Deserialize, ToSchema)] #[serde(rename_all = "camelCase")] @@ -2962,6 +3012,8 @@ pub struct ChainContractDefinition { pub tasks: Option<serde_json::Value>, /// Deliverable definitions as JSON: [{id, name, priority}, ...] pub deliverables: Option<serde_json::Value>, + /// Validation configuration for checkpoint contracts (JSON) + pub validation: Option<serde_json::Value>, /// Position in GUI editor pub editor_x: Option<f64>, pub editor_y: Option<f64>, @@ -2984,6 +3036,8 @@ pub struct AddContractDefinitionRequest { pub tasks: Option<Vec<CreateChainTaskRequest>>, /// Deliverable definitions pub deliverables: Option<Vec<CreateChainDeliverableRequest>>, + /// Validation configuration (for checkpoint contracts) + pub validation: Option<CheckpointValidation>, /// Position in GUI editor pub editor_x: Option<f64>, pub editor_y: Option<f64>, @@ -3004,10 +3058,23 @@ pub struct UpdateContractDefinitionRequest { pub depends_on: Option<Vec<String>>, pub tasks: Option<Vec<CreateChainTaskRequest>>, pub deliverables: Option<Vec<CreateChainDeliverableRequest>>, + /// Validation configuration (for checkpoint contracts) + pub validation: Option<CheckpointValidation>, pub editor_x: Option<f64>, pub editor_y: Option<f64>, } +/// Request to start a chain +#[derive(Debug, Clone, Deserialize, ToSchema)] +#[serde(rename_all = "camelCase")] +pub struct StartChainRequest { + /// Whether to create a supervisor task that monitors chain progress + #[serde(default)] + pub with_supervisor: bool, + /// Repository URL for the supervisor task to work with + pub repository_url: Option<String>, +} + /// Response when starting a chain #[derive(Debug, Clone, Serialize, ToSchema)] #[serde(rename_all = "camelCase")] diff --git a/makima/src/db/repository.rs b/makima/src/db/repository.rs index 85af178..fb430ab 100644 --- a/makima/src/db/repository.rs +++ b/makima/src/db/repository.rs @@ -5448,19 +5448,23 @@ pub async fn create_chain_contract_definition( let order_index = max_order.unwrap_or(-1) + 1; - // Convert tasks and deliverables to JSON + // Convert tasks, deliverables, and validation to JSON let tasks_json = req.tasks.as_ref().map(|t| serde_json::to_value(t).unwrap()); let deliverables_json = req .deliverables .as_ref() .map(|d| serde_json::to_value(d).unwrap()); + let validation_json = req + .validation + .as_ref() + .map(|v| serde_json::to_value(v).unwrap()); let depends_on_names: Vec<String> = req.depends_on.unwrap_or_default(); sqlx::query_as::<_, ChainContractDefinition>( r#" INSERT INTO chain_contract_definitions - (chain_id, name, description, contract_type, initial_phase, depends_on_names, tasks, deliverables, editor_x, editor_y, order_index) - VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11) + (chain_id, name, description, contract_type, initial_phase, depends_on_names, tasks, deliverables, validation, editor_x, editor_y, order_index) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12) RETURNING * "#, ) @@ -5472,6 +5476,7 @@ pub async fn create_chain_contract_definition( .bind(&depends_on_names) .bind(&tasks_json) .bind(&deliverables_json) + .bind(&validation_json) .bind(req.editor_x) .bind(req.editor_y) .bind(order_index) @@ -5520,6 +5525,10 @@ pub async fn update_chain_contract_definition( .deliverables .as_ref() .map(|d| serde_json::to_value(d).unwrap()); + let validation_json = req + .validation + .as_ref() + .map(|v| serde_json::to_value(v).unwrap()); sqlx::query_as::<_, ChainContractDefinition>( r#" @@ -5531,8 +5540,9 @@ pub async fn update_chain_contract_definition( depends_on_names = COALESCE($6, depends_on_names), tasks = COALESCE($7, tasks), deliverables = COALESCE($8, deliverables), - editor_x = COALESCE($9, editor_x), - editor_y = COALESCE($10, editor_y) + validation = COALESCE($9, validation), + editor_x = COALESCE($10, editor_x), + editor_y = COALESCE($11, editor_y) WHERE id = $1 RETURNING * "#, @@ -5545,6 +5555,7 @@ pub async fn update_chain_contract_definition( .bind(&req.depends_on) .bind(&tasks_json) .bind(&deliverables_json) + .bind(&validation_json) .bind(req.editor_x) .bind(req.editor_y) .fetch_one(pool) @@ -5705,3 +5716,414 @@ pub async fn update_chain_status( .await?; Ok(()) } + +// ============================================================================= +// Chain Progression +// ============================================================================= + +/// Result of chain progression check +#[derive(Debug)] +pub struct ChainProgressionResult { + /// Contracts created from ready definitions + pub contracts_created: Vec<Uuid>, + /// Whether all definitions are instantiated and completed (chain is done) + pub chain_completed: bool, +} + +/// Progress a chain by creating contracts from ready definitions. +/// +/// This is called when a contract in the chain completes. It: +/// 1. Finds definitions whose dependencies are all satisfied (completed) +/// 2. Creates contracts from those definitions +/// 3. Links them to the chain +/// 4. Checks if chain is complete (all definitions instantiated and completed) +pub async fn progress_chain( + pool: &PgPool, + chain_id: Uuid, + owner_id: Uuid, +) -> Result<ChainProgressionResult, sqlx::Error> { + let mut contracts_created = Vec::new(); + + // Get all definitions for this chain + let definitions = list_chain_contract_definitions(pool, chain_id).await?; + if definitions.is_empty() { + return Ok(ChainProgressionResult { + contracts_created: vec![], + chain_completed: true, + }); + } + + // Get existing chain contracts to know what's already instantiated + let chain_contracts = list_chain_contracts(pool, chain_id).await?; + + // Build a map of definition name -> instantiated contract status + let instantiated: std::collections::HashMap<String, Option<String>> = chain_contracts + .iter() + .map(|cc| (cc.contract_name.clone(), Some(cc.contract_status.clone()))) + .collect(); + + // Find definitions that are ready to be instantiated: + // - Not yet instantiated + // - All dependencies are instantiated AND completed + for def in &definitions { + // Skip if already instantiated + if instantiated.contains_key(&def.name) { + continue; + } + + // Check if all dependencies are completed + let deps_satisfied = def.depends_on_names.iter().all(|dep_name| { + instantiated + .get(dep_name) + .map(|status| status.as_deref() == Some("completed")) + .unwrap_or(false) + }); + + // Root definitions (no dependencies) are always ready + let is_root = def.depends_on_names.is_empty(); + + if is_root || deps_satisfied { + // Create contract from definition + match create_contract_from_definition(pool, chain_id, owner_id, def).await { + Ok(contract_id) => { + contracts_created.push(contract_id); + tracing::info!( + chain_id = %chain_id, + definition_name = %def.name, + contract_id = %contract_id, + "Created contract from chain definition" + ); + } + Err(e) => { + tracing::error!( + chain_id = %chain_id, + definition_name = %def.name, + error = %e, + "Failed to create contract from chain definition" + ); + } + } + } + } + + // Check if chain is complete (all definitions instantiated and completed) + let updated_contracts = list_chain_contracts(pool, chain_id).await?; + let all_instantiated = definitions.len() == updated_contracts.len(); + let all_completed = updated_contracts + .iter() + .all(|cc| cc.contract_status == "completed"); + let chain_completed = all_instantiated && all_completed; + + if chain_completed { + update_chain_status(pool, chain_id, "completed").await?; + tracing::info!(chain_id = %chain_id, "Chain completed - all contracts done"); + } + + Ok(ChainProgressionResult { + contracts_created, + chain_completed, + }) +} + +/// Task definition parsed from JSON (matches chain YAML format) +#[derive(Debug, Clone, serde::Deserialize)] +struct ChainTaskDef { + name: String, + plan: String, +} + +/// Validation config parsed from definition JSON +#[derive(Debug, Clone, serde::Deserialize)] +#[serde(rename_all = "camelCase")] +struct ValidationConfig { + #[serde(default)] + check_deliverables: bool, + #[serde(default)] + run_tests: bool, + check_content: Option<String>, + #[serde(default = "default_on_failure_str")] + on_failure: String, + #[serde(default = "default_max_retries_val")] + max_retries: i32, +} + +fn default_on_failure_str() -> String { + "block".to_string() +} + +fn default_max_retries_val() -> i32 { + 3 +} + +/// Generate a validation plan for a checkpoint contract. +fn generate_checkpoint_plan( + def: &ChainContractDefinition, + upstream_contracts: &[&ChainContractDetail], + validation: &ValidationConfig, +) -> String { + let upstream_names: Vec<&str> = upstream_contracts.iter().map(|c| c.contract_name.as_str()).collect(); + + let mut plan = format!( + r#"# Checkpoint Validation: {} + +You are validating the outputs of upstream contracts before allowing downstream work to proceed. + +## Upstream Contracts to Validate +{} + +"#, + def.name, + upstream_names.iter().map(|n| format!("- {}", n)).collect::<Vec<_>>().join("\n") + ); + + // Add deliverables check section + if validation.check_deliverables { + plan.push_str(r#"## Deliverables Check +Verify that all required deliverables from upstream contracts exist and are properly completed. + +Use the makima CLI to check contract status: +```bash +makima contract status <contract_id> +``` + +For each upstream contract, verify: +1. Contract status is "completed" +2. All required deliverables are marked as complete +3. Deliverable content exists and is not empty + +"#); + } + + // Add tests check section + if validation.run_tests { + plan.push_str(r#"## Tests Check +Run the test suite to verify the codebase is in a good state. + +```bash +# Run tests appropriate for the project type +npm test # for Node.js projects +cargo test # for Rust projects +pytest # for Python projects +go test ./... # for Go projects +``` + +Verify: +1. All tests pass +2. No new test failures introduced +3. Test coverage is acceptable + +"#); + } + + // Add custom content check section + if let Some(content_check) = &validation.check_content { + plan.push_str(&format!(r#"## Custom Validation Criteria +{} + +"#, content_check)); + } + + // Add validation result section + plan.push_str(&format!(r#"## Reporting Results + +After completing all validation checks, you must report the result: + +**If ALL checks pass:** +Mark this checkpoint contract as completed using: +```bash +makima supervisor complete +``` + +**If ANY check fails (on_failure: "{}"):** +"#, validation.on_failure)); + + match validation.on_failure.as_str() { + "block" => plan.push_str(r#" +- Document the failure reason clearly +- Do NOT mark the contract as complete +- The chain will be blocked until issues are resolved manually +"#), + "retry" => plan.push_str(&format!(r#" +- Document the failure reason +- Request retry of the failed upstream contract (max {} retries) +- Use: `makima supervisor ask "Upstream validation failed. Retry?" --choices "Yes,No"` +"#, validation.max_retries)), + "warn" => plan.push_str(r#" +- Document the warning/issue found +- Mark the contract as complete anyway (downstream will proceed) +- Log the warning for visibility +"#), + _ => plan.push_str(r#" +- Document the failure reason +- Do NOT mark the contract as complete +"#), + } + + plan.push_str(r#" +## Begin Validation + +Start by checking the status of each upstream contract, then proceed with the validation criteria above. +"#); + + plan +} + +/// Create a contract from a chain definition. +async fn create_contract_from_definition( + pool: &PgPool, + chain_id: Uuid, + owner_id: Uuid, + def: &ChainContractDefinition, +) -> Result<Uuid, sqlx::Error> { + // Get the existing contracts to find dependency info + let existing_contracts = list_chain_contracts(pool, chain_id).await?; + let name_to_contract: std::collections::HashMap<&str, &ChainContractDetail> = existing_contracts + .iter() + .map(|cc| (cc.contract_name.as_str(), cc)) + .collect(); + + // Resolve dependency names to contract details + let upstream_contracts: Vec<&ChainContractDetail> = def + .depends_on_names + .iter() + .filter_map(|name| name_to_contract.get(name.as_str()).copied()) + .collect(); + + // Create the contract request with basic fields + let req = CreateContractRequest { + name: def.name.clone(), + description: def.description.clone(), + contract_type: Some(def.contract_type.clone()), + initial_phase: def.initial_phase.clone(), + template_id: None, + autonomous_loop: None, + phase_guard: None, + local_only: None, + auto_merge_local: None, + }; + + // Create the contract + let contract = create_contract_for_owner(pool, owner_id, req).await?; + + // For checkpoint contracts, generate a validation plan + if def.contract_type == "checkpoint" { + // Parse validation config + let validation: ValidationConfig = def + .validation + .as_ref() + .and_then(|v| serde_json::from_value(v.clone()).ok()) + .unwrap_or(ValidationConfig { + check_deliverables: true, + run_tests: false, + check_content: None, + on_failure: default_on_failure_str(), + max_retries: default_max_retries_val(), + }); + + // Generate validation plan + let validation_plan = generate_checkpoint_plan(def, &upstream_contracts, &validation); + + // Create a supervisor task with the validation plan + let task_req = CreateTaskRequest { + contract_id: Some(contract.id), + name: format!("Validate: {}", def.name), + description: Some("Checkpoint validation task".to_string()), + plan: validation_plan, + parent_task_id: None, + is_supervisor: true, // Checkpoint uses supervisor task for validation + priority: 0, + repository_url: None, + base_branch: None, + target_branch: None, + merge_mode: None, + target_repo_path: None, + completion_action: None, + continue_from_task_id: None, + copy_files: None, + checkpoint_sha: None, + branched_from_task_id: None, + conversation_history: None, + supervisor_worktree_task_id: None, + }; + + if let Err(e) = create_task_for_owner(pool, owner_id, task_req).await { + tracing::warn!( + contract_id = %contract.id, + error = %e, + "Failed to create validation task for checkpoint contract" + ); + } + + // Set initial validation status + sqlx::query( + "UPDATE chain_contracts SET validation_status = 'pending' WHERE chain_id = $1 AND contract_id = $2", + ) + .bind(chain_id) + .bind(contract.id) + .execute(pool) + .await?; + } else { + // Parse and create tasks from definition for regular contracts + if let Some(tasks_json) = &def.tasks { + if let Ok(tasks) = serde_json::from_value::<Vec<ChainTaskDef>>(tasks_json.clone()) { + for task_def in tasks { + let task_req = CreateTaskRequest { + contract_id: Some(contract.id), + name: task_def.name, + description: None, + plan: task_def.plan, + parent_task_id: None, + is_supervisor: false, + priority: 0, + repository_url: None, + base_branch: None, + target_branch: None, + merge_mode: None, + target_repo_path: None, + completion_action: None, + continue_from_task_id: None, + copy_files: None, + checkpoint_sha: None, + branched_from_task_id: None, + conversation_history: None, + supervisor_worktree_task_id: None, + }; + if let Err(e) = create_task_for_owner(pool, owner_id, task_req).await { + tracing::warn!( + contract_id = %contract.id, + error = %e, + "Failed to create task from chain definition" + ); + } + } + } + } + } + + // Resolve dependency names to contract IDs + let depends_on: Vec<Uuid> = upstream_contracts.iter().map(|c| c.contract_id).collect(); + + // Link contract to chain + add_contract_to_chain( + pool, + chain_id, + contract.id, + depends_on, + def.order_index, + def.editor_x, + def.editor_y, + ) + .await?; + + // Update chain_contracts with definition_id link + sqlx::query( + "UPDATE chain_contracts SET definition_id = $1 WHERE chain_id = $2 AND contract_id = $3", + ) + .bind(def.id) + .bind(chain_id) + .bind(contract.id) + .execute(pool) + .await?; + + Ok(contract.id) +} diff --git a/makima/src/server/handlers/chains.rs b/makima/src/server/handlers/chains.rs index 5d26e6a..ae19ca0 100644 --- a/makima/src/server/handlers/chains.rs +++ b/makima/src/server/handlers/chains.rs @@ -15,8 +15,8 @@ use uuid::Uuid; use crate::db::models::{ AddContractDefinitionRequest, ChainContractDefinition, ChainContractDetail, ChainDefinitionGraphResponse, ChainEditorData, ChainEvent, ChainGraphResponse, ChainSummary, - ChainWithContracts, CreateChainRequest, StartChainResponse, UpdateChainRequest, - UpdateContractDefinitionRequest, + ChainWithContracts, CreateChainRequest, CreateTaskRequest, StartChainRequest, + StartChainResponse, UpdateChainRequest, UpdateContractDefinitionRequest, }; use crate::db::repository::{self, RepositoryError}; use crate::server::auth::Authenticated; @@ -1048,6 +1048,7 @@ pub async fn get_chain_definition_graph( params( ("id" = Uuid, Path, description = "Chain ID") ), + request_body(content = Option<StartChainRequest>, description = "Optional start options"), responses( (status = 200, description = "Chain started", body = StartChainResponse), (status = 400, description = "Chain cannot be started", body = ApiError), @@ -1066,6 +1067,7 @@ pub async fn start_chain( State(state): State<SharedState>, Authenticated(auth): Authenticated, Path(chain_id): Path<Uuid>, + body: Option<Json<StartChainRequest>>, ) -> impl IntoResponse { let Some(ref pool) = state.db_pool else { return ( @@ -1075,6 +1077,11 @@ pub async fn start_chain( .into_response(); }; + let req = body.map(|b| b.0).unwrap_or(StartChainRequest { + with_supervisor: false, + repository_url: None, + }); + // Verify ownership and get chain let chain = match repository::get_chain_for_owner(pool, chain_id, auth.owner_id).await { Ok(Some(c)) => c, @@ -1132,8 +1139,7 @@ pub async fn start_chain( .into_response(); } - // TODO: Implement chain supervisor spawning - // For now, just update the chain status to active + // Update chain status to active match repository::update_chain_status(pool, chain_id, "active").await { Ok(_) => {} Err(e) => { @@ -1146,13 +1152,139 @@ pub async fn start_chain( } } - // Return response indicating chain has started - // supervisor_task_id is None until we implement the supervisor daemon + // Create supervisor task if requested + let mut supervisor_task_id: Option<Uuid> = None; + if req.with_supervisor { + let supervisor_name = format!("Chain Supervisor: {}", chain.name); + let supervisor_plan = format!( + r#"You are the supervisor for chain "{}". + +## Environment Variables +- MAKIMA_CHAIN_ID={} +- MAKIMA_API_URL (configured) +- MAKIMA_API_KEY (configured) + +## Your Responsibilities +1. Monitor chain progress by periodically checking chain status +2. Validate that contracts are completing successfully +3. Identify and report any issues or blockers +4. Track the overall chain progress through the DAG + +## Available Commands +Use these makima CLI commands to monitor the chain: + +```bash +# Check chain status +makima chain status {} + +# List contracts in the chain +makima chain contracts {} + +# View the chain DAG with current status +makima chain graph {} --with-status +``` + +## Monitoring Loop +1. Check chain status every few minutes +2. If a contract fails, investigate the issue +3. Report progress to the user when milestones are reached +4. Mark the chain as complete when all contracts finish + +## Current Chain Info +- Chain ID: {} +- Chain Name: {} +- Total definitions: {} + +Begin monitoring the chain. Check the initial status and report what you find."#, + chain.name, + chain_id, + chain_id, + chain_id, + chain_id, + chain_id, + chain.name, + definitions.len() + ); + + let supervisor_req = CreateTaskRequest { + name: supervisor_name, + description: Some(format!("Supervisor task for chain: {}", chain.name)), + plan: supervisor_plan, + repository_url: req.repository_url.clone(), + base_branch: None, + target_branch: None, + parent_task_id: None, + contract_id: None, // Chain supervisor is not tied to a specific contract + target_repo_path: None, + completion_action: None, + continue_from_task_id: None, + copy_files: None, + is_supervisor: true, + checkpoint_sha: None, + priority: 0, + merge_mode: None, + branched_from_task_id: None, + conversation_history: None, + supervisor_worktree_task_id: None, + }; + + match repository::create_task_for_owner(pool, auth.owner_id, supervisor_req).await { + Ok(supervisor_task) => { + tracing::info!( + chain_id = %chain_id, + supervisor_task_id = %supervisor_task.id, + "Created supervisor task for chain" + ); + + // Update chain with supervisor_task_id + if let Err(e) = + repository::set_chain_supervisor_task(pool, chain_id, Some(supervisor_task.id)) + .await + { + tracing::warn!( + chain_id = %chain_id, + error = %e, + "Failed to link supervisor task to chain" + ); + } + + supervisor_task_id = Some(supervisor_task.id); + } + Err(e) => { + tracing::warn!( + chain_id = %chain_id, + error = %e, + "Failed to create supervisor task for chain" + ); + } + } + } + + // Progress the chain - this creates root contracts (definitions with no dependencies) + let progression = match repository::progress_chain(pool, chain_id, auth.owner_id).await { + Ok(p) => p, + Err(e) => { + tracing::error!("Failed to progress chain: {}", e); + // Chain is active but no contracts created - return partial success + return Json(StartChainResponse { + chain_id, + supervisor_task_id, + contracts_created: vec![], + status: "active".to_string(), + }) + .into_response(); + } + }; + Json(StartChainResponse { chain_id, - supervisor_task_id: None, - contracts_created: vec![], - status: "started".to_string(), + supervisor_task_id, + contracts_created: progression.contracts_created, + status: if progression.chain_completed { + "completed".to_string() + } else { + "active".to_string() + }, }) .into_response() } diff --git a/makima/src/server/handlers/contracts.rs b/makima/src/server/handlers/contracts.rs index 8c8cabf..54bae71 100644 --- a/makima/src/server/handlers/contracts.rs +++ b/makima/src/server/handlers/contracts.rs @@ -574,6 +574,35 @@ pub async fn update_contract( "status": &contract.status, }), ).await; + + // If contract is part of a chain, progress the chain + if let Some(chain_id) = contract.chain_id { + let pool_clone = pool.clone(); + let owner_id = auth.owner_id; + tokio::spawn(async move { + match repository::progress_chain(&pool_clone, chain_id, owner_id).await { + Ok(result) => { + if !result.contracts_created.is_empty() { + tracing::info!( + chain_id = %chain_id, + contracts_created = ?result.contracts_created, + "Chain progressed - created new contracts" + ); + } + if result.chain_completed { + tracing::info!(chain_id = %chain_id, "Chain completed"); + } + } + Err(e) => { + tracing::error!( + chain_id = %chain_id, + error = %e, + "Failed to progress chain after contract completion" + ); + } + } + }); + } } // Get summary with counts |
