//! Makima CLI - unified CLI for server, daemon, and task management. use std::io::{self, Read}; use std::path::Path; use std::sync::Arc; use makima::daemon::api::ApiClient; use makima::daemon::cli::{ Cli, Commands, ContractCommand, SupervisorCommand, }; use makima::daemon::config::{DaemonConfig, RepoEntry}; use makima::daemon::db::LocalDb; use makima::daemon::error::DaemonError; use makima::daemon::task::{TaskConfig, TaskManager}; use makima::daemon::ws::{DaemonCommand, WsClient}; use tokio::process::Command; use tokio::sync::mpsc; use tracing_subscriber::{fmt, prelude::*, EnvFilter}; #[tokio::main] async fn main() -> Result<(), Box> { let cli = Cli::parse_args(); match cli.command { Commands::Server(args) => run_server(args).await, Commands::Daemon(args) => run_daemon(args).await, Commands::Supervisor(cmd) => run_supervisor(cmd).await, Commands::Contract(cmd) => run_contract(cmd).await, } } /// Run the makima server. async fn run_server( args: makima::daemon::cli::ServerArgs, ) -> Result<(), Box> { // Initialize logging init_logging(&args.log_level, "text"); eprintln!("=== Makima Server Starting ==="); eprintln!("Port: {}", args.port); // Create app state let mut app_state = makima::server::state::AppState::new( &args.parakeet_model_dir, &args.parakeet_eou_dir, &args.sortformer_model_path, ); // Connect to database if URL provided if let Some(ref db_url) = args.database_url { eprintln!("Connecting to database..."); let pool = makima::db::create_pool(db_url).await?; app_state = app_state.with_db_pool(pool); eprintln!("Database connected"); } let state = Arc::new(app_state); let addr = format!("0.0.0.0:{}", args.port); eprintln!("Starting server on {}", addr); makima::server::run_server(state, &addr).await?; Ok(()) } /// Run the makima daemon. async fn run_daemon( args: makima::daemon::cli::DaemonArgs, ) -> Result<(), Box> { eprintln!("=== Makima Daemon Starting ==="); // Build a temporary CLI struct for config loading let cli = makima::daemon::cli::daemon::DaemonArgs { config: args.config, repos_dir: args.repos_dir, worktrees_dir: args.worktrees_dir, server_url: args.server_url, api_key: args.api_key, max_tasks: args.max_tasks, log_level: args.log_level, bubblewrap: args.bubblewrap, }; // Load configuration with CLI overrides eprintln!("[1/5] Loading configuration..."); let config = match DaemonConfig::load_with_daemon_args(&cli) { Ok(cfg) => { eprintln!(" Config loaded: server={}", cfg.server.url); cfg } Err(e) => { eprintln!("Failed to load configuration: {}", e); eprintln!(); eprintln!("Use CLI flags:"); eprintln!(" makima daemon --server-url ws://localhost:8080 --api-key your-api-key"); eprintln!(); eprintln!("Or set environment variables:"); eprintln!(" MAKIMA_DAEMON_SERVER_URL=ws://localhost:8080"); eprintln!(" MAKIMA_API_KEY=your-api-key"); eprintln!(); eprintln!("Or create a config file: makima-daemon.toml"); std::process::exit(1); } }; // Initialize logging init_logging(&config.logging.level, &config.logging.format); eprintln!("[2/5] Logging initialized"); // Initialize local database eprintln!( "[3/5] Opening local database: {}", config.local_db.path.display() ); let _local_db = LocalDb::open(&config.local_db.path)?; eprintln!(" Database opened"); // Initialize worktree directories eprintln!("[4/5] Setting up directories..."); tokio::fs::create_dir_all(&config.worktree.base_dir).await?; tokio::fs::create_dir_all(&config.worktree.repos_dir).await?; tokio::fs::create_dir_all(&config.repos.home_dir).await?; eprintln!( " Worktree base: {}", config.worktree.base_dir.display() ); eprintln!(" Repos cache: {}", config.worktree.repos_dir.display()); eprintln!(" Home dir: {}", config.repos.home_dir.display()); // Auto-clone repositories if configured if !config.repos.auto_clone.is_empty() { eprintln!( " Auto-cloning {} repositories...", config.repos.auto_clone.len() ); for repo_entry in &config.repos.auto_clone { if let Err(e) = auto_clone_repo(repo_entry, &config.repos.home_dir).await { eprintln!(" WARNING: Failed to clone {}: {}", repo_entry.url(), e); } } } // Create channels for communication let (command_tx, mut command_rx) = mpsc::channel::(64); // Get machine info let machine_id = get_machine_id(); let hostname = get_hostname(); eprintln!(" Machine ID: {}", machine_id); eprintln!(" Hostname: {}", hostname); // Create WebSocket client eprintln!("[5/5] Connecting to server: {}", config.server.url); let mut ws_client = WsClient::new( config.server.clone(), machine_id, hostname, config.process.max_concurrent_tasks as i32, command_tx, ); // Get sender for task manager let ws_tx = ws_client.sender(); // Create task configuration let bubblewrap_config = if config.process.bubblewrap.enabled { Some(config.process.bubblewrap.clone()) } else { None }; // Derive HTTP API URL from WebSocket server URL (wss://... -> https://...) let api_url = config .server .url .replace("wss://", "https://") .replace("ws://", "http://"); let task_config = TaskConfig { max_concurrent_tasks: config.process.max_concurrent_tasks, worktree_base_dir: config.worktree.base_dir.clone(), env_vars: config.process.env_vars.clone(), claude_command: config.process.claude_command.clone(), claude_args: config.process.claude_args.clone(), claude_pre_args: config.process.claude_pre_args.clone(), enable_permissions: config.process.enable_permissions, disable_verbose: config.process.disable_verbose, bubblewrap: bubblewrap_config, api_url, heartbeat_commit_interval_secs: config.process.heartbeat_commit_interval_secs, }; // Create task manager let task_manager = Arc::new(TaskManager::new(task_config, ws_tx.clone())); // Spawn command handler let task_manager_clone = task_manager.clone(); tokio::spawn(async move { tracing::info!("Command handler started, waiting for commands..."); while let Some(command) = command_rx.recv().await { tracing::info!("Received command from channel: {:?}", command); if let Err(e) = task_manager_clone.handle_command(command).await { tracing::error!("Failed to handle command: {}", e); } } tracing::info!("Command handler stopped"); }); // Handle shutdown signals let shutdown_signal = async { tokio::signal::ctrl_c() .await .expect("Failed to install Ctrl+C handler"); eprintln!("\nReceived shutdown signal"); }; eprintln!("=== Daemon running (Ctrl+C to stop) ==="); // Run WebSocket client with shutdown handling tokio::select! { result = ws_client.run() => { match result { Ok(()) => eprintln!("WebSocket client exited cleanly"), Err(DaemonError::AuthFailed(msg)) => { eprintln!("ERROR: Authentication failed: {}", msg); std::process::exit(1); } Err(e) => { eprintln!("ERROR: WebSocket client error: {}", e); std::process::exit(1); } } } _ = shutdown_signal => { eprintln!("Shutting down..."); } } // Gracefully shutdown all running Claude processes eprintln!("Terminating Claude processes..."); task_manager .shutdown_all_processes(std::time::Duration::from_secs(5)) .await; // Cleanup tracing::info!("Daemon stopped"); Ok(()) } /// Run supervisor commands. async fn run_supervisor( cmd: SupervisorCommand, ) -> Result<(), Box> { use makima::daemon::api::supervisor::*; match cmd { SupervisorCommand::Tasks(args) => { let client = ApiClient::new(args.api_url, args.api_key)?; let result = client.supervisor_tasks(args.contract_id).await?; println!("{}", serde_json::to_string(&result.0)?); } SupervisorCommand::Tree(args) => { let client = ApiClient::new(args.api_url, args.api_key)?; let result = client.supervisor_tree(args.contract_id).await?; println!("{}", serde_json::to_string(&result.0)?); } SupervisorCommand::Spawn(args) => { let client = ApiClient::new(args.common.api_url, args.common.api_key)?; eprintln!("Creating task: {}...", args.name); let req = SpawnTaskRequest { name: args.name, plan: args.plan, contract_id: args.common.contract_id, parent_task_id: args.parent, checkpoint_sha: args.checkpoint, }; let result = client.supervisor_spawn(req).await?; println!("{}", serde_json::to_string(&result.0)?); } SupervisorCommand::Wait(args) => { let client = ApiClient::new(args.common.api_url, args.common.api_key)?; eprintln!( "Waiting for task {} (timeout: {}s)...", args.task_id, args.timeout ); let result = client.supervisor_wait(args.task_id, args.timeout).await?; println!("{}", serde_json::to_string(&result.0)?); } SupervisorCommand::ReadFile(args) => { let client = ApiClient::new(args.common.api_url, args.common.api_key)?; let result = client .supervisor_read_file(args.task_id, &args.file_path) .await?; println!("{}", serde_json::to_string(&result.0)?); } SupervisorCommand::Branch(args) => { let client = ApiClient::new(args.common.api_url, args.common.api_key)?; eprintln!("Creating branch: {}...", args.name); let result = client.supervisor_branch(&args.name, args.from).await?; println!("{}", serde_json::to_string(&result.0)?); } SupervisorCommand::Merge(args) => { let client = ApiClient::new(args.common.api_url, args.common.api_key)?; eprintln!("Merging task {}...", args.task_id); let result = client .supervisor_merge(args.task_id, args.to, args.squash) .await?; println!("{}", serde_json::to_string(&result.0)?); } SupervisorCommand::Pr(args) => { let client = ApiClient::new(args.common.api_url, args.common.api_key)?; eprintln!("Creating PR for task {}...", args.task_id); let body = args.body.as_deref().unwrap_or(""); let result = client .supervisor_pr(args.task_id, &args.title, body, &args.base) .await?; println!("{}", serde_json::to_string(&result.0)?); } SupervisorCommand::Diff(args) => { let client = ApiClient::new(args.common.api_url, args.common.api_key)?; let result = client.supervisor_diff(args.task_id).await?; println!("{}", serde_json::to_string(&result.0)?); } SupervisorCommand::Checkpoint(args) => { let client = ApiClient::new(args.common.api_url, args.common.api_key)?; let task_id = args .common .self_task_id .ok_or("MAKIMA_TASK_ID is required for checkpoint")?; let result = client .supervisor_checkpoint(task_id, &args.message) .await?; println!("{}", serde_json::to_string(&result.0)?); } SupervisorCommand::Checkpoints(args) => { let client = ApiClient::new(args.api_url, args.api_key)?; let task_id = args.self_task_id.ok_or("MAKIMA_TASK_ID is required")?; let result = client.supervisor_checkpoints(task_id).await?; println!("{}", serde_json::to_string(&result.0)?); } SupervisorCommand::Status(args) => { let client = ApiClient::new(args.api_url, args.api_key)?; let result = client.supervisor_status(args.contract_id).await?; println!("{}", serde_json::to_string(&result.0)?); } SupervisorCommand::Ask(args) => { let client = ApiClient::new(args.common.api_url, args.common.api_key)?; eprintln!("Asking user: {}...", args.question); let choices = args .choices .map(|c| c.split(',').map(|s| s.trim().to_string()).collect()) .unwrap_or_default(); let result = client .supervisor_ask(&args.question, choices, args.context, args.timeout, args.phaseguard, args.multi_select) .await?; println!("{}", serde_json::to_string(&result.0)?); } SupervisorCommand::AdvancePhase(args) => { let client = ApiClient::new(args.common.api_url, args.common.api_key)?; eprintln!("Advancing contract to phase: {}...", args.phase); let result = client .supervisor_advance_phase(args.common.contract_id, &args.phase) .await?; println!("{}", serde_json::to_string(&result.0)?); } SupervisorCommand::Task(args) => { let client = ApiClient::new(args.common.api_url, args.common.api_key)?; let result = client.supervisor_get_task(args.target_task_id).await?; println!("{}", serde_json::to_string(&result.0)?); } SupervisorCommand::Output(args) => { let client = ApiClient::new(args.common.api_url, args.common.api_key)?; let result = client.supervisor_get_task_output(args.target_task_id).await?; println!("{}", serde_json::to_string(&result.0)?); } SupervisorCommand::TaskHistory(args) => { eprintln!( "Task history for {} (limit: {:?}, format: {})", args.task_id, args.limit, args.format ); eprintln!("CLI integration not yet implemented. Use the API directly:"); eprintln!(" GET /api/v1/mesh/tasks/{}/conversation", args.task_id); } SupervisorCommand::TaskCheckpoints(args) => { eprintln!( "Task checkpoints for {} (with_diff: {})", args.task_id, args.with_diff ); eprintln!("CLI integration not yet implemented. Use the API directly:"); eprintln!(" GET /api/v1/mesh/tasks/{}/checkpoints", args.task_id); } SupervisorCommand::Resume(args) => { eprintln!( "Resume supervisor for contract {} (mode: {}, checkpoint: {:?})", args.common.contract_id, args.mode, args.checkpoint ); eprintln!("CLI integration not yet implemented. Use the API directly:"); eprintln!( " POST /api/v1/contracts/{}/supervisor/resume", args.common.contract_id ); } SupervisorCommand::TaskResumeFrom(args) => { eprintln!( "Resume task {} from checkpoint {} with plan: {}", args.task_id, args.checkpoint, args.plan ); eprintln!("CLI integration not yet implemented. Use the API directly:"); eprintln!( " POST /api/v1/mesh/tasks/{}/checkpoints/{}/resume", args.task_id, args.checkpoint ); } SupervisorCommand::TaskRewind(args) => { eprintln!( "Rewind task {} to checkpoint {} (preserve: {}, branch: {:?})", args.task_id, args.checkpoint, args.preserve, args.branch_name ); eprintln!("CLI integration not yet implemented. Use the API directly:"); eprintln!(" POST /api/v1/mesh/tasks/{}/rewind", args.task_id); } SupervisorCommand::TaskFork(args) => { eprintln!( "Fork task {} from checkpoint {} as '{}' with plan: {}", args.task_id, args.checkpoint, args.name, args.plan ); eprintln!("CLI integration not yet implemented. Use the API directly:"); eprintln!(" POST /api/v1/mesh/tasks/{}/fork", args.task_id); } SupervisorCommand::RewindConversation(args) => { eprintln!( "Rewind conversation for contract {} (by: {:?}, to: {:?}, rewind_code: {})", args.common.contract_id, args.by_messages, args.to_message, args.rewind_code ); eprintln!("CLI integration not yet implemented. Use the API directly:"); eprintln!( " POST /api/v1/contracts/{}/supervisor/conversation/rewind", args.common.contract_id ); } } Ok(()) } /// Run contract commands. async fn run_contract( cmd: ContractCommand, ) -> Result<(), Box> { match cmd { ContractCommand::Status(args) => { let client = ApiClient::new(args.api_url, args.api_key)?; let result = client.contract_status(args.contract_id).await?; println!("{}", serde_json::to_string(&result.0)?); } ContractCommand::Checklist(args) => { let client = ApiClient::new(args.api_url, args.api_key)?; let result = client.contract_checklist(args.contract_id).await?; println!("{}", serde_json::to_string(&result.0)?); } ContractCommand::Goals(args) => { let client = ApiClient::new(args.api_url, args.api_key)?; let result = client.contract_goals(args.contract_id).await?; println!("{}", serde_json::to_string(&result.0)?); } ContractCommand::Files(args) => { let client = ApiClient::new(args.api_url, args.api_key)?; let result = client.contract_files(args.contract_id).await?; println!("{}", serde_json::to_string(&result.0)?); } ContractCommand::File(args) => { let client = ApiClient::new(args.common.api_url, args.common.api_key)?; let result = client .contract_file(args.common.contract_id, args.file_id) .await?; println!("{}", serde_json::to_string(&result.0)?); } ContractCommand::Report(args) => { let client = ApiClient::new(args.common.api_url, args.common.api_key)?; let result = client .contract_report(args.common.contract_id, &args.message, args.common.task_id) .await?; println!("{}", serde_json::to_string(&result.0)?); } ContractCommand::SuggestAction(args) => { let client = ApiClient::new(args.api_url, args.api_key)?; let result = client.contract_suggest_action(args.contract_id).await?; println!("{}", serde_json::to_string(&result.0)?); } ContractCommand::CompletionAction(args) => { let client = ApiClient::new(args.common.api_url, args.common.api_key)?; let files = args.files.map(|f| { f.split(',') .map(|s| s.trim().to_string()) .collect::>() }); let result = client .contract_completion_action( args.common.contract_id, args.common.task_id, files, args.lines_added, args.lines_removed, args.code, ) .await?; println!("{}", serde_json::to_string(&result.0)?); } ContractCommand::UpdateFile(args) => { let client = ApiClient::new(args.common.api_url, args.common.api_key)?; // Read content from stdin let mut content = String::new(); io::stdin().read_to_string(&mut content)?; let result = client .contract_update_file(args.common.contract_id, args.file_id, &content) .await?; println!("{}", serde_json::to_string(&result.0)?); } ContractCommand::CreateFile(args) => { let client = ApiClient::new(args.common.api_url, args.common.api_key)?; // Read content from stdin let mut content = String::new(); io::stdin().read_to_string(&mut content)?; let result = client .contract_create_file(args.common.contract_id, &args.name, &content) .await?; println!("{}", serde_json::to_string(&result.0)?); } } Ok(()) } fn init_logging(level: &str, format: &str) { let filter = EnvFilter::try_from_default_env() .or_else(|_| EnvFilter::try_new(level)) .unwrap_or_else(|_| EnvFilter::new("info")); let subscriber = tracing_subscriber::registry().with(filter); if format == "json" { subscriber.with(fmt::layer().json()).init(); } else { subscriber.with(fmt::layer()).init(); } } fn get_machine_id() -> String { // Try to read machine-id from standard locations #[cfg(target_os = "linux")] { if let Ok(id) = std::fs::read_to_string("/etc/machine-id") { return id.trim().to_string(); } if let Ok(id) = std::fs::read_to_string("/var/lib/dbus/machine-id") { return id.trim().to_string(); } } #[cfg(target_os = "macos")] { // Use IOPlatformSerialNumber if let Ok(output) = std::process::Command::new("ioreg") .args(["-rd1", "-c", "IOPlatformExpertDevice"]) .output() { let stdout = String::from_utf8_lossy(&output.stdout); for line in stdout.lines() { if line.contains("IOPlatformUUID") { if let Some(uuid) = line.split('"').nth(3) { return uuid.to_string(); } } } } } // Fallback: generate a random ID and persist it let state_dir = dirs_next::data_local_dir() .unwrap_or_else(|| std::path::PathBuf::from(".")) .join("makima"); let machine_id_file = state_dir.join("machine-id"); if let Ok(id) = std::fs::read_to_string(&machine_id_file) { return id.trim().to_string(); } // Generate new ID let new_id = uuid::Uuid::new_v4().to_string(); std::fs::create_dir_all(&state_dir).ok(); std::fs::write(&machine_id_file, &new_id).ok(); new_id } fn get_hostname() -> String { hostname::get() .map(|h| h.to_string_lossy().to_string()) .unwrap_or_else(|_| "unknown".to_string()) } /// Auto-clone a repository to the home directory if it doesn't exist. async fn auto_clone_repo( repo_entry: &RepoEntry, home_dir: &Path, ) -> Result<(), Box> { let dir_name = repo_entry .dir_name() .ok_or("Could not determine directory name from URL")?; let target_dir = home_dir.join(&dir_name); // Check if already cloned if target_dir.exists() { eprintln!(" [skip] {} (already exists)", dir_name); return Ok(()); } let url = repo_entry.expanded_url(); eprintln!(" [clone] {} -> {}", url, target_dir.display()); // Build git clone command let mut args = vec!["clone".to_string()]; // Add shallow clone if requested if repo_entry.shallow() { args.push("--depth".to_string()); args.push("1".to_string()); } // Add branch if specified if let Some(branch) = repo_entry.branch() { args.push("--branch".to_string()); args.push(branch.to_string()); } args.push(url.clone()); args.push(target_dir.to_string_lossy().to_string()); // Run git clone let output = Command::new("git").args(&args).output().await?; if !output.status.success() { let stderr = String::from_utf8_lossy(&output.stderr); return Err(format!("git clone failed: {}", stderr).into()); } eprintln!(" [done] {}", dir_name); Ok(()) } /// dirs_next minimal replacement mod dirs_next { use std::path::PathBuf; pub fn data_local_dir() -> Option { #[cfg(target_os = "macos")] { std::env::var("HOME") .ok() .map(|h| PathBuf::from(h).join("Library").join("Application Support")) } #[cfg(target_os = "linux")] { std::env::var("XDG_DATA_HOME") .ok() .map(PathBuf::from) .or_else(|| { std::env::var("HOME") .ok() .map(|h| PathBuf::from(h).join(".local").join("share")) }) } #[cfg(target_os = "windows")] { std::env::var("LOCALAPPDATA").ok().map(PathBuf::from) } #[cfg(not(any(target_os = "macos", target_os = "linux", target_os = "windows")))] { None } } }