summaryrefslogblamecommitdiff
path: root/makima/src/bin/makima.rs
blob: 9c9ac7782864ca256cba085fc86effacccefc4ac (plain) (tree)















































































                                                                         
                                    



















































































                                                                                                 




                                                                  








                                                                  
                                      















































                                                                              





                                                                  



















































































                                                                                   
                             







                                                                     
                                                                                 







                                                                           











                                                                                    







                                                                                   









                                                                                       
































































                                                                                            
















































































































































































































































                                                                                             
//! Makima CLI - unified CLI for server, daemon, and task management.

use std::io::{self, Read};
use std::path::Path;
use std::sync::Arc;

use makima::daemon::api::ApiClient;
use makima::daemon::cli::{
    Cli, Commands, ContractCommand, SupervisorCommand,
};
use makima::daemon::config::{DaemonConfig, RepoEntry};
use makima::daemon::db::LocalDb;
use makima::daemon::error::DaemonError;
use makima::daemon::task::{TaskConfig, TaskManager};
use makima::daemon::ws::{DaemonCommand, WsClient};
use tokio::process::Command;
use tokio::sync::mpsc;
use tracing_subscriber::{fmt, prelude::*, EnvFilter};

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
    let cli = Cli::parse_args();

    match cli.command {
        Commands::Server(args) => run_server(args).await,
        Commands::Daemon(args) => run_daemon(args).await,
        Commands::Supervisor(cmd) => run_supervisor(cmd).await,
        Commands::Contract(cmd) => run_contract(cmd).await,
    }
}

/// Run the makima server.
async fn run_server(
    args: makima::daemon::cli::ServerArgs,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
    // Initialize logging
    init_logging(&args.log_level, "text");

    eprintln!("=== Makima Server Starting ===");
    eprintln!("Port: {}", args.port);

    // Create app state
    let mut app_state = makima::server::state::AppState::new(
        &args.parakeet_model_dir,
        &args.parakeet_eou_dir,
        &args.sortformer_model_path,
    );

    // Connect to database if URL provided
    if let Some(ref db_url) = args.database_url {
        eprintln!("Connecting to database...");
        let pool = makima::db::create_pool(db_url).await?;
        app_state = app_state.with_db_pool(pool);
        eprintln!("Database connected");
    }

    let state = Arc::new(app_state);
    let addr = format!("0.0.0.0:{}", args.port);

    eprintln!("Starting server on {}", addr);
    makima::server::run_server(state, &addr).await?;

    Ok(())
}

/// Run the makima daemon.
async fn run_daemon(
    args: makima::daemon::cli::DaemonArgs,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
    eprintln!("=== Makima Daemon Starting ===");

    // Build a temporary CLI struct for config loading
    let cli = makima::daemon::cli::daemon::DaemonArgs {
        config: args.config,
        repos_dir: args.repos_dir,
        worktrees_dir: args.worktrees_dir,
        server_url: args.server_url,
        api_key: args.api_key,
        max_tasks: args.max_tasks,
        log_level: args.log_level,
        bubblewrap: args.bubblewrap,
    };

    // Load configuration with CLI overrides
    eprintln!("[1/5] Loading configuration...");
    let config = match DaemonConfig::load_with_daemon_args(&cli) {
        Ok(cfg) => {
            eprintln!("      Config loaded: server={}", cfg.server.url);
            cfg
        }
        Err(e) => {
            eprintln!("Failed to load configuration: {}", e);
            eprintln!();
            eprintln!("Use CLI flags:");
            eprintln!("  makima daemon --server-url ws://localhost:8080 --api-key your-api-key");
            eprintln!();
            eprintln!("Or set environment variables:");
            eprintln!("  MAKIMA_DAEMON_SERVER_URL=ws://localhost:8080");
            eprintln!("  MAKIMA_API_KEY=your-api-key");
            eprintln!();
            eprintln!("Or create a config file: makima-daemon.toml");
            std::process::exit(1);
        }
    };

    // Initialize logging
    init_logging(&config.logging.level, &config.logging.format);
    eprintln!("[2/5] Logging initialized");

    // Initialize local database
    eprintln!(
        "[3/5] Opening local database: {}",
        config.local_db.path.display()
    );
    let _local_db = LocalDb::open(&config.local_db.path)?;
    eprintln!("      Database opened");

    // Initialize worktree directories
    eprintln!("[4/5] Setting up directories...");
    tokio::fs::create_dir_all(&config.worktree.base_dir).await?;
    tokio::fs::create_dir_all(&config.worktree.repos_dir).await?;
    tokio::fs::create_dir_all(&config.repos.home_dir).await?;
    eprintln!(
        "      Worktree base: {}",
        config.worktree.base_dir.display()
    );
    eprintln!("      Repos cache: {}", config.worktree.repos_dir.display());
    eprintln!("      Home dir: {}", config.repos.home_dir.display());

    // Auto-clone repositories if configured
    if !config.repos.auto_clone.is_empty() {
        eprintln!(
            "      Auto-cloning {} repositories...",
            config.repos.auto_clone.len()
        );
        for repo_entry in &config.repos.auto_clone {
            if let Err(e) = auto_clone_repo(repo_entry, &config.repos.home_dir).await {
                eprintln!("      WARNING: Failed to clone {}: {}", repo_entry.url(), e);
            }
        }
    }

    // Create channels for communication
    let (command_tx, mut command_rx) = mpsc::channel::<DaemonCommand>(64);

    // Get machine info
    let machine_id = get_machine_id();
    let hostname = get_hostname();
    eprintln!("      Machine ID: {}", machine_id);
    eprintln!("      Hostname: {}", hostname);

    // Create WebSocket client
    eprintln!("[5/5] Connecting to server: {}", config.server.url);
    let mut ws_client = WsClient::new(
        config.server.clone(),
        machine_id,
        hostname,
        config.process.max_concurrent_tasks as i32,
        command_tx,
    );

    // Get sender for task manager
    let ws_tx = ws_client.sender();

    // Create task configuration
    let bubblewrap_config = if config.process.bubblewrap.enabled {
        Some(config.process.bubblewrap.clone())
    } else {
        None
    };
    let task_config = TaskConfig {
        max_concurrent_tasks: config.process.max_concurrent_tasks,
        worktree_base_dir: config.worktree.base_dir.clone(),
        env_vars: config.process.env_vars.clone(),
        claude_command: config.process.claude_command.clone(),
        claude_args: config.process.claude_args.clone(),
        claude_pre_args: config.process.claude_pre_args.clone(),
        enable_permissions: config.process.enable_permissions,
        disable_verbose: config.process.disable_verbose,
        bubblewrap: bubblewrap_config,
    };

    // Create task manager
    let task_manager = Arc::new(TaskManager::new(task_config, ws_tx.clone()));

    // Spawn command handler
    let task_manager_clone = task_manager.clone();
    tokio::spawn(async move {
        tracing::info!("Command handler started, waiting for commands...");
        while let Some(command) = command_rx.recv().await {
            tracing::info!("Received command from channel: {:?}", command);
            if let Err(e) = task_manager_clone.handle_command(command).await {
                tracing::error!("Failed to handle command: {}", e);
            }
        }
        tracing::info!("Command handler stopped");
    });

    // Handle shutdown signals
    let shutdown_signal = async {
        tokio::signal::ctrl_c()
            .await
            .expect("Failed to install Ctrl+C handler");
        eprintln!("\nReceived shutdown signal");
    };

    eprintln!("=== Daemon running (Ctrl+C to stop) ===");

    // Run WebSocket client with shutdown handling
    tokio::select! {
        result = ws_client.run() => {
            match result {
                Ok(()) => eprintln!("WebSocket client exited cleanly"),
                Err(DaemonError::AuthFailed(msg)) => {
                    eprintln!("ERROR: Authentication failed: {}", msg);
                    std::process::exit(1);
                }
                Err(e) => {
                    eprintln!("ERROR: WebSocket client error: {}", e);
                    std::process::exit(1);
                }
            }
        }
        _ = shutdown_signal => {
            eprintln!("Shutting down...");
        }
    }

    // Gracefully shutdown all running Claude processes
    eprintln!("Terminating Claude processes...");
    task_manager
        .shutdown_all_processes(std::time::Duration::from_secs(5))
        .await;

    // Cleanup
    tracing::info!("Daemon stopped");

    Ok(())
}

/// Run supervisor commands.
async fn run_supervisor(
    cmd: SupervisorCommand,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
    use makima::daemon::api::supervisor::*;

    match cmd {
        SupervisorCommand::Tasks(args) => {
            let client = ApiClient::new(args.api_url, args.api_key)?;
            let result = client.supervisor_tasks(args.contract_id).await?;
            println!("{}", serde_json::to_string(&result.0)?);
        }
        SupervisorCommand::Tree(args) => {
            let client = ApiClient::new(args.api_url, args.api_key)?;
            let result = client.supervisor_tree(args.contract_id).await?;
            println!("{}", serde_json::to_string(&result.0)?);
        }
        SupervisorCommand::Spawn(args) => {
            let client = ApiClient::new(args.common.api_url, args.common.api_key)?;
            eprintln!("Creating task: {}...", args.name);
            let req = SpawnTaskRequest {
                name: args.name,
                plan: args.plan,
                contract_id: args.common.contract_id,
                parent_task_id: args.parent,
                checkpoint_sha: args.checkpoint,
            };
            let result = client.supervisor_spawn(req).await?;
            println!("{}", serde_json::to_string(&result.0)?);
        }
        SupervisorCommand::Wait(args) => {
            let client = ApiClient::new(args.common.api_url, args.common.api_key)?;
            eprintln!(
                "Waiting for task {} (timeout: {}s)...",
                args.task_id, args.timeout
            );
            let result = client.supervisor_wait(args.task_id, args.timeout).await?;
            println!("{}", serde_json::to_string(&result.0)?);
        }
        SupervisorCommand::ReadFile(args) => {
            let client = ApiClient::new(args.common.api_url, args.common.api_key)?;
            let result = client
                .supervisor_read_file(args.task_id, &args.file_path)
                .await?;
            println!("{}", serde_json::to_string(&result.0)?);
        }
        SupervisorCommand::Branch(args) => {
            let client = ApiClient::new(args.common.api_url, args.common.api_key)?;
            eprintln!("Creating branch: {}...", args.name);
            let result = client.supervisor_branch(&args.name, args.from).await?;
            println!("{}", serde_json::to_string(&result.0)?);
        }
        SupervisorCommand::Merge(args) => {
            let client = ApiClient::new(args.common.api_url, args.common.api_key)?;
            eprintln!("Merging task {}...", args.task_id);
            let result = client
                .supervisor_merge(args.task_id, args.to, args.squash)
                .await?;
            println!("{}", serde_json::to_string(&result.0)?);
        }
        SupervisorCommand::Pr(args) => {
            let client = ApiClient::new(args.common.api_url, args.common.api_key)?;
            eprintln!("Creating PR for task {}...", args.task_id);
            let body = args.body.as_deref().unwrap_or("");
            let result = client
                .supervisor_pr(args.task_id, &args.title, body, &args.base)
                .await?;
            println!("{}", serde_json::to_string(&result.0)?);
        }
        SupervisorCommand::Diff(args) => {
            let client = ApiClient::new(args.common.api_url, args.common.api_key)?;
            let result = client.supervisor_diff(args.task_id).await?;
            println!("{}", serde_json::to_string(&result.0)?);
        }
        SupervisorCommand::Checkpoint(args) => {
            let client = ApiClient::new(args.common.api_url, args.common.api_key)?;
            let task_id = args
                .common
                .self_task_id
                .ok_or("MAKIMA_TASK_ID is required for checkpoint")?;
            let result = client
                .supervisor_checkpoint(task_id, &args.message)
                .await?;
            println!("{}", serde_json::to_string(&result.0)?);
        }
        SupervisorCommand::Checkpoints(args) => {
            let client = ApiClient::new(args.api_url, args.api_key)?;
            let task_id = args.self_task_id.ok_or("MAKIMA_TASK_ID is required")?;
            let result = client.supervisor_checkpoints(task_id).await?;
            println!("{}", serde_json::to_string(&result.0)?);
        }
        SupervisorCommand::Status(args) => {
            let client = ApiClient::new(args.api_url, args.api_key)?;
            let result = client.supervisor_status(args.contract_id).await?;
            println!("{}", serde_json::to_string(&result.0)?);
        }
        SupervisorCommand::Ask(args) => {
            let client = ApiClient::new(args.common.api_url, args.common.api_key)?;
            eprintln!("Asking user: {}...", args.question);
            let choices = args
                .choices
                .map(|c| c.split(',').map(|s| s.trim().to_string()).collect())
                .unwrap_or_default();
            let result = client
                .supervisor_ask(&args.question, choices, args.context, args.timeout)
                .await?;
            println!("{}", serde_json::to_string(&result.0)?);
        }
        SupervisorCommand::AdvancePhase(args) => {
            let client = ApiClient::new(args.common.api_url, args.common.api_key)?;
            eprintln!("Advancing contract to phase: {}...", args.phase);
            let result = client
                .supervisor_advance_phase(args.common.contract_id, &args.phase)
                .await?;
            println!("{}", serde_json::to_string(&result.0)?);
        }
        SupervisorCommand::Task(args) => {
            let client = ApiClient::new(args.common.api_url, args.common.api_key)?;
            let result = client.supervisor_get_task(args.target_task_id).await?;
            println!("{}", serde_json::to_string(&result.0)?);
        }
        SupervisorCommand::Output(args) => {
            let client = ApiClient::new(args.common.api_url, args.common.api_key)?;
            let result = client.supervisor_get_task_output(args.target_task_id).await?;
            println!("{}", serde_json::to_string(&result.0)?);
        }
        SupervisorCommand::TaskHistory(args) => {
            eprintln!(
                "Task history for {} (limit: {:?}, format: {})",
                args.task_id, args.limit, args.format
            );
            eprintln!("CLI integration not yet implemented. Use the API directly:");
            eprintln!("  GET /api/v1/mesh/tasks/{}/conversation", args.task_id);
        }
        SupervisorCommand::TaskCheckpoints(args) => {
            eprintln!(
                "Task checkpoints for {} (with_diff: {})",
                args.task_id, args.with_diff
            );
            eprintln!("CLI integration not yet implemented. Use the API directly:");
            eprintln!("  GET /api/v1/mesh/tasks/{}/checkpoints", args.task_id);
        }
        SupervisorCommand::Resume(args) => {
            eprintln!(
                "Resume supervisor for contract {} (mode: {}, checkpoint: {:?})",
                args.common.contract_id, args.mode, args.checkpoint
            );
            eprintln!("CLI integration not yet implemented. Use the API directly:");
            eprintln!(
                "  POST /api/v1/contracts/{}/supervisor/resume",
                args.common.contract_id
            );
        }
        SupervisorCommand::TaskResumeFrom(args) => {
            eprintln!(
                "Resume task {} from checkpoint {} with plan: {}",
                args.task_id, args.checkpoint, args.plan
            );
            eprintln!("CLI integration not yet implemented. Use the API directly:");
            eprintln!(
                "  POST /api/v1/mesh/tasks/{}/checkpoints/{}/resume",
                args.task_id, args.checkpoint
            );
        }
        SupervisorCommand::TaskRewind(args) => {
            eprintln!(
                "Rewind task {} to checkpoint {} (preserve: {}, branch: {:?})",
                args.task_id, args.checkpoint, args.preserve, args.branch_name
            );
            eprintln!("CLI integration not yet implemented. Use the API directly:");
            eprintln!("  POST /api/v1/mesh/tasks/{}/rewind", args.task_id);
        }
        SupervisorCommand::TaskFork(args) => {
            eprintln!(
                "Fork task {} from checkpoint {} as '{}' with plan: {}",
                args.task_id, args.checkpoint, args.name, args.plan
            );
            eprintln!("CLI integration not yet implemented. Use the API directly:");
            eprintln!("  POST /api/v1/mesh/tasks/{}/fork", args.task_id);
        }
        SupervisorCommand::RewindConversation(args) => {
            eprintln!(
                "Rewind conversation for contract {} (by: {:?}, to: {:?}, rewind_code: {})",
                args.common.contract_id, args.by_messages, args.to_message, args.rewind_code
            );
            eprintln!("CLI integration not yet implemented. Use the API directly:");
            eprintln!(
                "  POST /api/v1/contracts/{}/supervisor/conversation/rewind",
                args.common.contract_id
            );
        }
    }

    Ok(())
}

/// Run contract commands.
async fn run_contract(
    cmd: ContractCommand,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
    match cmd {
        ContractCommand::Status(args) => {
            let client = ApiClient::new(args.api_url, args.api_key)?;
            let result = client.contract_status(args.contract_id).await?;
            println!("{}", serde_json::to_string(&result.0)?);
        }
        ContractCommand::Checklist(args) => {
            let client = ApiClient::new(args.api_url, args.api_key)?;
            let result = client.contract_checklist(args.contract_id).await?;
            println!("{}", serde_json::to_string(&result.0)?);
        }
        ContractCommand::Goals(args) => {
            let client = ApiClient::new(args.api_url, args.api_key)?;
            let result = client.contract_goals(args.contract_id).await?;
            println!("{}", serde_json::to_string(&result.0)?);
        }
        ContractCommand::Files(args) => {
            let client = ApiClient::new(args.api_url, args.api_key)?;
            let result = client.contract_files(args.contract_id).await?;
            println!("{}", serde_json::to_string(&result.0)?);
        }
        ContractCommand::File(args) => {
            let client = ApiClient::new(args.common.api_url, args.common.api_key)?;
            let result = client
                .contract_file(args.common.contract_id, args.file_id)
                .await?;
            println!("{}", serde_json::to_string(&result.0)?);
        }
        ContractCommand::Report(args) => {
            let client = ApiClient::new(args.common.api_url, args.common.api_key)?;
            let result = client
                .contract_report(args.common.contract_id, &args.message, args.common.task_id)
                .await?;
            println!("{}", serde_json::to_string(&result.0)?);
        }
        ContractCommand::SuggestAction(args) => {
            let client = ApiClient::new(args.api_url, args.api_key)?;
            let result = client.contract_suggest_action(args.contract_id).await?;
            println!("{}", serde_json::to_string(&result.0)?);
        }
        ContractCommand::CompletionAction(args) => {
            let client = ApiClient::new(args.common.api_url, args.common.api_key)?;
            let files = args.files.map(|f| {
                f.split(',')
                    .map(|s| s.trim().to_string())
                    .collect::<Vec<_>>()
            });
            let result = client
                .contract_completion_action(
                    args.common.contract_id,
                    args.common.task_id,
                    files,
                    args.lines_added,
                    args.lines_removed,
                    args.code,
                )
                .await?;
            println!("{}", serde_json::to_string(&result.0)?);
        }
        ContractCommand::UpdateFile(args) => {
            let client = ApiClient::new(args.common.api_url, args.common.api_key)?;
            // Read content from stdin
            let mut content = String::new();
            io::stdin().read_to_string(&mut content)?;
            let result = client
                .contract_update_file(args.common.contract_id, args.file_id, &content)
                .await?;
            println!("{}", serde_json::to_string(&result.0)?);
        }
        ContractCommand::CreateFile(args) => {
            let client = ApiClient::new(args.common.api_url, args.common.api_key)?;
            // Read content from stdin
            let mut content = String::new();
            io::stdin().read_to_string(&mut content)?;
            let result = client
                .contract_create_file(args.common.contract_id, &args.name, &content)
                .await?;
            println!("{}", serde_json::to_string(&result.0)?);
        }
    }

    Ok(())
}

fn init_logging(level: &str, format: &str) {
    let filter = EnvFilter::try_from_default_env()
        .or_else(|_| EnvFilter::try_new(level))
        .unwrap_or_else(|_| EnvFilter::new("info"));

    let subscriber = tracing_subscriber::registry().with(filter);

    if format == "json" {
        subscriber.with(fmt::layer().json()).init();
    } else {
        subscriber.with(fmt::layer()).init();
    }
}

fn get_machine_id() -> String {
    // Try to read machine-id from standard locations
    #[cfg(target_os = "linux")]
    {
        if let Ok(id) = std::fs::read_to_string("/etc/machine-id") {
            return id.trim().to_string();
        }
        if let Ok(id) = std::fs::read_to_string("/var/lib/dbus/machine-id") {
            return id.trim().to_string();
        }
    }

    #[cfg(target_os = "macos")]
    {
        // Use IOPlatformSerialNumber
        if let Ok(output) = std::process::Command::new("ioreg")
            .args(["-rd1", "-c", "IOPlatformExpertDevice"])
            .output()
        {
            let stdout = String::from_utf8_lossy(&output.stdout);
            for line in stdout.lines() {
                if line.contains("IOPlatformUUID") {
                    if let Some(uuid) = line.split('"').nth(3) {
                        return uuid.to_string();
                    }
                }
            }
        }
    }

    // Fallback: generate a random ID and persist it
    let state_dir = dirs_next::data_local_dir()
        .unwrap_or_else(|| std::path::PathBuf::from("."))
        .join("makima");
    let machine_id_file = state_dir.join("machine-id");

    if let Ok(id) = std::fs::read_to_string(&machine_id_file) {
        return id.trim().to_string();
    }

    // Generate new ID
    let new_id = uuid::Uuid::new_v4().to_string();
    std::fs::create_dir_all(&state_dir).ok();
    std::fs::write(&machine_id_file, &new_id).ok();
    new_id
}

fn get_hostname() -> String {
    hostname::get()
        .map(|h| h.to_string_lossy().to_string())
        .unwrap_or_else(|_| "unknown".to_string())
}

/// Auto-clone a repository to the home directory if it doesn't exist.
async fn auto_clone_repo(
    repo_entry: &RepoEntry,
    home_dir: &Path,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
    let dir_name = repo_entry
        .dir_name()
        .ok_or("Could not determine directory name from URL")?;
    let target_dir = home_dir.join(&dir_name);

    // Check if already cloned
    if target_dir.exists() {
        eprintln!("      [skip] {} (already exists)", dir_name);
        return Ok(());
    }

    let url = repo_entry.expanded_url();
    eprintln!("      [clone] {} -> {}", url, target_dir.display());

    // Build git clone command
    let mut args = vec!["clone".to_string()];

    // Add shallow clone if requested
    if repo_entry.shallow() {
        args.push("--depth".to_string());
        args.push("1".to_string());
    }

    // Add branch if specified
    if let Some(branch) = repo_entry.branch() {
        args.push("--branch".to_string());
        args.push(branch.to_string());
    }

    args.push(url.clone());
    args.push(target_dir.to_string_lossy().to_string());

    // Run git clone
    let output = Command::new("git").args(&args).output().await?;

    if !output.status.success() {
        let stderr = String::from_utf8_lossy(&output.stderr);
        return Err(format!("git clone failed: {}", stderr).into());
    }

    eprintln!("      [done] {}", dir_name);
    Ok(())
}

/// dirs_next minimal replacement
mod dirs_next {
    use std::path::PathBuf;

    pub fn data_local_dir() -> Option<PathBuf> {
        #[cfg(target_os = "macos")]
        {
            std::env::var("HOME")
                .ok()
                .map(|h| PathBuf::from(h).join("Library").join("Application Support"))
        }
        #[cfg(target_os = "linux")]
        {
            std::env::var("XDG_DATA_HOME")
                .ok()
                .map(PathBuf::from)
                .or_else(|| {
                    std::env::var("HOME")
                        .ok()
                        .map(|h| PathBuf::from(h).join(".local").join("share"))
                })
        }
        #[cfg(target_os = "windows")]
        {
            std::env::var("LOCALAPPDATA").ok().map(PathBuf::from)
        }
        #[cfg(not(any(target_os = "macos", target_os = "linux", target_os = "windows")))]
        {
            None
        }
    }
}