From 8b17a175c3e7e27b789812eba4e3cd760beadb10 Mon Sep 17 00:00:00 2001 From: soryu Date: Tue, 6 Jan 2026 04:08:11 +0000 Subject: Initial Control system --- makima/daemon/Cargo.toml | 48 + makima/daemon/README.md | 353 ++++ makima/daemon/src/cli.rs | 45 + makima/daemon/src/config.rs | 536 ++++++ makima/daemon/src/db/local.rs | 391 +++++ makima/daemon/src/db/mod.rs | 5 + makima/daemon/src/error.rs | 75 + makima/daemon/src/lib.rs | 21 + makima/daemon/src/main.rs | 313 ++++ makima/daemon/src/process/claude.rs | 481 ++++++ makima/daemon/src/process/claude_protocol.rs | 59 + makima/daemon/src/process/mod.rs | 10 + makima/daemon/src/task/manager.rs | 2248 ++++++++++++++++++++++++++ makima/daemon/src/task/mod.rs | 7 + makima/daemon/src/task/state.rs | 161 ++ makima/daemon/src/temp.rs | 224 +++ makima/daemon/src/worktree/manager.rs | 1623 +++++++++++++++++++ makima/daemon/src/worktree/mod.rs | 11 + makima/daemon/src/ws/client.rs | 290 ++++ makima/daemon/src/ws/mod.rs | 7 + makima/daemon/src/ws/protocol.rs | 511 ++++++ 21 files changed, 7419 insertions(+) create mode 100644 makima/daemon/Cargo.toml create mode 100644 makima/daemon/README.md create mode 100644 makima/daemon/src/cli.rs create mode 100644 makima/daemon/src/config.rs create mode 100644 makima/daemon/src/db/local.rs create mode 100644 makima/daemon/src/db/mod.rs create mode 100644 makima/daemon/src/error.rs create mode 100644 makima/daemon/src/lib.rs create mode 100644 makima/daemon/src/main.rs create mode 100644 makima/daemon/src/process/claude.rs create mode 100644 makima/daemon/src/process/claude_protocol.rs create mode 100644 makima/daemon/src/process/mod.rs create mode 100644 makima/daemon/src/task/manager.rs create mode 100644 makima/daemon/src/task/mod.rs create mode 100644 makima/daemon/src/task/state.rs create mode 100644 makima/daemon/src/temp.rs create mode 100644 makima/daemon/src/worktree/manager.rs create mode 100644 makima/daemon/src/worktree/mod.rs create mode 100644 makima/daemon/src/ws/client.rs create mode 100644 makima/daemon/src/ws/mod.rs create mode 100644 makima/daemon/src/ws/protocol.rs (limited to 'makima/daemon') diff --git a/makima/daemon/Cargo.toml b/makima/daemon/Cargo.toml new file mode 100644 index 0000000..02ecbb3 --- /dev/null +++ b/makima/daemon/Cargo.toml @@ -0,0 +1,48 @@ +[package] +name = "makima-daemon" +version = "0.1.0" +edition = "2024" + +[[bin]] +name = "makima-daemon" +path = "src/main.rs" + +[dependencies] +# Async runtime +tokio = { version = "1.0", features = ["full", "signal", "process"] } +futures = "0.3" +async-trait = "0.1" + +# WebSocket client +tokio-tungstenite = { version = "0.24", features = ["native-tls"] } + +# Serialization +serde = { version = "1.0", features = ["derive"] } +serde_json = "1.0" + +# Configuration +config = "0.14" +clap = { version = "4.4", features = ["derive", "env"] } + +# Database (local state) +rusqlite = { version = "0.32", features = ["bundled"] } + +# Error handling +thiserror = "2.0" +anyhow = "1.0" + +# Logging +tracing = "0.1" +tracing-subscriber = { version = "0.3", features = ["env-filter", "json"] } + +# Utilities +uuid = { version = "1.0", features = ["v4", "serde"] } +chrono = { version = "0.4", features = ["serde"] } +dashmap = "6.0" +backoff = { version = "0.4", features = ["tokio"] } +hostname = "0.4" +sha2 = "0.10" +hex = "0.4" +shell-escape = "0.1" +dirs = "5.0" +rand = "0.9" diff --git a/makima/daemon/README.md b/makima/daemon/README.md new file mode 100644 index 0000000..7c577c5 --- /dev/null +++ b/makima/daemon/README.md @@ -0,0 +1,353 @@ +# Makima Daemon + +The Makima daemon connects to the Makima server and executes tasks using Claude Code in isolated git worktrees. + +## Installation + +```bash +cd makima/daemon +cargo build --release +``` + +The binary will be at `target/release/makima-daemon`. + +## Quick Start + +```bash +# Set required environment variables +export MAKIMA_API_KEY="your-api-key" +export MAKIMA_DAEMON_SERVER_URL="ws://localhost:8080" + +# Run the daemon +makima-daemon +``` + +## Configuration + +Configuration is loaded from multiple sources in order of precedence (highest first): + +1. CLI arguments +2. Environment variables +3. `./makima-daemon.toml` (current directory) +4. `~/.config/makima-daemon/config.toml` (user config) +5. `/etc/makima-daemon/config.toml` (system config, Linux only) + +### Environment Variables + +| Variable | Description | +|----------|-------------| +| `MAKIMA_API_KEY` | API key for authentication (preferred) | +| `MAKIMA_DAEMON_SERVER_URL` | WebSocket URL of the Makima server | +| `MAKIMA_DAEMON_SERVER_APIKEY` | Alternative to MAKIMA_API_KEY | +| `MAKIMA_DAEMON_PROCESS_MAXCONCURRENTTASKS` | Max concurrent tasks | + +### CLI Arguments + +``` +makima-daemon [OPTIONS] + +Options: + -c, --config Path to config file + -s, --server-url WebSocket URL of makima server + -k, --api-key API key for authentication + -m, --max-tasks Maximum concurrent tasks + -l, --log-level Log level (trace, debug, info, warn, error) + --repos-dir Directory for cloned repositories + --worktrees-dir Directory for worktrees + -h, --help Print help + -V, --version Print version +``` + +--- + +## Configuration File Reference + +Below is a complete configuration file with all options and their defaults. + +```toml +# ============================================================================= +# Server Connection +# ============================================================================= +[server] +# WebSocket URL of the Makima server (required) +url = "ws://localhost:8080" + +# API key for authentication (required) +# Can also be set via MAKIMA_API_KEY environment variable +api_key = "your-api-key" + +# Heartbeat interval in seconds (default: 30) +heartbeat_interval_secs = 30 + +# Reconnect interval after connection loss in seconds (default: 5) +reconnect_interval_secs = 5 + +# Maximum reconnect attempts before giving up (default: 0 = infinite) +max_reconnect_attempts = 0 + +# ============================================================================= +# Worktree Settings +# ============================================================================= +[worktree] +# Base directory for task worktrees (default: ~/.makima/worktrees) +base_dir = "/home/user/.makima/worktrees" + +# Directory for cached repository clones (default: ~/.makima/repos) +repos_dir = "/home/user/.makima/repos" + +# Branch prefix for task branches (default: "makima/task-") +branch_prefix = "makima/task-" + +# Clean up old worktrees on daemon start (default: false) +cleanup_on_start = false + +# Default target repository for pushing completed branches +# Used when task.target_repo_path is not set +default_target_repo = "/home/user/projects/my-repo" + +# ============================================================================= +# Process Settings (Claude Code) +# ============================================================================= +[process] +# Path or command for Claude Code CLI (default: "claude") +claude_command = "claude" + +# Additional arguments to pass to Claude Code (after default arguments) +# Default arguments are: --output-format=stream-json --input-format=stream-json +# --verbose --dangerously-skip-permissions +claude_args = ["--model", "opus"] + +# Arguments to pass before default arguments (for overriding defaults) +claude_pre_args = [] + +# Enable Claude's permission system (default: false) +# When true, removes --dangerously-skip-permissions flag +enable_permissions = false + +# Disable verbose output (default: false) +# When true, removes --verbose flag +disable_verbose = false + +# Maximum concurrent tasks (default: 4) +max_concurrent_tasks = 4 + +# Default timeout for tasks in seconds (default: 0 = no timeout) +default_timeout_secs = 0 + +# Additional environment variables to pass to Claude Code +[process.env_vars] +ANTHROPIC_API_KEY = "sk-ant-..." +CUSTOM_VAR = "value" + +# ============================================================================= +# Repository Auto-Clone +# ============================================================================= +[repos] +# Directory to clone repositories into (default: ~/.makima/home) +home_dir = "/home/user/.makima/home" + +# List of repositories to auto-clone on startup +# Repositories that already exist are skipped + +# Simple format - just URLs +auto_clone = [ + "https://github.com/user/repo1.git", + "https://github.com/user/repo2.git", +] + +# Shorthand format supported: +# github:user/repo -> https://github.com/user/repo.git +# gitlab:user/repo -> https://gitlab.com/user/repo.git +auto_clone = [ + "github:anthropics/claude-code", + "gitlab:company/project", +] + +# Detailed format with options (use [[repos.auto_clone]] for each entry) +[[repos.auto_clone]] +url = "github:user/repo" +name = "custom-directory-name" # Optional: override directory name +branch = "develop" # Optional: checkout specific branch +shallow = true # Optional: shallow clone (--depth 1) + +[[repos.auto_clone]] +url = "https://github.com/org/large-repo.git" +shallow = true # Faster clone for large repos + +# ============================================================================= +# Local Database +# ============================================================================= +[local_db] +# Path to local SQLite database (default: ~/.makima/daemon.db) +path = "/home/user/.makima/daemon.db" + +# ============================================================================= +# Logging +# ============================================================================= +[logging] +# Log level: trace, debug, info, warn, error (default: "info") +level = "info" + +# Log format: "pretty" or "json" (default: "pretty") +format = "pretty" +``` + +--- + +## Examples + +### Minimal Configuration + +```toml +[server] +url = "ws://localhost:8080" +api_key = "your-api-key" +``` + +### Production Configuration + +```toml +[server] +url = "wss://api.makima.example.com/daemon" +api_key = "prod-api-key" +heartbeat_interval_secs = 30 +reconnect_interval_secs = 10 +max_reconnect_attempts = 0 + +[worktree] +base_dir = "/var/lib/makima/worktrees" +repos_dir = "/var/lib/makima/repos" +cleanup_on_start = true + +[process] +max_concurrent_tasks = 8 +default_timeout_secs = 3600 # 1 hour timeout + +[logging] +level = "info" +format = "json" +``` + +### Development with Custom Claude + +```toml +[server] +url = "ws://localhost:8080" +api_key = "dev-key" + +[process] +# Use a specific claude binary +claude_command = "/usr/local/bin/claude-dev" + +# Add custom arguments +claude_args = ["--model", "sonnet", "--max-turns", "50"] + +# Enable permission prompts for testing +enable_permissions = true + +[process.env_vars] +ANTHROPIC_API_KEY = "sk-ant-dev-..." +DEBUG = "1" + +[logging] +level = "debug" +``` + +### Auto-Clone Team Repositories + +```toml +[server] +url = "ws://localhost:8080" +api_key = "team-key" + +[repos] +home_dir = "/home/dev/.makima/projects" + +# Clone all team repos on startup +[[repos.auto_clone]] +url = "github:myteam/frontend" +branch = "main" + +[[repos.auto_clone]] +url = "github:myteam/backend" +branch = "main" + +[[repos.auto_clone]] +url = "github:myteam/shared-libs" +shallow = true # Only need latest commit +``` + +--- + +## Directory Structure + +After running, the daemon creates the following directories: + +``` +~/.makima/ +├── daemon.db # Local state database +├── worktrees/ # Task worktrees (temporary) +│ └── task-abc123/ # Individual task worktree +├── repos/ # Cached repository clones +│ └── repo-name/ # Bare clone for worktree creation +└── home/ # Auto-cloned repositories + └── my-repo/ # Full repository clone +``` + +--- + +## Troubleshooting + +### Connection Issues + +```bash +# Check server connectivity +curl -I http://localhost:8080/health + +# Run with debug logging +makima-daemon --log-level debug +``` + +### Claude Code Not Found + +```bash +# Verify claude is installed and in PATH +which claude +claude --version + +# Or specify full path in config +[process] +claude_command = "/full/path/to/claude" +``` + +### Permission Errors + +If Claude Code requires permissions, either: + +1. Use `--dangerously-skip-permissions` (default behavior) +2. Set `enable_permissions = true` and handle permission prompts +3. Pre-configure Claude Code permissions in `~/.claude/` + +### Task Timeouts + +Set an appropriate timeout for long-running tasks: + +```toml +[process] +default_timeout_secs = 7200 # 2 hours +``` + +--- + +## Environment Variable Reference + +All configuration options can be set via environment variables using the pattern: +`MAKIMA_DAEMON_
_` + +Examples: +- `MAKIMA_DAEMON_SERVER_URL` -> `server.url` +- `MAKIMA_DAEMON_PROCESS_MAXCONCURRENTTASKS` -> `process.max_concurrent_tasks` +- `MAKIMA_DAEMON_LOGGING_LEVEL` -> `logging.level` + +Special case: +- `MAKIMA_API_KEY` -> `server.api_key` (preferred method) diff --git a/makima/daemon/src/cli.rs b/makima/daemon/src/cli.rs new file mode 100644 index 0000000..ca84017 --- /dev/null +++ b/makima/daemon/src/cli.rs @@ -0,0 +1,45 @@ +//! Command-line argument parsing for makima-daemon. + +use clap::Parser; +use std::path::PathBuf; + +/// Makima daemon for managing Claude Code instances in isolated worktrees. +#[derive(Parser, Debug)] +#[command(name = "makima-daemon")] +#[command(version, about, long_about = None)] +pub struct Cli { + /// Path to custom config file + #[arg(short, long)] + pub config: Option, + + /// Directory where repositories are cloned + #[arg(long, env = "MAKIMA_DAEMON_REPOS_DIR")] + pub repos_dir: Option, + + /// Directory where worktrees are created + #[arg(long, env = "MAKIMA_DAEMON_WORKTREES_DIR")] + pub worktrees_dir: Option, + + /// WebSocket server URL to connect to + #[arg(long, env = "MAKIMA_DAEMON_SERVER_URL")] + pub server_url: Option, + + /// API key for server authentication + #[arg(long, env = "MAKIMA_DAEMON_SERVER_APIKEY")] + pub api_key: Option, + + /// Maximum number of concurrent tasks + #[arg(long)] + pub max_tasks: Option, + + /// Log level (trace, debug, info, warn, error) + #[arg(short, long, default_value = "info")] + pub log_level: String, +} + +impl Cli { + /// Parse command-line arguments + pub fn parse_args() -> Self { + Self::parse() + } +} diff --git a/makima/daemon/src/config.rs b/makima/daemon/src/config.rs new file mode 100644 index 0000000..28c7fea --- /dev/null +++ b/makima/daemon/src/config.rs @@ -0,0 +1,536 @@ +//! Configuration management for the makima daemon. + +use config::{Config, Environment, File}; +use serde::Deserialize; +use std::collections::HashMap; +use std::path::PathBuf; + +/// Root daemon configuration. +#[derive(Debug, Clone, Deserialize)] +pub struct DaemonConfig { + /// Server connection settings. + pub server: ServerConfig, + + /// Worktree settings. + #[serde(default)] + pub worktree: WorktreeConfig, + + /// Process settings. + #[serde(default)] + pub process: ProcessConfig, + + /// Local database settings. + #[serde(default)] + pub local_db: LocalDbConfig, + + /// Logging settings. + #[serde(default)] + pub logging: LoggingConfig, + + /// Repositories to auto-clone on startup. + #[serde(default)] + pub repos: ReposConfig, +} + +/// Server connection configuration. +#[derive(Debug, Clone, Deserialize)] +pub struct ServerConfig { + /// WebSocket URL of makima server (e.g., ws://localhost:8080 or wss://makima.example.com). + #[serde(default)] + pub url: String, + + /// API key for authentication. + #[serde(default, alias = "apikey")] + pub api_key: String, + + /// Heartbeat interval in seconds. + #[serde(default = "default_heartbeat_interval", alias = "heartbeatintervalsecs")] + pub heartbeat_interval_secs: u64, + + /// Reconnect interval in seconds after connection loss. + #[serde(default = "default_reconnect_interval", alias = "reconnectintervalsecs")] + pub reconnect_interval_secs: u64, + + /// Maximum reconnect attempts before giving up (0 = infinite). + #[serde(default, alias = "maxreconnectattempts")] + pub max_reconnect_attempts: u32, +} + +fn default_heartbeat_interval() -> u64 { + 30 +} + +fn default_reconnect_interval() -> u64 { + 5 +} + +/// Worktree configuration for task isolation. +#[derive(Debug, Clone, Deserialize)] +#[serde(default)] +pub struct WorktreeConfig { + /// Base directory for worktrees (~/.makima/worktrees). + #[serde(default = "default_worktree_base_dir", alias = "basedir")] + pub base_dir: PathBuf, + + /// Base directory for cloned repositories (~/.makima/repos). + #[serde(default = "default_repos_base_dir", alias = "reposdir")] + pub repos_dir: PathBuf, + + /// Branch prefix for task branches. + #[serde(default = "default_branch_prefix", alias = "branchprefix")] + pub branch_prefix: String, + + /// Clean up worktrees on daemon start. + #[serde(default, alias = "cleanuponstart")] + pub cleanup_on_start: bool, + + /// Default target repository path for pushing completed branches. + /// Used when task.target_repo_path is not set. + #[serde(default, alias = "defaulttargetrepo")] + pub default_target_repo: Option, +} + +fn default_worktree_base_dir() -> PathBuf { + dirs::home_dir() + .unwrap_or_else(|| PathBuf::from(".")) + .join(".makima") + .join("worktrees") +} + +fn default_repos_base_dir() -> PathBuf { + dirs::home_dir() + .unwrap_or_else(|| PathBuf::from(".")) + .join(".makima") + .join("repos") +} + +fn default_branch_prefix() -> String { + "makima/task-".to_string() +} + +impl Default for WorktreeConfig { + fn default() -> Self { + Self { + base_dir: default_worktree_base_dir(), + repos_dir: default_repos_base_dir(), + branch_prefix: default_branch_prefix(), + cleanup_on_start: false, + default_target_repo: None, + } + } +} + +/// Process configuration for Claude Code subprocess execution. +#[derive(Debug, Clone, Deserialize)] +#[serde(default)] +pub struct ProcessConfig { + /// Path or command for Claude Code CLI. + #[serde(default = "default_claude_command", alias = "claudecommand")] + pub claude_command: String, + + /// Additional arguments to pass to Claude Code. + /// These are added after the default arguments. + #[serde(default, alias = "claudeargs")] + pub claude_args: Vec, + + /// Arguments to pass before the default arguments. + /// Useful for overriding defaults. + #[serde(default, alias = "claudepreargs")] + pub claude_pre_args: Vec, + + /// Skip the --dangerously-skip-permissions flag (default: false). + /// Set to true if you want to use Claude's permission system. + #[serde(default, alias = "enablepermissions")] + pub enable_permissions: bool, + + /// Skip the --verbose flag (default: false). + #[serde(default, alias = "disableverbose")] + pub disable_verbose: bool, + + /// Maximum concurrent tasks. + #[serde(default = "default_max_tasks", alias = "maxconcurrenttasks")] + pub max_concurrent_tasks: u32, + + /// Default timeout for tasks in seconds (0 = no timeout). + #[serde(default, alias = "defaulttimeoutsecs")] + pub default_timeout_secs: u64, + + /// Additional environment variables to pass to Claude Code. + #[serde(default, alias = "envvars")] + pub env_vars: HashMap, +} + +fn default_claude_command() -> String { + "claude".to_string() +} + +fn default_max_tasks() -> u32 { + 4 +} + +impl Default for ProcessConfig { + fn default() -> Self { + Self { + claude_command: default_claude_command(), + claude_args: Vec::new(), + claude_pre_args: Vec::new(), + enable_permissions: false, + disable_verbose: false, + max_concurrent_tasks: default_max_tasks(), + default_timeout_secs: 0, + env_vars: HashMap::new(), + } + } +} + +/// Local database configuration. +#[derive(Debug, Clone, Deserialize)] +#[serde(default)] +pub struct LocalDbConfig { + /// Path to local SQLite database. + #[serde(default = "default_db_path")] + pub path: PathBuf, +} + +impl Default for LocalDbConfig { + fn default() -> Self { + Self { + path: default_db_path(), + } + } +} + +fn default_db_path() -> PathBuf { + dirs::home_dir() + .unwrap_or_else(|| PathBuf::from(".")) + .join(".makima") + .join("daemon.db") +} + +/// Logging configuration. +#[derive(Debug, Clone, Deserialize, Default)] +pub struct LoggingConfig { + /// Log level: "trace", "debug", "info", "warn", "error". + #[serde(default = "default_log_level")] + pub level: String, + + /// Log format: "pretty" or "json". + #[serde(default = "default_log_format")] + pub format: String, +} + +fn default_log_level() -> String { + "info".to_string() +} + +fn default_log_format() -> String { + "pretty".to_string() +} + +/// Repository auto-clone configuration. +#[derive(Debug, Clone, Deserialize, Default)] +pub struct ReposConfig { + /// Directory to clone repositories into (default: ~/.makima/home). + #[serde(default = "default_home_dir")] + pub home_dir: PathBuf, + + /// List of repositories to auto-clone on startup. + /// Each entry can be a URL (e.g., "https://github.com/user/repo.git") + /// or a shorthand (e.g., "github:user/repo"). + #[serde(default, alias = "autoclone")] + pub auto_clone: Vec, +} + +/// A repository entry for auto-cloning. +#[derive(Debug, Clone, Deserialize)] +#[serde(untagged)] +pub enum RepoEntry { + /// Simple URL string. + Url(String), + /// Detailed configuration. + Config { + /// Repository URL. + url: String, + /// Custom directory name (defaults to repo name from URL). + #[serde(default)] + name: Option, + /// Branch to checkout after cloning (defaults to default branch). + #[serde(default)] + branch: Option, + /// Whether to do a shallow clone (default: false). + #[serde(default)] + shallow: bool, + }, +} + +impl RepoEntry { + /// Get the URL for this repo entry. + pub fn url(&self) -> &str { + match self { + RepoEntry::Url(url) => url, + RepoEntry::Config { url, .. } => url, + } + } + + /// Get the custom name, if any. + pub fn name(&self) -> Option<&str> { + match self { + RepoEntry::Url(_) => None, + RepoEntry::Config { name, .. } => name.as_deref(), + } + } + + /// Get the branch to checkout, if any. + pub fn branch(&self) -> Option<&str> { + match self { + RepoEntry::Url(_) => None, + RepoEntry::Config { branch, .. } => branch.as_deref(), + } + } + + /// Whether to do a shallow clone. + pub fn shallow(&self) -> bool { + match self { + RepoEntry::Url(_) => false, + RepoEntry::Config { shallow, .. } => *shallow, + } + } + + /// Get the directory name to use (either custom name or derived from URL). + pub fn dir_name(&self) -> Option { + if let Some(name) = self.name() { + return Some(name.to_string()); + } + + // Derive from URL + let url = self.url(); + + // Handle shorthand formats + let url = if url.starts_with("github:") { + url.strip_prefix("github:").unwrap_or(url) + } else if url.starts_with("gitlab:") { + url.strip_prefix("gitlab:").unwrap_or(url) + } else { + url + }; + + // Extract repo name from URL + url.trim_end_matches('/') + .trim_end_matches(".git") + .rsplit('/') + .next() + .map(|s| s.to_string()) + } + + /// Expand the URL (e.g., convert shorthand to full URL). + pub fn expanded_url(&self) -> String { + let url = self.url(); + + if url.starts_with("github:") { + format!("https://github.com/{}.git", url.strip_prefix("github:").unwrap_or("")) + } else if url.starts_with("gitlab:") { + format!("https://gitlab.com/{}.git", url.strip_prefix("gitlab:").unwrap_or("")) + } else { + url.to_string() + } + } +} + +fn default_home_dir() -> PathBuf { + dirs::home_dir() + .unwrap_or_else(|| PathBuf::from(".")) + .join(".makima") + .join("home") +} + +impl DaemonConfig { + /// Load configuration from files and environment variables. + /// + /// Configuration sources (in order of precedence): + /// 1. Environment variables (MAKIMA_API_KEY, MAKIMA_DAEMON_SERVER_URL, etc.) + /// 2. ./makima-daemon.toml (current directory) + /// 3. ~/.config/makima-daemon/config.toml + /// 4. /etc/makima-daemon/config.toml (Linux only) + /// + /// Environment variable examples: + /// - MAKIMA_API_KEY=your-api-key (preferred) + /// - MAKIMA_DAEMON_SERVER_URL=ws://localhost:8080 + /// - MAKIMA_DAEMON_PROCESS_MAXCONCURRENTTASKS=4 + pub fn load() -> Result { + Self::load_from_path(None) + } + + /// Load configuration from a specific path plus standard sources. + fn load_from_path(config_path: Option<&std::path::Path>) -> Result { + let mut builder = Config::builder(); + + // System-wide config (Linux only) + #[cfg(target_os = "linux")] + { + builder = builder.add_source( + File::with_name("/etc/makima-daemon/config").required(false), + ); + } + + // User config + if let Some(config_dir) = dirs::config_dir() { + let user_config = config_dir.join("makima-daemon").join("config"); + builder = builder.add_source( + File::with_name(user_config.to_str().unwrap_or("")).required(false), + ); + } + + // Local config + builder = builder.add_source(File::with_name("makima-daemon").required(false)); + + // Custom config file (if provided) + if let Some(path) = config_path { + builder = builder.add_source( + File::with_name(path.to_str().unwrap_or("")).required(true), + ); + } + + // Environment variables with underscore separator for nesting + // e.g., MAKIMA_DAEMON_SERVER_URL -> server.url + // MAKIMA_DAEMON_SERVER_APIKEY -> server.api_key + builder = builder.add_source( + Environment::with_prefix("MAKIMA_DAEMON") + .separator("_") + .try_parsing(true), + ); + + let config = builder.build()?; + let mut config: DaemonConfig = config.try_deserialize()?; + + // Check for MAKIMA_API_KEY environment variable (preferred over MAKIMA_DAEMON_SERVER_APIKEY) + if let Ok(api_key) = std::env::var("MAKIMA_API_KEY") { + config.server.api_key = api_key; + } + + // Validate required fields (don't validate here - let load_with_cli do final validation) + Ok(config) + } + + /// Validate that required configuration fields are set. + pub fn validate(&self) -> Result<(), config::ConfigError> { + if self.server.url.is_empty() { + return Err(config::ConfigError::Message( + "server.url is required. Set via config file, MAKIMA_DAEMON_SERVER_URL, or --server-url".to_string() + )); + } + if self.server.api_key.is_empty() { + return Err(config::ConfigError::Message( + "API key is required. Set via MAKIMA_API_KEY, config file, or --api-key".to_string() + )); + } + Ok(()) + } + + /// Load configuration with CLI argument overrides. + /// + /// Configuration sources (in order of precedence, highest first): + /// 1. CLI arguments + /// 2. Environment variables + /// 3. Custom config file (if --config specified) + /// 4. ./makima-daemon.toml (current directory) + /// 5. ~/.config/makima-daemon/config.toml + /// 6. /etc/makima-daemon/config.toml (Linux only) + /// 7. Default values + pub fn load_with_cli(cli: &crate::cli::Cli) -> Result { + // Load base config (with optional custom config file) + let mut config = Self::load_from_path(cli.config.as_deref())?; + + // Apply CLI overrides (highest priority) + if let Some(ref repos_dir) = cli.repos_dir { + config.worktree.repos_dir = repos_dir.clone(); + } + if let Some(ref worktrees_dir) = cli.worktrees_dir { + config.worktree.base_dir = worktrees_dir.clone(); + } + if let Some(ref server_url) = cli.server_url { + config.server.url = server_url.clone(); + } + if let Some(ref api_key) = cli.api_key { + config.server.api_key = api_key.clone(); + } + if let Some(max_tasks) = cli.max_tasks { + config.process.max_concurrent_tasks = max_tasks; + } + // Log level is always set (has default) + config.logging.level = cli.log_level.clone(); + + // Validate required fields after all sources are merged + config.validate()?; + + Ok(config) + } + + /// Create a minimal config for testing. + #[cfg(test)] + pub fn test_config() -> Self { + Self { + server: ServerConfig { + url: "ws://localhost:8080".to_string(), + api_key: "test-key".to_string(), + heartbeat_interval_secs: 30, + reconnect_interval_secs: 5, + max_reconnect_attempts: 0, + }, + worktree: WorktreeConfig { + base_dir: PathBuf::from("/tmp/makima-daemon-test/worktrees"), + repos_dir: PathBuf::from("/tmp/makima-daemon-test/repos"), + branch_prefix: "makima/task-".to_string(), + cleanup_on_start: true, + default_target_repo: None, + }, + process: ProcessConfig { + claude_command: "claude".to_string(), + claude_args: Vec::new(), + claude_pre_args: Vec::new(), + enable_permissions: false, + disable_verbose: false, + max_concurrent_tasks: 2, + default_timeout_secs: 0, + env_vars: HashMap::new(), + }, + local_db: LocalDbConfig { + path: PathBuf::from("/tmp/makima-daemon-test/state.db"), + }, + logging: LoggingConfig::default(), + repos: ReposConfig::default(), + } + } +} + +/// Helper module for dirs crate (minimal subset). +mod dirs { + use std::path::PathBuf; + + pub fn home_dir() -> Option { + std::env::var("HOME").ok().map(PathBuf::from) + } + + pub fn config_dir() -> Option { + #[cfg(target_os = "macos")] + { + std::env::var("HOME") + .ok() + .map(|h| PathBuf::from(h).join("Library").join("Application Support")) + } + #[cfg(target_os = "linux")] + { + std::env::var("XDG_CONFIG_HOME") + .ok() + .map(PathBuf::from) + .or_else(|| std::env::var("HOME").ok().map(|h| PathBuf::from(h).join(".config"))) + } + #[cfg(target_os = "windows")] + { + std::env::var("APPDATA").ok().map(PathBuf::from) + } + #[cfg(not(any(target_os = "macos", target_os = "linux", target_os = "windows")))] + { + None + } + } +} diff --git a/makima/daemon/src/db/local.rs b/makima/daemon/src/db/local.rs new file mode 100644 index 0000000..5adbf98 --- /dev/null +++ b/makima/daemon/src/db/local.rs @@ -0,0 +1,391 @@ +//! Local SQLite database for crash recovery and state persistence. + +use std::path::Path; + +use chrono::{DateTime, Utc}; +use rusqlite::{params, Connection, Result as SqliteResult}; +use uuid::Uuid; + +use crate::task::TaskState; + +/// Local task record for persistence. +#[derive(Debug, Clone)] +pub struct LocalTask { + pub id: Uuid, + pub server_task_id: Uuid, + pub state: TaskState, + pub container_id: Option, + pub overlay_path: Option, + pub repo_url: Option, + pub base_branch: Option, + pub plan: String, + pub created_at: DateTime, + pub started_at: Option>, + pub completed_at: Option>, + pub error_message: Option, +} + +/// Buffered output for reliable delivery. +#[derive(Debug, Clone)] +pub struct BufferedOutput { + pub id: i64, + pub task_id: Uuid, + pub output: String, + pub is_partial: bool, + pub timestamp: DateTime, +} + +/// Local database for daemon state persistence. +pub struct LocalDb { + conn: Connection, +} + +impl LocalDb { + /// Open or create the local database. + pub fn open(path: &Path) -> SqliteResult { + // Create parent directory if needed + if let Some(parent) = path.parent() { + std::fs::create_dir_all(parent).ok(); + } + + let conn = Connection::open(path)?; + + // Initialize schema + conn.execute_batch(Self::schema())?; + + Ok(Self { conn }) + } + + /// Open an in-memory database (for testing). + #[cfg(test)] + pub fn open_memory() -> SqliteResult { + let conn = Connection::open_in_memory()?; + conn.execute_batch(Self::schema())?; + Ok(Self { conn }) + } + + /// Database schema. + fn schema() -> &'static str { + r#" + -- Local task state for crash recovery + CREATE TABLE IF NOT EXISTS tasks ( + id TEXT PRIMARY KEY, + server_task_id TEXT NOT NULL, + state TEXT NOT NULL, + container_id TEXT, + overlay_path TEXT, + repo_url TEXT, + base_branch TEXT, + plan TEXT NOT NULL, + created_at TEXT NOT NULL, + started_at TEXT, + completed_at TEXT, + error_message TEXT + ); + + -- Buffered output for reliable delivery + CREATE TABLE IF NOT EXISTS output_buffer ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + task_id TEXT NOT NULL, + output TEXT NOT NULL, + is_partial INTEGER NOT NULL, + timestamp TEXT NOT NULL, + sent INTEGER NOT NULL DEFAULT 0 + ); + + -- Daemon state key-value store + CREATE TABLE IF NOT EXISTS daemon_state ( + key TEXT PRIMARY KEY, + value TEXT NOT NULL, + updated_at TEXT NOT NULL + ); + + -- Indexes + CREATE INDEX IF NOT EXISTS idx_tasks_state ON tasks(state); + CREATE INDEX IF NOT EXISTS idx_output_buffer_sent ON output_buffer(sent, id); + CREATE INDEX IF NOT EXISTS idx_output_buffer_task ON output_buffer(task_id); + "# + } + + /// Save a task. + pub fn save_task(&self, task: &LocalTask) -> SqliteResult<()> { + self.conn.execute( + r#" + INSERT OR REPLACE INTO tasks + (id, server_task_id, state, container_id, overlay_path, repo_url, base_branch, plan, created_at, started_at, completed_at, error_message) + VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12) + "#, + params![ + task.id.to_string(), + task.server_task_id.to_string(), + task.state.as_str(), + task.container_id, + task.overlay_path, + task.repo_url, + task.base_branch, + task.plan, + task.created_at.to_rfc3339(), + task.started_at.map(|t| t.to_rfc3339()), + task.completed_at.map(|t| t.to_rfc3339()), + task.error_message, + ], + )?; + Ok(()) + } + + /// Get a task by ID. + pub fn get_task(&self, id: Uuid) -> SqliteResult> { + let mut stmt = self.conn.prepare( + "SELECT id, server_task_id, state, container_id, overlay_path, repo_url, base_branch, plan, created_at, started_at, completed_at, error_message FROM tasks WHERE id = ?1", + )?; + + let mut rows = stmt.query(params![id.to_string()])?; + + if let Some(row) = rows.next()? { + Ok(Some(Self::task_from_row(row)?)) + } else { + Ok(None) + } + } + + /// Get all running/active tasks (for recovery). + pub fn get_active_tasks(&self) -> SqliteResult> { + let mut stmt = self.conn.prepare( + r#" + SELECT id, server_task_id, state, container_id, overlay_path, repo_url, base_branch, plan, created_at, started_at, completed_at, error_message + FROM tasks + WHERE state IN ('initializing', 'starting', 'running', 'paused', 'blocked') + "#, + )?; + + let rows = stmt.query_map([], |row| Self::task_from_row(row))?; + + rows.collect() + } + + /// Delete a task. + pub fn delete_task(&self, id: Uuid) -> SqliteResult<()> { + self.conn.execute( + "DELETE FROM tasks WHERE id = ?1", + params![id.to_string()], + )?; + Ok(()) + } + + /// Update task state. + pub fn update_task_state(&self, id: Uuid, state: TaskState) -> SqliteResult<()> { + self.conn.execute( + "UPDATE tasks SET state = ?2 WHERE id = ?1", + params![id.to_string(), state.as_str()], + )?; + Ok(()) + } + + /// Buffer output for reliable delivery. + pub fn buffer_output(&self, task_id: Uuid, output: &str, is_partial: bool) -> SqliteResult { + self.conn.execute( + r#" + INSERT INTO output_buffer (task_id, output, is_partial, timestamp, sent) + VALUES (?1, ?2, ?3, datetime('now'), 0) + "#, + params![task_id.to_string(), output, is_partial as i32], + )?; + Ok(self.conn.last_insert_rowid()) + } + + /// Get unsent outputs. + pub fn get_unsent_outputs(&self, limit: i64) -> SqliteResult> { + let mut stmt = self.conn.prepare( + r#" + SELECT id, task_id, output, is_partial, timestamp + FROM output_buffer + WHERE sent = 0 + ORDER BY id + LIMIT ?1 + "#, + )?; + + let rows = stmt.query_map(params![limit], |row| { + let id: i64 = row.get(0)?; + let task_id_str: String = row.get(1)?; + let task_id = Uuid::parse_str(&task_id_str).unwrap_or_default(); + let output: String = row.get(2)?; + let is_partial: i32 = row.get(3)?; + let timestamp_str: String = row.get(4)?; + let timestamp = DateTime::parse_from_rfc3339(×tamp_str) + .map(|dt| dt.with_timezone(&Utc)) + .unwrap_or_else(|_| Utc::now()); + + Ok(BufferedOutput { + id, + task_id, + output, + is_partial: is_partial != 0, + timestamp, + }) + })?; + + rows.collect() + } + + /// Mark outputs as sent. + pub fn mark_outputs_sent(&self, ids: &[i64]) -> SqliteResult<()> { + if ids.is_empty() { + return Ok(()); + } + + let placeholders: Vec<&str> = ids.iter().map(|_| "?").collect(); + let sql = format!( + "UPDATE output_buffer SET sent = 1 WHERE id IN ({})", + placeholders.join(",") + ); + + let params: Vec = ids + .iter() + .map(|id| rusqlite::types::Value::Integer(*id)) + .collect(); + + self.conn.execute(&sql, rusqlite::params_from_iter(params))?; + Ok(()) + } + + /// Clean up old sent outputs. + pub fn cleanup_sent_outputs(&self, older_than_hours: i64) -> SqliteResult { + let result = self.conn.execute( + r#" + DELETE FROM output_buffer + WHERE sent = 1 AND timestamp < datetime('now', ?1 || ' hours') + "#, + params![format!("-{}", older_than_hours)], + )?; + Ok(result) + } + + /// Get daemon state value. + pub fn get_state(&self, key: &str) -> SqliteResult> { + let mut stmt = self.conn.prepare( + "SELECT value FROM daemon_state WHERE key = ?1", + )?; + + let mut rows = stmt.query(params![key])?; + + if let Some(row) = rows.next()? { + let value: String = row.get(0)?; + Ok(Some(value)) + } else { + Ok(None) + } + } + + /// Set daemon state value. + pub fn set_state(&self, key: &str, value: &str) -> SqliteResult<()> { + self.conn.execute( + r#" + INSERT OR REPLACE INTO daemon_state (key, value, updated_at) + VALUES (?1, ?2, datetime('now')) + "#, + params![key, value], + )?; + Ok(()) + } + + /// Parse a task from a database row. + fn task_from_row(row: &rusqlite::Row) -> SqliteResult { + let id_str: String = row.get(0)?; + let server_task_id_str: String = row.get(1)?; + let state_str: String = row.get(2)?; + let container_id: Option = row.get(3)?; + let overlay_path: Option = row.get(4)?; + let repo_url: Option = row.get(5)?; + let base_branch: Option = row.get(6)?; + let plan: String = row.get(7)?; + let created_at_str: String = row.get(8)?; + let started_at_str: Option = row.get(9)?; + let completed_at_str: Option = row.get(10)?; + let error_message: Option = row.get(11)?; + + let id = Uuid::parse_str(&id_str).unwrap_or_default(); + let server_task_id = Uuid::parse_str(&server_task_id_str).unwrap_or_default(); + let state = TaskState::from_str(&state_str).unwrap_or_default(); + let created_at = DateTime::parse_from_rfc3339(&created_at_str) + .map(|dt| dt.with_timezone(&Utc)) + .unwrap_or_else(|_| Utc::now()); + let started_at = started_at_str + .and_then(|s| DateTime::parse_from_rfc3339(&s).ok()) + .map(|dt| dt.with_timezone(&Utc)); + let completed_at = completed_at_str + .and_then(|s| DateTime::parse_from_rfc3339(&s).ok()) + .map(|dt| dt.with_timezone(&Utc)); + + Ok(LocalTask { + id, + server_task_id, + state, + container_id, + overlay_path, + repo_url, + base_branch, + plan, + created_at, + started_at, + completed_at, + error_message, + }) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_open_memory() { + let db = LocalDb::open_memory().unwrap(); + assert!(db.get_active_tasks().unwrap().is_empty()); + } + + #[test] + fn test_save_and_get_task() { + let db = LocalDb::open_memory().unwrap(); + + let task = LocalTask { + id: Uuid::new_v4(), + server_task_id: Uuid::new_v4(), + state: TaskState::Running, + container_id: Some("abc123".to_string()), + overlay_path: Some("/tmp/overlay".to_string()), + repo_url: Some("https://github.com/test/repo".to_string()), + base_branch: Some("main".to_string()), + plan: "Build the feature".to_string(), + created_at: Utc::now(), + started_at: Some(Utc::now()), + completed_at: None, + error_message: None, + }; + + db.save_task(&task).unwrap(); + + let loaded = db.get_task(task.id).unwrap().unwrap(); + assert_eq!(loaded.id, task.id); + assert_eq!(loaded.state, TaskState::Running); + assert_eq!(loaded.plan, "Build the feature"); + } + + #[test] + fn test_output_buffer() { + let db = LocalDb::open_memory().unwrap(); + let task_id = Uuid::new_v4(); + + db.buffer_output(task_id, "line 1", false).unwrap(); + db.buffer_output(task_id, "line 2", false).unwrap(); + + let unsent = db.get_unsent_outputs(10).unwrap(); + assert_eq!(unsent.len(), 2); + + let ids: Vec = unsent.iter().map(|o| o.id).collect(); + db.mark_outputs_sent(&ids).unwrap(); + + let unsent = db.get_unsent_outputs(10).unwrap(); + assert!(unsent.is_empty()); + } +} diff --git a/makima/daemon/src/db/mod.rs b/makima/daemon/src/db/mod.rs new file mode 100644 index 0000000..2c6e0f3 --- /dev/null +++ b/makima/daemon/src/db/mod.rs @@ -0,0 +1,5 @@ +//! Local database for daemon state persistence. + +pub mod local; + +pub use local::{BufferedOutput, LocalDb, LocalTask}; diff --git a/makima/daemon/src/error.rs b/makima/daemon/src/error.rs new file mode 100644 index 0000000..00e5140 --- /dev/null +++ b/makima/daemon/src/error.rs @@ -0,0 +1,75 @@ +//! Error types for the makima daemon. + +use thiserror::Error; +use uuid::Uuid; + +/// Top-level daemon error type. +#[derive(Error, Debug)] +pub enum DaemonError { + #[error("WebSocket error: {0}")] + WebSocket(#[from] tokio_tungstenite::tungstenite::Error), + + #[error("Worktree error: {0}")] + Worktree(#[from] crate::worktree::WorktreeError), + + #[error("Process error: {0}")] + Process(#[from] crate::process::ClaudeProcessError), + + #[error("Task error: {0}")] + Task(#[from] TaskError), + + #[error("Configuration error: {0}")] + Config(#[from] config::ConfigError), + + #[error("Database error: {0}")] + Database(#[from] rusqlite::Error), + + #[error("IO error: {0}")] + Io(#[from] std::io::Error), + + #[error("JSON error: {0}")] + Json(#[from] serde_json::Error), + + #[error("Authentication failed: {0}")] + AuthFailed(String), + + #[error("Connection lost")] + ConnectionLost, + + #[error("Server error: {code} - {message}")] + ServerError { code: String, message: String }, +} + +/// Task management errors. +#[derive(Error, Debug)] +pub enum TaskError { + #[error("Task not found: {0}")] + NotFound(Uuid), + + #[error("Invalid state transition from {from} to {to}")] + InvalidStateTransition { from: String, to: String }, + + #[error("Concurrency limit reached")] + ConcurrencyLimit, + + #[error("Task already exists: {0}")] + AlreadyExists(Uuid), + + #[error("Task not running: {0}")] + NotRunning(Uuid), + + #[error("Failed to send message to task: {0}")] + MessageFailed(String), + + #[error("Task setup failed: {0}")] + SetupFailed(String), + + #[error("Task execution failed: {0}")] + ExecutionFailed(String), +} + +/// Result type alias for daemon operations. +pub type Result = std::result::Result; + +/// Result type alias for task operations. +pub type TaskResult = std::result::Result; diff --git a/makima/daemon/src/lib.rs b/makima/daemon/src/lib.rs new file mode 100644 index 0000000..9555681 --- /dev/null +++ b/makima/daemon/src/lib.rs @@ -0,0 +1,21 @@ +//! Makima Daemon - Git worktree orchestration for Claude Code. +//! +//! This daemon runs on worker machines and: +//! - Connects to the makima server via WebSocket +//! - Creates git worktrees for task isolation +//! - Runs Claude Code CLI as subprocesses in worktrees +//! - Streams JSON output back to server + +pub mod cli; +pub mod config; +pub mod db; +pub mod error; +pub mod process; +pub mod task; +pub mod temp; +pub mod worktree; +pub mod ws; + +pub use cli::Cli; +pub use config::DaemonConfig; +pub use error::{DaemonError, Result}; diff --git a/makima/daemon/src/main.rs b/makima/daemon/src/main.rs new file mode 100644 index 0000000..e4ca5d4 --- /dev/null +++ b/makima/daemon/src/main.rs @@ -0,0 +1,313 @@ +//! Makima Daemon - Git worktree orchestration for Claude Code. + +use std::sync::Arc; + +use std::path::Path; + +use clap::Parser; +use makima_daemon::cli::Cli; +use makima_daemon::config::{DaemonConfig, RepoEntry}; +use makima_daemon::db::LocalDb; +use makima_daemon::error::DaemonError; +use makima_daemon::task::{TaskConfig, TaskManager}; +use makima_daemon::ws::{DaemonCommand, WsClient}; +use tokio::process::Command; +use tokio::sync::mpsc; +use tracing_subscriber::{fmt, prelude::*, EnvFilter}; + +#[tokio::main] +async fn main() -> Result<(), Box> { + eprintln!("=== Makima Daemon Starting ==="); + + // Parse command-line arguments + let cli = Cli::parse(); + + // Load configuration with CLI overrides + eprintln!("[1/5] Loading configuration..."); + let config = match DaemonConfig::load_with_cli(&cli) { + Ok(cfg) => { + eprintln!(" Config loaded: server={}", cfg.server.url); + cfg + } + Err(e) => { + eprintln!("Failed to load configuration: {}", e); + eprintln!(); + eprintln!("Use CLI flags:"); + eprintln!(" makima-daemon --server-url ws://localhost:8080 --api-key your-api-key"); + eprintln!(); + eprintln!("Or set environment variables:"); + eprintln!(" MAKIMA_DAEMON_SERVER_URL=ws://localhost:8080"); + eprintln!(" MAKIMA_API_KEY=your-api-key"); + eprintln!(); + eprintln!("Or create a config file: makima-daemon.toml"); + eprintln!(); + eprintln!("Run 'makima-daemon --help' for all options."); + std::process::exit(1); + } + }; + + // Initialize logging + init_logging(&config.logging.level, &config.logging.format); + eprintln!("[2/5] Logging initialized"); + + // Initialize local database + eprintln!("[3/5] Opening local database: {}", config.local_db.path.display()); + let _local_db = LocalDb::open(&config.local_db.path)?; + eprintln!(" Database opened"); + + // Initialize worktree directories + eprintln!("[4/5] Setting up directories..."); + tokio::fs::create_dir_all(&config.worktree.base_dir).await?; + tokio::fs::create_dir_all(&config.worktree.repos_dir).await?; + tokio::fs::create_dir_all(&config.repos.home_dir).await?; + eprintln!(" Worktree base: {}", config.worktree.base_dir.display()); + eprintln!(" Repos cache: {}", config.worktree.repos_dir.display()); + eprintln!(" Home dir: {}", config.repos.home_dir.display()); + + // Auto-clone repositories if configured + if !config.repos.auto_clone.is_empty() { + eprintln!(" Auto-cloning {} repositories...", config.repos.auto_clone.len()); + for repo_entry in &config.repos.auto_clone { + if let Err(e) = auto_clone_repo(repo_entry, &config.repos.home_dir).await { + eprintln!(" WARNING: Failed to clone {}: {}", repo_entry.url(), e); + } + } + } + + // Create channels for communication + let (command_tx, mut command_rx) = mpsc::channel::(64); + + // Get machine info + let machine_id = get_machine_id(); + let hostname = get_hostname(); + eprintln!(" Machine ID: {}", machine_id); + eprintln!(" Hostname: {}", hostname); + + // Create WebSocket client + eprintln!("[5/5] Connecting to server: {}", config.server.url); + let mut ws_client = WsClient::new( + config.server.clone(), + machine_id, + hostname, + config.process.max_concurrent_tasks as i32, + command_tx, + ); + + // Get sender for task manager + let ws_tx = ws_client.sender(); + + // Create task configuration + let task_config = TaskConfig { + max_concurrent_tasks: config.process.max_concurrent_tasks, + worktree_base_dir: config.worktree.base_dir.clone(), + env_vars: config.process.env_vars.clone(), + claude_command: config.process.claude_command.clone(), + claude_args: config.process.claude_args.clone(), + claude_pre_args: config.process.claude_pre_args.clone(), + enable_permissions: config.process.enable_permissions, + disable_verbose: config.process.disable_verbose, + }; + + // Create task manager + let task_manager = Arc::new(TaskManager::new(task_config, ws_tx.clone())); + + // Spawn command handler + let task_manager_clone = task_manager.clone(); + tokio::spawn(async move { + tracing::info!("Command handler started, waiting for commands..."); + while let Some(command) = command_rx.recv().await { + tracing::info!("Received command from channel: {:?}", command); + if let Err(e) = task_manager_clone.handle_command(command).await { + tracing::error!("Failed to handle command: {}", e); + } + } + tracing::info!("Command handler stopped"); + }); + + // Handle shutdown signals + let shutdown_signal = async { + tokio::signal::ctrl_c() + .await + .expect("Failed to install Ctrl+C handler"); + eprintln!("\nReceived shutdown signal"); + }; + + eprintln!("=== Daemon running (Ctrl+C to stop) ==="); + + // Run WebSocket client with shutdown handling + tokio::select! { + result = ws_client.run() => { + match result { + Ok(()) => eprintln!("WebSocket client exited cleanly"), + Err(DaemonError::AuthFailed(msg)) => { + eprintln!("ERROR: Authentication failed: {}", msg); + std::process::exit(1); + } + Err(e) => { + eprintln!("ERROR: WebSocket client error: {}", e); + std::process::exit(1); + } + } + } + _ = shutdown_signal => { + eprintln!("Shutting down..."); + } + } + + // Cleanup + tracing::info!("Daemon stopped"); + + Ok(()) +} + +fn init_logging(level: &str, format: &str) { + let filter = EnvFilter::try_from_default_env() + .or_else(|_| EnvFilter::try_new(level)) + .unwrap_or_else(|_| EnvFilter::new("info")); + + let subscriber = tracing_subscriber::registry().with(filter); + + if format == "json" { + subscriber.with(fmt::layer().json()).init(); + } else { + subscriber.with(fmt::layer()).init(); + } +} + +fn get_machine_id() -> String { + // Try to read machine-id from standard locations + #[cfg(target_os = "linux")] + { + if let Ok(id) = std::fs::read_to_string("/etc/machine-id") { + return id.trim().to_string(); + } + if let Ok(id) = std::fs::read_to_string("/var/lib/dbus/machine-id") { + return id.trim().to_string(); + } + } + + #[cfg(target_os = "macos")] + { + // Use IOPlatformSerialNumber + if let Ok(output) = std::process::Command::new("ioreg") + .args(["-rd1", "-c", "IOPlatformExpertDevice"]) + .output() + { + let stdout = String::from_utf8_lossy(&output.stdout); + for line in stdout.lines() { + if line.contains("IOPlatformUUID") { + if let Some(uuid) = line.split('"').nth(3) { + return uuid.to_string(); + } + } + } + } + } + + // Fallback: generate a random ID and persist it + let state_dir = dirs_next::data_local_dir() + .unwrap_or_else(|| std::path::PathBuf::from(".")) + .join("makima-daemon"); + let machine_id_file = state_dir.join("machine-id"); + + if let Ok(id) = std::fs::read_to_string(&machine_id_file) { + return id.trim().to_string(); + } + + // Generate new ID + let new_id = uuid::Uuid::new_v4().to_string(); + std::fs::create_dir_all(&state_dir).ok(); + std::fs::write(&machine_id_file, &new_id).ok(); + new_id +} + +fn get_hostname() -> String { + hostname::get() + .map(|h| h.to_string_lossy().to_string()) + .unwrap_or_else(|_| "unknown".to_string()) +} + +/// Auto-clone a repository to the home directory if it doesn't exist. +async fn auto_clone_repo( + repo_entry: &RepoEntry, + home_dir: &Path, +) -> Result<(), Box> { + let dir_name = repo_entry + .dir_name() + .ok_or("Could not determine directory name from URL")?; + let target_dir = home_dir.join(&dir_name); + + // Check if already cloned + if target_dir.exists() { + eprintln!(" [skip] {} (already exists)", dir_name); + return Ok(()); + } + + let url = repo_entry.expanded_url(); + eprintln!(" [clone] {} -> {}", url, target_dir.display()); + + // Build git clone command + let mut args = vec!["clone".to_string()]; + + // Add shallow clone if requested + if repo_entry.shallow() { + args.push("--depth".to_string()); + args.push("1".to_string()); + } + + // Add branch if specified + if let Some(branch) = repo_entry.branch() { + args.push("--branch".to_string()); + args.push(branch.to_string()); + } + + args.push(url.clone()); + args.push(target_dir.to_string_lossy().to_string()); + + // Run git clone + let output = Command::new("git") + .args(&args) + .output() + .await?; + + if !output.status.success() { + let stderr = String::from_utf8_lossy(&output.stderr); + return Err(format!("git clone failed: {}", stderr).into()); + } + + eprintln!(" [done] {}", dir_name); + Ok(()) +} + +/// dirs_next minimal replacement +mod dirs_next { + use std::path::PathBuf; + + pub fn data_local_dir() -> Option { + #[cfg(target_os = "macos")] + { + std::env::var("HOME") + .ok() + .map(|h| PathBuf::from(h).join("Library").join("Application Support")) + } + #[cfg(target_os = "linux")] + { + std::env::var("XDG_DATA_HOME") + .ok() + .map(PathBuf::from) + .or_else(|| { + std::env::var("HOME") + .ok() + .map(|h| PathBuf::from(h).join(".local").join("share")) + }) + } + #[cfg(target_os = "windows")] + { + std::env::var("LOCALAPPDATA").ok().map(PathBuf::from) + } + #[cfg(not(any(target_os = "macos", target_os = "linux", target_os = "windows")))] + { + None + } + } +} diff --git a/makima/daemon/src/process/claude.rs b/makima/daemon/src/process/claude.rs new file mode 100644 index 0000000..e06ee09 --- /dev/null +++ b/makima/daemon/src/process/claude.rs @@ -0,0 +1,481 @@ +//! Claude Code process management. + +use std::collections::HashMap; +use std::path::Path; +use std::process::Stdio; +use std::sync::Arc; + +use futures::Stream; +use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader}; +use tokio::process::{Child, ChildStdin, Command}; +use tokio::sync::{mpsc, Mutex}; + +use super::claude_protocol::ClaudeInputMessage; + +/// Errors that can occur during Claude process management. +#[derive(Debug, thiserror::Error)] +pub enum ClaudeProcessError { + #[error("Failed to spawn Claude process: {0}")] + SpawnFailed(#[from] std::io::Error), + + #[error("Claude command not found: {0}")] + CommandNotFound(String), + + #[error("Process already exited")] + AlreadyExited, + + #[error("Failed to read output: {0}")] + OutputRead(String), +} + +/// A line of output from Claude Code. +#[derive(Debug, Clone)] +pub struct OutputLine { + /// The raw content of the line. + pub content: String, + /// Whether this is from stdout (true) or stderr (false). + pub is_stdout: bool, + /// Parsed JSON type if available (e.g., "system", "assistant", "result"). + pub json_type: Option, +} + +impl OutputLine { + /// Create a new stdout output line. + pub fn stdout(content: String) -> Self { + let json_type = extract_json_type(&content); + Self { + content, + is_stdout: true, + json_type, + } + } + + /// Create a new stderr output line. + pub fn stderr(content: String) -> Self { + Self { + content, + is_stdout: false, + json_type: None, + } + } +} + +/// Extract the "type" field from a JSON line if present. +fn extract_json_type(line: &str) -> Option { + // Quick check for JSON + if !line.starts_with('{') { + return None; + } + + // Try to parse and extract type + if let Ok(json) = serde_json::from_str::(line) { + json.get("type") + .and_then(|v| v.as_str()) + .map(|s| s.to_string()) + } else { + None + } +} + +/// Handle to a running Claude Code process. +pub struct ClaudeProcess { + /// The child process. + child: Child, + /// Receiver for output lines. + output_rx: mpsc::Receiver, + /// Stdin handle for sending input to the process (thread-safe). + stdin: Arc>>, +} + +impl ClaudeProcess { + /// Wait for the process to exit and return the exit code. + pub async fn wait(&mut self) -> Result { + let status = self.child.wait().await?; + Ok(status.code().unwrap_or(-1) as i64) + } + + /// Check if the process has exited. + pub fn try_wait(&mut self) -> Result, ClaudeProcessError> { + match self.child.try_wait()? { + Some(status) => Ok(Some(status.code().unwrap_or(-1) as i64)), + None => Ok(None), + } + } + + /// Kill the process. + pub async fn kill(&mut self) -> Result<(), ClaudeProcessError> { + self.child.kill().await?; + Ok(()) + } + + /// Get the next output line, if available. + pub async fn next_output(&mut self) -> Option { + self.output_rx.recv().await + } + + /// Send a raw message to the process via stdin. + /// + /// This can be used to provide input when Claude Code is waiting for user input. + pub async fn send_input(&self, message: &str) -> Result<(), ClaudeProcessError> { + let mut stdin_guard = self.stdin.lock().await; + if let Some(ref mut stdin) = *stdin_guard { + stdin + .write_all(message.as_bytes()) + .await + .map_err(|e| ClaudeProcessError::OutputRead(format!("Failed to write to stdin: {}", e)))?; + stdin + .write_all(b"\n") + .await + .map_err(|e| ClaudeProcessError::OutputRead(format!("Failed to write newline: {}", e)))?; + stdin + .flush() + .await + .map_err(|e| ClaudeProcessError::OutputRead(format!("Failed to flush stdin: {}", e)))?; + Ok(()) + } else { + Err(ClaudeProcessError::OutputRead("Stdin not available".to_string())) + } + } + + /// Send a user message to the process via stdin using JSON protocol. + /// + /// This is the preferred method when using `--input-format=stream-json`. + /// The message is serialized as JSON and sent as a single line. + pub async fn send_user_message(&self, content: &str) -> Result<(), ClaudeProcessError> { + let message = ClaudeInputMessage::user(content); + let json_line = message.to_json_line().map_err(|e| { + ClaudeProcessError::OutputRead(format!("Failed to serialize message: {}", e)) + })?; + + tracing::debug!(content_len = content.len(), "Sending user message to Claude process"); + + let mut stdin_guard = self.stdin.lock().await; + if let Some(ref mut stdin) = *stdin_guard { + stdin + .write_all(json_line.as_bytes()) + .await + .map_err(|e| ClaudeProcessError::OutputRead(format!("Failed to write to stdin: {}", e)))?; + stdin + .flush() + .await + .map_err(|e| ClaudeProcessError::OutputRead(format!("Failed to flush stdin: {}", e)))?; + Ok(()) + } else { + Err(ClaudeProcessError::OutputRead("Stdin not available".to_string())) + } + } + + /// Get a clone of the stdin handle for external use. + pub fn stdin_handle(&self) -> Arc>> { + Arc::clone(&self.stdin) + } + + /// Close stdin, signaling EOF to the process. + pub async fn close_stdin(&self) -> Result<(), ClaudeProcessError> { + let mut stdin_guard = self.stdin.lock().await; + if let Some(mut stdin) = stdin_guard.take() { + let _ = stdin.shutdown().await; + } + Ok(()) + } + + /// Convert to a stream of output lines. + pub fn into_stream(self) -> impl Stream { + futures::stream::unfold(self.output_rx, |mut rx| async move { + rx.recv().await.map(|line| (line, rx)) + }) + } +} + +/// Manages Claude Code process spawning. +pub struct ProcessManager { + /// Path to the claude command. + claude_command: String, + /// Additional arguments to pass to Claude Code (after defaults). + claude_args: Vec, + /// Arguments to pass before defaults. + claude_pre_args: Vec, + /// Whether to enable Claude's permission system (skip --dangerously-skip-permissions). + enable_permissions: bool, + /// Whether to disable verbose output. + disable_verbose: bool, + /// Default environment variables to pass. + default_env: HashMap, +} + +impl Default for ProcessManager { + fn default() -> Self { + Self::new() + } +} + +impl ProcessManager { + /// Create a new ProcessManager with default settings. + pub fn new() -> Self { + Self { + claude_command: "claude".to_string(), + claude_args: Vec::new(), + claude_pre_args: Vec::new(), + enable_permissions: false, + disable_verbose: false, + default_env: HashMap::new(), + } + } + + /// Create a ProcessManager with a custom claude command path. + pub fn with_command(command: String) -> Self { + Self { + claude_command: command, + claude_args: Vec::new(), + claude_pre_args: Vec::new(), + enable_permissions: false, + disable_verbose: false, + default_env: HashMap::new(), + } + } + + /// Set additional arguments to pass after default arguments. + pub fn with_args(mut self, args: Vec) -> Self { + self.claude_args = args; + self + } + + /// Set arguments to pass before default arguments. + pub fn with_pre_args(mut self, args: Vec) -> Self { + self.claude_pre_args = args; + self + } + + /// Enable Claude's permission system (don't pass --dangerously-skip-permissions). + pub fn with_permissions_enabled(mut self, enabled: bool) -> Self { + self.enable_permissions = enabled; + self + } + + /// Disable verbose output. + pub fn with_verbose_disabled(mut self, disabled: bool) -> Self { + self.disable_verbose = disabled; + self + } + + /// Add default environment variables. + pub fn with_env(mut self, env: HashMap) -> Self { + self.default_env = env; + self + } + + /// Spawn a Claude Code process to execute a plan. + /// + /// The process runs in the specified working directory with stream-json output format. + pub async fn spawn( + &self, + working_dir: &Path, + plan: &str, + extra_env: Option>, + ) -> Result { + tracing::info!( + working_dir = %working_dir.display(), + plan_len = plan.len(), + plan_preview = %if plan.len() > 200 { &plan[..200] } else { plan }, + "Spawning Claude Code process" + ); + + // Verify working directory exists + if !working_dir.exists() { + tracing::error!(working_dir = %working_dir.display(), "Working directory does not exist!"); + return Err(ClaudeProcessError::SpawnFailed(std::io::Error::new( + std::io::ErrorKind::NotFound, + format!("Working directory does not exist: {}", working_dir.display()), + ))); + } + + // Build environment + let mut env = self.default_env.clone(); + if let Some(extra) = extra_env { + env.extend(extra); + } + + // Build arguments list + let mut args = Vec::new(); + + // Pre-args (before defaults) + args.extend(self.claude_pre_args.clone()); + + // Required arguments for stream-json protocol + args.push("--output-format=stream-json".to_string()); + args.push("--input-format=stream-json".to_string()); + + // Optional default arguments + if !self.disable_verbose { + args.push("--verbose".to_string()); + } + if !self.enable_permissions { + args.push("--dangerously-skip-permissions".to_string()); + } + + // Additional user-configured arguments + args.extend(self.claude_args.clone()); + + tracing::debug!(args = ?args, "Claude command arguments"); + + // Spawn the process + let mut child = Command::new(&self.claude_command) + .args(&args) + .current_dir(working_dir) + .envs(env) + .stdin(Stdio::piped()) + .stdout(Stdio::piped()) + .stderr(Stdio::piped()) + .kill_on_drop(true) + .spawn() + .map_err(|e| { + if e.kind() == std::io::ErrorKind::NotFound { + ClaudeProcessError::CommandNotFound(self.claude_command.clone()) + } else { + ClaudeProcessError::SpawnFailed(e) + } + })?; + + // Create output channel + let (tx, rx) = mpsc::channel(1000); + + // Take stdout, stderr, and stdin + // With --input-format=stream-json, we keep stdin open for sending messages + let stdin = child.stdin.take(); + let stdin = Arc::new(Mutex::new(stdin)); + + let stdout = child.stdout.take().expect("stdout should be piped"); + let stderr = child.stderr.take().expect("stderr should be piped"); + + // Spawn task to read stdout + let tx_stdout = tx.clone(); + tokio::spawn(async move { + use tokio::io::AsyncReadExt; + let mut reader = BufReader::new(stdout); + let mut buffer = vec![0u8; 4096]; + let mut line_buffer = String::new(); + + loop { + // Try to read with a timeout to detect if we're stuck + match tokio::time::timeout( + tokio::time::Duration::from_secs(5), + reader.read(&mut buffer) + ).await { + Ok(Ok(0)) => { + // EOF + tracing::debug!("Claude stdout EOF"); + // Send any remaining content + if !line_buffer.is_empty() { + let _ = tx_stdout.send(OutputLine::stdout(line_buffer)).await; + } + break; + } + Ok(Ok(n)) => { + let chunk = String::from_utf8_lossy(&buffer[..n]); + tracing::debug!(bytes = n, chunk_preview = %if chunk.len() > 100 { &chunk[..100] } else { &chunk }, "Got stdout chunk from Claude"); + + // Accumulate into line buffer and emit complete lines + line_buffer.push_str(&chunk); + while let Some(newline_pos) = line_buffer.find('\n') { + let line = line_buffer[..newline_pos].to_string(); + line_buffer = line_buffer[newline_pos + 1..].to_string(); + if tx_stdout.send(OutputLine::stdout(line)).await.is_err() { + return; + } + } + } + Ok(Err(e)) => { + tracing::error!(error = %e, "Error reading Claude stdout"); + break; + } + Err(_) => { + // Timeout - no data for 5 seconds + tracing::warn!("No stdout data from Claude for 5 seconds"); + } + } + } + tracing::debug!("Claude stdout reader task ended"); + }); + + // Spawn task to read stderr + let tx_stderr = tx; + tokio::spawn(async move { + let reader = BufReader::new(stderr); + let mut lines = reader.lines(); + while let Ok(Some(line)) = lines.next_line().await { + tracing::debug!(line = %line, "Claude stderr"); + if tx_stderr.send(OutputLine::stderr(line)).await.is_err() { + break; + } + } + tracing::debug!("Claude stderr reader task ended"); + }); + + tracing::info!("Claude Code process spawned successfully"); + + let process = ClaudeProcess { + child, + output_rx: rx, + stdin, + }; + + // Send the initial plan as the first user message + tracing::info!(plan_len = plan.len(), "Sending initial plan to Claude via stdin"); + process.send_user_message(plan).await?; + + Ok(process) + } + + /// Check if the claude command is available. + pub async fn check_claude_available(&self) -> Result { + let output = Command::new(&self.claude_command) + .arg("--version") + .output() + .await + .map_err(|e| { + if e.kind() == std::io::ErrorKind::NotFound { + ClaudeProcessError::CommandNotFound(self.claude_command.clone()) + } else { + ClaudeProcessError::SpawnFailed(e) + } + })?; + + if output.status.success() { + Ok(String::from_utf8_lossy(&output.stdout).trim().to_string()) + } else { + Err(ClaudeProcessError::CommandNotFound( + self.claude_command.clone(), + )) + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_extract_json_type() { + assert_eq!( + extract_json_type(r#"{"type":"system","subtype":"init"}"#), + Some("system".to_string()) + ); + assert_eq!( + extract_json_type(r#"{"type":"assistant","message":{}}"#), + Some("assistant".to_string()) + ); + assert_eq!(extract_json_type("not json"), None); + assert_eq!(extract_json_type(r#"{"no_type": true}"#), None); + } + + #[test] + fn test_output_line_creation() { + let line = OutputLine::stdout(r#"{"type":"result"}"#.to_string()); + assert!(line.is_stdout); + assert_eq!(line.json_type, Some("result".to_string())); + + let line = OutputLine::stderr("error message".to_string()); + assert!(!line.is_stdout); + assert_eq!(line.json_type, None); + } +} diff --git a/makima/daemon/src/process/claude_protocol.rs b/makima/daemon/src/process/claude_protocol.rs new file mode 100644 index 0000000..930152b --- /dev/null +++ b/makima/daemon/src/process/claude_protocol.rs @@ -0,0 +1,59 @@ +//! Claude Code JSON protocol types for stdin communication. +//! +//! When using `--input-format=stream-json`, Claude Code expects +//! newline-delimited JSON messages on stdin. + +use serde::Serialize; + +/// Message sent to Claude Code via stdin. +/// +/// Format based on Claude Code's stream-json input protocol. +#[derive(Debug, Clone, Serialize)] +#[serde(tag = "type", rename_all = "snake_case")] +pub enum ClaudeInputMessage { + /// A user message to send to Claude. + User { message: UserMessage }, +} + +/// The inner user message structure. +#[derive(Debug, Clone, Serialize)] +pub struct UserMessage { + /// Always "user" for user messages. + pub role: String, + /// The message content. + pub content: String, +} + +impl ClaudeInputMessage { + /// Create a new user message. + pub fn user(content: impl Into) -> Self { + Self::User { + message: UserMessage { + role: "user".to_string(), + content: content.into(), + }, + } + } + + /// Serialize to a JSON string with trailing newline (NDJSON format). + pub fn to_json_line(&self) -> Result { + let mut json = serde_json::to_string(self)?; + json.push('\n'); + Ok(json) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_user_message_serialization() { + let msg = ClaudeInputMessage::user("Hello, Claude!"); + let json = msg.to_json_line().unwrap(); + + // Should produce: {"type":"user","message":{"role":"user","content":"Hello, Claude!"}}\n + assert!(json.starts_with(r#"{"type":"user","message":{"role":"user","content":"Hello, Claude!"}}"#)); + assert!(json.ends_with('\n')); + } +} diff --git a/makima/daemon/src/process/mod.rs b/makima/daemon/src/process/mod.rs new file mode 100644 index 0000000..814a3c5 --- /dev/null +++ b/makima/daemon/src/process/mod.rs @@ -0,0 +1,10 @@ +//! Process management for Claude Code subprocess execution. +//! +//! Spawns and manages Claude Code processes in worktree directories, +//! streaming JSON output back to the daemon. + +mod claude; +mod claude_protocol; + +pub use claude::{ClaudeProcess, ClaudeProcessError, OutputLine, ProcessManager}; +pub use claude_protocol::ClaudeInputMessage; diff --git a/makima/daemon/src/task/manager.rs b/makima/daemon/src/task/manager.rs new file mode 100644 index 0000000..4979ce7 --- /dev/null +++ b/makima/daemon/src/task/manager.rs @@ -0,0 +1,2248 @@ +//! Task lifecycle manager using git worktrees and Claude Code subprocesses. + +use std::collections::HashMap; +use std::path::PathBuf; +use std::sync::Arc; +use std::time::Instant; + +use rand::Rng; +use tokio::io::AsyncWriteExt; +use tokio::sync::{mpsc, RwLock, Semaphore}; +use uuid::Uuid; + +use std::collections::HashSet; + +use super::state::TaskState; +use crate::error::{DaemonError, TaskError, TaskResult}; +use crate::process::{ClaudeInputMessage, ProcessManager}; +use crate::temp::TempManager; +use crate::worktree::{is_new_repo_request, ConflictResolution, WorktreeInfo, WorktreeManager}; +use crate::ws::{BranchInfo, DaemonCommand, DaemonMessage}; + +/// Generate a secure random API key for orchestrator tool access. +fn generate_tool_key() -> String { + let mut rng = rand::rng(); + let bytes: [u8; 32] = rng.random(); + hex::encode(bytes) +} + +/// System prompt for regular (non-orchestrator) subtasks. +/// This ensures subtasks work only within their isolated worktree directory. +const SUBTASK_SYSTEM_PROMPT: &str = r#"You are working in an isolated worktree directory that contains a snapshot of the codebase. + +## IMPORTANT: Directory Restrictions + +**You MUST only work within the current working directory (your worktree).** + +- DO NOT use `cd` to navigate to directories outside your worktree +- DO NOT use absolute paths that point outside your worktree (e.g., don't write to ~/some/path, /tmp, or the original repository) +- DO NOT modify files in parent directories or sibling directories +- All your file operations should be relative to the current directory + +Your working directory is your sandboxed workspace. When you complete your task, your changes will be reviewed and integrated by the orchestrator. + +**Why?** Your worktree is isolated so that: +1. Your changes don't affect other running tasks +2. Changes can be reviewed before integration +3. Multiple tasks can work on the codebase in parallel without conflicts + +--- + +"#; + +/// The orchestrator system prompt that tells Claude how to use the helper script. +const ORCHESTRATOR_SYSTEM_PROMPT: &str = r#"You are an orchestrator task. Your job is to coordinate subtasks and integrate their work, NOT to write code directly. + +## FIRST STEP + +Start by checking if you have existing subtasks: + +```bash +# List all subtasks to see what work needs to be done +./.makima/orchestrate.sh list +``` + +If subtasks exist, start them. If you need additional subtasks or no subtasks exist yet, you can create them. + +--- + +## Creating Subtasks + +You can create new subtasks to break down work: + +```bash +# Create a new subtask with a name and plan +./.makima/orchestrate.sh create "Subtask Name" "Detailed plan for what the subtask should do..." + +# The command returns the new subtask ID - use it to start the subtask +./.makima/orchestrate.sh start +``` + +Create subtasks when you need to: +- Break down complex work into smaller pieces +- Run multiple tasks in parallel on different parts of the codebase +- Delegate specific implementation work + +## Task Continuation (Sequential Dependencies) + +When subtasks need to build on each other's work (e.g., Task B depends on Task A's changes), use `--continue-from`: + +```bash +# Create Task B that continues from Task A's worktree +./.makima/orchestrate.sh create "Task B" "Build on Task A's work..." --continue-from +``` + +This copies all files from Task A's worktree into Task B's worktree, so Task B starts with Task A's changes. + +**When to use continuation:** +- Sequential work: Task B needs Task A's output files +- Staged implementation: Building features incrementally +- Fix-and-extend: One task fixes issues, another adds features on top + +**When NOT to use continuation:** +- Parallel tasks working on different files +- Independent subtasks that can be merged separately + +**Important for merging:** When tasks continue from each other, only merge the FINAL task in the chain. Earlier tasks' changes are already included in later tasks' worktrees. + +## Sharing Files with Subtasks + +Use `--files` to copy specific files from your orchestrator worktree to subtasks. This is useful for sharing plans, configs, or data files: + +```bash +# Create subtask with specific files copied from orchestrator +./.makima/orchestrate.sh create "Implement Feature" "Follow the plan in PLAN.md" --files "PLAN.md" + +# Copy multiple files (comma-separated) +./.makima/orchestrate.sh create "API Work" "Use the spec..." --files "PLAN.md,api-spec.yaml,types.ts" + +# Combine with --continue-from to share files AND continue from another task +./.makima/orchestrate.sh create "Step 2" "Continue..." --continue-from --files "requirements.md" +``` + +**Use cases for --files:** +- Share a PLAN.md with detailed implementation steps +- Distribute configuration or spec files +- Pass generated data or intermediate results + +## How Subtasks Work + +Each subtask runs in its own **worktree** - a separate directory with a copy of the codebase. When subtasks complete: +- Their work remains in the worktree files (NOT committed to git) +- **Subtasks do NOT auto-merge** - YOU must integrate their work into your worktree +- You can view and copy files from subtask worktrees using their paths +- The worktree path is returned when you get subtask status + +**IMPORTANT:** Subtasks never create PRs or merge to the target repository. Only the orchestrator (you) can trigger completion actions like PR creation or merging after integrating all subtask work. + +## Subtask Commands +```bash +# List all subtasks and their current status +./.makima/orchestrate.sh list + +# Create a new subtask (returns the subtask ID) +./.makima/orchestrate.sh create "Name" "Plan/description" + +# Create a subtask that continues from another task's worktree +./.makima/orchestrate.sh create "Name" "Plan" --continue-from + +# Create a subtask with specific files copied from orchestrator worktree +./.makima/orchestrate.sh create "Name" "Plan" --files "file1.md,file2.yaml" + +# Start a specific subtask (it will run in its own Claude instance) +./.makima/orchestrate.sh start + +# Stop a running subtask +./.makima/orchestrate.sh stop + +# Get detailed status of a subtask (includes worktree_path when available) +./.makima/orchestrate.sh status + +# Get the output/logs of a subtask +./.makima/orchestrate.sh output + +# Get the worktree path for a subtask +./.makima/orchestrate.sh worktree +``` + +## Integrating Subtask Work + +When subtasks complete, their changes exist as files in their worktree directories: +- Files are NOT committed to git branches +- You must copy/integrate files from subtask worktrees into your worktree +- Use standard file operations (cp, cat, etc.) to review and integrate changes + +### Handling Continuation Chains + +**CRITICAL:** When subtasks use `--continue-from`, they form a chain where each task includes all changes from previous tasks. You must ONLY integrate the FINAL task in each chain. + +Example chain: Task A → Task B (continues from A) → Task C (continues from B) +- Task C's worktree contains ALL changes from A, B, and C +- You should ONLY integrate Task C's worktree +- DO NOT integrate Task A or Task B separately (their changes are already in C) + +**How to track continuation chains:** +1. When you create tasks with `--continue-from`, note which task continues from which +2. Build a mental model: Independent tasks (no continuation) + Continuation chains +3. For each chain, only integrate the LAST task in the chain + +**Example with mixed independent and chained tasks:** +``` +Independent tasks (integrate all): +- Task X: API endpoints +- Task Y: Database models + +Continuation chain (integrate ONLY the last one): +- Task A: Core feature → Task B: Tests (continues from A) → Task C: Docs (continues from B) + Only integrate Task C! +``` + +### Integration Examples + +For independent subtasks (no continuation): +```bash +# Get the worktree path for a completed subtask +SUBTASK_PATH=$(./.makima/orchestrate.sh worktree ) + +# View what files were changed +ls -la "$SUBTASK_PATH" +diff -r . "$SUBTASK_PATH" --exclude=.git --exclude=.makima + +# Copy specific files from subtask +cp "$SUBTASK_PATH/src/new_file.rs" ./src/ +cp "$SUBTASK_PATH/src/modified_file.rs" ./src/ + +# Or use diff/patch for more control +diff -u ./src/file.rs "$SUBTASK_PATH/src/file.rs" > changes.patch +patch -p0 < changes.patch +``` + +For continuation chains (only integrate the final task): +```bash +# If you have: Task A → Task B → Task C (each continues from previous) +# ONLY get and integrate Task C's worktree - it has everything! + +FINAL_TASK_PATH=$(./.makima/orchestrate.sh worktree ) + +# Copy all changes from the final task +rsync -av --exclude='.git' --exclude='.makima' "$FINAL_TASK_PATH/" ./ +``` + +## Completion +```bash +# Mark yourself as complete after integrating all subtask work +./.makima/orchestrate.sh done "Summary of what was accomplished" +``` + +## Workflow +1. **List existing subtasks**: Run `list` to see current subtasks +2. **Create subtasks if needed**: Use `create` to add new subtasks for the work + - For independent parallel work: create without `--continue-from` + - For sequential dependencies: use `--continue-from ` + - Track which tasks continue from which (continuation chains) +3. **Start subtasks**: Run `start` for each subtask +4. **Monitor progress**: Check status and output as subtasks run +5. **Integrate work**: When subtasks complete: + - For independent tasks: integrate each one's worktree + - For continuation chains: ONLY integrate the FINAL task (it has all changes) + - Get worktree path with `worktree ` + - Copy or merge files into your worktree +6. **Complete**: Call `done` once all work is integrated + +## Important Notes +- Subtask files are in worktrees, NOT committed git branches +- **Subtasks do NOT auto-merge or create PRs** - you must integrate their work +- You can read files from subtask worktrees using their paths +- Use standard file tools (cp, diff, cat, rsync) to integrate changes +- You should NOT edit files directly - that's what subtasks are for +- DO NOT DO THE SUBTASKS' WORK! Your only job is to coordinate, not implement. +- When you call `done`, YOUR worktree may be used for the final PR/merge +"#; + +/// Content of the helper bash script that orchestrators use to call the API. +const ORCHESTRATE_SCRIPT: &str = r#"#!/bin/bash +# Makima Orchestrator Helper Script +# Usage: ./orchestrate.sh [args...] + +API_URL="${MAKIMA_API_URL:-http://localhost:8080}" +API_KEY="${MAKIMA_API_KEY}" +TASK_ID="${MAKIMA_TASK_ID}" + +if [ -z "$API_KEY" ]; then + echo "Error: MAKIMA_API_KEY not set" >&2 + exit 1 +fi + +if [ -z "$TASK_ID" ]; then + echo "Error: MAKIMA_TASK_ID not set" >&2 + exit 1 +fi + +# Helper function to make API calls and check for errors +api_call() { + local method="$1" + local url="$2" + local data="$3" + local response + local http_code + + if [ -n "$data" ]; then + response=$(curl -s -w "\n%{http_code}" -X "$method" \ + -H "X-Makima-Tool-Key: $API_KEY" \ + -H "Content-Type: application/json" \ + -d "$data" \ + "$url") + else + response=$(curl -s -w "\n%{http_code}" -X "$method" \ + -H "X-Makima-Tool-Key: $API_KEY" \ + "$url") + fi + + # Extract HTTP code (last line) and body (everything else) + http_code=$(echo "$response" | tail -n1) + body=$(echo "$response" | sed '$d') + + # Check for curl errors or non-2xx status + if [ "$http_code" -lt 200 ] || [ "$http_code" -ge 300 ]; then + echo "Error: API request failed with HTTP $http_code" >&2 + echo "URL: $url" >&2 + echo "Response: $body" >&2 + echo "$body" + return 1 + fi + + echo "$body" + return 0 +} + +case "$1" in + list) + api_call GET "$API_URL/api/v1/mesh/tasks/$TASK_ID/subtasks" + ;; + create) + # Parse arguments: create "name" "plan" [--continue-from ] [--files "file1,file2"] + if [ -z "$2" ] || [ -z "$3" ]; then + echo "Usage: $0 create \"\" \"\" [--continue-from ] [--files \"file1,file2\"]" >&2 + exit 1 + fi + NAME="$2" + PLAN="$3" + CONTINUE_FROM="" + COPY_FILES="" + + # Parse optional flags (can be in any order after name and plan) + shift 3 + while [ $# -gt 0 ]; do + case "$1" in + --continue-from) + CONTINUE_FROM="$2" + shift 2 + ;; + --files) + COPY_FILES="$2" + shift 2 + ;; + *) + echo "Unknown option: $1" >&2 + exit 1 + ;; + esac + done + + # Escape quotes in name and plan for JSON + NAME_ESCAPED=$(echo "$NAME" | sed 's/"/\\"/g' | sed 's/\\/\\\\/g') + PLAN_ESCAPED=$(echo "$PLAN" | sed 's/"/\\"/g' | sed 's/\\/\\\\/g') + + # Build JSON body + JSON_BODY="{\"name\":\"$NAME_ESCAPED\",\"plan\":\"$PLAN_ESCAPED\",\"parentTaskId\":\"$TASK_ID\"" + + if [ -n "$CONTINUE_FROM" ]; then + echo "Creating subtask: $NAME (continuing from $CONTINUE_FROM)..." >&2 + JSON_BODY="$JSON_BODY,\"continueFromTaskId\":\"$CONTINUE_FROM\"" + else + echo "Creating subtask: $NAME..." >&2 + fi + + if [ -n "$COPY_FILES" ]; then + # Convert comma-separated file list to JSON array + FILES_JSON="[" + first=true + IFS=',' read -ra FILE_ARRAY <<< "$COPY_FILES" + for file in "${FILE_ARRAY[@]}"; do + file=$(echo "$file" | xargs) # trim whitespace + if [ "$first" = true ]; then + FILES_JSON="$FILES_JSON\"$file\"" + first=false + else + FILES_JSON="$FILES_JSON,\"$file\"" + fi + done + FILES_JSON="$FILES_JSON]" + JSON_BODY="$JSON_BODY,\"copyFiles\":$FILES_JSON" + echo " (copying files: $COPY_FILES)" >&2 + fi + + JSON_BODY="$JSON_BODY}" + api_call POST "$API_URL/api/v1/mesh/tasks" "$JSON_BODY" + ;; + start) + if [ -z "$2" ]; then + echo "Usage: $0 start " >&2 + exit 1 + fi + echo "Starting subtask $2..." >&2 + api_call POST "$API_URL/api/v1/mesh/tasks/$2/start" + ;; + stop) + if [ -z "$2" ]; then + echo "Usage: $0 stop " >&2 + exit 1 + fi + api_call POST "$API_URL/api/v1/mesh/tasks/$2/stop" + ;; + status) + if [ -z "$2" ]; then + echo "Usage: $0 status " >&2 + exit 1 + fi + api_call GET "$API_URL/api/v1/mesh/tasks/$2" + ;; + output) + if [ -z "$2" ]; then + echo "Usage: $0 output " >&2 + exit 1 + fi + api_call GET "$API_URL/api/v1/mesh/tasks/$2/output" + ;; + worktree) + if [ -z "$2" ]; then + echo "Usage: $0 worktree " >&2 + exit 1 + fi + # Get the worktree path from the task's overlayPath field via API + TASK_JSON=$(api_call GET "$API_URL/api/v1/mesh/tasks/$2") + if [ $? -ne 0 ]; then + echo "Error: Failed to get task info" >&2 + exit 1 + fi + WORKTREE_PATH=$(echo "$TASK_JSON" | grep -o '"overlayPath":"[^"]*"' | cut -d'"' -f4) + if [ -z "$WORKTREE_PATH" ]; then + echo "Error: Task has no worktree path (may not have started yet)" >&2 + exit 1 + fi + if [ -d "$WORKTREE_PATH" ]; then + echo "$WORKTREE_PATH" + else + echo "Error: Worktree not found at $WORKTREE_PATH" >&2 + echo "The worktree may have been cleaned up." >&2 + exit 1 + fi + ;; + done) + SUMMARY="${2:-Task completed}" + api_call PUT "$API_URL/api/v1/mesh/tasks/$TASK_ID" "{\"status\":\"done\",\"progressSummary\":\"$SUMMARY\"}" + ;; + *) + echo "Makima Orchestrator Helper" + echo "" + echo "Usage: $0 [args...]" + echo "" + echo "Subtask Commands:" + echo " list List all subtasks and their status" + echo " create \"\" \"\" Create a new subtask" + echo " create \"...\" --continue-from ID Create subtask continuing from another task's worktree" + echo " create \"...\" --files \"file1,file2\" Copy specific files from parent (orchestrator) worktree" + echo " start Start a subtask" + echo " stop Stop a running subtask" + echo " status Get detailed subtask status" + echo " output Get subtask output history" + echo " worktree Get path to subtask's worktree" + echo "" + echo "Completion:" + echo " done [summary] Mark orchestrator as complete" + echo "" + echo "Examples:" + echo " create \"Fix bug\" \"Fix the null check bug\" --files \"PLAN.md\"" + echo " create \"Step 2\" \"Continue work\" --continue-from abc123 --files \"shared.rs,types.rs\"" + ;; +esac +"#; + +/// Tracks merge state for an orchestrator task. +#[derive(Default)] +struct MergeTracker { + /// Subtask branches that have been successfully merged. + merged_subtasks: HashSet, + /// Subtask branches that were explicitly skipped (with reason). + skipped_subtasks: HashMap, +} + +/// Managed task information. +pub struct ManagedTask { + /// Task ID. + pub id: Uuid, + /// Current state. + pub state: TaskState, + /// Worktree info if created. + pub worktree: Option, + /// Task plan. + pub plan: String, + /// Repository URL or path. + pub repo_source: Option, + /// Base branch. + pub base_branch: Option, + /// Target branch to merge into. + pub target_branch: Option, + /// Parent task ID if this is a subtask. + pub parent_task_id: Option, + /// Depth in task hierarchy (0=top-level, 1=subtask, 2=sub-subtask). + pub depth: i32, + /// Whether this task runs as an orchestrator (coordinates subtasks). + pub is_orchestrator: bool, + /// Path to target repository for completion actions. + pub target_repo_path: Option, + /// Completion action: "none", "branch", "merge", "pr". + pub completion_action: Option, + /// Task ID to continue from (copy worktree from this task). + pub continue_from_task_id: Option, + /// Files to copy from parent task's worktree. + pub copy_files: Option>, + /// Time task was created. + pub created_at: Instant, + /// Time task started running. + pub started_at: Option, + /// Time task completed. + pub completed_at: Option, + /// Error message if failed. + pub error: Option, +} + +/// Configuration for task execution. +#[derive(Clone)] +pub struct TaskConfig { + /// Maximum concurrent tasks. + pub max_concurrent_tasks: u32, + /// Base directory for worktrees. + pub worktree_base_dir: PathBuf, + /// Environment variables to pass to Claude. + pub env_vars: HashMap, + /// Claude command path. + pub claude_command: String, + /// Additional arguments to pass to Claude Code. + pub claude_args: Vec, + /// Arguments to pass before defaults. + pub claude_pre_args: Vec, + /// Enable Claude's permission system. + pub enable_permissions: bool, + /// Disable verbose output. + pub disable_verbose: bool, +} + +impl Default for TaskConfig { + fn default() -> Self { + Self { + max_concurrent_tasks: 4, + worktree_base_dir: WorktreeManager::default_base_dir(), + env_vars: HashMap::new(), + claude_command: "claude".to_string(), + claude_args: Vec::new(), + claude_pre_args: Vec::new(), + enable_permissions: false, + disable_verbose: false, + } + } +} + +/// Task manager for handling task lifecycle. +pub struct TaskManager { + /// Worktree manager. + worktree_manager: Arc, + /// Process manager. + process_manager: Arc, + /// Temp directory manager. + temp_manager: Arc, + /// Task configuration. + #[allow(dead_code)] + config: TaskConfig, + /// Active tasks. + tasks: Arc>>, + /// Channel to send messages to server. + ws_tx: mpsc::Sender, + /// Semaphore for limiting concurrent tasks. + semaphore: Arc, + /// Channels for sending input to running tasks. + /// Each sender allows sending messages to the stdin of a running Claude process. + task_inputs: Arc>>>, + /// Tracks merge state per orchestrator task (for completion gate). + merge_trackers: Arc>>, +} + +impl TaskManager { + /// Create a new task manager. + pub fn new(config: TaskConfig, ws_tx: mpsc::Sender) -> Self { + let max_concurrent = config.max_concurrent_tasks as usize; + let worktree_manager = Arc::new(WorktreeManager::new(config.worktree_base_dir.clone())); + let process_manager = Arc::new( + ProcessManager::with_command(config.claude_command.clone()) + .with_args(config.claude_args.clone()) + .with_pre_args(config.claude_pre_args.clone()) + .with_permissions_enabled(config.enable_permissions) + .with_verbose_disabled(config.disable_verbose) + .with_env(config.env_vars.clone()), + ); + let temp_manager = Arc::new(TempManager::new()); + + Self { + worktree_manager, + process_manager, + temp_manager, + config, + tasks: Arc::new(RwLock::new(HashMap::new())), + ws_tx, + semaphore: Arc::new(Semaphore::new(max_concurrent)), + task_inputs: Arc::new(RwLock::new(HashMap::new())), + merge_trackers: Arc::new(RwLock::new(HashMap::new())), + } + } + + /// Handle a command from the server. + pub async fn handle_command(&self, command: DaemonCommand) -> Result<(), DaemonError> { + tracing::info!("Received command from server: {:?}", command); + + match command { + DaemonCommand::SpawnTask { + task_id, + task_name, + plan, + repo_url, + base_branch, + target_branch, + parent_task_id, + depth, + is_orchestrator, + target_repo_path, + completion_action, + continue_from_task_id, + copy_files, + } => { + tracing::info!( + task_id = %task_id, + task_name = %task_name, + repo_url = ?repo_url, + base_branch = ?base_branch, + target_branch = ?target_branch, + parent_task_id = ?parent_task_id, + depth = depth, + is_orchestrator = is_orchestrator, + target_repo_path = ?target_repo_path, + completion_action = ?completion_action, + continue_from_task_id = ?continue_from_task_id, + copy_files = ?copy_files, + plan_len = plan.len(), + "Spawning new task" + ); + self.spawn_task( + task_id, task_name, plan, repo_url, base_branch, target_branch, + parent_task_id, depth, is_orchestrator, + target_repo_path, completion_action, continue_from_task_id, + copy_files + ).await?; + } + DaemonCommand::PauseTask { task_id } => { + tracing::info!(task_id = %task_id, "Pause not supported for subprocess tasks"); + // Subprocesses don't support pause, just log and ignore + } + DaemonCommand::ResumeTask { task_id } => { + tracing::info!(task_id = %task_id, "Resume not supported for subprocess tasks"); + // Subprocesses don't support resume, just log and ignore + } + DaemonCommand::InterruptTask { task_id, graceful: _ } => { + tracing::info!(task_id = %task_id, "Interrupting task"); + self.interrupt_task(task_id).await?; + } + DaemonCommand::SendMessage { task_id, message } => { + tracing::info!(task_id = %task_id, message_len = message.len(), "Sending message to task"); + // Send message to the task's stdin via the input channel + let inputs = self.task_inputs.read().await; + if let Some(sender) = inputs.get(&task_id) { + if let Err(e) = sender.send(message).await { + tracing::warn!(task_id = %task_id, error = %e, "Failed to send message to task input channel"); + } else { + tracing::info!(task_id = %task_id, "Message sent to task successfully"); + } + } else { + tracing::warn!(task_id = %task_id, "No input channel for task (task may not be running)"); + } + } + DaemonCommand::InjectSiblingContext { task_id, .. } => { + tracing::debug!(task_id = %task_id, "Sibling context injection not supported for subprocess tasks"); + } + DaemonCommand::Authenticated { daemon_id } => { + tracing::debug!(daemon_id = %daemon_id, "Authenticated command (handled by WS client)"); + } + DaemonCommand::Error { code, message } => { + tracing::warn!(code = %code, message = %message, "Error command from server"); + } + + // ========================================================================= + // Merge Commands + // ========================================================================= + + DaemonCommand::ListBranches { task_id } => { + tracing::info!(task_id = %task_id, "Listing task branches"); + self.handle_list_branches(task_id).await?; + } + DaemonCommand::MergeStart { task_id, source_branch } => { + tracing::info!(task_id = %task_id, source_branch = %source_branch, "Starting merge"); + self.handle_merge_start(task_id, source_branch).await?; + } + DaemonCommand::MergeStatus { task_id } => { + tracing::info!(task_id = %task_id, "Getting merge status"); + self.handle_merge_status(task_id).await?; + } + DaemonCommand::MergeResolve { task_id, file, strategy } => { + tracing::info!(task_id = %task_id, file = %file, strategy = %strategy, "Resolving conflict"); + self.handle_merge_resolve(task_id, file, strategy).await?; + } + DaemonCommand::MergeCommit { task_id, message } => { + tracing::info!(task_id = %task_id, "Committing merge"); + self.handle_merge_commit(task_id, message).await?; + } + DaemonCommand::MergeAbort { task_id } => { + tracing::info!(task_id = %task_id, "Aborting merge"); + self.handle_merge_abort(task_id).await?; + } + DaemonCommand::MergeSkip { task_id, subtask_id, reason } => { + tracing::info!(task_id = %task_id, subtask_id = %subtask_id, reason = %reason, "Skipping subtask merge"); + self.handle_merge_skip(task_id, subtask_id, reason).await?; + } + DaemonCommand::CheckMergeComplete { task_id } => { + tracing::info!(task_id = %task_id, "Checking merge completion"); + self.handle_check_merge_complete(task_id).await?; + } + + // ========================================================================= + // Completion Action Commands + // ========================================================================= + + DaemonCommand::RetryCompletionAction { + task_id, + task_name, + action, + target_repo_path, + target_branch, + } => { + tracing::info!( + task_id = %task_id, + task_name = %task_name, + action = %action, + target_repo_path = %target_repo_path, + target_branch = ?target_branch, + "Retrying completion action" + ); + self.handle_retry_completion_action(task_id, task_name, action, target_repo_path, target_branch).await?; + } + + DaemonCommand::CloneWorktree { task_id, target_dir } => { + tracing::info!( + task_id = %task_id, + target_dir = %target_dir, + "Cloning worktree to target directory" + ); + self.handle_clone_worktree(task_id, target_dir).await?; + } + + DaemonCommand::CheckTargetExists { task_id, target_dir } => { + tracing::debug!( + task_id = %task_id, + target_dir = %target_dir, + "Checking if target directory exists" + ); + self.handle_check_target_exists(task_id, target_dir).await?; + } + } + Ok(()) + } + + /// Spawn a new task. + #[allow(clippy::too_many_arguments)] + pub async fn spawn_task( + &self, + task_id: Uuid, + task_name: String, + plan: String, + repo_url: Option, + base_branch: Option, + target_branch: Option, + parent_task_id: Option, + depth: i32, + is_orchestrator: bool, + target_repo_path: Option, + completion_action: Option, + continue_from_task_id: Option, + copy_files: Option>, + ) -> TaskResult<()> { + tracing::info!(task_id = %task_id, is_orchestrator = is_orchestrator, depth = depth, "=== SPAWN_TASK START ==="); + + // Check if task already exists - allow re-spawning if in terminal state + { + let mut tasks = self.tasks.write().await; + if let Some(existing) = tasks.get(&task_id) { + if existing.state.is_terminal() { + // Task exists but is in terminal state (completed, failed, interrupted) + // Remove it so we can re-spawn + tracing::info!(task_id = %task_id, old_state = ?existing.state, "Removing terminated task to allow re-spawn"); + tasks.remove(&task_id); + } else { + // Task is still active, reject + tracing::warn!(task_id = %task_id, state = ?existing.state, "Task already exists and is active, rejecting spawn"); + return Err(TaskError::AlreadyExists(task_id)); + } + } + } + + // Acquire semaphore permit + tracing::info!(task_id = %task_id, "Acquiring concurrency permit..."); + let permit = self + .semaphore + .clone() + .try_acquire_owned() + .map_err(|_| { + tracing::warn!(task_id = %task_id, "Concurrency limit reached, cannot spawn task"); + TaskError::ConcurrencyLimit + })?; + tracing::info!(task_id = %task_id, "Concurrency permit acquired"); + + // Create task entry + tracing::info!(task_id = %task_id, "Creating task entry in state: Initializing"); + let task = ManagedTask { + id: task_id, + state: TaskState::Initializing, + worktree: None, + plan: plan.clone(), + repo_source: repo_url.clone(), + base_branch: base_branch.clone(), + target_branch: target_branch.clone(), + parent_task_id, + depth, + is_orchestrator, + target_repo_path: target_repo_path.clone(), + completion_action: completion_action.clone(), + continue_from_task_id, + copy_files: copy_files.clone(), + created_at: Instant::now(), + started_at: None, + completed_at: None, + error: None, + }; + + self.tasks.write().await.insert(task_id, task); + tracing::info!(task_id = %task_id, "Task entry created and stored"); + + // Notify server of status change + tracing::info!(task_id = %task_id, "Notifying server: pending -> initializing"); + self.send_status_change(task_id, "pending", "initializing").await; + + // Spawn task in background + tracing::info!(task_id = %task_id, "Spawning background task runner"); + let inner = self.clone_inner(); + tokio::spawn(async move { + let _permit = permit; // Hold permit until done + tracing::info!(task_id = %task_id, "Background task runner started"); + + if let Err(e) = inner.run_task( + task_id, task_name, plan, repo_url, base_branch, target_branch, + is_orchestrator, target_repo_path, completion_action, + continue_from_task_id, copy_files + ).await { + tracing::error!(task_id = %task_id, error = %e, "Task execution failed"); + inner.mark_failed(task_id, &e.to_string()).await; + } + tracing::info!(task_id = %task_id, "Background task runner completed"); + }); + + tracing::info!(task_id = %task_id, "=== SPAWN_TASK END (task running in background) ==="); + Ok(()) + } + + /// Clone inner state for spawned tasks. + fn clone_inner(&self) -> TaskManagerInner { + TaskManagerInner { + worktree_manager: self.worktree_manager.clone(), + process_manager: self.process_manager.clone(), + temp_manager: self.temp_manager.clone(), + tasks: self.tasks.clone(), + ws_tx: self.ws_tx.clone(), + task_inputs: self.task_inputs.clone(), + } + } + + /// Interrupt a task. + pub async fn interrupt_task(&self, task_id: Uuid) -> TaskResult<()> { + let mut tasks = self.tasks.write().await; + let task = tasks.get_mut(&task_id).ok_or(TaskError::NotFound(task_id))?; + + if task.state.is_terminal() { + return Ok(()); // Already done + } + + let old_state = task.state; + task.state = TaskState::Interrupted; + task.completed_at = Some(Instant::now()); + + // Notify server + drop(tasks); + self.send_status_change(task_id, old_state.as_str(), "interrupted").await; + + // Note: The process will be killed when the ClaudeProcess is dropped + // Worktrees are kept until explicitly deleted + + Ok(()) + } + + /// Get list of active task IDs. + pub async fn active_task_ids(&self) -> Vec { + self.tasks + .read() + .await + .iter() + .filter(|(_, t)| t.state.is_active()) + .map(|(id, _)| *id) + .collect() + } + + /// Get task state. + pub async fn get_task_state(&self, task_id: Uuid) -> Option { + self.tasks.read().await.get(&task_id).map(|t| t.state) + } + + /// Send status change notification to server. + async fn send_status_change(&self, task_id: Uuid, old_status: &str, new_status: &str) { + let msg = DaemonMessage::task_status_change(task_id, old_status, new_status); + let _ = self.ws_tx.send(msg).await; + } + + // ========================================================================= + // Merge Handler Methods + // ========================================================================= + + /// Get worktree path for a task, or return error if not found. + /// First checks in-memory tasks, then scans the worktrees directory. + async fn get_task_worktree_path(&self, task_id: Uuid) -> Result { + // First try to get from in-memory tasks + { + let tasks = self.tasks.read().await; + if let Some(task) = tasks.get(&task_id) { + if let Some(ref worktree) = task.worktree { + return Ok(worktree.path.clone()); + } + } + } + + // Task not in memory - scan worktrees directory for matching task ID + let short_id = &task_id.to_string()[..8]; + let worktrees_dir = self.worktree_manager.base_dir(); + + if let Ok(mut entries) = tokio::fs::read_dir(worktrees_dir).await { + while let Ok(Some(entry)) = entries.next_entry().await { + let name = entry.file_name(); + let name_str = name.to_string_lossy(); + if name_str.starts_with(short_id) { + let path = entry.path(); + // Verify it's a valid git directory + if path.join(".git").exists() { + tracing::info!( + task_id = %task_id, + worktree_path = %path.display(), + "Found worktree by scanning directory" + ); + return Ok(path); + } + } + } + } + + Err(DaemonError::Task(TaskError::SetupFailed( + format!("No worktree found for task {}. The worktree may have been cleaned up.", task_id) + ))) + } + + /// Handle ListBranches command. + async fn handle_list_branches(&self, task_id: Uuid) -> Result<(), DaemonError> { + let worktree_path = self.get_task_worktree_path(task_id).await?; + + match self.worktree_manager.list_task_branches(&worktree_path).await { + Ok(branches) => { + let branch_infos: Vec = branches + .into_iter() + .map(|b| BranchInfo { + name: b.name, + task_id: b.task_id, + is_merged: b.is_merged, + last_commit: b.last_commit, + last_commit_message: b.last_commit_message, + }) + .collect(); + + let msg = DaemonMessage::BranchList { + task_id, + branches: branch_infos, + }; + let _ = self.ws_tx.send(msg).await; + } + Err(e) => { + tracing::error!(task_id = %task_id, error = %e, "Failed to list branches"); + let msg = DaemonMessage::MergeResult { + task_id, + success: false, + message: e.to_string(), + commit_sha: None, + conflicts: None, + }; + let _ = self.ws_tx.send(msg).await; + } + } + Ok(()) + } + + /// Handle MergeStart command. + async fn handle_merge_start(&self, task_id: Uuid, source_branch: String) -> Result<(), DaemonError> { + let worktree_path = self.get_task_worktree_path(task_id).await?; + + match self.worktree_manager.merge_branch(&worktree_path, &source_branch).await { + Ok(None) => { + // Merge succeeded without conflicts + let msg = DaemonMessage::MergeResult { + task_id, + success: true, + message: "Merge completed without conflicts".to_string(), + commit_sha: None, + conflicts: None, + }; + let _ = self.ws_tx.send(msg).await; + } + Ok(Some(conflicts)) => { + // Merge has conflicts + let msg = DaemonMessage::MergeResult { + task_id, + success: false, + message: format!("Merge has {} conflicts", conflicts.len()), + commit_sha: None, + conflicts: Some(conflicts), + }; + let _ = self.ws_tx.send(msg).await; + } + Err(e) => { + let msg = DaemonMessage::MergeResult { + task_id, + success: false, + message: e.to_string(), + commit_sha: None, + conflicts: None, + }; + let _ = self.ws_tx.send(msg).await; + } + } + Ok(()) + } + + /// Handle MergeStatus command. + async fn handle_merge_status(&self, task_id: Uuid) -> Result<(), DaemonError> { + let worktree_path = self.get_task_worktree_path(task_id).await?; + + match self.worktree_manager.get_merge_state(&worktree_path).await { + Ok(state) => { + let msg = DaemonMessage::MergeStatusResponse { + task_id, + in_progress: state.in_progress, + source_branch: if state.in_progress { Some(state.source_branch) } else { None }, + conflicted_files: state.conflicted_files, + }; + let _ = self.ws_tx.send(msg).await; + } + Err(e) => { + tracing::error!(task_id = %task_id, error = %e, "Failed to get merge status"); + let msg = DaemonMessage::MergeStatusResponse { + task_id, + in_progress: false, + source_branch: None, + conflicted_files: vec![], + }; + let _ = self.ws_tx.send(msg).await; + } + } + Ok(()) + } + + /// Handle MergeResolve command. + async fn handle_merge_resolve(&self, task_id: Uuid, file: String, strategy: String) -> Result<(), DaemonError> { + let worktree_path = self.get_task_worktree_path(task_id).await?; + + let resolution = match strategy.to_lowercase().as_str() { + "ours" => ConflictResolution::Ours, + "theirs" => ConflictResolution::Theirs, + _ => { + let msg = DaemonMessage::MergeResult { + task_id, + success: false, + message: format!("Invalid strategy '{}', must be 'ours' or 'theirs'", strategy), + commit_sha: None, + conflicts: None, + }; + let _ = self.ws_tx.send(msg).await; + return Ok(()); + } + }; + + match self.worktree_manager.resolve_conflict(&worktree_path, &file, resolution).await { + Ok(()) => { + let msg = DaemonMessage::MergeResult { + task_id, + success: true, + message: format!("Resolved conflict in {}", file), + commit_sha: None, + conflicts: None, + }; + let _ = self.ws_tx.send(msg).await; + } + Err(e) => { + let msg = DaemonMessage::MergeResult { + task_id, + success: false, + message: e.to_string(), + commit_sha: None, + conflicts: None, + }; + let _ = self.ws_tx.send(msg).await; + } + } + Ok(()) + } + + /// Handle MergeCommit command. + async fn handle_merge_commit(&self, task_id: Uuid, message: String) -> Result<(), DaemonError> { + let worktree_path = self.get_task_worktree_path(task_id).await?; + + match self.worktree_manager.commit_merge(&worktree_path, &message).await { + Ok(commit_sha) => { + // Track this merge as completed (extract subtask ID from branch if possible) + // For now, we'll track it when MergeSkip is called or based on branch names + let msg = DaemonMessage::MergeResult { + task_id, + success: true, + message: "Merge committed successfully".to_string(), + commit_sha: Some(commit_sha), + conflicts: None, + }; + let _ = self.ws_tx.send(msg).await; + } + Err(e) => { + let msg = DaemonMessage::MergeResult { + task_id, + success: false, + message: e.to_string(), + commit_sha: None, + conflicts: None, + }; + let _ = self.ws_tx.send(msg).await; + } + } + Ok(()) + } + + /// Handle MergeAbort command. + async fn handle_merge_abort(&self, task_id: Uuid) -> Result<(), DaemonError> { + let worktree_path = self.get_task_worktree_path(task_id).await?; + + match self.worktree_manager.abort_merge(&worktree_path).await { + Ok(()) => { + let msg = DaemonMessage::MergeResult { + task_id, + success: true, + message: "Merge aborted".to_string(), + commit_sha: None, + conflicts: None, + }; + let _ = self.ws_tx.send(msg).await; + } + Err(e) => { + let msg = DaemonMessage::MergeResult { + task_id, + success: false, + message: e.to_string(), + commit_sha: None, + conflicts: None, + }; + let _ = self.ws_tx.send(msg).await; + } + } + Ok(()) + } + + /// Handle MergeSkip command. + async fn handle_merge_skip(&self, task_id: Uuid, subtask_id: Uuid, reason: String) -> Result<(), DaemonError> { + // Record that this subtask was skipped + { + let mut trackers = self.merge_trackers.write().await; + let tracker = trackers.entry(task_id).or_insert_with(MergeTracker::default); + tracker.skipped_subtasks.insert(subtask_id, reason.clone()); + } + + let msg = DaemonMessage::MergeResult { + task_id, + success: true, + message: format!("Subtask {} skipped: {}", subtask_id, reason), + commit_sha: None, + conflicts: None, + }; + let _ = self.ws_tx.send(msg).await; + Ok(()) + } + + /// Handle CheckMergeComplete command. + async fn handle_check_merge_complete(&self, task_id: Uuid) -> Result<(), DaemonError> { + let worktree_path = self.get_task_worktree_path(task_id).await?; + + // Get all task branches + let branches = match self.worktree_manager.list_task_branches(&worktree_path).await { + Ok(b) => b, + Err(e) => { + let msg = DaemonMessage::MergeCompleteCheck { + task_id, + can_complete: false, + unmerged_branches: vec![format!("Error listing branches: {}", e)], + merged_count: 0, + skipped_count: 0, + }; + let _ = self.ws_tx.send(msg).await; + return Ok(()); + } + }; + + // Get tracker state + let trackers = self.merge_trackers.read().await; + let empty_merged: HashSet = HashSet::new(); + let empty_skipped: HashMap = HashMap::new(); + let tracker = trackers.get(&task_id); + let merged_set = tracker.map(|t| &t.merged_subtasks).unwrap_or(&empty_merged); + let skipped_set = tracker.map(|t| &t.skipped_subtasks).unwrap_or(&empty_skipped); + + let mut merged_count = 0u32; + let mut skipped_count = 0u32; + let mut unmerged_branches = Vec::new(); + + for branch in &branches { + if branch.is_merged { + merged_count += 1; + } else if let Some(subtask_id) = branch.task_id { + if merged_set.contains(&subtask_id) { + merged_count += 1; + } else if skipped_set.contains_key(&subtask_id) { + skipped_count += 1; + } else { + unmerged_branches.push(branch.name.clone()); + } + } else { + // Branch without task ID - check if it's merged + unmerged_branches.push(branch.name.clone()); + } + } + + let can_complete = unmerged_branches.is_empty(); + + let msg = DaemonMessage::MergeCompleteCheck { + task_id, + can_complete, + unmerged_branches, + merged_count, + skipped_count, + }; + let _ = self.ws_tx.send(msg).await; + Ok(()) + } + + /// Mark a subtask as merged in the tracker. + #[allow(dead_code)] + pub async fn mark_subtask_merged(&self, orchestrator_task_id: Uuid, subtask_id: Uuid) { + let mut trackers = self.merge_trackers.write().await; + let tracker = trackers.entry(orchestrator_task_id).or_insert_with(MergeTracker::default); + tracker.merged_subtasks.insert(subtask_id); + } + + // ========================================================================= + // Completion Action Handler Methods + // ========================================================================= + + /// Handle RetryCompletionAction command. + async fn handle_retry_completion_action( + &self, + task_id: Uuid, + task_name: String, + action: String, + target_repo_path: String, + target_branch: Option, + ) -> Result<(), DaemonError> { + // Get the task's worktree path + let worktree_path = self.get_task_worktree_path(task_id).await?; + + // Execute the completion action + let inner = self.clone_inner(); + let result = inner.execute_completion_action( + task_id, + &task_name, + &worktree_path, + &action, + Some(target_repo_path.as_str()), + target_branch.as_deref(), + ).await; + + // Send result back to server + let msg = match result { + Ok(pr_url) => DaemonMessage::CompletionActionResult { + task_id, + success: true, + message: match action.as_str() { + "branch" => format!("Branch pushed to {}", target_repo_path), + "merge" => format!("Merged into {}", target_branch.as_deref().unwrap_or("main")), + "pr" => format!("Pull request created"), + _ => format!("Completion action '{}' executed", action), + }, + pr_url, + }, + Err(e) => DaemonMessage::CompletionActionResult { + task_id, + success: false, + message: e, + pr_url: None, + }, + }; + let _ = self.ws_tx.send(msg).await; + Ok(()) + } + + /// Handle CloneWorktree command. + async fn handle_clone_worktree( + &self, + task_id: Uuid, + target_dir: String, + ) -> Result<(), DaemonError> { + // Get the task's worktree path + let worktree_path = self.get_task_worktree_path(task_id).await?; + + // Expand tilde in target path + let target_path = crate::worktree::expand_tilde(&target_dir); + + // Clone the worktree to target directory + let result = self.worktree_manager.clone_worktree_to_directory( + &worktree_path, + &target_path, + ).await; + + // Send result back to server + let msg = match result { + Ok(message) => DaemonMessage::CloneWorktreeResult { + task_id, + success: true, + message, + target_dir: Some(target_path.to_string_lossy().to_string()), + }, + Err(e) => DaemonMessage::CloneWorktreeResult { + task_id, + success: false, + message: e.to_string(), + target_dir: None, + }, + }; + let _ = self.ws_tx.send(msg).await; + Ok(()) + } + + /// Handle CheckTargetExists command. + async fn handle_check_target_exists( + &self, + task_id: Uuid, + target_dir: String, + ) -> Result<(), DaemonError> { + // Expand tilde in target path + let target_path = crate::worktree::expand_tilde(&target_dir); + + // Check if target exists + let exists = self.worktree_manager.target_directory_exists(&target_path).await; + + // Send result back to server + let msg = DaemonMessage::CheckTargetExistsResult { + task_id, + exists, + target_dir: target_path.to_string_lossy().to_string(), + }; + let _ = self.ws_tx.send(msg).await; + Ok(()) + } +} + +/// Inner state for spawned tasks (cloneable). +struct TaskManagerInner { + worktree_manager: Arc, + process_manager: Arc, + temp_manager: Arc, + tasks: Arc>>, + ws_tx: mpsc::Sender, + task_inputs: Arc>>>, +} + +impl TaskManagerInner { + /// Run a task to completion. + #[allow(clippy::too_many_arguments)] + async fn run_task( + &self, + task_id: Uuid, + task_name: String, + plan: String, + repo_source: Option, + base_branch: Option, + target_branch: Option, + is_orchestrator: bool, + target_repo_path: Option, + completion_action: Option, + continue_from_task_id: Option, + copy_files: Option>, + ) -> Result<(), DaemonError> { + tracing::info!(task_id = %task_id, is_orchestrator = is_orchestrator, "=== RUN_TASK START ==="); + + // Determine working directory + let working_dir = if let Some(ref source) = repo_source { + if is_new_repo_request(source) { + // Explicit new repo request: new:// or new://project-name + tracing::info!( + task_id = %task_id, + source = %source, + "Creating new git repository" + ); + + let msg = DaemonMessage::task_output( + task_id, + format!("Initializing new git repository...\n"), + false, + ); + let _ = self.ws_tx.send(msg).await; + + let worktree_info = self.worktree_manager + .init_new_repo(task_id, source) + .await + .map_err(|e| DaemonError::Task(TaskError::SetupFailed(e.to_string())))?; + + tracing::info!( + task_id = %task_id, + path = %worktree_info.path.display(), + "New repository created" + ); + + // Store worktree info + { + let mut tasks = self.tasks.write().await; + if let Some(task) = tasks.get_mut(&task_id) { + task.worktree = Some(worktree_info.clone()); + } + } + + let msg = DaemonMessage::task_output( + task_id, + format!("Repository ready at {}\n", worktree_info.path.display()), + false, + ); + let _ = self.ws_tx.send(msg).await; + + worktree_info.path + } else { + // Send progress message + let msg = DaemonMessage::task_output( + task_id, + format!("Setting up worktree from {}...\n", source), + false, + ); + let _ = self.ws_tx.send(msg).await; + + // Ensure source repo exists (clone if URL, verify if path) + let source_repo = self.worktree_manager.ensure_repo(source).await + .map_err(|e| DaemonError::Task(TaskError::SetupFailed(e.to_string())))?; + + // Detect or use provided base branch + let branch = if let Some(ref b) = base_branch { + b.clone() + } else { + self.worktree_manager.detect_default_branch(&source_repo).await + .map_err(|e| DaemonError::Task(TaskError::SetupFailed(e.to_string())))? + }; + + tracing::info!( + task_id = %task_id, + source = %source, + branch = %branch, + continue_from_task_id = ?continue_from_task_id, + "Setting up worktree" + ); + + // Create worktree - either from scratch or copying from another task + let task_name = format!("task-{}", &task_id.to_string()[..8]); + let worktree_info = if let Some(from_task_id) = continue_from_task_id { + // Find the source task's worktree path + let source_worktree = self.find_worktree_for_task(from_task_id).await + .map_err(|e| DaemonError::Task(TaskError::SetupFailed( + format!("Cannot continue from task {}: {}", from_task_id, e) + )))?; + + let msg = DaemonMessage::task_output( + task_id, + format!("Continuing from task {} worktree...\n", &from_task_id.to_string()[..8]), + false, + ); + let _ = self.ws_tx.send(msg).await; + + // Create worktree by copying from source task + self.worktree_manager + .create_worktree_from_task(&source_worktree, task_id, &task_name) + .await + .map_err(|e| DaemonError::Task(TaskError::SetupFailed(e.to_string())))? + } else { + // Create fresh worktree from repo + self.worktree_manager + .create_worktree(&source_repo, task_id, &task_name, &branch) + .await + .map_err(|e| DaemonError::Task(TaskError::SetupFailed(e.to_string())))? + }; + + tracing::info!( + task_id = %task_id, + worktree_path = %worktree_info.path.display(), + branch = %worktree_info.branch, + continued_from = ?continue_from_task_id, + "Worktree created" + ); + + // Store worktree info + { + let mut tasks = self.tasks.write().await; + if let Some(task) = tasks.get_mut(&task_id) { + task.worktree = Some(worktree_info.clone()); + } + } + + let msg = DaemonMessage::task_output( + task_id, + format!("Worktree ready at {}\n", worktree_info.path.display()), + false, + ); + let _ = self.ws_tx.send(msg).await; + + worktree_info.path + } + } else { + // No repo specified - use managed temp directory in ~/.makima/temp/ + tracing::info!(task_id = %task_id, "Creating managed temp directory (no repo)"); + + let msg = DaemonMessage::task_output( + task_id, + "Creating temporary working directory...\n".to_string(), + false, + ); + let _ = self.ws_tx.send(msg).await; + + let temp_dir = self.temp_manager.create_task_dir(task_id).await?; + + let msg = DaemonMessage::task_output( + task_id, + format!("Working directory ready at {}\n", temp_dir.display()), + false, + ); + let _ = self.ws_tx.send(msg).await; + + temp_dir + }; + + // Copy files from parent task's worktree if specified + if let Some(ref files) = copy_files { + if !files.is_empty() { + // Get the parent task ID to find its worktree + let parent_task_id = { + let tasks = self.tasks.read().await; + tasks.get(&task_id).and_then(|t| t.parent_task_id) + }; + + if let Some(parent_id) = parent_task_id { + match self.find_worktree_for_task(parent_id).await { + Ok(parent_worktree) => { + let msg = DaemonMessage::task_output( + task_id, + format!("Copying {} files from orchestrator...\n", files.len()), + false, + ); + let _ = self.ws_tx.send(msg).await; + + for file_path in files { + let source = parent_worktree.join(file_path); + let dest = working_dir.join(file_path); + + // Create parent directories if needed + if let Some(parent) = dest.parent() { + if let Err(e) = tokio::fs::create_dir_all(parent).await { + tracing::warn!( + task_id = %task_id, + file = %file_path, + error = %e, + "Failed to create parent directory for file" + ); + continue; + } + } + + // Copy the file + match tokio::fs::copy(&source, &dest).await { + Ok(_) => { + tracing::info!( + task_id = %task_id, + source = %source.display(), + dest = %dest.display(), + "Copied file from orchestrator" + ); + } + Err(e) => { + tracing::warn!( + task_id = %task_id, + source = %source.display(), + dest = %dest.display(), + error = %e, + "Failed to copy file from orchestrator" + ); + // Notify but don't fail - the file might be optional + let msg = DaemonMessage::task_output( + task_id, + format!("Warning: Could not copy {}: {}\n", file_path, e), + false, + ); + let _ = self.ws_tx.send(msg).await; + } + } + } + + let msg = DaemonMessage::task_output( + task_id, + "Files copied from orchestrator.\n".to_string(), + false, + ); + let _ = self.ws_tx.send(msg).await; + } + Err(e) => { + tracing::warn!( + task_id = %task_id, + parent_id = %parent_id, + error = %e, + "Could not find parent task worktree for file copying" + ); + } + } + } else { + tracing::warn!( + task_id = %task_id, + "copy_files specified but no parent_task_id" + ); + } + } + } + + // Update state to Starting + tracing::info!(task_id = %task_id, "Updating state: Initializing -> Starting"); + self.update_state(task_id, TaskState::Starting).await; + self.send_status_change(task_id, "initializing", "starting").await; + + // Check Claude is available + match self.process_manager.check_claude_available().await { + Ok(version) => { + tracing::info!(task_id = %task_id, version = %version, "Claude Code available"); + let msg = DaemonMessage::task_output( + task_id, + format!("Claude Code {} ready\n", version), + false, + ); + let _ = self.ws_tx.send(msg).await; + } + Err(e) => { + let err_msg = format!("Claude Code not available: {}", e); + tracing::error!(task_id = %task_id, error = %err_msg); + return Err(DaemonError::Task(TaskError::SetupFailed(err_msg))); + } + } + + // Set up orchestrator mode if needed + let (extra_env, full_plan) = if is_orchestrator { + tracing::info!(task_id = %task_id, working_dir = %working_dir.display(), "Setting up orchestrator mode"); + + let msg = DaemonMessage::task_output( + task_id, + "Setting up orchestrator environment...\n".to_string(), + false, + ); + let _ = self.ws_tx.send(msg).await; + + // Generate tool key for API access + let tool_key = generate_tool_key(); + tracing::info!(task_id = %task_id, tool_key_len = tool_key.len(), "Generated tool key for orchestrator"); + + // Register tool key with server + let register_msg = DaemonMessage::register_tool_key(task_id, tool_key.clone()); + if self.ws_tx.send(register_msg).await.is_err() { + tracing::warn!(task_id = %task_id, "Failed to register tool key"); + } else { + tracing::info!(task_id = %task_id, "Tool key registration message sent to server"); + } + + // Create .makima directory and helper script + let makima_dir = working_dir.join(".makima"); + if let Err(e) = tokio::fs::create_dir_all(&makima_dir).await { + tracing::warn!(task_id = %task_id, makima_dir = %makima_dir.display(), "Failed to create .makima directory: {}", e); + } else { + tracing::info!(task_id = %task_id, makima_dir = %makima_dir.display(), "Created .makima directory"); + } + + let script_path = makima_dir.join("orchestrate.sh"); + if let Err(e) = tokio::fs::write(&script_path, ORCHESTRATE_SCRIPT).await { + tracing::warn!(task_id = %task_id, script_path = %script_path.display(), "Failed to write orchestrate.sh: {}", e); + } else { + tracing::info!(task_id = %task_id, script_path = %script_path.display(), script_size = ORCHESTRATE_SCRIPT.len(), "Wrote orchestrate.sh"); + // Make script executable + #[cfg(unix)] + { + use std::os::unix::fs::PermissionsExt; + if let Err(e) = std::fs::set_permissions(&script_path, std::fs::Permissions::from_mode(0o755)) { + tracing::warn!(task_id = %task_id, "Failed to set script permissions: {}", e); + } else { + tracing::info!(task_id = %task_id, "Set orchestrate.sh executable (0o755)"); + } + } + } + + // Set up environment variables + let mut env = HashMap::new(); + // TODO: Make API URL configurable + env.insert("MAKIMA_API_URL".to_string(), "http://localhost:8080".to_string()); + env.insert("MAKIMA_API_KEY".to_string(), tool_key.clone()); + env.insert("MAKIMA_TASK_ID".to_string(), task_id.to_string()); + + tracing::info!( + task_id = %task_id, + api_url = "http://localhost:8080", + tool_key_preview = &tool_key[..8.min(tool_key.len())], + "Set orchestrator environment variables" + ); + + // Prepend orchestrator instructions to the plan + let orchestrator_plan = format!( + "{}\n\n---\n\nYour task:\n{}", + ORCHESTRATOR_SYSTEM_PROMPT, + plan + ); + + let msg = DaemonMessage::task_output( + task_id, + format!("Orchestrator environment ready (script at {})\n", script_path.display()), + false, + ); + let _ = self.ws_tx.send(msg).await; + + (Some(env), orchestrator_plan) + } else { + tracing::info!(task_id = %task_id, "Running as regular subtask (not orchestrator)"); + // Prepend subtask instructions to ensure worktree isolation + let subtask_plan = format!( + "{}\nYour task:\n{}", + SUBTASK_SYSTEM_PROMPT, + plan + ); + (None, subtask_plan) + }; + + // Spawn Claude process + let plan_bytes = full_plan.len(); + let plan_chars = full_plan.chars().count(); + // Rough token estimate: ~4 chars per token for English + let estimated_tokens = plan_chars / 4; + + tracing::info!( + task_id = %task_id, + working_dir = %working_dir.display(), + is_orchestrator = is_orchestrator, + plan_bytes = plan_bytes, + plan_chars = plan_chars, + estimated_tokens = estimated_tokens, + "Spawning Claude process" + ); + + // Warn if plan is very large (Claude's context is typically 100k-200k tokens) + if estimated_tokens > 50_000 { + tracing::warn!(task_id = %task_id, estimated_tokens = estimated_tokens, "Plan is very large - may hit context limits!"); + let msg = DaemonMessage::task_output( + task_id, + format!("Warning: Plan is very large (~{} tokens). This may cause issues.\n", estimated_tokens), + false, + ); + let _ = self.ws_tx.send(msg).await; + } + + let msg = DaemonMessage::task_output( + task_id, + if is_orchestrator { + format!("Starting Claude Code (orchestrator mode, ~{} tokens)...\n", estimated_tokens) + } else { + format!("Starting Claude Code (~{} tokens)...\n", estimated_tokens) + }, + false, + ); + let _ = self.ws_tx.send(msg).await; + + tracing::debug!(task_id = %task_id, "Calling process_manager.spawn()..."); + let mut process = self.process_manager + .spawn(&working_dir, &full_plan, extra_env) + .await + .map_err(|e| { + tracing::error!(task_id = %task_id, error = %e, "Failed to spawn Claude process"); + DaemonError::Task(TaskError::SetupFailed(e.to_string())) + })?; + tracing::info!(task_id = %task_id, "Claude process spawned successfully"); + + // Set up input channel for this task so we can send messages to its stdin + tracing::debug!(task_id = %task_id, "Setting up input channel..."); + let (input_tx, mut input_rx) = mpsc::channel::(100); + tracing::debug!(task_id = %task_id, "Acquiring task_inputs write lock..."); + self.task_inputs.write().await.insert(task_id, input_tx); + tracing::debug!(task_id = %task_id, "Input channel registered"); + + // Get stdin handle for input forwarding and completion signaling + let stdin_handle = process.stdin_handle(); + let stdin_handle_for_completion = stdin_handle.clone(); + + tracing::info!(task_id = %task_id, "Setting up stdin forwarder for task input (JSON protocol)"); + tokio::spawn(async move { + tracing::info!(task_id = %task_id, "Stdin forwarder task started, waiting for messages..."); + while let Some(msg) = input_rx.recv().await { + tracing::info!(task_id = %task_id, msg_len = msg.len(), msg_preview = %if msg.len() > 50 { &msg[..50] } else { &msg }, "Received message from input channel"); + + // Format as JSON user message for stream-json input protocol + let json_msg = ClaudeInputMessage::user(&msg); + let json_line = match json_msg.to_json_line() { + Ok(line) => line, + Err(e) => { + tracing::error!(task_id = %task_id, error = %e, "Failed to serialize input message"); + continue; + } + }; + + tracing::debug!(task_id = %task_id, json_line = %json_line.trim(), "Formatted JSON line for stdin"); + + let mut stdin_guard = stdin_handle.lock().await; + if let Some(ref mut stdin) = *stdin_guard { + tracing::debug!(task_id = %task_id, "Acquired stdin lock, writing..."); + if stdin.write_all(json_line.as_bytes()).await.is_err() { + tracing::warn!(task_id = %task_id, "Failed to write to stdin, breaking"); + break; + } + if stdin.flush().await.is_err() { + tracing::warn!(task_id = %task_id, "Failed to flush stdin, breaking"); + break; + } + tracing::info!(task_id = %task_id, json_len = json_line.len(), "Successfully wrote user message to Claude stdin"); + } else { + tracing::warn!(task_id = %task_id, "Stdin is None (already closed), cannot send message"); + break; + } + } + tracing::info!(task_id = %task_id, "Stdin forwarder task ended (channel closed or stdin unavailable)"); + }); + + // Update state to Running + { + tracing::debug!(task_id = %task_id, "Acquiring tasks write lock for Running state update"); + let mut tasks = self.tasks.write().await; + if let Some(task) = tasks.get_mut(&task_id) { + task.state = TaskState::Running; + task.started_at = Some(Instant::now()); + } + tracing::debug!(task_id = %task_id, "Released tasks write lock"); + } + tracing::info!(task_id = %task_id, "Updating state: Starting -> Running"); + self.send_status_change(task_id, "starting", "running").await; + tracing::debug!(task_id = %task_id, "Sent status change notification"); + + // Stream output with startup timeout check + tracing::info!(task_id = %task_id, "Starting output stream - waiting for Claude output..."); + tracing::debug!(task_id = %task_id, "Output will be forwarded via WebSocket to server"); + let ws_tx = self.ws_tx.clone(); + + let mut output_count = 0u64; + let mut output_bytes = 0usize; + let startup_timeout = tokio::time::Duration::from_secs(30); + let mut startup_check = tokio::time::interval(tokio::time::Duration::from_secs(5)); + startup_check.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); + let startup_deadline = tokio::time::Instant::now() + startup_timeout; + + loop { + tokio::select! { + maybe_line = process.next_output() => { + match maybe_line { + Some(line) => { + output_count += 1; + output_bytes += line.content.len(); + + if output_count == 1 { + tracing::info!(task_id = %task_id, "Received first output line from Claude"); + } + if output_count % 100 == 0 { + tracing::debug!(task_id = %task_id, output_count = output_count, output_bytes = output_bytes, "Output progress"); + } + + // Log output details for debugging + tracing::trace!( + task_id = %task_id, + line_num = output_count, + content_len = line.content.len(), + is_stdout = line.is_stdout, + json_type = ?line.json_type, + "Forwarding output to WebSocket" + ); + + // Check if this is a "result" message indicating task completion + // With --input-format=stream-json, Claude waits for more input after completion + // We close stdin to signal EOF and let the process exit + if line.json_type.as_deref() == Some("result") { + tracing::info!(task_id = %task_id, "Received result message, closing stdin to signal completion"); + let mut stdin_guard = stdin_handle_for_completion.lock().await; + if let Some(mut stdin) = stdin_guard.take() { + let _ = stdin.shutdown().await; + } + } + + let msg = DaemonMessage::task_output(task_id, line.content, false); + if ws_tx.send(msg).await.is_err() { + tracing::warn!(task_id = %task_id, "Failed to send output, channel closed"); + break; + } + } + None => { + tracing::info!(task_id = %task_id, output_count = output_count, output_bytes = output_bytes, "Output stream ended"); + break; + } + } + } + _ = startup_check.tick(), if output_count == 0 => { + // Check if process is still alive + match process.try_wait() { + Ok(Some(exit_code)) => { + tracing::error!(task_id = %task_id, exit_code = exit_code, "Claude process exited before producing output!"); + let msg = DaemonMessage::task_output( + task_id, + format!("Error: Claude process exited unexpectedly with code {}\n", exit_code), + false, + ); + let _ = ws_tx.send(msg).await; + break; + } + Ok(None) => { + // Still running but no output + if tokio::time::Instant::now() > startup_deadline { + tracing::warn!(task_id = %task_id, "Claude process not producing output after 30s - may be stuck"); + let msg = DaemonMessage::task_output( + task_id, + "Warning: Claude Code is taking longer than expected to start. It may be waiting for authentication or network access.\n".to_string(), + false, + ); + let _ = ws_tx.send(msg).await; + } else { + tracing::debug!(task_id = %task_id, "Claude process still running, waiting for output..."); + } + } + Err(e) => { + tracing::error!(task_id = %task_id, error = %e, "Failed to check Claude process status"); + } + } + } + } + } + + // Wait for process to exit + let exit_code = process.wait().await.unwrap_or(-1); + + // Clean up input channel for this task + self.task_inputs.write().await.remove(&task_id); + tracing::debug!(task_id = %task_id, "Removed task input channel"); + + // Update state based on exit code + let success = exit_code == 0; + let new_state = if success { + TaskState::Completed + } else { + TaskState::Failed + }; + + tracing::info!( + task_id = %task_id, + exit_code = exit_code, + success = success, + new_state = ?new_state, + "Claude process exited, updating task state" + ); + + { + let mut tasks = self.tasks.write().await; + if let Some(task) = tasks.get_mut(&task_id) { + task.state = new_state; + task.completed_at = Some(Instant::now()); + if !success { + task.error = Some(format!("Process exited with code {}", exit_code)); + } + } + } + + // Execute completion action if task succeeded + let completion_result = if success { + if let Some(ref action) = completion_action { + if action != "none" { + self.execute_completion_action( + task_id, + &task_name, + &working_dir, + action, + target_repo_path.as_deref(), + target_branch.as_deref(), + ).await + } else { + Ok(None) + } + } else { + Ok(None) + } + } else { + Ok(None) + }; + + // Log completion action result + match &completion_result { + Ok(Some(pr_url)) => { + tracing::info!(task_id = %task_id, pr_url = %pr_url, "Completion action created PR"); + } + Ok(None) => {} + Err(e) => { + tracing::warn!(task_id = %task_id, error = %e, "Completion action failed (task still marked as done)"); + } + } + + // Notify server + let error = if success { + None + } else { + Some(format!("Exit code: {}", exit_code)) + }; + tracing::info!(task_id = %task_id, success = success, "Notifying server of task completion"); + let msg = DaemonMessage::task_complete(task_id, success, error); + let _ = self.ws_tx.send(msg).await; + + // Note: Worktrees are kept until explicitly deleted (per user preference) + // This allows inspection, PR creation, etc. + + tracing::info!(task_id = %task_id, "=== RUN_TASK END ==="); + Ok(()) + } + + /// Execute the completion action for a task. + async fn execute_completion_action( + &self, + task_id: Uuid, + task_name: &str, + worktree_path: &std::path::Path, + action: &str, + target_repo_path: Option<&str>, + target_branch: Option<&str>, + ) -> Result, String> { + let target_repo = match target_repo_path { + Some(path) => crate::worktree::expand_tilde(path), + None => { + tracing::warn!(task_id = %task_id, "No target_repo_path configured, skipping completion action"); + return Ok(None); + } + }; + + if !target_repo.exists() { + return Err(format!("Target repo not found: {} (expanded from {:?})", target_repo.display(), target_repo_path)); + } + + // Get the branch name: makima/{task-name-with-dashes}-{short-id} + let branch_name = format!( + "makima/{}-{}", + crate::worktree::sanitize_name(task_name), + crate::worktree::short_uuid(task_id) + ); + + // Determine target branch - use provided value or detect default branch of target repo + let target_branch = match target_branch { + Some(branch) => branch.to_string(), + None => { + // Detect default branch (main, master, develop, etc.) + self.worktree_manager + .detect_default_branch(&target_repo) + .await + .unwrap_or_else(|_| "master".to_string()) + } + }; + + let msg = DaemonMessage::task_output( + task_id, + format!("Executing completion action: {}...\n", action), + false, + ); + let _ = self.ws_tx.send(msg).await; + + match action { + "branch" => { + // Just push the branch to target repo + self.worktree_manager + .push_to_target_repo(worktree_path, &target_repo, &branch_name, task_name) + .await + .map_err(|e| e.to_string())?; + + let msg = DaemonMessage::task_output( + task_id, + format!("Branch '{}' pushed to {}\n", branch_name, target_repo.display()), + false, + ); + let _ = self.ws_tx.send(msg).await; + Ok(None) + } + "merge" => { + // Push and merge into target branch + let commit_sha = self.worktree_manager + .merge_to_target(worktree_path, &target_repo, &branch_name, &target_branch, task_name) + .await + .map_err(|e| e.to_string())?; + + let msg = DaemonMessage::task_output( + task_id, + format!("Branch merged into {} (commit: {})\n", target_branch, commit_sha), + false, + ); + let _ = self.ws_tx.send(msg).await; + Ok(None) + } + "pr" => { + // Push and create PR + let title = task_name.to_string(); + let body = format!( + "Automated PR from makima task.\n\nTask ID: `{}`", + task_id + ); + let pr_url = self.worktree_manager + .create_pull_request( + worktree_path, + &target_repo, + &branch_name, + &target_branch, + &title, + &body, + ) + .await + .map_err(|e| e.to_string())?; + + let msg = DaemonMessage::task_output( + task_id, + format!("Pull request created: {}\n", pr_url), + false, + ); + let _ = self.ws_tx.send(msg).await; + Ok(Some(pr_url)) + } + _ => { + tracing::warn!(task_id = %task_id, action = %action, "Unknown completion action"); + Ok(None) + } + } + } + + /// Find worktree path for a task ID. + /// First checks in-memory tasks, then scans the worktrees directory. + async fn find_worktree_for_task(&self, task_id: Uuid) -> Result { + // First try to get from in-memory tasks + { + let tasks = self.tasks.read().await; + if let Some(task) = tasks.get(&task_id) { + if let Some(ref worktree) = task.worktree { + return Ok(worktree.path.clone()); + } + } + } + + // Task not in memory - scan worktrees directory for matching task ID + let short_id = &task_id.to_string()[..8]; + let worktrees_dir = self.worktree_manager.base_dir(); + + if let Ok(mut entries) = tokio::fs::read_dir(worktrees_dir).await { + while let Ok(Some(entry)) = entries.next_entry().await { + let name = entry.file_name(); + let name_str = name.to_string_lossy(); + if name_str.starts_with(short_id) { + let path = entry.path(); + // Verify it's a valid git directory + if path.join(".git").exists() { + tracing::info!( + task_id = %task_id, + worktree_path = %path.display(), + "Found worktree by scanning directory" + ); + return Ok(path); + } + } + } + } + + Err(format!( + "No worktree found for task {}. The worktree may have been cleaned up.", + task_id + )) + } + + async fn update_state(&self, task_id: Uuid, state: TaskState) { + let mut tasks = self.tasks.write().await; + if let Some(task) = tasks.get_mut(&task_id) { + task.state = state; + } + } + + async fn send_status_change(&self, task_id: Uuid, old_status: &str, new_status: &str) { + let msg = DaemonMessage::task_status_change(task_id, old_status, new_status); + let _ = self.ws_tx.send(msg).await; + } + + /// Mark task as failed. + async fn mark_failed(&self, task_id: Uuid, error: &str) { + { + let mut tasks = self.tasks.write().await; + if let Some(task) = tasks.get_mut(&task_id) { + task.state = TaskState::Failed; + task.error = Some(error.to_string()); + task.completed_at = Some(Instant::now()); + } + } + + // Notify server + let msg = DaemonMessage::task_complete(task_id, false, Some(error.to_string())); + let _ = self.ws_tx.send(msg).await; + } +} + +impl Clone for TaskManagerInner { + fn clone(&self) -> Self { + Self { + worktree_manager: self.worktree_manager.clone(), + process_manager: self.process_manager.clone(), + temp_manager: self.temp_manager.clone(), + tasks: self.tasks.clone(), + ws_tx: self.ws_tx.clone(), + task_inputs: self.task_inputs.clone(), + } + } +} diff --git a/makima/daemon/src/task/mod.rs b/makima/daemon/src/task/mod.rs new file mode 100644 index 0000000..29c261e --- /dev/null +++ b/makima/daemon/src/task/mod.rs @@ -0,0 +1,7 @@ +//! Task management and execution. + +pub mod manager; +pub mod state; + +pub use manager::{ManagedTask, TaskConfig, TaskManager}; +pub use state::TaskState; diff --git a/makima/daemon/src/task/state.rs b/makima/daemon/src/task/state.rs new file mode 100644 index 0000000..fe73de1 --- /dev/null +++ b/makima/daemon/src/task/state.rs @@ -0,0 +1,161 @@ +//! Task state machine. + +use std::fmt; + +/// Task execution state. +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +pub enum TaskState { + /// Task received, preparing overlay. + Initializing, + /// Overlay ready, starting container. + Starting, + /// Container running. + Running, + /// Container paused. + Paused, + /// Waiting for sibling or resource. + Blocked, + /// Task completed successfully. + Completed, + /// Task failed with error. + Failed, + /// Task interrupted by user. + Interrupted, +} + +impl TaskState { + /// Check if a state transition is valid. + pub fn can_transition_to(&self, target: TaskState) -> bool { + use TaskState::*; + + matches!( + (self, target), + // From Initializing + (Initializing, Starting) + | (Initializing, Failed) + | (Initializing, Interrupted) + // From Starting + | (Starting, Running) + | (Starting, Failed) + | (Starting, Interrupted) + // From Running + | (Running, Paused) + | (Running, Blocked) + | (Running, Completed) + | (Running, Failed) + | (Running, Interrupted) + // From Paused + | (Paused, Running) + | (Paused, Interrupted) + | (Paused, Failed) + // From Blocked + | (Blocked, Running) + | (Blocked, Failed) + | (Blocked, Interrupted) + ) + } + + /// Check if this state is terminal (no more transitions possible). + pub fn is_terminal(&self) -> bool { + matches!( + self, + TaskState::Completed | TaskState::Failed | TaskState::Interrupted + ) + } + + /// Check if the task is currently active (running or paused). + pub fn is_active(&self) -> bool { + matches!( + self, + TaskState::Initializing + | TaskState::Starting + | TaskState::Running + | TaskState::Paused + | TaskState::Blocked + ) + } + + /// Check if the task is running. + pub fn is_running(&self) -> bool { + matches!(self, TaskState::Running) + } + + /// Convert to string for protocol messages. + pub fn as_str(&self) -> &'static str { + match self { + TaskState::Initializing => "initializing", + TaskState::Starting => "starting", + TaskState::Running => "running", + TaskState::Paused => "paused", + TaskState::Blocked => "blocked", + TaskState::Completed => "done", + TaskState::Failed => "failed", + TaskState::Interrupted => "interrupted", + } + } + + /// Parse from string. + pub fn from_str(s: &str) -> Option { + match s.to_lowercase().as_str() { + "initializing" => Some(TaskState::Initializing), + "starting" => Some(TaskState::Starting), + "running" => Some(TaskState::Running), + "paused" => Some(TaskState::Paused), + "blocked" => Some(TaskState::Blocked), + "done" | "completed" => Some(TaskState::Completed), + "failed" => Some(TaskState::Failed), + "interrupted" => Some(TaskState::Interrupted), + _ => None, + } + } +} + +impl fmt::Display for TaskState { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "{}", self.as_str()) + } +} + +impl Default for TaskState { + fn default() -> Self { + TaskState::Initializing + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_valid_transitions() { + use TaskState::*; + + // Valid transitions + assert!(Initializing.can_transition_to(Starting)); + assert!(Starting.can_transition_to(Running)); + assert!(Running.can_transition_to(Completed)); + assert!(Running.can_transition_to(Paused)); + assert!(Paused.can_transition_to(Running)); + + // Invalid transitions + assert!(!Completed.can_transition_to(Running)); + assert!(!Failed.can_transition_to(Running)); + assert!(!Running.can_transition_to(Initializing)); + } + + #[test] + fn test_terminal_states() { + assert!(TaskState::Completed.is_terminal()); + assert!(TaskState::Failed.is_terminal()); + assert!(TaskState::Interrupted.is_terminal()); + assert!(!TaskState::Running.is_terminal()); + assert!(!TaskState::Paused.is_terminal()); + } + + #[test] + fn test_parse() { + assert_eq!(TaskState::from_str("running"), Some(TaskState::Running)); + assert_eq!(TaskState::from_str("done"), Some(TaskState::Completed)); + assert_eq!(TaskState::from_str("invalid"), None); + } +} diff --git a/makima/daemon/src/temp.rs b/makima/daemon/src/temp.rs new file mode 100644 index 0000000..015b21b --- /dev/null +++ b/makima/daemon/src/temp.rs @@ -0,0 +1,224 @@ +//! Managed temporary directory for tasks without repositories. +//! +//! Tasks that don't have a repository URL and aren't subtasks (which inherit +//! from parent) use a managed temp directory in ~/.makima/temp/. The directory +//! is automatically cleaned up when it exceeds a size limit. + +use std::path::PathBuf; + +use tokio::fs; +use uuid::Uuid; + +/// Maximum size of the temp directory before cleanup (5GB). +const MAX_TEMP_SIZE_BYTES: u64 = 5 * 1024 * 1024 * 1024; + +/// Manages temporary directories for tasks without repositories. +pub struct TempManager { + /// Base directory for temp task directories (~/.makima/temp/). + temp_dir: PathBuf, +} + +impl TempManager { + /// Create a new TempManager. + pub fn new() -> Self { + let home = dirs::home_dir().unwrap_or_else(|| PathBuf::from(".")); + Self { + temp_dir: home.join(".makima").join("temp"), + } + } + + /// Create a new TempManager with a custom base directory. + #[allow(dead_code)] + pub fn with_base_dir(base_dir: PathBuf) -> Self { + Self { temp_dir: base_dir } + } + + /// Get the base temp directory path. + pub fn temp_dir(&self) -> &PathBuf { + &self.temp_dir + } + + /// Create a temp directory for a task. + /// + /// This creates a directory at ~/.makima/temp/task-{id}/ and triggers + /// cleanup if the total size exceeds the limit. + pub async fn create_task_dir(&self, task_id: Uuid) -> Result { + // Ensure base directory exists + fs::create_dir_all(&self.temp_dir).await?; + + // Check size and cleanup if needed + if let Err(e) = self.cleanup_if_needed().await { + tracing::warn!("Temp directory cleanup failed: {}", e); + // Continue anyway, cleanup is best-effort + } + + // Create task-specific directory + let task_dir = self.temp_dir.join(format!("task-{}", task_id)); + fs::create_dir_all(&task_dir).await?; + + tracing::info!( + task_id = %task_id, + path = %task_dir.display(), + "Created temp directory for task" + ); + + Ok(task_dir) + } + + /// Calculate total size of temp directory recursively. + async fn get_total_size(&self) -> Result { + if !self.temp_dir.exists() { + return Ok(0); + } + + let mut total = 0u64; + let mut stack = vec![self.temp_dir.clone()]; + + while let Some(dir) = stack.pop() { + let mut entries = match fs::read_dir(&dir).await { + Ok(e) => e, + Err(_) => continue, + }; + + while let Ok(Some(entry)) = entries.next_entry().await { + let metadata = match entry.metadata().await { + Ok(m) => m, + Err(_) => continue, + }; + + if metadata.is_dir() { + stack.push(entry.path()); + } else { + total += metadata.len(); + } + } + } + + Ok(total) + } + + /// Remove oldest directories if total size exceeds limit. + async fn cleanup_if_needed(&self) -> Result<(), std::io::Error> { + let size = self.get_total_size().await?; + if size <= MAX_TEMP_SIZE_BYTES { + return Ok(()); + } + + tracing::info!( + current_size_mb = size / 1024 / 1024, + limit_mb = MAX_TEMP_SIZE_BYTES / 1024 / 1024, + "Temp directory exceeds size limit, starting cleanup" + ); + + // Get all task dirs with modification times + let mut dirs: Vec<(PathBuf, std::time::SystemTime, u64)> = vec![]; + let mut entries = fs::read_dir(&self.temp_dir).await?; + + while let Ok(Some(entry)) = entries.next_entry().await { + let path = entry.path(); + if !path.is_dir() { + continue; + } + + let metadata = match entry.metadata().await { + Ok(m) => m, + Err(_) => continue, + }; + + let modified = metadata.modified().unwrap_or(std::time::UNIX_EPOCH); + let dir_size = self.get_dir_size(&path).await.unwrap_or(0); + dirs.push((path, modified, dir_size)); + } + + // Sort by oldest first + dirs.sort_by(|a, b| a.1.cmp(&b.1)); + + // Remove oldest until under limit + let mut current_size = size; + for (path, _, dir_size) in dirs { + if current_size <= MAX_TEMP_SIZE_BYTES { + break; + } + + tracing::info!( + path = %path.display(), + size_mb = dir_size / 1024 / 1024, + "Removing old temp directory" + ); + + if let Err(e) = fs::remove_dir_all(&path).await { + tracing::warn!(path = %path.display(), error = %e, "Failed to remove temp directory"); + continue; + } + + current_size = current_size.saturating_sub(dir_size); + } + + tracing::info!( + new_size_mb = current_size / 1024 / 1024, + "Temp directory cleanup complete" + ); + + Ok(()) + } + + /// Calculate size of a directory recursively. + async fn get_dir_size(&self, path: &PathBuf) -> Result { + let mut total = 0u64; + let mut stack = vec![path.clone()]; + + while let Some(dir) = stack.pop() { + let mut entries = match fs::read_dir(&dir).await { + Ok(e) => e, + Err(_) => continue, + }; + + while let Ok(Some(entry)) = entries.next_entry().await { + let metadata = match entry.metadata().await { + Ok(m) => m, + Err(_) => continue, + }; + + if metadata.is_dir() { + stack.push(entry.path()); + } else { + total += metadata.len(); + } + } + } + + Ok(total) + } + + /// Remove a specific task's temp directory. + #[allow(dead_code)] + pub async fn remove_task_dir(&self, task_id: Uuid) -> Result<(), std::io::Error> { + let task_dir = self.temp_dir.join(format!("task-{}", task_id)); + if task_dir.exists() { + fs::remove_dir_all(&task_dir).await?; + tracing::info!( + task_id = %task_id, + path = %task_dir.display(), + "Removed temp directory for task" + ); + } + Ok(()) + } +} + +impl Default for TempManager { + fn default() -> Self { + Self::new() + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_temp_manager_default_dir() { + let manager = TempManager::new(); + assert!(manager.temp_dir().ends_with(".makima/temp")); + } +} diff --git a/makima/daemon/src/worktree/manager.rs b/makima/daemon/src/worktree/manager.rs new file mode 100644 index 0000000..266b970 --- /dev/null +++ b/makima/daemon/src/worktree/manager.rs @@ -0,0 +1,1623 @@ +//! Worktree manager implementation. + +use std::collections::HashMap; +use std::path::{Path, PathBuf}; +use std::sync::LazyLock; + +use tokio::process::Command; +use tokio::sync::Mutex; +use uuid::Uuid; + +/// Errors that can occur during worktree operations. +#[derive(Debug, thiserror::Error)] +pub enum WorktreeError { + #[error("Git command failed: {0}")] + GitCommand(String), + + #[error("Repository not found: {0}")] + RepoNotFound(String), + + #[error("Failed to create directory: {0}")] + CreateDir(#[from] std::io::Error), + + #[error("Invalid repository path: {0}")] + InvalidPath(String), + + #[error("Worktree already exists: {0}")] + AlreadyExists(String), + + #[error("Clone failed: {0}")] + CloneFailed(String), + + #[error("Merge in progress")] + MergeInProgress, + + #[error("No merge in progress")] + NoMergeInProgress, + + #[error("Merge has conflicts: {0}")] + MergeConflicts(String), +} + +/// Strategy for resolving a merge conflict. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum ConflictResolution { + /// Use our version (the branch being merged into). + Ours, + /// Use their version (the branch being merged). + Theirs, +} + +/// State of an in-progress merge. +#[derive(Debug, Clone)] +pub struct MergeState { + /// The branch being merged. + pub source_branch: String, + /// Files with unresolved conflicts. + pub conflicted_files: Vec, + /// Whether a merge is currently in progress. + pub in_progress: bool, +} + +/// Information about a task branch. +#[derive(Debug, Clone)] +pub struct TaskBranchInfo { + /// Full branch name. + pub name: String, + /// Task ID extracted from branch name (if parseable). + pub task_id: Option, + /// Whether this branch has been merged into the current branch. + pub is_merged: bool, + /// Short SHA of the last commit. + pub last_commit: String, + /// Subject line of the last commit. + pub last_commit_message: String, +} + +/// Information about a created worktree. +#[derive(Debug, Clone)] +pub struct WorktreeInfo { + /// Path to the worktree directory. + pub path: PathBuf, + /// Git branch name for this worktree. + pub branch: String, + /// Source repository path. + pub source_repo: PathBuf, +} + +/// Manages git worktrees for task isolation. +pub struct WorktreeManager { + /// Base directory for all worktrees (~/.makima/worktrees). + base_dir: PathBuf, + /// Base directory for cloned repos (~/.makima/repos). + repos_dir: PathBuf, + /// Branch prefix for task branches. + branch_prefix: String, +} + +/// Per-worktree locks to prevent concurrent creation issues. +static WORKTREE_LOCKS: LazyLock>>>> = + LazyLock::new(|| Mutex::new(HashMap::new())); + +impl WorktreeManager { + /// Create a new WorktreeManager with the given base directory. + pub fn new(base_dir: PathBuf) -> Self { + let repos_dir = base_dir.parent() + .map(|p| p.join("repos")) + .unwrap_or_else(|| base_dir.join("repos")); + + Self { + base_dir, + repos_dir, + branch_prefix: "makima/task-".to_string(), + } + } + + /// Get the default worktree base directory (~/.makima/worktrees). + pub fn default_base_dir() -> PathBuf { + dirs::home_dir() + .unwrap_or_else(|| PathBuf::from(".")) + .join(".makima") + .join("worktrees") + } + + /// Get the base directory for worktrees. + pub fn base_dir(&self) -> &Path { + &self.base_dir + } + + /// Detect the default branch of a repository. + /// Tries to find HEAD's target, falling back to common branch names. + pub async fn detect_default_branch(&self, repo_path: &Path) -> Result { + // Try to get the branch that HEAD points to + let output = Command::new("git") + .args(["symbolic-ref", "refs/remotes/origin/HEAD", "--short"]) + .current_dir(repo_path) + .output() + .await?; + + if output.status.success() { + let branch = String::from_utf8_lossy(&output.stdout).trim().to_string(); + // Remove "origin/" prefix if present + let branch = branch.strip_prefix("origin/").unwrap_or(&branch).to_string(); + if !branch.is_empty() { + return Ok(branch); + } + } + + // Try common branch names + for branch in ["main", "master", "develop", "trunk"] { + let output = Command::new("git") + .args(["rev-parse", "--verify", &format!("refs/heads/{}", branch)]) + .current_dir(repo_path) + .output() + .await?; + + if output.status.success() { + return Ok(branch.to_string()); + } + } + + // Fall back to getting the current branch + let output = Command::new("git") + .args(["rev-parse", "--abbrev-ref", "HEAD"]) + .current_dir(repo_path) + .output() + .await?; + + if output.status.success() { + let branch = String::from_utf8_lossy(&output.stdout).trim().to_string(); + if !branch.is_empty() && branch != "HEAD" { + return Ok(branch); + } + } + + Err(WorktreeError::GitCommand( + "Could not detect default branch".to_string(), + )) + } + + /// Ensure the source repository exists locally and is up-to-date. + /// If repo_source is a URL, clone it. If it's a path, verify it exists. + /// For both cases, fetch latest changes from remote if available. + pub async fn ensure_repo(&self, repo_source: &str) -> Result { + // Check if it's a URL (simple heuristic) + if repo_source.starts_with("http://") + || repo_source.starts_with("https://") + || repo_source.starts_with("git@") + || repo_source.starts_with("ssh://") + { + self.clone_or_fetch_repo(repo_source).await + } else { + // Treat as local path - expand tilde if present + let path = expand_tilde(repo_source); + if !path.exists() { + return Err(WorktreeError::RepoNotFound(repo_source.to_string())); + } + // Verify it's a git repo + let git_dir = path.join(".git"); + if !git_dir.exists() { + return Err(WorktreeError::InvalidPath(format!( + "{} is not a git repository", + repo_source + ))); + } + + // Fetch latest changes from remote if configured + tracing::info!("Fetching latest changes for local repo: {}", repo_source); + let output = Command::new("git") + .args(["fetch", "--all", "--prune"]) + .current_dir(&path) + .output() + .await?; + + if !output.status.success() { + let stderr = String::from_utf8_lossy(&output.stderr); + // Don't fail - repo might not have a remote configured + tracing::debug!("Git fetch for local repo (may not have remote): {}", stderr); + } else { + tracing::info!("Fetched latest changes for {}", repo_source); + } + + Ok(path) + } + } + + /// Clone a repository or fetch if already cloned. + async fn clone_or_fetch_repo(&self, url: &str) -> Result { + // Extract repo name from URL + let repo_name = extract_repo_name(url); + let repo_path = self.repos_dir.join(&repo_name); + + // Create repos directory if needed + tokio::fs::create_dir_all(&self.repos_dir).await?; + + if repo_path.exists() { + // Fetch latest changes + tracing::info!("Fetching updates for existing repo: {}", repo_name); + let output = Command::new("git") + .args(["fetch", "--all", "--prune"]) + .current_dir(&repo_path) + .output() + .await?; + + if !output.status.success() { + let stderr = String::from_utf8_lossy(&output.stderr); + tracing::warn!("Git fetch warning: {}", stderr); + // Don't fail on fetch errors, repo might still be usable + } + } else { + // Clone the repository + tracing::info!("Cloning repository: {} -> {}", url, repo_path.display()); + let output = Command::new("git") + .args(["clone", "--bare", url]) + .arg(&repo_path) + .output() + .await?; + + if !output.status.success() { + let stderr = String::from_utf8_lossy(&output.stderr); + return Err(WorktreeError::CloneFailed(stderr.to_string())); + } + } + + Ok(repo_path) + } + + /// Create a worktree for a task. + /// + /// This creates a unique directory with a git worktree checked out to a new branch. + pub async fn create_worktree( + &self, + source_repo: &Path, + task_id: Uuid, + task_name: &str, + base_branch: &str, + ) -> Result { + // Generate unique directory name and branch + let dir_name = format!("{}-{}", short_uuid(task_id), sanitize_name(task_name)); + let worktree_path = self.base_dir.join(&dir_name); + // Branch name: makima/{task-name-with-dashes}-{short-id} + let branch_name = format!("{}{}-{}", self.branch_prefix, sanitize_name(task_name), short_uuid(task_id)); + + // Acquire lock for this worktree path + let lock = { + let mut locks = WORKTREE_LOCKS.lock().await; + locks + .entry(worktree_path.to_string_lossy().to_string()) + .or_insert_with(|| std::sync::Arc::new(tokio::sync::Mutex::new(()))) + .clone() + }; + let _guard = lock.lock().await; + + // Check if worktree already exists - reuse it if so + if worktree_path.exists() { + tracing::info!( + task_id = %task_id, + worktree_path = %worktree_path.display(), + "Worktree already exists, reusing" + ); + + // Verify it's a valid git directory + let git_dir = worktree_path.join(".git"); + if git_dir.exists() { + // Get the current branch name + let output = Command::new("git") + .args(["rev-parse", "--abbrev-ref", "HEAD"]) + .current_dir(&worktree_path) + .output() + .await?; + + let current_branch = if output.status.success() { + String::from_utf8_lossy(&output.stdout).trim().to_string() + } else { + branch_name.clone() + }; + + return Ok(WorktreeInfo { + path: worktree_path, + branch: current_branch, + source_repo: source_repo.to_path_buf(), + }); + } else { + // Directory exists but isn't a git worktree - remove and recreate + tracing::warn!( + task_id = %task_id, + worktree_path = %worktree_path.display(), + "Directory exists but is not a git worktree, removing" + ); + tokio::fs::remove_dir_all(&worktree_path).await?; + } + } + + // Create base directory + tokio::fs::create_dir_all(&self.base_dir).await?; + + tracing::info!( + task_id = %task_id, + worktree_path = %worktree_path.display(), + branch = %branch_name, + base_branch = %base_branch, + "Creating worktree from local branch" + ); + + // Create the worktree with a new branch based on the local base_branch + let output = Command::new("git") + .args([ + "worktree", + "add", + "-b", + &branch_name, + ]) + .arg(&worktree_path) + .arg(base_branch) + .current_dir(source_repo) + .output() + .await?; + + if !output.status.success() { + let stderr = String::from_utf8_lossy(&output.stderr); + return Err(WorktreeError::GitCommand(format!( + "Failed to create worktree: {}", + stderr + ))); + } + + tracing::info!( + task_id = %task_id, + worktree_path = %worktree_path.display(), + "Worktree created successfully" + ); + + Ok(WorktreeInfo { + path: worktree_path, + branch: branch_name, + source_repo: source_repo.to_path_buf(), + }) + } + + /// Create a worktree for a task by copying from another task's worktree. + /// + /// This allows sequential subtasks where one continues from another's work, + /// including uncommitted changes. + pub async fn create_worktree_from_task( + &self, + source_worktree: &Path, + task_id: Uuid, + task_name: &str, + ) -> Result { + // Verify source worktree exists + if !source_worktree.exists() { + return Err(WorktreeError::RepoNotFound(format!( + "Source worktree not found: {}", + source_worktree.display() + ))); + } + + // Get the source repo from the source worktree + let source_repo = self.get_worktree_source(source_worktree).await?; + + // Get the base branch from source worktree's current HEAD + let output = Command::new("git") + .args(["rev-parse", "HEAD"]) + .current_dir(source_worktree) + .output() + .await?; + + if !output.status.success() { + return Err(WorktreeError::GitCommand( + "Failed to get source worktree HEAD".to_string(), + )); + } + let source_commit = String::from_utf8_lossy(&output.stdout).trim().to_string(); + + // Generate unique directory name and branch for new worktree + let dir_name = format!("{}-{}", short_uuid(task_id), sanitize_name(task_name)); + let worktree_path = self.base_dir.join(&dir_name); + let branch_name = format!("{}{}", self.branch_prefix, task_id); + + // Acquire lock for this worktree path + let lock = { + let mut locks = WORKTREE_LOCKS.lock().await; + locks + .entry(worktree_path.to_string_lossy().to_string()) + .or_insert_with(|| std::sync::Arc::new(tokio::sync::Mutex::new(()))) + .clone() + }; + let _guard = lock.lock().await; + + // Remove existing worktree if present + if worktree_path.exists() { + tracing::info!( + task_id = %task_id, + worktree_path = %worktree_path.display(), + "Removing existing worktree before creating from source" + ); + tokio::fs::remove_dir_all(&worktree_path).await?; + } + + // Create base directory + tokio::fs::create_dir_all(&self.base_dir).await?; + + tracing::info!( + task_id = %task_id, + source_worktree = %source_worktree.display(), + worktree_path = %worktree_path.display(), + branch = %branch_name, + source_commit = %source_commit, + "Creating worktree from source task" + ); + + // Create a new worktree based on the source commit + let output = Command::new("git") + .args([ + "worktree", + "add", + "-b", + &branch_name, + ]) + .arg(&worktree_path) + .arg(&source_commit) + .current_dir(&source_repo) + .output() + .await?; + + if !output.status.success() { + let stderr = String::from_utf8_lossy(&output.stderr); + return Err(WorktreeError::GitCommand(format!( + "Failed to create worktree: {}", + stderr + ))); + } + + // Now copy uncommitted changes from source worktree + // Use rsync to copy all files except .git + let output = Command::new("rsync") + .args([ + "-a", + "--exclude", ".git", + "--exclude", ".makima", + &format!("{}/", source_worktree.display()), + &format!("{}/", worktree_path.display()), + ]) + .output() + .await?; + + if !output.status.success() { + let stderr = String::from_utf8_lossy(&output.stderr); + tracing::warn!( + task_id = %task_id, + "rsync warning (continuing anyway): {}", + stderr + ); + } + + tracing::info!( + task_id = %task_id, + worktree_path = %worktree_path.display(), + "Worktree created from source task successfully" + ); + + Ok(WorktreeInfo { + path: worktree_path, + branch: branch_name, + source_repo: source_repo.to_path_buf(), + }) + } + + /// Remove a worktree and optionally its branch. + pub async fn remove_worktree( + &self, + worktree_path: &Path, + delete_branch: bool, + ) -> Result<(), WorktreeError> { + if !worktree_path.exists() { + return Ok(()); // Already gone + } + + // Get the branch name before removing + let branch_name = if delete_branch { + self.get_worktree_branch(worktree_path).await.ok() + } else { + None + }; + + // Find the source repo from worktree + let source_repo = self.get_worktree_source(worktree_path).await?; + + tracing::info!( + worktree_path = %worktree_path.display(), + delete_branch = delete_branch, + "Removing worktree" + ); + + // Remove the worktree + let output = Command::new("git") + .args(["worktree", "remove", "--force"]) + .arg(worktree_path) + .current_dir(&source_repo) + .output() + .await?; + + if !output.status.success() { + let stderr = String::from_utf8_lossy(&output.stderr); + // Try force removal of directory if git worktree remove fails + if worktree_path.exists() { + tokio::fs::remove_dir_all(worktree_path).await?; + } + tracing::warn!("Git worktree remove warning: {}", stderr); + } + + // Prune worktree references + let _ = Command::new("git") + .args(["worktree", "prune"]) + .current_dir(&source_repo) + .output() + .await; + + // Delete the branch if requested + if let Some(branch) = branch_name { + let output = Command::new("git") + .args(["branch", "-D", &branch]) + .current_dir(&source_repo) + .output() + .await?; + + if !output.status.success() { + let stderr = String::from_utf8_lossy(&output.stderr); + tracing::warn!("Failed to delete branch {}: {}", branch, stderr); + } + } + + Ok(()) + } + + /// Get the branch name of a worktree. + async fn get_worktree_branch(&self, worktree_path: &Path) -> Result { + let output = Command::new("git") + .args(["rev-parse", "--abbrev-ref", "HEAD"]) + .current_dir(worktree_path) + .output() + .await?; + + if !output.status.success() { + let stderr = String::from_utf8_lossy(&output.stderr); + return Err(WorktreeError::GitCommand(format!( + "Failed to get branch: {}", + stderr + ))); + } + + Ok(String::from_utf8_lossy(&output.stdout).trim().to_string()) + } + + /// Get the source repository path for a worktree. + async fn get_worktree_source(&self, worktree_path: &Path) -> Result { + // Read the .git file in the worktree which contains the path to the main repo + let git_file = worktree_path.join(".git"); + + if git_file.is_file() { + let content = tokio::fs::read_to_string(&git_file).await?; + // Format: "gitdir: /path/to/repo/.git/worktrees/name" + if let Some(gitdir) = content.strip_prefix("gitdir: ") { + let gitdir = gitdir.trim(); + // Navigate from worktrees/name back to the main repo + let path = PathBuf::from(gitdir); + if let Some(worktrees_dir) = path.parent() { + if let Some(git_dir) = worktrees_dir.parent() { + if let Some(repo_dir) = git_dir.parent() { + return Ok(repo_dir.to_path_buf()); + } + } + } + } + } + + // Fallback: try to find it in our repos directory + Err(WorktreeError::InvalidPath(format!( + "Could not determine source repo for worktree: {}", + worktree_path.display() + ))) + } + + /// List all worktrees in the base directory. + pub async fn list_worktrees(&self) -> Result, WorktreeError> { + let mut worktrees = Vec::new(); + + if !self.base_dir.exists() { + return Ok(worktrees); + } + + let mut entries = tokio::fs::read_dir(&self.base_dir).await?; + while let Some(entry) = entries.next_entry().await? { + let path = entry.path(); + if path.is_dir() && path.join(".git").exists() { + worktrees.push(path); + } + } + + Ok(worktrees) + } + + /// Initialize a new git repository for a task. + /// + /// This creates a fresh git repo (not a worktree) for tasks that don't need + /// an existing codebase. Use this when `repository_url` is `new://` or `new://project-name`. + pub async fn init_new_repo( + &self, + task_id: Uuid, + repo_source: &str, + ) -> Result { + let project_name = extract_new_repo_name(repo_source); + let dir_name = match project_name { + Some(name) => format!("{}-{}", short_uuid(task_id), sanitize_name(name)), + None => format!("{}-new", short_uuid(task_id)), + }; + let repo_path = self.repos_dir.join(&dir_name); + + tracing::info!( + task_id = %task_id, + path = %repo_path.display(), + project_name = ?project_name, + "Initializing new git repository" + ); + + // Create directory + tokio::fs::create_dir_all(&repo_path).await?; + + // git init + let output = Command::new("git") + .args(["init"]) + .current_dir(&repo_path) + .output() + .await?; + + if !output.status.success() { + let stderr = String::from_utf8_lossy(&output.stderr); + return Err(WorktreeError::GitCommand(format!( + "Failed to init repository: {}", + stderr + ))); + } + + // Configure git user (needed for commits) + let _ = Command::new("git") + .args(["config", "user.email", "makima@localhost"]) + .current_dir(&repo_path) + .output() + .await; + let _ = Command::new("git") + .args(["config", "user.name", "Makima"]) + .current_dir(&repo_path) + .output() + .await; + + // Initial commit (required for worktrees to work later if needed) + let output = Command::new("git") + .args(["commit", "--allow-empty", "-m", "Initial commit"]) + .current_dir(&repo_path) + .output() + .await?; + + if !output.status.success() { + let stderr = String::from_utf8_lossy(&output.stderr); + return Err(WorktreeError::GitCommand(format!( + "Failed to create initial commit: {}", + stderr + ))); + } + + tracing::info!( + task_id = %task_id, + path = %repo_path.display(), + "New git repository initialized" + ); + + Ok(WorktreeInfo { + path: repo_path.clone(), + branch: "main".to_string(), + source_repo: repo_path, + }) + } + + // ========== Merge Operations ========== + + /// List all task branches in a repository. + /// + /// Returns branches matching the pattern `makima/task-*`. + pub async fn list_task_branches( + &self, + repo_path: &Path, + ) -> Result, WorktreeError> { + // Get all branches matching our prefix + let output = Command::new("git") + .args([ + "branch", + "--list", + &format!("{}*", self.branch_prefix), + "--format=%(refname:short)|%(objectname:short)|%(subject)", + ]) + .current_dir(repo_path) + .output() + .await?; + + if !output.status.success() { + let stderr = String::from_utf8_lossy(&output.stderr); + return Err(WorktreeError::GitCommand(format!( + "Failed to list branches: {}", + stderr + ))); + } + + // Get list of merged branches + let merged_output = Command::new("git") + .args(["branch", "--merged", "HEAD", "--format=%(refname:short)"]) + .current_dir(repo_path) + .output() + .await?; + + let merged_branches: std::collections::HashSet = if merged_output.status.success() { + String::from_utf8_lossy(&merged_output.stdout) + .lines() + .map(|s| s.trim().to_string()) + .collect() + } else { + std::collections::HashSet::new() + }; + + let stdout = String::from_utf8_lossy(&output.stdout); + let mut branches = Vec::new(); + + for line in stdout.lines() { + let parts: Vec<&str> = line.split('|').collect(); + if parts.len() >= 3 { + let name = parts[0].trim().to_string(); + let last_commit = parts[1].trim().to_string(); + let last_commit_message = parts[2].trim().to_string(); + + // Try to extract task ID from branch name + let task_id = name + .strip_prefix(&self.branch_prefix) + .and_then(|s| Uuid::parse_str(s).ok()); + + let is_merged = merged_branches.contains(&name); + + branches.push(TaskBranchInfo { + name, + task_id, + is_merged, + last_commit, + last_commit_message, + }); + } + } + + Ok(branches) + } + + /// Start a merge of a branch into the current worktree. + /// + /// Uses `--no-commit` to allow conflict resolution before committing. + /// Returns Ok(None) if merge succeeds without conflicts, or Ok(Some(files)) + /// with the list of conflicted files. + pub async fn merge_branch( + &self, + worktree_path: &Path, + source_branch: &str, + ) -> Result>, WorktreeError> { + // Check if there's already a merge in progress + if self.is_merge_in_progress(worktree_path).await? { + return Err(WorktreeError::MergeInProgress); + } + + tracing::info!( + worktree = %worktree_path.display(), + source_branch = %source_branch, + "Starting merge" + ); + + // Attempt the merge with --no-commit --no-ff + let output = Command::new("git") + .args(["merge", "--no-commit", "--no-ff", source_branch]) + .current_dir(worktree_path) + .output() + .await?; + + if output.status.success() { + tracing::info!("Merge completed without conflicts"); + return Ok(None); + } + + // Check if there are conflicts + let conflicts = self.get_conflicted_files(worktree_path).await?; + if !conflicts.is_empty() { + tracing::info!( + conflicts = ?conflicts, + "Merge has conflicts" + ); + return Ok(Some(conflicts)); + } + + // Other error + let stderr = String::from_utf8_lossy(&output.stderr); + Err(WorktreeError::GitCommand(format!( + "Merge failed: {}", + stderr + ))) + } + + /// Check if a merge is currently in progress. + pub async fn is_merge_in_progress(&self, worktree_path: &Path) -> Result { + // Check for MERGE_HEAD file + let merge_head = worktree_path.join(".git").join("MERGE_HEAD"); + if merge_head.exists() { + return Ok(true); + } + + // Also check in .git file (for worktrees) + let git_file = worktree_path.join(".git"); + if git_file.is_file() { + if let Ok(content) = tokio::fs::read_to_string(&git_file).await { + if let Some(gitdir) = content.strip_prefix("gitdir: ") { + let gitdir = PathBuf::from(gitdir.trim()); + let merge_head = gitdir.join("MERGE_HEAD"); + if merge_head.exists() { + return Ok(true); + } + } + } + } + + Ok(false) + } + + /// Get the list of files with unresolved conflicts. + pub async fn get_conflicted_files( + &self, + worktree_path: &Path, + ) -> Result, WorktreeError> { + let output = Command::new("git") + .args(["diff", "--name-only", "--diff-filter=U"]) + .current_dir(worktree_path) + .output() + .await?; + + if !output.status.success() { + // No conflicts or not in merge state + return Ok(Vec::new()); + } + + let stdout = String::from_utf8_lossy(&output.stdout); + let files: Vec = stdout + .lines() + .map(|s| s.trim().to_string()) + .filter(|s| !s.is_empty()) + .collect(); + + Ok(files) + } + + /// Get the current merge state. + pub async fn get_merge_state( + &self, + worktree_path: &Path, + ) -> Result { + let in_progress = self.is_merge_in_progress(worktree_path).await?; + + if !in_progress { + return Ok(MergeState { + source_branch: String::new(), + conflicted_files: Vec::new(), + in_progress: false, + }); + } + + // Get the branch being merged from MERGE_HEAD + let source_branch = self.get_merge_source_branch(worktree_path).await?; + let conflicted_files = self.get_conflicted_files(worktree_path).await?; + + Ok(MergeState { + source_branch, + conflicted_files, + in_progress: true, + }) + } + + /// Get the branch name being merged (from MERGE_HEAD). + async fn get_merge_source_branch(&self, worktree_path: &Path) -> Result { + // Get MERGE_HEAD commit + let output = Command::new("git") + .args(["rev-parse", "MERGE_HEAD"]) + .current_dir(worktree_path) + .output() + .await?; + + if !output.status.success() { + return Ok("unknown".to_string()); + } + + let commit = String::from_utf8_lossy(&output.stdout).trim().to_string(); + + // Try to find branch name for this commit + let output = Command::new("git") + .args(["name-rev", "--name-only", &commit]) + .current_dir(worktree_path) + .output() + .await?; + + if output.status.success() { + let name = String::from_utf8_lossy(&output.stdout).trim().to_string(); + // Clean up the name (remove ~N suffixes, etc.) + let name = name.split('~').next().unwrap_or(&name); + let name = name.split('^').next().unwrap_or(name); + return Ok(name.to_string()); + } + + Ok(commit[..8.min(commit.len())].to_string()) + } + + /// Resolve a conflict in a specific file. + pub async fn resolve_conflict( + &self, + worktree_path: &Path, + file_path: &str, + resolution: ConflictResolution, + ) -> Result<(), WorktreeError> { + if !self.is_merge_in_progress(worktree_path).await? { + return Err(WorktreeError::NoMergeInProgress); + } + + let strategy = match resolution { + ConflictResolution::Ours => "--ours", + ConflictResolution::Theirs => "--theirs", + }; + + tracing::info!( + worktree = %worktree_path.display(), + file = %file_path, + strategy = %strategy, + "Resolving conflict" + ); + + // Checkout the chosen version + let output = Command::new("git") + .args(["checkout", strategy, "--", file_path]) + .current_dir(worktree_path) + .output() + .await?; + + if !output.status.success() { + let stderr = String::from_utf8_lossy(&output.stderr); + return Err(WorktreeError::GitCommand(format!( + "Failed to resolve conflict: {}", + stderr + ))); + } + + // Stage the resolved file + let output = Command::new("git") + .args(["add", file_path]) + .current_dir(worktree_path) + .output() + .await?; + + if !output.status.success() { + let stderr = String::from_utf8_lossy(&output.stderr); + return Err(WorktreeError::GitCommand(format!( + "Failed to stage resolved file: {}", + stderr + ))); + } + + Ok(()) + } + + /// Abort the current merge. + pub async fn abort_merge(&self, worktree_path: &Path) -> Result<(), WorktreeError> { + if !self.is_merge_in_progress(worktree_path).await? { + return Err(WorktreeError::NoMergeInProgress); + } + + tracing::info!( + worktree = %worktree_path.display(), + "Aborting merge" + ); + + let output = Command::new("git") + .args(["merge", "--abort"]) + .current_dir(worktree_path) + .output() + .await?; + + if !output.status.success() { + let stderr = String::from_utf8_lossy(&output.stderr); + return Err(WorktreeError::GitCommand(format!( + "Failed to abort merge: {}", + stderr + ))); + } + + Ok(()) + } + + /// Commit the current merge. + pub async fn commit_merge( + &self, + worktree_path: &Path, + message: &str, + ) -> Result { + // Check for remaining conflicts + let conflicts = self.get_conflicted_files(worktree_path).await?; + if !conflicts.is_empty() { + return Err(WorktreeError::MergeConflicts(conflicts.join(", "))); + } + + tracing::info!( + worktree = %worktree_path.display(), + message = %message, + "Committing merge" + ); + + let output = Command::new("git") + .args(["commit", "-m", message]) + .current_dir(worktree_path) + .output() + .await?; + + if !output.status.success() { + let stderr = String::from_utf8_lossy(&output.stderr); + return Err(WorktreeError::GitCommand(format!( + "Failed to commit merge: {}", + stderr + ))); + } + + // Get the new commit SHA + let output = Command::new("git") + .args(["rev-parse", "HEAD"]) + .current_dir(worktree_path) + .output() + .await?; + + if output.status.success() { + let sha = String::from_utf8_lossy(&output.stdout).trim().to_string(); + return Ok(sha); + } + + Ok("unknown".to_string()) + } + + // ========== Completion Action Operations ========== + + /// Push task branch from worktree to an external target repository. + /// + /// This stages and commits any uncommitted changes, then pushes to the target repo. + pub async fn push_to_target_repo( + &self, + worktree_path: &Path, + target_repo: &Path, + branch_name: &str, + task_name: &str, + ) -> Result<(), WorktreeError> { + tracing::info!( + worktree = %worktree_path.display(), + target_repo = %target_repo.display(), + branch = %branch_name, + "Pushing branch to target repository" + ); + + // First, stage all changes (including new files) + let output = Command::new("git") + .args(["add", "-A"]) + .current_dir(worktree_path) + .output() + .await?; + + if !output.status.success() { + let stderr = String::from_utf8_lossy(&output.stderr); + return Err(WorktreeError::GitCommand(format!( + "Failed to stage changes: {}", + stderr + ))); + } + + // Check if there are staged changes to commit + let output = Command::new("git") + .args(["diff", "--cached", "--quiet"]) + .current_dir(worktree_path) + .output() + .await?; + + // Exit code 1 means there are staged changes + if !output.status.success() { + tracing::info!("Committing staged changes before push"); + + let commit_message = format!("feat: {}", task_name); + let output = Command::new("git") + .args(["commit", "-m", &commit_message]) + .current_dir(worktree_path) + .output() + .await?; + + if !output.status.success() { + let stderr = String::from_utf8_lossy(&output.stderr); + return Err(WorktreeError::GitCommand(format!( + "Failed to commit changes: {}", + stderr + ))); + } + } + + // Ensure there are commits to push + let output = Command::new("git") + .args(["log", "--oneline", "-1"]) + .current_dir(worktree_path) + .output() + .await?; + + if !output.status.success() { + return Err(WorktreeError::GitCommand( + "No commits in worktree".to_string(), + )); + } + + // Add target repo as a remote in the worktree (if not already) + let remote_name = "target"; + let target_path_str = target_repo.to_string_lossy(); + + // Remove existing remote if any (ignore errors) + let _ = Command::new("git") + .args(["remote", "remove", remote_name]) + .current_dir(worktree_path) + .output() + .await; + + // Add the target as a remote + let output = Command::new("git") + .args(["remote", "add", remote_name, &target_path_str]) + .current_dir(worktree_path) + .output() + .await?; + + if !output.status.success() { + let stderr = String::from_utf8_lossy(&output.stderr); + return Err(WorktreeError::GitCommand(format!( + "Failed to add remote: {}", + stderr + ))); + } + + // Push the branch to the target + let output = Command::new("git") + .args(["push", "-u", remote_name, &format!("HEAD:{}", branch_name)]) + .current_dir(worktree_path) + .output() + .await?; + + if !output.status.success() { + let stderr = String::from_utf8_lossy(&output.stderr); + return Err(WorktreeError::GitCommand(format!( + "Failed to push to target: {}", + stderr + ))); + } + + tracing::info!( + branch = %branch_name, + target_repo = %target_repo.display(), + "Branch pushed successfully" + ); + + // Detach HEAD in the worktree to release the branch + // This allows the branch to be checked out in the target repo + let output = Command::new("git") + .args(["checkout", "--detach", "HEAD"]) + .current_dir(worktree_path) + .output() + .await?; + + if !output.status.success() { + // Non-fatal: log but don't fail the push + let stderr = String::from_utf8_lossy(&output.stderr); + tracing::warn!( + "Failed to detach HEAD in worktree (branch may not be checkable in target): {}", + stderr + ); + } else { + tracing::info!("Detached HEAD in worktree to release branch"); + } + + Ok(()) + } + + /// Merge a branch into the target branch in the target repository. + /// + /// This pushes the branch first (if needed), then performs a merge in the target repo. + pub async fn merge_to_target( + &self, + worktree_path: &Path, + target_repo: &Path, + source_branch: &str, + target_branch: &str, + task_name: &str, + ) -> Result { + tracing::info!( + worktree = %worktree_path.display(), + target_repo = %target_repo.display(), + source_branch = %source_branch, + target_branch = %target_branch, + "Merging branch to target" + ); + + // First, push the branch to target repo + self.push_to_target_repo(worktree_path, target_repo, source_branch, task_name) + .await?; + + // In target repo, checkout the target branch + let output = Command::new("git") + .args(["checkout", target_branch]) + .current_dir(target_repo) + .output() + .await?; + + if !output.status.success() { + let stderr = String::from_utf8_lossy(&output.stderr); + return Err(WorktreeError::GitCommand(format!( + "Failed to checkout target branch: {}", + stderr + ))); + } + + // Pull latest changes first + let _ = Command::new("git") + .args(["pull", "--ff-only"]) + .current_dir(target_repo) + .output() + .await; + + // Merge the source branch + let merge_message = format!("feat: {}", task_name); + let output = Command::new("git") + .args(["merge", "--no-ff", source_branch, "-m", &merge_message]) + .current_dir(target_repo) + .output() + .await?; + + if !output.status.success() { + let stderr = String::from_utf8_lossy(&output.stderr); + + // Check if it's a conflict + let conflicts = self.get_conflicted_files(target_repo).await?; + if !conflicts.is_empty() { + // Abort the merge + let _ = Command::new("git") + .args(["merge", "--abort"]) + .current_dir(target_repo) + .output() + .await; + + return Err(WorktreeError::MergeConflicts(format!( + "Merge conflicts in: {}. Consider creating a PR instead.", + conflicts.join(", ") + ))); + } + + return Err(WorktreeError::GitCommand(format!( + "Failed to merge: {}", + stderr + ))); + } + + // Get the merge commit SHA + let output = Command::new("git") + .args(["rev-parse", "HEAD"]) + .current_dir(target_repo) + .output() + .await?; + + let commit_sha = if output.status.success() { + String::from_utf8_lossy(&output.stdout).trim().to_string() + } else { + "unknown".to_string() + }; + + tracing::info!( + commit_sha = %commit_sha, + "Merge completed successfully" + ); + + Ok(commit_sha) + } + + /// Create a GitHub pull request using the gh CLI. + /// + /// This pushes the branch first, then creates a PR. + pub async fn create_pull_request( + &self, + worktree_path: &Path, + target_repo: &Path, + source_branch: &str, + target_branch: &str, + title: &str, + body: &str, + ) -> Result { + tracing::info!( + worktree = %worktree_path.display(), + target_repo = %target_repo.display(), + source_branch = %source_branch, + target_branch = %target_branch, + title = %title, + "Creating pull request" + ); + + // First, push the branch to the target repo's remote + // For PRs, we need to push to origin (the GitHub remote) + + // Get the worktree's current branch + let output = Command::new("git") + .args(["rev-parse", "--abbrev-ref", "HEAD"]) + .current_dir(worktree_path) + .output() + .await?; + + let current_branch = if output.status.success() { + String::from_utf8_lossy(&output.stdout).trim().to_string() + } else { + source_branch.to_string() + }; + + // Push to the target repo's origin + // First, check if target_repo has an origin remote + let output = Command::new("git") + .args(["remote", "get-url", "origin"]) + .current_dir(target_repo) + .output() + .await?; + + if !output.status.success() { + return Err(WorktreeError::GitCommand( + "Target repository has no origin remote configured".to_string(), + )); + } + + let origin_url = String::from_utf8_lossy(&output.stdout).trim().to_string(); + + // Push the branch from worktree to the remote + // First add the remote to worktree + let _ = Command::new("git") + .args(["remote", "remove", "pr-origin"]) + .current_dir(worktree_path) + .output() + .await; + + let output = Command::new("git") + .args(["remote", "add", "pr-origin", &origin_url]) + .current_dir(worktree_path) + .output() + .await?; + + if !output.status.success() { + let stderr = String::from_utf8_lossy(&output.stderr); + return Err(WorktreeError::GitCommand(format!( + "Failed to add remote: {}", + stderr + ))); + } + + // Push to the remote + let output = Command::new("git") + .args(["push", "-u", "pr-origin", &format!("{}:{}", current_branch, source_branch)]) + .current_dir(worktree_path) + .output() + .await?; + + if !output.status.success() { + let stderr = String::from_utf8_lossy(&output.stderr); + return Err(WorktreeError::GitCommand(format!( + "Failed to push branch: {}", + stderr + ))); + } + + // Create PR using gh CLI in the target repo + let output = Command::new("gh") + .args([ + "pr", + "create", + "--title", title, + "--body", body, + "--head", source_branch, + "--base", target_branch, + ]) + .current_dir(target_repo) + .output() + .await?; + + if !output.status.success() { + let stderr = String::from_utf8_lossy(&output.stderr); + return Err(WorktreeError::GitCommand(format!( + "Failed to create PR: {}", + stderr + ))); + } + + // The gh CLI outputs the PR URL + let pr_url = String::from_utf8_lossy(&output.stdout).trim().to_string(); + + tracing::info!( + pr_url = %pr_url, + "Pull request created successfully" + ); + + Ok(pr_url) + } + + /// Clone/copy the worktree contents to a target directory. + /// + /// This creates a new git repository at the target path with the same contents + /// as the worktree. Returns (success, message). + pub async fn clone_worktree_to_directory( + &self, + worktree_path: &Path, + target_dir: &Path, + ) -> Result { + tracing::info!( + worktree = %worktree_path.display(), + target = %target_dir.display(), + "Cloning worktree to target directory" + ); + + // Check if target directory already exists + if target_dir.exists() { + return Err(WorktreeError::AlreadyExists(format!( + "Target directory already exists: {}", + target_dir.display() + ))); + } + + // Get parent directory to ensure it exists + if let Some(parent) = target_dir.parent() { + if !parent.exists() { + tokio::fs::create_dir_all(parent).await?; + } + } + + // Use git clone --local to efficiently copy the repository + // This is more efficient than cp -r for git repos + let output = Command::new("git") + .args([ + "clone", + "--local", + "--no-hardlinks", + &worktree_path.to_string_lossy(), + &target_dir.to_string_lossy(), + ]) + .output() + .await?; + + if !output.status.success() { + let stderr = String::from_utf8_lossy(&output.stderr); + return Err(WorktreeError::CloneFailed(format!( + "Failed to clone worktree: {}", + stderr + ))); + } + + // Remove the 'origin' remote that points back to the worktree + let _ = Command::new("git") + .args(["remote", "remove", "origin"]) + .current_dir(target_dir) + .output() + .await; + + tracing::info!( + target = %target_dir.display(), + "Worktree cloned successfully" + ); + + Ok(format!("Cloned to {}", target_dir.display())) + } + + /// Check if a target directory exists. + pub async fn target_directory_exists(&self, target_dir: &Path) -> bool { + target_dir.exists() + } +} + +/// Check if repo_source is a "new repo" request. +/// +/// Accepts `new://` or `new://project-name` to create a fresh git repository. +pub fn is_new_repo_request(source: &str) -> bool { + source == "new" || source == "new://" || source.starts_with("new://") +} + +/// Extract optional project name from new:// URL. +fn extract_new_repo_name(source: &str) -> Option<&str> { + source.strip_prefix("new://").filter(|s| !s.is_empty()) +} + +/// Extract repository name from URL. +fn extract_repo_name(url: &str) -> String { + // Handle various URL formats: + // https://github.com/user/repo.git -> repo + // git@github.com:user/repo.git -> repo + // https://github.com/user/repo -> repo + + let url = url.trim_end_matches('/'); + let url = url.trim_end_matches(".git"); + + url.rsplit('/') + .next() + .or_else(|| url.rsplit(':').next()) + .unwrap_or("repo") + .to_string() +} + +/// Create a short UUID string for directory naming. +pub fn short_uuid(id: Uuid) -> String { + id.to_string()[..8].to_string() +} + +/// Expand tilde (~) in path to home directory. +pub fn expand_tilde(path: &str) -> PathBuf { + if let Some(rest) = path.strip_prefix("~/") { + if let Some(home) = dirs::home_dir() { + return home.join(rest); + } + } else if path == "~" { + if let Some(home) = dirs::home_dir() { + return home; + } + } + PathBuf::from(path) +} + +/// Sanitize a name for use in directory/branch names. +pub fn sanitize_name(name: &str) -> String { + name.chars() + .map(|c| { + if c.is_alphanumeric() || c == '-' || c == '_' { + c.to_ascii_lowercase() + } else { + '-' + } + }) + .collect::() + .chars() + .take(50) // Limit length + .collect() +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_extract_repo_name() { + assert_eq!( + extract_repo_name("https://github.com/user/repo.git"), + "repo" + ); + assert_eq!( + extract_repo_name("https://github.com/user/repo"), + "repo" + ); + assert_eq!( + extract_repo_name("git@github.com:user/repo.git"), + "repo" + ); + } + + #[test] + fn test_sanitize_name() { + assert_eq!(sanitize_name("Hello World!"), "hello-world-"); + assert_eq!(sanitize_name("test_name-123"), "test_name-123"); + assert_eq!(sanitize_name("A".repeat(100).as_str()).len(), 50); + } + + #[test] + fn test_short_uuid() { + let id = Uuid::parse_str("550e8400-e29b-41d4-a716-446655440000").unwrap(); + assert_eq!(short_uuid(id), "550e8400"); + } +} diff --git a/makima/daemon/src/worktree/mod.rs b/makima/daemon/src/worktree/mod.rs new file mode 100644 index 0000000..eb9f031 --- /dev/null +++ b/makima/daemon/src/worktree/mod.rs @@ -0,0 +1,11 @@ +//! Git worktree management for task isolation. +//! +//! Each task gets a unique git worktree with its own branch, +//! providing isolation without the overhead of Docker containers. + +mod manager; + +pub use manager::{ + expand_tilde, is_new_repo_request, sanitize_name, short_uuid, ConflictResolution, MergeState, + TaskBranchInfo, WorktreeError, WorktreeInfo, WorktreeManager, +}; diff --git a/makima/daemon/src/ws/client.rs b/makima/daemon/src/ws/client.rs new file mode 100644 index 0000000..ba1263f --- /dev/null +++ b/makima/daemon/src/ws/client.rs @@ -0,0 +1,290 @@ +//! WebSocket client for connecting to the makima server. + +use std::sync::Arc; +use std::time::Duration; + +use backoff::backoff::Backoff; +use backoff::ExponentialBackoff; +use futures::{SinkExt, StreamExt}; +use tokio::sync::{mpsc, RwLock}; +use tokio_tungstenite::{connect_async, tungstenite::{client::IntoClientRequest, Message}}; +use uuid::Uuid; + +use super::protocol::{DaemonCommand, DaemonMessage}; +use crate::config::ServerConfig; +use crate::error::{DaemonError, Result}; + +/// WebSocket client state. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum ConnectionState { + /// Not connected to server. + Disconnected, + /// Currently connecting. + Connecting, + /// Connected and authenticated. + Connected, + /// Connection failed, will retry. + Reconnecting, + /// Permanently failed (e.g., auth failure). + Failed, +} + +/// WebSocket client for daemon-server communication. +pub struct WsClient { + config: ServerConfig, + machine_id: String, + hostname: String, + max_concurrent_tasks: i32, + state: Arc>, + daemon_id: Arc>>, + /// Channel to receive messages to send to server. + outgoing_rx: mpsc::Receiver, + /// Sender for outgoing messages (clone this to send messages). + outgoing_tx: mpsc::Sender, + /// Channel to send received commands to the task manager. + incoming_tx: mpsc::Sender, +} + +impl WsClient { + /// Create a new WebSocket client. + pub fn new( + config: ServerConfig, + machine_id: String, + hostname: String, + max_concurrent_tasks: i32, + incoming_tx: mpsc::Sender, + ) -> Self { + let (outgoing_tx, outgoing_rx) = mpsc::channel(256); + + Self { + config, + machine_id, + hostname, + max_concurrent_tasks, + state: Arc::new(RwLock::new(ConnectionState::Disconnected)), + daemon_id: Arc::new(RwLock::new(None)), + outgoing_rx, + outgoing_tx, + incoming_tx, + } + } + + /// Get a sender for outgoing messages. + pub fn sender(&self) -> mpsc::Sender { + self.outgoing_tx.clone() + } + + /// Get current connection state. + pub async fn state(&self) -> ConnectionState { + *self.state.read().await + } + + /// Get daemon ID if authenticated. + pub async fn daemon_id(&self) -> Option { + *self.daemon_id.read().await + } + + /// Run the WebSocket client with automatic reconnection. + pub async fn run(&mut self) -> Result<()> { + let mut backoff = ExponentialBackoff { + initial_interval: Duration::from_secs(self.config.reconnect_interval_secs), + max_interval: Duration::from_secs(60), + max_elapsed_time: if self.config.max_reconnect_attempts > 0 { + Some(Duration::from_secs( + self.config.reconnect_interval_secs * self.config.max_reconnect_attempts as u64 * 10, + )) + } else { + None // Infinite retries + }, + ..Default::default() + }; + + loop { + *self.state.write().await = ConnectionState::Connecting; + tracing::info!("Connecting to server: {}", self.config.url); + + match self.connect_and_run().await { + Ok(()) => { + // Clean shutdown + tracing::info!("WebSocket connection closed cleanly"); + break; + } + Err(DaemonError::AuthFailed(msg)) => { + tracing::error!("Authentication failed: {}", msg); + *self.state.write().await = ConnectionState::Failed; + return Err(DaemonError::AuthFailed(msg)); + } + Err(e) => { + tracing::warn!("Connection error: {}", e); + *self.state.write().await = ConnectionState::Reconnecting; + + if let Some(delay) = backoff.next_backoff() { + tracing::info!("Reconnecting in {:?}...", delay); + tokio::time::sleep(delay).await; + } else { + tracing::error!("Max reconnection attempts reached"); + *self.state.write().await = ConnectionState::Failed; + return Err(DaemonError::ConnectionLost); + } + } + } + } + + Ok(()) + } + + /// Connect to server and run the message loop. + async fn connect_and_run(&mut self) -> Result<()> { + // Build WebSocket URL + let ws_url = format!("{}/api/v1/mesh/daemons/connect", self.config.url); + tracing::debug!("Connecting to WebSocket: {}", ws_url); + + // Build request with API key header + let mut request = ws_url.into_client_request()?; + request.headers_mut().insert( + "x-makima-api-key", + self.config.api_key.parse().map_err(|_| { + DaemonError::AuthFailed("Invalid API key format".into()) + })?, + ); + + // Connect with API key in headers + let (ws_stream, _response) = connect_async(request).await?; + let (mut write, mut read) = ws_stream.split(); + + // Send daemon info after connection (server authenticated us via header) + let info_msg = DaemonMessage::authenticate( + &self.config.api_key, + &self.machine_id, + &self.hostname, + self.max_concurrent_tasks, + ); + let info_json = serde_json::to_string(&info_msg)?; + write.send(Message::Text(info_json)).await?; + + // Wait for authentication response + let auth_response = read + .next() + .await + .ok_or(DaemonError::ConnectionLost)??; + + let auth_text = match auth_response { + Message::Text(text) => text, + Message::Close(_) => return Err(DaemonError::ConnectionLost), + _ => return Err(DaemonError::AuthFailed("Unexpected response type".into())), + }; + + let command: DaemonCommand = serde_json::from_str(&auth_text)?; + match command { + DaemonCommand::Authenticated { daemon_id } => { + tracing::info!("Authenticated with daemon ID: {}", daemon_id); + *self.daemon_id.write().await = Some(daemon_id); + *self.state.write().await = ConnectionState::Connected; + + // Send daemon directories info to server + let working_directory = std::env::current_dir() + .map(|p| p.to_string_lossy().to_string()) + .unwrap_or_else(|_| ".".to_string()); + let home_directory = dirs::home_dir() + .map(|h| h.join(".makima").join("home")) + .unwrap_or_else(|| std::path::PathBuf::from("~/.makima/home")); + // Create home directory if it doesn't exist + if let Err(e) = std::fs::create_dir_all(&home_directory) { + tracing::warn!("Failed to create home directory {:?}: {}", home_directory, e); + } + let home_directory_str = home_directory.to_string_lossy().to_string(); + let worktrees_directory = dirs::home_dir() + .map(|h| h.join(".makima").join("worktrees").to_string_lossy().to_string()) + .unwrap_or_else(|| "~/.makima/worktrees".to_string()); + + let dirs_msg = DaemonMessage::DaemonDirectories { + working_directory, + home_directory: home_directory_str, + worktrees_directory, + }; + let dirs_json = serde_json::to_string(&dirs_msg)?; + write.send(Message::Text(dirs_json)).await?; + tracing::info!("Sent daemon directories info to server"); + } + DaemonCommand::Error { code, message } => { + return Err(DaemonError::AuthFailed(format!("{}: {}", code, message))); + } + _ => { + return Err(DaemonError::AuthFailed( + "Unexpected response to authentication".into(), + )); + } + } + + // Start main message loop + let heartbeat_interval = Duration::from_secs(self.config.heartbeat_interval_secs); + let mut heartbeat_timer = tokio::time::interval(heartbeat_interval); + + loop { + tokio::select! { + // Handle incoming server commands + msg = read.next() => { + match msg { + Some(Ok(Message::Text(text))) => { + tracing::info!("Received WebSocket message: {} bytes", text.len()); + match serde_json::from_str::(&text) { + Ok(command) => { + tracing::info!("Parsed command: {:?}", command); + tracing::info!("Sending command to task manager channel..."); + if self.incoming_tx.send(command).await.is_err() { + tracing::warn!("Command receiver dropped, shutting down"); + break; + } + tracing::info!("Command sent to task manager successfully"); + } + Err(e) => { + tracing::warn!("Failed to parse server message: {}", e); + tracing::debug!("Raw message: {}", text); + } + } + } + Some(Ok(Message::Ping(data))) => { + write.send(Message::Pong(data)).await?; + } + Some(Ok(Message::Close(_))) | None => { + tracing::info!("Server closed connection"); + return Err(DaemonError::ConnectionLost); + } + Some(Err(e)) => { + tracing::warn!("WebSocket error: {}", e); + return Err(e.into()); + } + _ => {} + } + } + + // Handle outgoing messages + msg = self.outgoing_rx.recv() => { + match msg { + Some(message) => { + let json = serde_json::to_string(&message)?; + tracing::trace!("Sending message: {}", json); + write.send(Message::Text(json)).await?; + } + None => { + // Sender dropped, shutdown + tracing::info!("Outgoing channel closed, shutting down"); + break; + } + } + } + + // Send heartbeat + _ = heartbeat_timer.tick() => { + // Get active task IDs from task manager + // For now, send empty list - will be connected to task manager + let heartbeat = DaemonMessage::heartbeat(vec![]); + let json = serde_json::to_string(&heartbeat)?; + write.send(Message::Text(json)).await?; + } + } + } + + Ok(()) + } +} diff --git a/makima/daemon/src/ws/mod.rs b/makima/daemon/src/ws/mod.rs new file mode 100644 index 0000000..5a0e9d1 --- /dev/null +++ b/makima/daemon/src/ws/mod.rs @@ -0,0 +1,7 @@ +//! WebSocket client and protocol types for daemon-server communication. + +pub mod client; +pub mod protocol; + +pub use client::{ConnectionState, WsClient}; +pub use protocol::{BranchInfo, DaemonCommand, DaemonMessage}; diff --git a/makima/daemon/src/ws/protocol.rs b/makima/daemon/src/ws/protocol.rs new file mode 100644 index 0000000..7c2ad6d --- /dev/null +++ b/makima/daemon/src/ws/protocol.rs @@ -0,0 +1,511 @@ +//! Protocol types for daemon-server communication. +//! +//! These types mirror the server's protocol exactly for compatibility. + +use serde::{Deserialize, Serialize}; +use uuid::Uuid; + +/// Message from daemon to server. +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(tag = "type", rename_all = "camelCase")] +pub enum DaemonMessage { + /// Authentication request (first message required). + Authenticate { + #[serde(rename = "apiKey")] + api_key: String, + #[serde(rename = "machineId")] + machine_id: String, + hostname: String, + #[serde(rename = "maxConcurrentTasks")] + max_concurrent_tasks: i32, + }, + + /// Periodic heartbeat with current status. + Heartbeat { + #[serde(rename = "activeTasks")] + active_tasks: Vec, + }, + + /// Task output streaming (stdout/stderr from Claude Code). + TaskOutput { + #[serde(rename = "taskId")] + task_id: Uuid, + output: String, + #[serde(rename = "isPartial")] + is_partial: bool, + }, + + /// Task status change notification. + TaskStatusChange { + #[serde(rename = "taskId")] + task_id: Uuid, + #[serde(rename = "oldStatus")] + old_status: String, + #[serde(rename = "newStatus")] + new_status: String, + }, + + /// Task progress update with summary. + TaskProgress { + #[serde(rename = "taskId")] + task_id: Uuid, + summary: String, + }, + + /// Task completion notification. + TaskComplete { + #[serde(rename = "taskId")] + task_id: Uuid, + success: bool, + error: Option, + }, + + /// Register a tool key for orchestrator API access. + RegisterToolKey { + #[serde(rename = "taskId")] + task_id: Uuid, + /// The API key for this orchestrator to use when calling mesh endpoints. + key: String, + }, + + /// Revoke a tool key when task completes. + RevokeToolKey { + #[serde(rename = "taskId")] + task_id: Uuid, + }, + + // ========================================================================= + // Merge Response Messages (sent by daemon after processing merge commands) + // ========================================================================= + + /// Response to ListBranches command. + BranchList { + #[serde(rename = "taskId")] + task_id: Uuid, + branches: Vec, + }, + + /// Response to MergeStatus command. + MergeStatusResponse { + #[serde(rename = "taskId")] + task_id: Uuid, + #[serde(rename = "inProgress")] + in_progress: bool, + #[serde(rename = "sourceBranch")] + source_branch: Option, + #[serde(rename = "conflictedFiles")] + conflicted_files: Vec, + }, + + /// Response to merge operations (MergeStart, MergeResolve, MergeCommit, MergeAbort). + MergeResult { + #[serde(rename = "taskId")] + task_id: Uuid, + success: bool, + message: String, + #[serde(rename = "commitSha")] + commit_sha: Option, + /// Present only when conflicts occurred. + conflicts: Option>, + }, + + /// Response to CheckMergeComplete command. + MergeCompleteCheck { + #[serde(rename = "taskId")] + task_id: Uuid, + #[serde(rename = "canComplete")] + can_complete: bool, + #[serde(rename = "unmergedBranches")] + unmerged_branches: Vec, + #[serde(rename = "mergedCount")] + merged_count: u32, + #[serde(rename = "skippedCount")] + skipped_count: u32, + }, + + // ========================================================================= + // Completion Action Response Messages + // ========================================================================= + + /// Response to RetryCompletionAction command. + CompletionActionResult { + #[serde(rename = "taskId")] + task_id: Uuid, + success: bool, + message: String, + /// PR URL if action was "pr" and successful. + #[serde(rename = "prUrl")] + pr_url: Option, + }, + + /// Report daemon's available directories for task output. + DaemonDirectories { + /// Current working directory of the daemon. + #[serde(rename = "workingDirectory")] + working_directory: String, + /// Path to ~/.makima/home directory (for cloning completed work). + #[serde(rename = "homeDirectory")] + home_directory: String, + /// Path to worktrees directory (~/.makima/worktrees). + #[serde(rename = "worktreesDirectory")] + worktrees_directory: String, + }, + + /// Response to CloneWorktree command. + CloneWorktreeResult { + #[serde(rename = "taskId")] + task_id: Uuid, + success: bool, + message: String, + /// The path where the worktree was cloned. + #[serde(rename = "targetDir")] + target_dir: Option, + }, + + /// Response to CheckTargetExists command. + CheckTargetExistsResult { + #[serde(rename = "taskId")] + task_id: Uuid, + /// Whether the target directory exists. + exists: bool, + /// The path that was checked. + #[serde(rename = "targetDir")] + target_dir: String, + }, +} + +/// Information about a branch (used in BranchList message). +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct BranchInfo { + /// Full branch name. + pub name: String, + /// Task ID extracted from branch name (if parseable). + #[serde(rename = "taskId")] + pub task_id: Option, + /// Whether this branch has been merged. + #[serde(rename = "isMerged")] + pub is_merged: bool, + /// Short SHA of the last commit. + #[serde(rename = "lastCommit")] + pub last_commit: String, + /// Subject line of the last commit. + #[serde(rename = "lastCommitMessage")] + pub last_commit_message: String, +} + +/// Command from server to daemon. +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(tag = "type", rename_all = "camelCase")] +pub enum DaemonCommand { + /// Confirm successful authentication. + Authenticated { + #[serde(rename = "daemonId")] + daemon_id: Uuid, + }, + + /// Spawn a new task in a container. + SpawnTask { + #[serde(rename = "taskId")] + task_id: Uuid, + /// Human-readable task name (used for commit messages). + #[serde(rename = "taskName")] + task_name: String, + plan: String, + #[serde(rename = "repoUrl")] + repo_url: Option, + #[serde(rename = "baseBranch")] + base_branch: Option, + /// Target branch to merge into (used for completion actions). + #[serde(rename = "targetBranch")] + target_branch: Option, + /// Parent task ID if this is a subtask. + #[serde(rename = "parentTaskId")] + parent_task_id: Option, + /// Depth in task hierarchy (0=top-level, 1=subtask, 2=sub-subtask). + depth: i32, + /// Whether this task should run as an orchestrator (true if depth==0 and has subtasks). + #[serde(rename = "isOrchestrator")] + is_orchestrator: bool, + /// Path to user's local repository (outside ~/.makima) for completion actions. + #[serde(rename = "targetRepoPath")] + target_repo_path: Option, + /// Action on completion: "none", "branch", "merge", "pr". + #[serde(rename = "completionAction")] + completion_action: Option, + /// Task ID to continue from (copy worktree from this task). + #[serde(rename = "continueFromTaskId")] + continue_from_task_id: Option, + /// Files to copy from parent task's worktree. + #[serde(rename = "copyFiles")] + copy_files: Option>, + }, + + /// Pause a running task. + PauseTask { + #[serde(rename = "taskId")] + task_id: Uuid, + }, + + /// Resume a paused task. + ResumeTask { + #[serde(rename = "taskId")] + task_id: Uuid, + }, + + /// Interrupt a task (gracefully or forced). + InterruptTask { + #[serde(rename = "taskId")] + task_id: Uuid, + graceful: bool, + }, + + /// Send a message to a running task. + SendMessage { + #[serde(rename = "taskId")] + task_id: Uuid, + message: String, + }, + + /// Inject context about sibling task progress. + InjectSiblingContext { + #[serde(rename = "taskId")] + task_id: Uuid, + #[serde(rename = "siblingTaskId")] + sibling_task_id: Uuid, + #[serde(rename = "siblingName")] + sibling_name: String, + #[serde(rename = "siblingStatus")] + sibling_status: String, + #[serde(rename = "progressSummary")] + progress_summary: Option, + #[serde(rename = "changedFiles")] + changed_files: Vec, + }, + + // ========================================================================= + // Merge Commands (for orchestrators to merge subtask branches) + // ========================================================================= + + /// List all subtask branches for a task. + ListBranches { + #[serde(rename = "taskId")] + task_id: Uuid, + }, + + /// Start merging a subtask branch. + MergeStart { + #[serde(rename = "taskId")] + task_id: Uuid, + #[serde(rename = "sourceBranch")] + source_branch: String, + }, + + /// Get current merge status. + MergeStatus { + #[serde(rename = "taskId")] + task_id: Uuid, + }, + + /// Resolve a merge conflict. + MergeResolve { + #[serde(rename = "taskId")] + task_id: Uuid, + file: String, + /// "ours" or "theirs" + strategy: String, + }, + + /// Commit the current merge. + MergeCommit { + #[serde(rename = "taskId")] + task_id: Uuid, + message: String, + }, + + /// Abort the current merge. + MergeAbort { + #[serde(rename = "taskId")] + task_id: Uuid, + }, + + /// Skip merging a subtask branch (mark as intentionally not merged). + MergeSkip { + #[serde(rename = "taskId")] + task_id: Uuid, + #[serde(rename = "subtaskId")] + subtask_id: Uuid, + reason: String, + }, + + /// Check if all subtask branches have been merged or skipped (completion gate). + CheckMergeComplete { + #[serde(rename = "taskId")] + task_id: Uuid, + }, + + // ========================================================================= + // Completion Action Commands + // ========================================================================= + + /// Retry a completion action for a completed task. + RetryCompletionAction { + #[serde(rename = "taskId")] + task_id: Uuid, + /// Human-readable task name (used for commit messages). + #[serde(rename = "taskName")] + task_name: String, + /// The action to execute: "branch", "merge", or "pr". + action: String, + /// Path to the target repository. + #[serde(rename = "targetRepoPath")] + target_repo_path: String, + /// Target branch to merge into (for merge/pr actions). + #[serde(rename = "targetBranch")] + target_branch: Option, + }, + + /// Clone worktree to a target directory. + CloneWorktree { + #[serde(rename = "taskId")] + task_id: Uuid, + /// Path to the target directory. + #[serde(rename = "targetDir")] + target_dir: String, + }, + + /// Check if a target directory exists. + CheckTargetExists { + #[serde(rename = "taskId")] + task_id: Uuid, + /// Path to check. + #[serde(rename = "targetDir")] + target_dir: String, + }, + + /// Error response. + Error { + code: String, + message: String, + }, +} + +impl DaemonMessage { + /// Create an authentication message. + pub fn authenticate( + api_key: &str, + machine_id: &str, + hostname: &str, + max_concurrent_tasks: i32, + ) -> Self { + Self::Authenticate { + api_key: api_key.to_string(), + machine_id: machine_id.to_string(), + hostname: hostname.to_string(), + max_concurrent_tasks, + } + } + + /// Create a heartbeat message. + pub fn heartbeat(active_tasks: Vec) -> Self { + Self::Heartbeat { active_tasks } + } + + /// Create a task output message. + pub fn task_output(task_id: Uuid, output: String, is_partial: bool) -> Self { + Self::TaskOutput { + task_id, + output, + is_partial, + } + } + + /// Create a task status change message. + pub fn task_status_change(task_id: Uuid, old_status: &str, new_status: &str) -> Self { + Self::TaskStatusChange { + task_id, + old_status: old_status.to_string(), + new_status: new_status.to_string(), + } + } + + /// Create a task progress message. + pub fn task_progress(task_id: Uuid, summary: String) -> Self { + Self::TaskProgress { task_id, summary } + } + + /// Create a task complete message. + pub fn task_complete(task_id: Uuid, success: bool, error: Option) -> Self { + Self::TaskComplete { + task_id, + success, + error, + } + } + + /// Create a register tool key message. + pub fn register_tool_key(task_id: Uuid, key: String) -> Self { + Self::RegisterToolKey { task_id, key } + } + + /// Create a revoke tool key message. + pub fn revoke_tool_key(task_id: Uuid) -> Self { + Self::RevokeToolKey { task_id } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_daemon_message_serialization() { + let msg = DaemonMessage::authenticate("key123", "machine-abc", "worker-1", 4); + let json = serde_json::to_string(&msg).unwrap(); + assert!(json.contains("\"type\":\"authenticate\"")); + assert!(json.contains("\"apiKey\":\"key123\"")); + assert!(json.contains("\"machineId\":\"machine-abc\"")); + } + + #[test] + fn test_daemon_command_deserialization() { + let json = r#"{"type":"spawnTask","taskId":"550e8400-e29b-41d4-a716-446655440000","plan":"Build the feature","repoUrl":"https://github.com/test/repo","baseBranch":"main","parentTaskId":null,"depth":0,"isOrchestrator":false}"#; + let cmd: DaemonCommand = serde_json::from_str(json).unwrap(); + match cmd { + DaemonCommand::SpawnTask { + plan, + repo_url, + base_branch, + parent_task_id, + depth, + is_orchestrator, + .. + } => { + assert_eq!(plan, "Build the feature"); + assert_eq!(repo_url, Some("https://github.com/test/repo".to_string())); + assert_eq!(base_branch, Some("main".to_string())); + assert_eq!(parent_task_id, None); + assert_eq!(depth, 0); + assert!(!is_orchestrator); + } + _ => panic!("Expected SpawnTask"), + } + } + + #[test] + fn test_orchestrator_spawn_deserialization() { + let json = r#"{"type":"spawnTask","taskId":"550e8400-e29b-41d4-a716-446655440000","plan":"Coordinate subtasks","repoUrl":"https://github.com/test/repo","baseBranch":"main","parentTaskId":null,"depth":0,"isOrchestrator":true}"#; + let cmd: DaemonCommand = serde_json::from_str(json).unwrap(); + match cmd { + DaemonCommand::SpawnTask { + is_orchestrator, + depth, + .. + } => { + assert!(is_orchestrator); + assert_eq!(depth, 0); + } + _ => panic!("Expected SpawnTask"), + } + } +} -- cgit v1.2.3