diff options
| author | soryu <soryu@soryu.co> | 2026-01-11 05:52:14 +0000 |
|---|---|---|
| committer | soryu <soryu@soryu.co> | 2026-01-15 00:21:16 +0000 |
| commit | 87044a747b47bd83249d61a45842c7f7b2eae56d (patch) | |
| tree | ef2000ce79ffcc2723ef841acef5aa1deb1d5378 /makima/daemon/src | |
| parent | 077820c4167c168072d217a1b01df840463a12a8 (diff) | |
| download | soryu-87044a747b47bd83249d61a45842c7f7b2eae56d.tar.gz soryu-87044a747b47bd83249d61a45842c7f7b2eae56d.zip | |
Contract system
Diffstat (limited to 'makima/daemon/src')
| -rw-r--r-- | makima/daemon/src/cli.rs | 45 | ||||
| -rw-r--r-- | makima/daemon/src/config.rs | 550 | ||||
| -rw-r--r-- | makima/daemon/src/db/local.rs | 391 | ||||
| -rw-r--r-- | makima/daemon/src/db/mod.rs | 5 | ||||
| -rw-r--r-- | makima/daemon/src/error.rs | 75 | ||||
| -rw-r--r-- | makima/daemon/src/lib.rs | 21 | ||||
| -rw-r--r-- | makima/daemon/src/main.rs | 313 | ||||
| -rw-r--r-- | makima/daemon/src/process/claude.rs | 481 | ||||
| -rw-r--r-- | makima/daemon/src/process/claude_protocol.rs | 59 | ||||
| -rw-r--r-- | makima/daemon/src/process/mod.rs | 10 | ||||
| -rw-r--r-- | makima/daemon/src/task/manager.rs | 2248 | ||||
| -rw-r--r-- | makima/daemon/src/task/mod.rs | 7 | ||||
| -rw-r--r-- | makima/daemon/src/task/state.rs | 161 | ||||
| -rw-r--r-- | makima/daemon/src/temp.rs | 224 | ||||
| -rw-r--r-- | makima/daemon/src/worktree/manager.rs | 1623 | ||||
| -rw-r--r-- | makima/daemon/src/worktree/mod.rs | 11 | ||||
| -rw-r--r-- | makima/daemon/src/ws/client.rs | 290 | ||||
| -rw-r--r-- | makima/daemon/src/ws/mod.rs | 7 | ||||
| -rw-r--r-- | makima/daemon/src/ws/protocol.rs | 511 |
19 files changed, 0 insertions, 7032 deletions
diff --git a/makima/daemon/src/cli.rs b/makima/daemon/src/cli.rs deleted file mode 100644 index ca84017..0000000 --- a/makima/daemon/src/cli.rs +++ /dev/null @@ -1,45 +0,0 @@ -//! Command-line argument parsing for makima-daemon. - -use clap::Parser; -use std::path::PathBuf; - -/// Makima daemon for managing Claude Code instances in isolated worktrees. -#[derive(Parser, Debug)] -#[command(name = "makima-daemon")] -#[command(version, about, long_about = None)] -pub struct Cli { - /// Path to custom config file - #[arg(short, long)] - pub config: Option<PathBuf>, - - /// Directory where repositories are cloned - #[arg(long, env = "MAKIMA_DAEMON_REPOS_DIR")] - pub repos_dir: Option<PathBuf>, - - /// Directory where worktrees are created - #[arg(long, env = "MAKIMA_DAEMON_WORKTREES_DIR")] - pub worktrees_dir: Option<PathBuf>, - - /// WebSocket server URL to connect to - #[arg(long, env = "MAKIMA_DAEMON_SERVER_URL")] - pub server_url: Option<String>, - - /// API key for server authentication - #[arg(long, env = "MAKIMA_DAEMON_SERVER_APIKEY")] - pub api_key: Option<String>, - - /// Maximum number of concurrent tasks - #[arg(long)] - pub max_tasks: Option<u32>, - - /// Log level (trace, debug, info, warn, error) - #[arg(short, long, default_value = "info")] - pub log_level: String, -} - -impl Cli { - /// Parse command-line arguments - pub fn parse_args() -> Self { - Self::parse() - } -} diff --git a/makima/daemon/src/config.rs b/makima/daemon/src/config.rs deleted file mode 100644 index 94d1e8a..0000000 --- a/makima/daemon/src/config.rs +++ /dev/null @@ -1,550 +0,0 @@ -//! Configuration management for the makima daemon. - -use config::{Config, Environment, File}; -use serde::Deserialize; -use std::collections::HashMap; -use std::path::PathBuf; - -/// Root daemon configuration. -#[derive(Debug, Clone, Deserialize)] -pub struct DaemonConfig { - /// Server connection settings. - #[serde(default)] - pub server: ServerConfig, - - /// Worktree settings. - #[serde(default)] - pub worktree: WorktreeConfig, - - /// Process settings. - #[serde(default)] - pub process: ProcessConfig, - - /// Local database settings. - #[serde(default)] - pub local_db: LocalDbConfig, - - /// Logging settings. - #[serde(default)] - pub logging: LoggingConfig, - - /// Repositories to auto-clone on startup. - #[serde(default)] - pub repos: ReposConfig, -} - -/// Server connection configuration. -#[derive(Debug, Clone, Deserialize)] -#[serde(default)] -pub struct ServerConfig { - /// WebSocket URL of makima server (e.g., ws://localhost:8080 or wss://makima.example.com). - /// Defaults to wss://api.makima.jp. - #[serde(default = "default_server_url")] - pub url: String, - - /// API key for authentication. - #[serde(default, alias = "apikey")] - pub api_key: String, - - /// Heartbeat interval in seconds. - #[serde(default = "default_heartbeat_interval", alias = "heartbeatintervalsecs")] - pub heartbeat_interval_secs: u64, - - /// Reconnect interval in seconds after connection loss. - #[serde(default = "default_reconnect_interval", alias = "reconnectintervalsecs")] - pub reconnect_interval_secs: u64, - - /// Maximum reconnect attempts before giving up (0 = infinite). - #[serde(default, alias = "maxreconnectattempts")] - pub max_reconnect_attempts: u32, -} - -fn default_heartbeat_interval() -> u64 { - 30 -} - -fn default_reconnect_interval() -> u64 { - 5 -} - -fn default_server_url() -> String { - "wss://api.makima.jp".to_string() -} - -impl Default for ServerConfig { - fn default() -> Self { - Self { - url: default_server_url(), - api_key: String::new(), - heartbeat_interval_secs: default_heartbeat_interval(), - reconnect_interval_secs: default_reconnect_interval(), - max_reconnect_attempts: 0, - } - } -} - -/// Worktree configuration for task isolation. -#[derive(Debug, Clone, Deserialize)] -#[serde(default)] -pub struct WorktreeConfig { - /// Base directory for worktrees (~/.makima/worktrees). - #[serde(default = "default_worktree_base_dir", alias = "basedir")] - pub base_dir: PathBuf, - - /// Base directory for cloned repositories (~/.makima/repos). - #[serde(default = "default_repos_base_dir", alias = "reposdir")] - pub repos_dir: PathBuf, - - /// Branch prefix for task branches. - #[serde(default = "default_branch_prefix", alias = "branchprefix")] - pub branch_prefix: String, - - /// Clean up worktrees on daemon start. - #[serde(default, alias = "cleanuponstart")] - pub cleanup_on_start: bool, - - /// Default target repository path for pushing completed branches. - /// Used when task.target_repo_path is not set. - #[serde(default, alias = "defaulttargetrepo")] - pub default_target_repo: Option<PathBuf>, -} - -fn default_worktree_base_dir() -> PathBuf { - dirs::home_dir() - .unwrap_or_else(|| PathBuf::from(".")) - .join(".makima") - .join("worktrees") -} - -fn default_repos_base_dir() -> PathBuf { - dirs::home_dir() - .unwrap_or_else(|| PathBuf::from(".")) - .join(".makima") - .join("repos") -} - -fn default_branch_prefix() -> String { - "makima/task-".to_string() -} - -impl Default for WorktreeConfig { - fn default() -> Self { - Self { - base_dir: default_worktree_base_dir(), - repos_dir: default_repos_base_dir(), - branch_prefix: default_branch_prefix(), - cleanup_on_start: false, - default_target_repo: None, - } - } -} - -/// Process configuration for Claude Code subprocess execution. -#[derive(Debug, Clone, Deserialize)] -#[serde(default)] -pub struct ProcessConfig { - /// Path or command for Claude Code CLI. - #[serde(default = "default_claude_command", alias = "claudecommand")] - pub claude_command: String, - - /// Additional arguments to pass to Claude Code. - /// These are added after the default arguments. - #[serde(default, alias = "claudeargs")] - pub claude_args: Vec<String>, - - /// Arguments to pass before the default arguments. - /// Useful for overriding defaults. - #[serde(default, alias = "claudepreargs")] - pub claude_pre_args: Vec<String>, - - /// Skip the --dangerously-skip-permissions flag (default: false). - /// Set to true if you want to use Claude's permission system. - #[serde(default, alias = "enablepermissions")] - pub enable_permissions: bool, - - /// Skip the --verbose flag (default: false). - #[serde(default, alias = "disableverbose")] - pub disable_verbose: bool, - - /// Maximum concurrent tasks. - #[serde(default = "default_max_tasks", alias = "maxconcurrenttasks")] - pub max_concurrent_tasks: u32, - - /// Default timeout for tasks in seconds (0 = no timeout). - #[serde(default, alias = "defaulttimeoutsecs")] - pub default_timeout_secs: u64, - - /// Additional environment variables to pass to Claude Code. - #[serde(default, alias = "envvars")] - pub env_vars: HashMap<String, String>, -} - -fn default_claude_command() -> String { - "claude".to_string() -} - -fn default_max_tasks() -> u32 { - 4 -} - -impl Default for ProcessConfig { - fn default() -> Self { - Self { - claude_command: default_claude_command(), - claude_args: Vec::new(), - claude_pre_args: Vec::new(), - enable_permissions: false, - disable_verbose: false, - max_concurrent_tasks: default_max_tasks(), - default_timeout_secs: 0, - env_vars: HashMap::new(), - } - } -} - -/// Local database configuration. -#[derive(Debug, Clone, Deserialize)] -#[serde(default)] -pub struct LocalDbConfig { - /// Path to local SQLite database. - #[serde(default = "default_db_path")] - pub path: PathBuf, -} - -impl Default for LocalDbConfig { - fn default() -> Self { - Self { - path: default_db_path(), - } - } -} - -fn default_db_path() -> PathBuf { - dirs::home_dir() - .unwrap_or_else(|| PathBuf::from(".")) - .join(".makima") - .join("daemon.db") -} - -/// Logging configuration. -#[derive(Debug, Clone, Deserialize, Default)] -pub struct LoggingConfig { - /// Log level: "trace", "debug", "info", "warn", "error". - #[serde(default = "default_log_level")] - pub level: String, - - /// Log format: "pretty" or "json". - #[serde(default = "default_log_format")] - pub format: String, -} - -fn default_log_level() -> String { - "info".to_string() -} - -fn default_log_format() -> String { - "pretty".to_string() -} - -/// Repository auto-clone configuration. -#[derive(Debug, Clone, Deserialize, Default)] -pub struct ReposConfig { - /// Directory to clone repositories into (default: ~/.makima/home). - #[serde(default = "default_home_dir")] - pub home_dir: PathBuf, - - /// List of repositories to auto-clone on startup. - /// Each entry can be a URL (e.g., "https://github.com/user/repo.git") - /// or a shorthand (e.g., "github:user/repo"). - #[serde(default, alias = "autoclone")] - pub auto_clone: Vec<RepoEntry>, -} - -/// A repository entry for auto-cloning. -#[derive(Debug, Clone, Deserialize)] -#[serde(untagged)] -pub enum RepoEntry { - /// Simple URL string. - Url(String), - /// Detailed configuration. - Config { - /// Repository URL. - url: String, - /// Custom directory name (defaults to repo name from URL). - #[serde(default)] - name: Option<String>, - /// Branch to checkout after cloning (defaults to default branch). - #[serde(default)] - branch: Option<String>, - /// Whether to do a shallow clone (default: false). - #[serde(default)] - shallow: bool, - }, -} - -impl RepoEntry { - /// Get the URL for this repo entry. - pub fn url(&self) -> &str { - match self { - RepoEntry::Url(url) => url, - RepoEntry::Config { url, .. } => url, - } - } - - /// Get the custom name, if any. - pub fn name(&self) -> Option<&str> { - match self { - RepoEntry::Url(_) => None, - RepoEntry::Config { name, .. } => name.as_deref(), - } - } - - /// Get the branch to checkout, if any. - pub fn branch(&self) -> Option<&str> { - match self { - RepoEntry::Url(_) => None, - RepoEntry::Config { branch, .. } => branch.as_deref(), - } - } - - /// Whether to do a shallow clone. - pub fn shallow(&self) -> bool { - match self { - RepoEntry::Url(_) => false, - RepoEntry::Config { shallow, .. } => *shallow, - } - } - - /// Get the directory name to use (either custom name or derived from URL). - pub fn dir_name(&self) -> Option<String> { - if let Some(name) = self.name() { - return Some(name.to_string()); - } - - // Derive from URL - let url = self.url(); - - // Handle shorthand formats - let url = if url.starts_with("github:") { - url.strip_prefix("github:").unwrap_or(url) - } else if url.starts_with("gitlab:") { - url.strip_prefix("gitlab:").unwrap_or(url) - } else { - url - }; - - // Extract repo name from URL - url.trim_end_matches('/') - .trim_end_matches(".git") - .rsplit('/') - .next() - .map(|s| s.to_string()) - } - - /// Expand the URL (e.g., convert shorthand to full URL). - pub fn expanded_url(&self) -> String { - let url = self.url(); - - if url.starts_with("github:") { - format!("https://github.com/{}.git", url.strip_prefix("github:").unwrap_or("")) - } else if url.starts_with("gitlab:") { - format!("https://gitlab.com/{}.git", url.strip_prefix("gitlab:").unwrap_or("")) - } else { - url.to_string() - } - } -} - -fn default_home_dir() -> PathBuf { - dirs::home_dir() - .unwrap_or_else(|| PathBuf::from(".")) - .join(".makima") - .join("home") -} - -impl DaemonConfig { - /// Load configuration from files and environment variables. - /// - /// Configuration sources (in order of precedence): - /// 1. Environment variables (MAKIMA_API_KEY, MAKIMA_DAEMON_SERVER_URL, etc.) - /// 2. ./makima-daemon.toml (current directory) - /// 3. ~/.config/makima-daemon/config.toml - /// 4. /etc/makima-daemon/config.toml (Linux only) - /// - /// Environment variable examples: - /// - MAKIMA_API_KEY=your-api-key (preferred) - /// - MAKIMA_DAEMON_SERVER_URL=ws://localhost:8080 - /// - MAKIMA_DAEMON_PROCESS_MAXCONCURRENTTASKS=4 - pub fn load() -> Result<Self, config::ConfigError> { - Self::load_from_path(None) - } - - /// Load configuration from a specific path plus standard sources. - fn load_from_path(config_path: Option<&std::path::Path>) -> Result<Self, config::ConfigError> { - let mut builder = Config::builder(); - - // System-wide config (Linux only) - #[cfg(target_os = "linux")] - { - builder = builder.add_source( - File::with_name("/etc/makima-daemon/config").required(false), - ); - } - - // User config - if let Some(config_dir) = dirs::config_dir() { - let user_config = config_dir.join("makima-daemon").join("config"); - builder = builder.add_source( - File::with_name(user_config.to_str().unwrap_or("")).required(false), - ); - } - - // Local config - builder = builder.add_source(File::with_name("makima-daemon").required(false)); - - // Custom config file (if provided) - if let Some(path) = config_path { - builder = builder.add_source( - File::with_name(path.to_str().unwrap_or("")).required(true), - ); - } - - // Environment variables with underscore separator for nesting - // e.g., MAKIMA_DAEMON_SERVER_URL -> server.url - // MAKIMA_DAEMON_SERVER_APIKEY -> server.api_key - builder = builder.add_source( - Environment::with_prefix("MAKIMA_DAEMON") - .separator("_") - .try_parsing(true), - ); - - let config = builder.build()?; - let mut config: DaemonConfig = config.try_deserialize()?; - - // Check for MAKIMA_API_KEY environment variable (preferred over MAKIMA_DAEMON_SERVER_APIKEY) - if let Ok(api_key) = std::env::var("MAKIMA_API_KEY") { - config.server.api_key = api_key; - } - - // Validate required fields (don't validate here - let load_with_cli do final validation) - Ok(config) - } - - /// Validate that required configuration fields are set. - pub fn validate(&self) -> Result<(), config::ConfigError> { - if self.server.api_key.is_empty() { - return Err(config::ConfigError::Message( - "API key is required. Set via MAKIMA_API_KEY, config file, or --api-key".to_string() - )); - } - Ok(()) - } - - /// Load configuration with CLI argument overrides. - /// - /// Configuration sources (in order of precedence, highest first): - /// 1. CLI arguments - /// 2. Environment variables - /// 3. Custom config file (if --config specified) - /// 4. ./makima-daemon.toml (current directory) - /// 5. ~/.config/makima-daemon/config.toml - /// 6. /etc/makima-daemon/config.toml (Linux only) - /// 7. Default values - pub fn load_with_cli(cli: &crate::cli::Cli) -> Result<Self, config::ConfigError> { - // Load base config (with optional custom config file) - let mut config = Self::load_from_path(cli.config.as_deref())?; - - // Apply CLI overrides (highest priority) - if let Some(ref repos_dir) = cli.repos_dir { - config.worktree.repos_dir = repos_dir.clone(); - } - if let Some(ref worktrees_dir) = cli.worktrees_dir { - config.worktree.base_dir = worktrees_dir.clone(); - } - if let Some(ref server_url) = cli.server_url { - config.server.url = server_url.clone(); - } - if let Some(ref api_key) = cli.api_key { - config.server.api_key = api_key.clone(); - } - if let Some(max_tasks) = cli.max_tasks { - config.process.max_concurrent_tasks = max_tasks; - } - // Log level is always set (has default) - config.logging.level = cli.log_level.clone(); - - // Validate required fields after all sources are merged - config.validate()?; - - Ok(config) - } - - /// Create a minimal config for testing. - #[cfg(test)] - pub fn test_config() -> Self { - Self { - server: ServerConfig { - url: "ws://localhost:8080".to_string(), - api_key: "test-key".to_string(), - heartbeat_interval_secs: 30, - reconnect_interval_secs: 5, - max_reconnect_attempts: 0, - }, - worktree: WorktreeConfig { - base_dir: PathBuf::from("/tmp/makima-daemon-test/worktrees"), - repos_dir: PathBuf::from("/tmp/makima-daemon-test/repos"), - branch_prefix: "makima/task-".to_string(), - cleanup_on_start: true, - default_target_repo: None, - }, - process: ProcessConfig { - claude_command: "claude".to_string(), - claude_args: Vec::new(), - claude_pre_args: Vec::new(), - enable_permissions: false, - disable_verbose: false, - max_concurrent_tasks: 2, - default_timeout_secs: 0, - env_vars: HashMap::new(), - }, - local_db: LocalDbConfig { - path: PathBuf::from("/tmp/makima-daemon-test/state.db"), - }, - logging: LoggingConfig::default(), - repos: ReposConfig::default(), - } - } -} - -/// Helper module for dirs crate (minimal subset). -mod dirs { - use std::path::PathBuf; - - pub fn home_dir() -> Option<PathBuf> { - std::env::var("HOME").ok().map(PathBuf::from) - } - - pub fn config_dir() -> Option<PathBuf> { - #[cfg(target_os = "macos")] - { - std::env::var("HOME") - .ok() - .map(|h| PathBuf::from(h).join("Library").join("Application Support")) - } - #[cfg(target_os = "linux")] - { - std::env::var("XDG_CONFIG_HOME") - .ok() - .map(PathBuf::from) - .or_else(|| std::env::var("HOME").ok().map(|h| PathBuf::from(h).join(".config"))) - } - #[cfg(target_os = "windows")] - { - std::env::var("APPDATA").ok().map(PathBuf::from) - } - #[cfg(not(any(target_os = "macos", target_os = "linux", target_os = "windows")))] - { - None - } - } -} diff --git a/makima/daemon/src/db/local.rs b/makima/daemon/src/db/local.rs deleted file mode 100644 index 5adbf98..0000000 --- a/makima/daemon/src/db/local.rs +++ /dev/null @@ -1,391 +0,0 @@ -//! Local SQLite database for crash recovery and state persistence. - -use std::path::Path; - -use chrono::{DateTime, Utc}; -use rusqlite::{params, Connection, Result as SqliteResult}; -use uuid::Uuid; - -use crate::task::TaskState; - -/// Local task record for persistence. -#[derive(Debug, Clone)] -pub struct LocalTask { - pub id: Uuid, - pub server_task_id: Uuid, - pub state: TaskState, - pub container_id: Option<String>, - pub overlay_path: Option<String>, - pub repo_url: Option<String>, - pub base_branch: Option<String>, - pub plan: String, - pub created_at: DateTime<Utc>, - pub started_at: Option<DateTime<Utc>>, - pub completed_at: Option<DateTime<Utc>>, - pub error_message: Option<String>, -} - -/// Buffered output for reliable delivery. -#[derive(Debug, Clone)] -pub struct BufferedOutput { - pub id: i64, - pub task_id: Uuid, - pub output: String, - pub is_partial: bool, - pub timestamp: DateTime<Utc>, -} - -/// Local database for daemon state persistence. -pub struct LocalDb { - conn: Connection, -} - -impl LocalDb { - /// Open or create the local database. - pub fn open(path: &Path) -> SqliteResult<Self> { - // Create parent directory if needed - if let Some(parent) = path.parent() { - std::fs::create_dir_all(parent).ok(); - } - - let conn = Connection::open(path)?; - - // Initialize schema - conn.execute_batch(Self::schema())?; - - Ok(Self { conn }) - } - - /// Open an in-memory database (for testing). - #[cfg(test)] - pub fn open_memory() -> SqliteResult<Self> { - let conn = Connection::open_in_memory()?; - conn.execute_batch(Self::schema())?; - Ok(Self { conn }) - } - - /// Database schema. - fn schema() -> &'static str { - r#" - -- Local task state for crash recovery - CREATE TABLE IF NOT EXISTS tasks ( - id TEXT PRIMARY KEY, - server_task_id TEXT NOT NULL, - state TEXT NOT NULL, - container_id TEXT, - overlay_path TEXT, - repo_url TEXT, - base_branch TEXT, - plan TEXT NOT NULL, - created_at TEXT NOT NULL, - started_at TEXT, - completed_at TEXT, - error_message TEXT - ); - - -- Buffered output for reliable delivery - CREATE TABLE IF NOT EXISTS output_buffer ( - id INTEGER PRIMARY KEY AUTOINCREMENT, - task_id TEXT NOT NULL, - output TEXT NOT NULL, - is_partial INTEGER NOT NULL, - timestamp TEXT NOT NULL, - sent INTEGER NOT NULL DEFAULT 0 - ); - - -- Daemon state key-value store - CREATE TABLE IF NOT EXISTS daemon_state ( - key TEXT PRIMARY KEY, - value TEXT NOT NULL, - updated_at TEXT NOT NULL - ); - - -- Indexes - CREATE INDEX IF NOT EXISTS idx_tasks_state ON tasks(state); - CREATE INDEX IF NOT EXISTS idx_output_buffer_sent ON output_buffer(sent, id); - CREATE INDEX IF NOT EXISTS idx_output_buffer_task ON output_buffer(task_id); - "# - } - - /// Save a task. - pub fn save_task(&self, task: &LocalTask) -> SqliteResult<()> { - self.conn.execute( - r#" - INSERT OR REPLACE INTO tasks - (id, server_task_id, state, container_id, overlay_path, repo_url, base_branch, plan, created_at, started_at, completed_at, error_message) - VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12) - "#, - params![ - task.id.to_string(), - task.server_task_id.to_string(), - task.state.as_str(), - task.container_id, - task.overlay_path, - task.repo_url, - task.base_branch, - task.plan, - task.created_at.to_rfc3339(), - task.started_at.map(|t| t.to_rfc3339()), - task.completed_at.map(|t| t.to_rfc3339()), - task.error_message, - ], - )?; - Ok(()) - } - - /// Get a task by ID. - pub fn get_task(&self, id: Uuid) -> SqliteResult<Option<LocalTask>> { - let mut stmt = self.conn.prepare( - "SELECT id, server_task_id, state, container_id, overlay_path, repo_url, base_branch, plan, created_at, started_at, completed_at, error_message FROM tasks WHERE id = ?1", - )?; - - let mut rows = stmt.query(params![id.to_string()])?; - - if let Some(row) = rows.next()? { - Ok(Some(Self::task_from_row(row)?)) - } else { - Ok(None) - } - } - - /// Get all running/active tasks (for recovery). - pub fn get_active_tasks(&self) -> SqliteResult<Vec<LocalTask>> { - let mut stmt = self.conn.prepare( - r#" - SELECT id, server_task_id, state, container_id, overlay_path, repo_url, base_branch, plan, created_at, started_at, completed_at, error_message - FROM tasks - WHERE state IN ('initializing', 'starting', 'running', 'paused', 'blocked') - "#, - )?; - - let rows = stmt.query_map([], |row| Self::task_from_row(row))?; - - rows.collect() - } - - /// Delete a task. - pub fn delete_task(&self, id: Uuid) -> SqliteResult<()> { - self.conn.execute( - "DELETE FROM tasks WHERE id = ?1", - params![id.to_string()], - )?; - Ok(()) - } - - /// Update task state. - pub fn update_task_state(&self, id: Uuid, state: TaskState) -> SqliteResult<()> { - self.conn.execute( - "UPDATE tasks SET state = ?2 WHERE id = ?1", - params![id.to_string(), state.as_str()], - )?; - Ok(()) - } - - /// Buffer output for reliable delivery. - pub fn buffer_output(&self, task_id: Uuid, output: &str, is_partial: bool) -> SqliteResult<i64> { - self.conn.execute( - r#" - INSERT INTO output_buffer (task_id, output, is_partial, timestamp, sent) - VALUES (?1, ?2, ?3, datetime('now'), 0) - "#, - params![task_id.to_string(), output, is_partial as i32], - )?; - Ok(self.conn.last_insert_rowid()) - } - - /// Get unsent outputs. - pub fn get_unsent_outputs(&self, limit: i64) -> SqliteResult<Vec<BufferedOutput>> { - let mut stmt = self.conn.prepare( - r#" - SELECT id, task_id, output, is_partial, timestamp - FROM output_buffer - WHERE sent = 0 - ORDER BY id - LIMIT ?1 - "#, - )?; - - let rows = stmt.query_map(params![limit], |row| { - let id: i64 = row.get(0)?; - let task_id_str: String = row.get(1)?; - let task_id = Uuid::parse_str(&task_id_str).unwrap_or_default(); - let output: String = row.get(2)?; - let is_partial: i32 = row.get(3)?; - let timestamp_str: String = row.get(4)?; - let timestamp = DateTime::parse_from_rfc3339(×tamp_str) - .map(|dt| dt.with_timezone(&Utc)) - .unwrap_or_else(|_| Utc::now()); - - Ok(BufferedOutput { - id, - task_id, - output, - is_partial: is_partial != 0, - timestamp, - }) - })?; - - rows.collect() - } - - /// Mark outputs as sent. - pub fn mark_outputs_sent(&self, ids: &[i64]) -> SqliteResult<()> { - if ids.is_empty() { - return Ok(()); - } - - let placeholders: Vec<&str> = ids.iter().map(|_| "?").collect(); - let sql = format!( - "UPDATE output_buffer SET sent = 1 WHERE id IN ({})", - placeholders.join(",") - ); - - let params: Vec<rusqlite::types::Value> = ids - .iter() - .map(|id| rusqlite::types::Value::Integer(*id)) - .collect(); - - self.conn.execute(&sql, rusqlite::params_from_iter(params))?; - Ok(()) - } - - /// Clean up old sent outputs. - pub fn cleanup_sent_outputs(&self, older_than_hours: i64) -> SqliteResult<usize> { - let result = self.conn.execute( - r#" - DELETE FROM output_buffer - WHERE sent = 1 AND timestamp < datetime('now', ?1 || ' hours') - "#, - params![format!("-{}", older_than_hours)], - )?; - Ok(result) - } - - /// Get daemon state value. - pub fn get_state(&self, key: &str) -> SqliteResult<Option<String>> { - let mut stmt = self.conn.prepare( - "SELECT value FROM daemon_state WHERE key = ?1", - )?; - - let mut rows = stmt.query(params![key])?; - - if let Some(row) = rows.next()? { - let value: String = row.get(0)?; - Ok(Some(value)) - } else { - Ok(None) - } - } - - /// Set daemon state value. - pub fn set_state(&self, key: &str, value: &str) -> SqliteResult<()> { - self.conn.execute( - r#" - INSERT OR REPLACE INTO daemon_state (key, value, updated_at) - VALUES (?1, ?2, datetime('now')) - "#, - params![key, value], - )?; - Ok(()) - } - - /// Parse a task from a database row. - fn task_from_row(row: &rusqlite::Row) -> SqliteResult<LocalTask> { - let id_str: String = row.get(0)?; - let server_task_id_str: String = row.get(1)?; - let state_str: String = row.get(2)?; - let container_id: Option<String> = row.get(3)?; - let overlay_path: Option<String> = row.get(4)?; - let repo_url: Option<String> = row.get(5)?; - let base_branch: Option<String> = row.get(6)?; - let plan: String = row.get(7)?; - let created_at_str: String = row.get(8)?; - let started_at_str: Option<String> = row.get(9)?; - let completed_at_str: Option<String> = row.get(10)?; - let error_message: Option<String> = row.get(11)?; - - let id = Uuid::parse_str(&id_str).unwrap_or_default(); - let server_task_id = Uuid::parse_str(&server_task_id_str).unwrap_or_default(); - let state = TaskState::from_str(&state_str).unwrap_or_default(); - let created_at = DateTime::parse_from_rfc3339(&created_at_str) - .map(|dt| dt.with_timezone(&Utc)) - .unwrap_or_else(|_| Utc::now()); - let started_at = started_at_str - .and_then(|s| DateTime::parse_from_rfc3339(&s).ok()) - .map(|dt| dt.with_timezone(&Utc)); - let completed_at = completed_at_str - .and_then(|s| DateTime::parse_from_rfc3339(&s).ok()) - .map(|dt| dt.with_timezone(&Utc)); - - Ok(LocalTask { - id, - server_task_id, - state, - container_id, - overlay_path, - repo_url, - base_branch, - plan, - created_at, - started_at, - completed_at, - error_message, - }) - } -} - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn test_open_memory() { - let db = LocalDb::open_memory().unwrap(); - assert!(db.get_active_tasks().unwrap().is_empty()); - } - - #[test] - fn test_save_and_get_task() { - let db = LocalDb::open_memory().unwrap(); - - let task = LocalTask { - id: Uuid::new_v4(), - server_task_id: Uuid::new_v4(), - state: TaskState::Running, - container_id: Some("abc123".to_string()), - overlay_path: Some("/tmp/overlay".to_string()), - repo_url: Some("https://github.com/test/repo".to_string()), - base_branch: Some("main".to_string()), - plan: "Build the feature".to_string(), - created_at: Utc::now(), - started_at: Some(Utc::now()), - completed_at: None, - error_message: None, - }; - - db.save_task(&task).unwrap(); - - let loaded = db.get_task(task.id).unwrap().unwrap(); - assert_eq!(loaded.id, task.id); - assert_eq!(loaded.state, TaskState::Running); - assert_eq!(loaded.plan, "Build the feature"); - } - - #[test] - fn test_output_buffer() { - let db = LocalDb::open_memory().unwrap(); - let task_id = Uuid::new_v4(); - - db.buffer_output(task_id, "line 1", false).unwrap(); - db.buffer_output(task_id, "line 2", false).unwrap(); - - let unsent = db.get_unsent_outputs(10).unwrap(); - assert_eq!(unsent.len(), 2); - - let ids: Vec<i64> = unsent.iter().map(|o| o.id).collect(); - db.mark_outputs_sent(&ids).unwrap(); - - let unsent = db.get_unsent_outputs(10).unwrap(); - assert!(unsent.is_empty()); - } -} diff --git a/makima/daemon/src/db/mod.rs b/makima/daemon/src/db/mod.rs deleted file mode 100644 index 2c6e0f3..0000000 --- a/makima/daemon/src/db/mod.rs +++ /dev/null @@ -1,5 +0,0 @@ -//! Local database for daemon state persistence. - -pub mod local; - -pub use local::{BufferedOutput, LocalDb, LocalTask}; diff --git a/makima/daemon/src/error.rs b/makima/daemon/src/error.rs deleted file mode 100644 index 00e5140..0000000 --- a/makima/daemon/src/error.rs +++ /dev/null @@ -1,75 +0,0 @@ -//! Error types for the makima daemon. - -use thiserror::Error; -use uuid::Uuid; - -/// Top-level daemon error type. -#[derive(Error, Debug)] -pub enum DaemonError { - #[error("WebSocket error: {0}")] - WebSocket(#[from] tokio_tungstenite::tungstenite::Error), - - #[error("Worktree error: {0}")] - Worktree(#[from] crate::worktree::WorktreeError), - - #[error("Process error: {0}")] - Process(#[from] crate::process::ClaudeProcessError), - - #[error("Task error: {0}")] - Task(#[from] TaskError), - - #[error("Configuration error: {0}")] - Config(#[from] config::ConfigError), - - #[error("Database error: {0}")] - Database(#[from] rusqlite::Error), - - #[error("IO error: {0}")] - Io(#[from] std::io::Error), - - #[error("JSON error: {0}")] - Json(#[from] serde_json::Error), - - #[error("Authentication failed: {0}")] - AuthFailed(String), - - #[error("Connection lost")] - ConnectionLost, - - #[error("Server error: {code} - {message}")] - ServerError { code: String, message: String }, -} - -/// Task management errors. -#[derive(Error, Debug)] -pub enum TaskError { - #[error("Task not found: {0}")] - NotFound(Uuid), - - #[error("Invalid state transition from {from} to {to}")] - InvalidStateTransition { from: String, to: String }, - - #[error("Concurrency limit reached")] - ConcurrencyLimit, - - #[error("Task already exists: {0}")] - AlreadyExists(Uuid), - - #[error("Task not running: {0}")] - NotRunning(Uuid), - - #[error("Failed to send message to task: {0}")] - MessageFailed(String), - - #[error("Task setup failed: {0}")] - SetupFailed(String), - - #[error("Task execution failed: {0}")] - ExecutionFailed(String), -} - -/// Result type alias for daemon operations. -pub type Result<T> = std::result::Result<T, DaemonError>; - -/// Result type alias for task operations. -pub type TaskResult<T> = std::result::Result<T, TaskError>; diff --git a/makima/daemon/src/lib.rs b/makima/daemon/src/lib.rs deleted file mode 100644 index 9555681..0000000 --- a/makima/daemon/src/lib.rs +++ /dev/null @@ -1,21 +0,0 @@ -//! Makima Daemon - Git worktree orchestration for Claude Code. -//! -//! This daemon runs on worker machines and: -//! - Connects to the makima server via WebSocket -//! - Creates git worktrees for task isolation -//! - Runs Claude Code CLI as subprocesses in worktrees -//! - Streams JSON output back to server - -pub mod cli; -pub mod config; -pub mod db; -pub mod error; -pub mod process; -pub mod task; -pub mod temp; -pub mod worktree; -pub mod ws; - -pub use cli::Cli; -pub use config::DaemonConfig; -pub use error::{DaemonError, Result}; diff --git a/makima/daemon/src/main.rs b/makima/daemon/src/main.rs deleted file mode 100644 index e4ca5d4..0000000 --- a/makima/daemon/src/main.rs +++ /dev/null @@ -1,313 +0,0 @@ -//! Makima Daemon - Git worktree orchestration for Claude Code. - -use std::sync::Arc; - -use std::path::Path; - -use clap::Parser; -use makima_daemon::cli::Cli; -use makima_daemon::config::{DaemonConfig, RepoEntry}; -use makima_daemon::db::LocalDb; -use makima_daemon::error::DaemonError; -use makima_daemon::task::{TaskConfig, TaskManager}; -use makima_daemon::ws::{DaemonCommand, WsClient}; -use tokio::process::Command; -use tokio::sync::mpsc; -use tracing_subscriber::{fmt, prelude::*, EnvFilter}; - -#[tokio::main] -async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> { - eprintln!("=== Makima Daemon Starting ==="); - - // Parse command-line arguments - let cli = Cli::parse(); - - // Load configuration with CLI overrides - eprintln!("[1/5] Loading configuration..."); - let config = match DaemonConfig::load_with_cli(&cli) { - Ok(cfg) => { - eprintln!(" Config loaded: server={}", cfg.server.url); - cfg - } - Err(e) => { - eprintln!("Failed to load configuration: {}", e); - eprintln!(); - eprintln!("Use CLI flags:"); - eprintln!(" makima-daemon --server-url ws://localhost:8080 --api-key your-api-key"); - eprintln!(); - eprintln!("Or set environment variables:"); - eprintln!(" MAKIMA_DAEMON_SERVER_URL=ws://localhost:8080"); - eprintln!(" MAKIMA_API_KEY=your-api-key"); - eprintln!(); - eprintln!("Or create a config file: makima-daemon.toml"); - eprintln!(); - eprintln!("Run 'makima-daemon --help' for all options."); - std::process::exit(1); - } - }; - - // Initialize logging - init_logging(&config.logging.level, &config.logging.format); - eprintln!("[2/5] Logging initialized"); - - // Initialize local database - eprintln!("[3/5] Opening local database: {}", config.local_db.path.display()); - let _local_db = LocalDb::open(&config.local_db.path)?; - eprintln!(" Database opened"); - - // Initialize worktree directories - eprintln!("[4/5] Setting up directories..."); - tokio::fs::create_dir_all(&config.worktree.base_dir).await?; - tokio::fs::create_dir_all(&config.worktree.repos_dir).await?; - tokio::fs::create_dir_all(&config.repos.home_dir).await?; - eprintln!(" Worktree base: {}", config.worktree.base_dir.display()); - eprintln!(" Repos cache: {}", config.worktree.repos_dir.display()); - eprintln!(" Home dir: {}", config.repos.home_dir.display()); - - // Auto-clone repositories if configured - if !config.repos.auto_clone.is_empty() { - eprintln!(" Auto-cloning {} repositories...", config.repos.auto_clone.len()); - for repo_entry in &config.repos.auto_clone { - if let Err(e) = auto_clone_repo(repo_entry, &config.repos.home_dir).await { - eprintln!(" WARNING: Failed to clone {}: {}", repo_entry.url(), e); - } - } - } - - // Create channels for communication - let (command_tx, mut command_rx) = mpsc::channel::<DaemonCommand>(64); - - // Get machine info - let machine_id = get_machine_id(); - let hostname = get_hostname(); - eprintln!(" Machine ID: {}", machine_id); - eprintln!(" Hostname: {}", hostname); - - // Create WebSocket client - eprintln!("[5/5] Connecting to server: {}", config.server.url); - let mut ws_client = WsClient::new( - config.server.clone(), - machine_id, - hostname, - config.process.max_concurrent_tasks as i32, - command_tx, - ); - - // Get sender for task manager - let ws_tx = ws_client.sender(); - - // Create task configuration - let task_config = TaskConfig { - max_concurrent_tasks: config.process.max_concurrent_tasks, - worktree_base_dir: config.worktree.base_dir.clone(), - env_vars: config.process.env_vars.clone(), - claude_command: config.process.claude_command.clone(), - claude_args: config.process.claude_args.clone(), - claude_pre_args: config.process.claude_pre_args.clone(), - enable_permissions: config.process.enable_permissions, - disable_verbose: config.process.disable_verbose, - }; - - // Create task manager - let task_manager = Arc::new(TaskManager::new(task_config, ws_tx.clone())); - - // Spawn command handler - let task_manager_clone = task_manager.clone(); - tokio::spawn(async move { - tracing::info!("Command handler started, waiting for commands..."); - while let Some(command) = command_rx.recv().await { - tracing::info!("Received command from channel: {:?}", command); - if let Err(e) = task_manager_clone.handle_command(command).await { - tracing::error!("Failed to handle command: {}", e); - } - } - tracing::info!("Command handler stopped"); - }); - - // Handle shutdown signals - let shutdown_signal = async { - tokio::signal::ctrl_c() - .await - .expect("Failed to install Ctrl+C handler"); - eprintln!("\nReceived shutdown signal"); - }; - - eprintln!("=== Daemon running (Ctrl+C to stop) ==="); - - // Run WebSocket client with shutdown handling - tokio::select! { - result = ws_client.run() => { - match result { - Ok(()) => eprintln!("WebSocket client exited cleanly"), - Err(DaemonError::AuthFailed(msg)) => { - eprintln!("ERROR: Authentication failed: {}", msg); - std::process::exit(1); - } - Err(e) => { - eprintln!("ERROR: WebSocket client error: {}", e); - std::process::exit(1); - } - } - } - _ = shutdown_signal => { - eprintln!("Shutting down..."); - } - } - - // Cleanup - tracing::info!("Daemon stopped"); - - Ok(()) -} - -fn init_logging(level: &str, format: &str) { - let filter = EnvFilter::try_from_default_env() - .or_else(|_| EnvFilter::try_new(level)) - .unwrap_or_else(|_| EnvFilter::new("info")); - - let subscriber = tracing_subscriber::registry().with(filter); - - if format == "json" { - subscriber.with(fmt::layer().json()).init(); - } else { - subscriber.with(fmt::layer()).init(); - } -} - -fn get_machine_id() -> String { - // Try to read machine-id from standard locations - #[cfg(target_os = "linux")] - { - if let Ok(id) = std::fs::read_to_string("/etc/machine-id") { - return id.trim().to_string(); - } - if let Ok(id) = std::fs::read_to_string("/var/lib/dbus/machine-id") { - return id.trim().to_string(); - } - } - - #[cfg(target_os = "macos")] - { - // Use IOPlatformSerialNumber - if let Ok(output) = std::process::Command::new("ioreg") - .args(["-rd1", "-c", "IOPlatformExpertDevice"]) - .output() - { - let stdout = String::from_utf8_lossy(&output.stdout); - for line in stdout.lines() { - if line.contains("IOPlatformUUID") { - if let Some(uuid) = line.split('"').nth(3) { - return uuid.to_string(); - } - } - } - } - } - - // Fallback: generate a random ID and persist it - let state_dir = dirs_next::data_local_dir() - .unwrap_or_else(|| std::path::PathBuf::from(".")) - .join("makima-daemon"); - let machine_id_file = state_dir.join("machine-id"); - - if let Ok(id) = std::fs::read_to_string(&machine_id_file) { - return id.trim().to_string(); - } - - // Generate new ID - let new_id = uuid::Uuid::new_v4().to_string(); - std::fs::create_dir_all(&state_dir).ok(); - std::fs::write(&machine_id_file, &new_id).ok(); - new_id -} - -fn get_hostname() -> String { - hostname::get() - .map(|h| h.to_string_lossy().to_string()) - .unwrap_or_else(|_| "unknown".to_string()) -} - -/// Auto-clone a repository to the home directory if it doesn't exist. -async fn auto_clone_repo( - repo_entry: &RepoEntry, - home_dir: &Path, -) -> Result<(), Box<dyn std::error::Error + Send + Sync>> { - let dir_name = repo_entry - .dir_name() - .ok_or("Could not determine directory name from URL")?; - let target_dir = home_dir.join(&dir_name); - - // Check if already cloned - if target_dir.exists() { - eprintln!(" [skip] {} (already exists)", dir_name); - return Ok(()); - } - - let url = repo_entry.expanded_url(); - eprintln!(" [clone] {} -> {}", url, target_dir.display()); - - // Build git clone command - let mut args = vec!["clone".to_string()]; - - // Add shallow clone if requested - if repo_entry.shallow() { - args.push("--depth".to_string()); - args.push("1".to_string()); - } - - // Add branch if specified - if let Some(branch) = repo_entry.branch() { - args.push("--branch".to_string()); - args.push(branch.to_string()); - } - - args.push(url.clone()); - args.push(target_dir.to_string_lossy().to_string()); - - // Run git clone - let output = Command::new("git") - .args(&args) - .output() - .await?; - - if !output.status.success() { - let stderr = String::from_utf8_lossy(&output.stderr); - return Err(format!("git clone failed: {}", stderr).into()); - } - - eprintln!(" [done] {}", dir_name); - Ok(()) -} - -/// dirs_next minimal replacement -mod dirs_next { - use std::path::PathBuf; - - pub fn data_local_dir() -> Option<PathBuf> { - #[cfg(target_os = "macos")] - { - std::env::var("HOME") - .ok() - .map(|h| PathBuf::from(h).join("Library").join("Application Support")) - } - #[cfg(target_os = "linux")] - { - std::env::var("XDG_DATA_HOME") - .ok() - .map(PathBuf::from) - .or_else(|| { - std::env::var("HOME") - .ok() - .map(|h| PathBuf::from(h).join(".local").join("share")) - }) - } - #[cfg(target_os = "windows")] - { - std::env::var("LOCALAPPDATA").ok().map(PathBuf::from) - } - #[cfg(not(any(target_os = "macos", target_os = "linux", target_os = "windows")))] - { - None - } - } -} diff --git a/makima/daemon/src/process/claude.rs b/makima/daemon/src/process/claude.rs deleted file mode 100644 index e06ee09..0000000 --- a/makima/daemon/src/process/claude.rs +++ /dev/null @@ -1,481 +0,0 @@ -//! Claude Code process management. - -use std::collections::HashMap; -use std::path::Path; -use std::process::Stdio; -use std::sync::Arc; - -use futures::Stream; -use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader}; -use tokio::process::{Child, ChildStdin, Command}; -use tokio::sync::{mpsc, Mutex}; - -use super::claude_protocol::ClaudeInputMessage; - -/// Errors that can occur during Claude process management. -#[derive(Debug, thiserror::Error)] -pub enum ClaudeProcessError { - #[error("Failed to spawn Claude process: {0}")] - SpawnFailed(#[from] std::io::Error), - - #[error("Claude command not found: {0}")] - CommandNotFound(String), - - #[error("Process already exited")] - AlreadyExited, - - #[error("Failed to read output: {0}")] - OutputRead(String), -} - -/// A line of output from Claude Code. -#[derive(Debug, Clone)] -pub struct OutputLine { - /// The raw content of the line. - pub content: String, - /// Whether this is from stdout (true) or stderr (false). - pub is_stdout: bool, - /// Parsed JSON type if available (e.g., "system", "assistant", "result"). - pub json_type: Option<String>, -} - -impl OutputLine { - /// Create a new stdout output line. - pub fn stdout(content: String) -> Self { - let json_type = extract_json_type(&content); - Self { - content, - is_stdout: true, - json_type, - } - } - - /// Create a new stderr output line. - pub fn stderr(content: String) -> Self { - Self { - content, - is_stdout: false, - json_type: None, - } - } -} - -/// Extract the "type" field from a JSON line if present. -fn extract_json_type(line: &str) -> Option<String> { - // Quick check for JSON - if !line.starts_with('{') { - return None; - } - - // Try to parse and extract type - if let Ok(json) = serde_json::from_str::<serde_json::Value>(line) { - json.get("type") - .and_then(|v| v.as_str()) - .map(|s| s.to_string()) - } else { - None - } -} - -/// Handle to a running Claude Code process. -pub struct ClaudeProcess { - /// The child process. - child: Child, - /// Receiver for output lines. - output_rx: mpsc::Receiver<OutputLine>, - /// Stdin handle for sending input to the process (thread-safe). - stdin: Arc<Mutex<Option<ChildStdin>>>, -} - -impl ClaudeProcess { - /// Wait for the process to exit and return the exit code. - pub async fn wait(&mut self) -> Result<i64, ClaudeProcessError> { - let status = self.child.wait().await?; - Ok(status.code().unwrap_or(-1) as i64) - } - - /// Check if the process has exited. - pub fn try_wait(&mut self) -> Result<Option<i64>, ClaudeProcessError> { - match self.child.try_wait()? { - Some(status) => Ok(Some(status.code().unwrap_or(-1) as i64)), - None => Ok(None), - } - } - - /// Kill the process. - pub async fn kill(&mut self) -> Result<(), ClaudeProcessError> { - self.child.kill().await?; - Ok(()) - } - - /// Get the next output line, if available. - pub async fn next_output(&mut self) -> Option<OutputLine> { - self.output_rx.recv().await - } - - /// Send a raw message to the process via stdin. - /// - /// This can be used to provide input when Claude Code is waiting for user input. - pub async fn send_input(&self, message: &str) -> Result<(), ClaudeProcessError> { - let mut stdin_guard = self.stdin.lock().await; - if let Some(ref mut stdin) = *stdin_guard { - stdin - .write_all(message.as_bytes()) - .await - .map_err(|e| ClaudeProcessError::OutputRead(format!("Failed to write to stdin: {}", e)))?; - stdin - .write_all(b"\n") - .await - .map_err(|e| ClaudeProcessError::OutputRead(format!("Failed to write newline: {}", e)))?; - stdin - .flush() - .await - .map_err(|e| ClaudeProcessError::OutputRead(format!("Failed to flush stdin: {}", e)))?; - Ok(()) - } else { - Err(ClaudeProcessError::OutputRead("Stdin not available".to_string())) - } - } - - /// Send a user message to the process via stdin using JSON protocol. - /// - /// This is the preferred method when using `--input-format=stream-json`. - /// The message is serialized as JSON and sent as a single line. - pub async fn send_user_message(&self, content: &str) -> Result<(), ClaudeProcessError> { - let message = ClaudeInputMessage::user(content); - let json_line = message.to_json_line().map_err(|e| { - ClaudeProcessError::OutputRead(format!("Failed to serialize message: {}", e)) - })?; - - tracing::debug!(content_len = content.len(), "Sending user message to Claude process"); - - let mut stdin_guard = self.stdin.lock().await; - if let Some(ref mut stdin) = *stdin_guard { - stdin - .write_all(json_line.as_bytes()) - .await - .map_err(|e| ClaudeProcessError::OutputRead(format!("Failed to write to stdin: {}", e)))?; - stdin - .flush() - .await - .map_err(|e| ClaudeProcessError::OutputRead(format!("Failed to flush stdin: {}", e)))?; - Ok(()) - } else { - Err(ClaudeProcessError::OutputRead("Stdin not available".to_string())) - } - } - - /// Get a clone of the stdin handle for external use. - pub fn stdin_handle(&self) -> Arc<Mutex<Option<ChildStdin>>> { - Arc::clone(&self.stdin) - } - - /// Close stdin, signaling EOF to the process. - pub async fn close_stdin(&self) -> Result<(), ClaudeProcessError> { - let mut stdin_guard = self.stdin.lock().await; - if let Some(mut stdin) = stdin_guard.take() { - let _ = stdin.shutdown().await; - } - Ok(()) - } - - /// Convert to a stream of output lines. - pub fn into_stream(self) -> impl Stream<Item = OutputLine> { - futures::stream::unfold(self.output_rx, |mut rx| async move { - rx.recv().await.map(|line| (line, rx)) - }) - } -} - -/// Manages Claude Code process spawning. -pub struct ProcessManager { - /// Path to the claude command. - claude_command: String, - /// Additional arguments to pass to Claude Code (after defaults). - claude_args: Vec<String>, - /// Arguments to pass before defaults. - claude_pre_args: Vec<String>, - /// Whether to enable Claude's permission system (skip --dangerously-skip-permissions). - enable_permissions: bool, - /// Whether to disable verbose output. - disable_verbose: bool, - /// Default environment variables to pass. - default_env: HashMap<String, String>, -} - -impl Default for ProcessManager { - fn default() -> Self { - Self::new() - } -} - -impl ProcessManager { - /// Create a new ProcessManager with default settings. - pub fn new() -> Self { - Self { - claude_command: "claude".to_string(), - claude_args: Vec::new(), - claude_pre_args: Vec::new(), - enable_permissions: false, - disable_verbose: false, - default_env: HashMap::new(), - } - } - - /// Create a ProcessManager with a custom claude command path. - pub fn with_command(command: String) -> Self { - Self { - claude_command: command, - claude_args: Vec::new(), - claude_pre_args: Vec::new(), - enable_permissions: false, - disable_verbose: false, - default_env: HashMap::new(), - } - } - - /// Set additional arguments to pass after default arguments. - pub fn with_args(mut self, args: Vec<String>) -> Self { - self.claude_args = args; - self - } - - /// Set arguments to pass before default arguments. - pub fn with_pre_args(mut self, args: Vec<String>) -> Self { - self.claude_pre_args = args; - self - } - - /// Enable Claude's permission system (don't pass --dangerously-skip-permissions). - pub fn with_permissions_enabled(mut self, enabled: bool) -> Self { - self.enable_permissions = enabled; - self - } - - /// Disable verbose output. - pub fn with_verbose_disabled(mut self, disabled: bool) -> Self { - self.disable_verbose = disabled; - self - } - - /// Add default environment variables. - pub fn with_env(mut self, env: HashMap<String, String>) -> Self { - self.default_env = env; - self - } - - /// Spawn a Claude Code process to execute a plan. - /// - /// The process runs in the specified working directory with stream-json output format. - pub async fn spawn( - &self, - working_dir: &Path, - plan: &str, - extra_env: Option<HashMap<String, String>>, - ) -> Result<ClaudeProcess, ClaudeProcessError> { - tracing::info!( - working_dir = %working_dir.display(), - plan_len = plan.len(), - plan_preview = %if plan.len() > 200 { &plan[..200] } else { plan }, - "Spawning Claude Code process" - ); - - // Verify working directory exists - if !working_dir.exists() { - tracing::error!(working_dir = %working_dir.display(), "Working directory does not exist!"); - return Err(ClaudeProcessError::SpawnFailed(std::io::Error::new( - std::io::ErrorKind::NotFound, - format!("Working directory does not exist: {}", working_dir.display()), - ))); - } - - // Build environment - let mut env = self.default_env.clone(); - if let Some(extra) = extra_env { - env.extend(extra); - } - - // Build arguments list - let mut args = Vec::new(); - - // Pre-args (before defaults) - args.extend(self.claude_pre_args.clone()); - - // Required arguments for stream-json protocol - args.push("--output-format=stream-json".to_string()); - args.push("--input-format=stream-json".to_string()); - - // Optional default arguments - if !self.disable_verbose { - args.push("--verbose".to_string()); - } - if !self.enable_permissions { - args.push("--dangerously-skip-permissions".to_string()); - } - - // Additional user-configured arguments - args.extend(self.claude_args.clone()); - - tracing::debug!(args = ?args, "Claude command arguments"); - - // Spawn the process - let mut child = Command::new(&self.claude_command) - .args(&args) - .current_dir(working_dir) - .envs(env) - .stdin(Stdio::piped()) - .stdout(Stdio::piped()) - .stderr(Stdio::piped()) - .kill_on_drop(true) - .spawn() - .map_err(|e| { - if e.kind() == std::io::ErrorKind::NotFound { - ClaudeProcessError::CommandNotFound(self.claude_command.clone()) - } else { - ClaudeProcessError::SpawnFailed(e) - } - })?; - - // Create output channel - let (tx, rx) = mpsc::channel(1000); - - // Take stdout, stderr, and stdin - // With --input-format=stream-json, we keep stdin open for sending messages - let stdin = child.stdin.take(); - let stdin = Arc::new(Mutex::new(stdin)); - - let stdout = child.stdout.take().expect("stdout should be piped"); - let stderr = child.stderr.take().expect("stderr should be piped"); - - // Spawn task to read stdout - let tx_stdout = tx.clone(); - tokio::spawn(async move { - use tokio::io::AsyncReadExt; - let mut reader = BufReader::new(stdout); - let mut buffer = vec![0u8; 4096]; - let mut line_buffer = String::new(); - - loop { - // Try to read with a timeout to detect if we're stuck - match tokio::time::timeout( - tokio::time::Duration::from_secs(5), - reader.read(&mut buffer) - ).await { - Ok(Ok(0)) => { - // EOF - tracing::debug!("Claude stdout EOF"); - // Send any remaining content - if !line_buffer.is_empty() { - let _ = tx_stdout.send(OutputLine::stdout(line_buffer)).await; - } - break; - } - Ok(Ok(n)) => { - let chunk = String::from_utf8_lossy(&buffer[..n]); - tracing::debug!(bytes = n, chunk_preview = %if chunk.len() > 100 { &chunk[..100] } else { &chunk }, "Got stdout chunk from Claude"); - - // Accumulate into line buffer and emit complete lines - line_buffer.push_str(&chunk); - while let Some(newline_pos) = line_buffer.find('\n') { - let line = line_buffer[..newline_pos].to_string(); - line_buffer = line_buffer[newline_pos + 1..].to_string(); - if tx_stdout.send(OutputLine::stdout(line)).await.is_err() { - return; - } - } - } - Ok(Err(e)) => { - tracing::error!(error = %e, "Error reading Claude stdout"); - break; - } - Err(_) => { - // Timeout - no data for 5 seconds - tracing::warn!("No stdout data from Claude for 5 seconds"); - } - } - } - tracing::debug!("Claude stdout reader task ended"); - }); - - // Spawn task to read stderr - let tx_stderr = tx; - tokio::spawn(async move { - let reader = BufReader::new(stderr); - let mut lines = reader.lines(); - while let Ok(Some(line)) = lines.next_line().await { - tracing::debug!(line = %line, "Claude stderr"); - if tx_stderr.send(OutputLine::stderr(line)).await.is_err() { - break; - } - } - tracing::debug!("Claude stderr reader task ended"); - }); - - tracing::info!("Claude Code process spawned successfully"); - - let process = ClaudeProcess { - child, - output_rx: rx, - stdin, - }; - - // Send the initial plan as the first user message - tracing::info!(plan_len = plan.len(), "Sending initial plan to Claude via stdin"); - process.send_user_message(plan).await?; - - Ok(process) - } - - /// Check if the claude command is available. - pub async fn check_claude_available(&self) -> Result<String, ClaudeProcessError> { - let output = Command::new(&self.claude_command) - .arg("--version") - .output() - .await - .map_err(|e| { - if e.kind() == std::io::ErrorKind::NotFound { - ClaudeProcessError::CommandNotFound(self.claude_command.clone()) - } else { - ClaudeProcessError::SpawnFailed(e) - } - })?; - - if output.status.success() { - Ok(String::from_utf8_lossy(&output.stdout).trim().to_string()) - } else { - Err(ClaudeProcessError::CommandNotFound( - self.claude_command.clone(), - )) - } - } -} - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn test_extract_json_type() { - assert_eq!( - extract_json_type(r#"{"type":"system","subtype":"init"}"#), - Some("system".to_string()) - ); - assert_eq!( - extract_json_type(r#"{"type":"assistant","message":{}}"#), - Some("assistant".to_string()) - ); - assert_eq!(extract_json_type("not json"), None); - assert_eq!(extract_json_type(r#"{"no_type": true}"#), None); - } - - #[test] - fn test_output_line_creation() { - let line = OutputLine::stdout(r#"{"type":"result"}"#.to_string()); - assert!(line.is_stdout); - assert_eq!(line.json_type, Some("result".to_string())); - - let line = OutputLine::stderr("error message".to_string()); - assert!(!line.is_stdout); - assert_eq!(line.json_type, None); - } -} diff --git a/makima/daemon/src/process/claude_protocol.rs b/makima/daemon/src/process/claude_protocol.rs deleted file mode 100644 index 930152b..0000000 --- a/makima/daemon/src/process/claude_protocol.rs +++ /dev/null @@ -1,59 +0,0 @@ -//! Claude Code JSON protocol types for stdin communication. -//! -//! When using `--input-format=stream-json`, Claude Code expects -//! newline-delimited JSON messages on stdin. - -use serde::Serialize; - -/// Message sent to Claude Code via stdin. -/// -/// Format based on Claude Code's stream-json input protocol. -#[derive(Debug, Clone, Serialize)] -#[serde(tag = "type", rename_all = "snake_case")] -pub enum ClaudeInputMessage { - /// A user message to send to Claude. - User { message: UserMessage }, -} - -/// The inner user message structure. -#[derive(Debug, Clone, Serialize)] -pub struct UserMessage { - /// Always "user" for user messages. - pub role: String, - /// The message content. - pub content: String, -} - -impl ClaudeInputMessage { - /// Create a new user message. - pub fn user(content: impl Into<String>) -> Self { - Self::User { - message: UserMessage { - role: "user".to_string(), - content: content.into(), - }, - } - } - - /// Serialize to a JSON string with trailing newline (NDJSON format). - pub fn to_json_line(&self) -> Result<String, serde_json::Error> { - let mut json = serde_json::to_string(self)?; - json.push('\n'); - Ok(json) - } -} - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn test_user_message_serialization() { - let msg = ClaudeInputMessage::user("Hello, Claude!"); - let json = msg.to_json_line().unwrap(); - - // Should produce: {"type":"user","message":{"role":"user","content":"Hello, Claude!"}}\n - assert!(json.starts_with(r#"{"type":"user","message":{"role":"user","content":"Hello, Claude!"}}"#)); - assert!(json.ends_with('\n')); - } -} diff --git a/makima/daemon/src/process/mod.rs b/makima/daemon/src/process/mod.rs deleted file mode 100644 index 814a3c5..0000000 --- a/makima/daemon/src/process/mod.rs +++ /dev/null @@ -1,10 +0,0 @@ -//! Process management for Claude Code subprocess execution. -//! -//! Spawns and manages Claude Code processes in worktree directories, -//! streaming JSON output back to the daemon. - -mod claude; -mod claude_protocol; - -pub use claude::{ClaudeProcess, ClaudeProcessError, OutputLine, ProcessManager}; -pub use claude_protocol::ClaudeInputMessage; diff --git a/makima/daemon/src/task/manager.rs b/makima/daemon/src/task/manager.rs deleted file mode 100644 index 4979ce7..0000000 --- a/makima/daemon/src/task/manager.rs +++ /dev/null @@ -1,2248 +0,0 @@ -//! Task lifecycle manager using git worktrees and Claude Code subprocesses. - -use std::collections::HashMap; -use std::path::PathBuf; -use std::sync::Arc; -use std::time::Instant; - -use rand::Rng; -use tokio::io::AsyncWriteExt; -use tokio::sync::{mpsc, RwLock, Semaphore}; -use uuid::Uuid; - -use std::collections::HashSet; - -use super::state::TaskState; -use crate::error::{DaemonError, TaskError, TaskResult}; -use crate::process::{ClaudeInputMessage, ProcessManager}; -use crate::temp::TempManager; -use crate::worktree::{is_new_repo_request, ConflictResolution, WorktreeInfo, WorktreeManager}; -use crate::ws::{BranchInfo, DaemonCommand, DaemonMessage}; - -/// Generate a secure random API key for orchestrator tool access. -fn generate_tool_key() -> String { - let mut rng = rand::rng(); - let bytes: [u8; 32] = rng.random(); - hex::encode(bytes) -} - -/// System prompt for regular (non-orchestrator) subtasks. -/// This ensures subtasks work only within their isolated worktree directory. -const SUBTASK_SYSTEM_PROMPT: &str = r#"You are working in an isolated worktree directory that contains a snapshot of the codebase. - -## IMPORTANT: Directory Restrictions - -**You MUST only work within the current working directory (your worktree).** - -- DO NOT use `cd` to navigate to directories outside your worktree -- DO NOT use absolute paths that point outside your worktree (e.g., don't write to ~/some/path, /tmp, or the original repository) -- DO NOT modify files in parent directories or sibling directories -- All your file operations should be relative to the current directory - -Your working directory is your sandboxed workspace. When you complete your task, your changes will be reviewed and integrated by the orchestrator. - -**Why?** Your worktree is isolated so that: -1. Your changes don't affect other running tasks -2. Changes can be reviewed before integration -3. Multiple tasks can work on the codebase in parallel without conflicts - ---- - -"#; - -/// The orchestrator system prompt that tells Claude how to use the helper script. -const ORCHESTRATOR_SYSTEM_PROMPT: &str = r#"You are an orchestrator task. Your job is to coordinate subtasks and integrate their work, NOT to write code directly. - -## FIRST STEP - -Start by checking if you have existing subtasks: - -```bash -# List all subtasks to see what work needs to be done -./.makima/orchestrate.sh list -``` - -If subtasks exist, start them. If you need additional subtasks or no subtasks exist yet, you can create them. - ---- - -## Creating Subtasks - -You can create new subtasks to break down work: - -```bash -# Create a new subtask with a name and plan -./.makima/orchestrate.sh create "Subtask Name" "Detailed plan for what the subtask should do..." - -# The command returns the new subtask ID - use it to start the subtask -./.makima/orchestrate.sh start <new_subtask_id> -``` - -Create subtasks when you need to: -- Break down complex work into smaller pieces -- Run multiple tasks in parallel on different parts of the codebase -- Delegate specific implementation work - -## Task Continuation (Sequential Dependencies) - -When subtasks need to build on each other's work (e.g., Task B depends on Task A's changes), use `--continue-from`: - -```bash -# Create Task B that continues from Task A's worktree -./.makima/orchestrate.sh create "Task B" "Build on Task A's work..." --continue-from <task_a_id> -``` - -This copies all files from Task A's worktree into Task B's worktree, so Task B starts with Task A's changes. - -**When to use continuation:** -- Sequential work: Task B needs Task A's output files -- Staged implementation: Building features incrementally -- Fix-and-extend: One task fixes issues, another adds features on top - -**When NOT to use continuation:** -- Parallel tasks working on different files -- Independent subtasks that can be merged separately - -**Important for merging:** When tasks continue from each other, only merge the FINAL task in the chain. Earlier tasks' changes are already included in later tasks' worktrees. - -## Sharing Files with Subtasks - -Use `--files` to copy specific files from your orchestrator worktree to subtasks. This is useful for sharing plans, configs, or data files: - -```bash -# Create subtask with specific files copied from orchestrator -./.makima/orchestrate.sh create "Implement Feature" "Follow the plan in PLAN.md" --files "PLAN.md" - -# Copy multiple files (comma-separated) -./.makima/orchestrate.sh create "API Work" "Use the spec..." --files "PLAN.md,api-spec.yaml,types.ts" - -# Combine with --continue-from to share files AND continue from another task -./.makima/orchestrate.sh create "Step 2" "Continue..." --continue-from <task_a_id> --files "requirements.md" -``` - -**Use cases for --files:** -- Share a PLAN.md with detailed implementation steps -- Distribute configuration or spec files -- Pass generated data or intermediate results - -## How Subtasks Work - -Each subtask runs in its own **worktree** - a separate directory with a copy of the codebase. When subtasks complete: -- Their work remains in the worktree files (NOT committed to git) -- **Subtasks do NOT auto-merge** - YOU must integrate their work into your worktree -- You can view and copy files from subtask worktrees using their paths -- The worktree path is returned when you get subtask status - -**IMPORTANT:** Subtasks never create PRs or merge to the target repository. Only the orchestrator (you) can trigger completion actions like PR creation or merging after integrating all subtask work. - -## Subtask Commands -```bash -# List all subtasks and their current status -./.makima/orchestrate.sh list - -# Create a new subtask (returns the subtask ID) -./.makima/orchestrate.sh create "Name" "Plan/description" - -# Create a subtask that continues from another task's worktree -./.makima/orchestrate.sh create "Name" "Plan" --continue-from <other_task_id> - -# Create a subtask with specific files copied from orchestrator worktree -./.makima/orchestrate.sh create "Name" "Plan" --files "file1.md,file2.yaml" - -# Start a specific subtask (it will run in its own Claude instance) -./.makima/orchestrate.sh start <subtask_id> - -# Stop a running subtask -./.makima/orchestrate.sh stop <subtask_id> - -# Get detailed status of a subtask (includes worktree_path when available) -./.makima/orchestrate.sh status <subtask_id> - -# Get the output/logs of a subtask -./.makima/orchestrate.sh output <subtask_id> - -# Get the worktree path for a subtask -./.makima/orchestrate.sh worktree <subtask_id> -``` - -## Integrating Subtask Work - -When subtasks complete, their changes exist as files in their worktree directories: -- Files are NOT committed to git branches -- You must copy/integrate files from subtask worktrees into your worktree -- Use standard file operations (cp, cat, etc.) to review and integrate changes - -### Handling Continuation Chains - -**CRITICAL:** When subtasks use `--continue-from`, they form a chain where each task includes all changes from previous tasks. You must ONLY integrate the FINAL task in each chain. - -Example chain: Task A → Task B (continues from A) → Task C (continues from B) -- Task C's worktree contains ALL changes from A, B, and C -- You should ONLY integrate Task C's worktree -- DO NOT integrate Task A or Task B separately (their changes are already in C) - -**How to track continuation chains:** -1. When you create tasks with `--continue-from`, note which task continues from which -2. Build a mental model: Independent tasks (no continuation) + Continuation chains -3. For each chain, only integrate the LAST task in the chain - -**Example with mixed independent and chained tasks:** -``` -Independent tasks (integrate all): -- Task X: API endpoints -- Task Y: Database models - -Continuation chain (integrate ONLY the last one): -- Task A: Core feature → Task B: Tests (continues from A) → Task C: Docs (continues from B) - Only integrate Task C! -``` - -### Integration Examples - -For independent subtasks (no continuation): -```bash -# Get the worktree path for a completed subtask -SUBTASK_PATH=$(./.makima/orchestrate.sh worktree <subtask_id>) - -# View what files were changed -ls -la "$SUBTASK_PATH" -diff -r . "$SUBTASK_PATH" --exclude=.git --exclude=.makima - -# Copy specific files from subtask -cp "$SUBTASK_PATH/src/new_file.rs" ./src/ -cp "$SUBTASK_PATH/src/modified_file.rs" ./src/ - -# Or use diff/patch for more control -diff -u ./src/file.rs "$SUBTASK_PATH/src/file.rs" > changes.patch -patch -p0 < changes.patch -``` - -For continuation chains (only integrate the final task): -```bash -# If you have: Task A → Task B → Task C (each continues from previous) -# ONLY get and integrate Task C's worktree - it has everything! - -FINAL_TASK_PATH=$(./.makima/orchestrate.sh worktree <task_c_id>) - -# Copy all changes from the final task -rsync -av --exclude='.git' --exclude='.makima' "$FINAL_TASK_PATH/" ./ -``` - -## Completion -```bash -# Mark yourself as complete after integrating all subtask work -./.makima/orchestrate.sh done "Summary of what was accomplished" -``` - -## Workflow -1. **List existing subtasks**: Run `list` to see current subtasks -2. **Create subtasks if needed**: Use `create` to add new subtasks for the work - - For independent parallel work: create without `--continue-from` - - For sequential dependencies: use `--continue-from <previous_task_id>` - - Track which tasks continue from which (continuation chains) -3. **Start subtasks**: Run `start` for each subtask -4. **Monitor progress**: Check status and output as subtasks run -5. **Integrate work**: When subtasks complete: - - For independent tasks: integrate each one's worktree - - For continuation chains: ONLY integrate the FINAL task (it has all changes) - - Get worktree path with `worktree <subtask_id>` - - Copy or merge files into your worktree -6. **Complete**: Call `done` once all work is integrated - -## Important Notes -- Subtask files are in worktrees, NOT committed git branches -- **Subtasks do NOT auto-merge or create PRs** - you must integrate their work -- You can read files from subtask worktrees using their paths -- Use standard file tools (cp, diff, cat, rsync) to integrate changes -- You should NOT edit files directly - that's what subtasks are for -- DO NOT DO THE SUBTASKS' WORK! Your only job is to coordinate, not implement. -- When you call `done`, YOUR worktree may be used for the final PR/merge -"#; - -/// Content of the helper bash script that orchestrators use to call the API. -const ORCHESTRATE_SCRIPT: &str = r#"#!/bin/bash -# Makima Orchestrator Helper Script -# Usage: ./orchestrate.sh <command> [args...] - -API_URL="${MAKIMA_API_URL:-http://localhost:8080}" -API_KEY="${MAKIMA_API_KEY}" -TASK_ID="${MAKIMA_TASK_ID}" - -if [ -z "$API_KEY" ]; then - echo "Error: MAKIMA_API_KEY not set" >&2 - exit 1 -fi - -if [ -z "$TASK_ID" ]; then - echo "Error: MAKIMA_TASK_ID not set" >&2 - exit 1 -fi - -# Helper function to make API calls and check for errors -api_call() { - local method="$1" - local url="$2" - local data="$3" - local response - local http_code - - if [ -n "$data" ]; then - response=$(curl -s -w "\n%{http_code}" -X "$method" \ - -H "X-Makima-Tool-Key: $API_KEY" \ - -H "Content-Type: application/json" \ - -d "$data" \ - "$url") - else - response=$(curl -s -w "\n%{http_code}" -X "$method" \ - -H "X-Makima-Tool-Key: $API_KEY" \ - "$url") - fi - - # Extract HTTP code (last line) and body (everything else) - http_code=$(echo "$response" | tail -n1) - body=$(echo "$response" | sed '$d') - - # Check for curl errors or non-2xx status - if [ "$http_code" -lt 200 ] || [ "$http_code" -ge 300 ]; then - echo "Error: API request failed with HTTP $http_code" >&2 - echo "URL: $url" >&2 - echo "Response: $body" >&2 - echo "$body" - return 1 - fi - - echo "$body" - return 0 -} - -case "$1" in - list) - api_call GET "$API_URL/api/v1/mesh/tasks/$TASK_ID/subtasks" - ;; - create) - # Parse arguments: create "name" "plan" [--continue-from <task_id>] [--files "file1,file2"] - if [ -z "$2" ] || [ -z "$3" ]; then - echo "Usage: $0 create \"<name>\" \"<plan>\" [--continue-from <task_id>] [--files \"file1,file2\"]" >&2 - exit 1 - fi - NAME="$2" - PLAN="$3" - CONTINUE_FROM="" - COPY_FILES="" - - # Parse optional flags (can be in any order after name and plan) - shift 3 - while [ $# -gt 0 ]; do - case "$1" in - --continue-from) - CONTINUE_FROM="$2" - shift 2 - ;; - --files) - COPY_FILES="$2" - shift 2 - ;; - *) - echo "Unknown option: $1" >&2 - exit 1 - ;; - esac - done - - # Escape quotes in name and plan for JSON - NAME_ESCAPED=$(echo "$NAME" | sed 's/"/\\"/g' | sed 's/\\/\\\\/g') - PLAN_ESCAPED=$(echo "$PLAN" | sed 's/"/\\"/g' | sed 's/\\/\\\\/g') - - # Build JSON body - JSON_BODY="{\"name\":\"$NAME_ESCAPED\",\"plan\":\"$PLAN_ESCAPED\",\"parentTaskId\":\"$TASK_ID\"" - - if [ -n "$CONTINUE_FROM" ]; then - echo "Creating subtask: $NAME (continuing from $CONTINUE_FROM)..." >&2 - JSON_BODY="$JSON_BODY,\"continueFromTaskId\":\"$CONTINUE_FROM\"" - else - echo "Creating subtask: $NAME..." >&2 - fi - - if [ -n "$COPY_FILES" ]; then - # Convert comma-separated file list to JSON array - FILES_JSON="[" - first=true - IFS=',' read -ra FILE_ARRAY <<< "$COPY_FILES" - for file in "${FILE_ARRAY[@]}"; do - file=$(echo "$file" | xargs) # trim whitespace - if [ "$first" = true ]; then - FILES_JSON="$FILES_JSON\"$file\"" - first=false - else - FILES_JSON="$FILES_JSON,\"$file\"" - fi - done - FILES_JSON="$FILES_JSON]" - JSON_BODY="$JSON_BODY,\"copyFiles\":$FILES_JSON" - echo " (copying files: $COPY_FILES)" >&2 - fi - - JSON_BODY="$JSON_BODY}" - api_call POST "$API_URL/api/v1/mesh/tasks" "$JSON_BODY" - ;; - start) - if [ -z "$2" ]; then - echo "Usage: $0 start <subtask_id>" >&2 - exit 1 - fi - echo "Starting subtask $2..." >&2 - api_call POST "$API_URL/api/v1/mesh/tasks/$2/start" - ;; - stop) - if [ -z "$2" ]; then - echo "Usage: $0 stop <subtask_id>" >&2 - exit 1 - fi - api_call POST "$API_URL/api/v1/mesh/tasks/$2/stop" - ;; - status) - if [ -z "$2" ]; then - echo "Usage: $0 status <subtask_id>" >&2 - exit 1 - fi - api_call GET "$API_URL/api/v1/mesh/tasks/$2" - ;; - output) - if [ -z "$2" ]; then - echo "Usage: $0 output <subtask_id>" >&2 - exit 1 - fi - api_call GET "$API_URL/api/v1/mesh/tasks/$2/output" - ;; - worktree) - if [ -z "$2" ]; then - echo "Usage: $0 worktree <subtask_id>" >&2 - exit 1 - fi - # Get the worktree path from the task's overlayPath field via API - TASK_JSON=$(api_call GET "$API_URL/api/v1/mesh/tasks/$2") - if [ $? -ne 0 ]; then - echo "Error: Failed to get task info" >&2 - exit 1 - fi - WORKTREE_PATH=$(echo "$TASK_JSON" | grep -o '"overlayPath":"[^"]*"' | cut -d'"' -f4) - if [ -z "$WORKTREE_PATH" ]; then - echo "Error: Task has no worktree path (may not have started yet)" >&2 - exit 1 - fi - if [ -d "$WORKTREE_PATH" ]; then - echo "$WORKTREE_PATH" - else - echo "Error: Worktree not found at $WORKTREE_PATH" >&2 - echo "The worktree may have been cleaned up." >&2 - exit 1 - fi - ;; - done) - SUMMARY="${2:-Task completed}" - api_call PUT "$API_URL/api/v1/mesh/tasks/$TASK_ID" "{\"status\":\"done\",\"progressSummary\":\"$SUMMARY\"}" - ;; - *) - echo "Makima Orchestrator Helper" - echo "" - echo "Usage: $0 <command> [args...]" - echo "" - echo "Subtask Commands:" - echo " list List all subtasks and their status" - echo " create \"<name>\" \"<plan>\" Create a new subtask" - echo " create \"...\" --continue-from ID Create subtask continuing from another task's worktree" - echo " create \"...\" --files \"file1,file2\" Copy specific files from parent (orchestrator) worktree" - echo " start <subtask_id> Start a subtask" - echo " stop <subtask_id> Stop a running subtask" - echo " status <subtask_id> Get detailed subtask status" - echo " output <subtask_id> Get subtask output history" - echo " worktree <subtask_id> Get path to subtask's worktree" - echo "" - echo "Completion:" - echo " done [summary] Mark orchestrator as complete" - echo "" - echo "Examples:" - echo " create \"Fix bug\" \"Fix the null check bug\" --files \"PLAN.md\"" - echo " create \"Step 2\" \"Continue work\" --continue-from abc123 --files \"shared.rs,types.rs\"" - ;; -esac -"#; - -/// Tracks merge state for an orchestrator task. -#[derive(Default)] -struct MergeTracker { - /// Subtask branches that have been successfully merged. - merged_subtasks: HashSet<Uuid>, - /// Subtask branches that were explicitly skipped (with reason). - skipped_subtasks: HashMap<Uuid, String>, -} - -/// Managed task information. -pub struct ManagedTask { - /// Task ID. - pub id: Uuid, - /// Current state. - pub state: TaskState, - /// Worktree info if created. - pub worktree: Option<WorktreeInfo>, - /// Task plan. - pub plan: String, - /// Repository URL or path. - pub repo_source: Option<String>, - /// Base branch. - pub base_branch: Option<String>, - /// Target branch to merge into. - pub target_branch: Option<String>, - /// Parent task ID if this is a subtask. - pub parent_task_id: Option<Uuid>, - /// Depth in task hierarchy (0=top-level, 1=subtask, 2=sub-subtask). - pub depth: i32, - /// Whether this task runs as an orchestrator (coordinates subtasks). - pub is_orchestrator: bool, - /// Path to target repository for completion actions. - pub target_repo_path: Option<String>, - /// Completion action: "none", "branch", "merge", "pr". - pub completion_action: Option<String>, - /// Task ID to continue from (copy worktree from this task). - pub continue_from_task_id: Option<Uuid>, - /// Files to copy from parent task's worktree. - pub copy_files: Option<Vec<String>>, - /// Time task was created. - pub created_at: Instant, - /// Time task started running. - pub started_at: Option<Instant>, - /// Time task completed. - pub completed_at: Option<Instant>, - /// Error message if failed. - pub error: Option<String>, -} - -/// Configuration for task execution. -#[derive(Clone)] -pub struct TaskConfig { - /// Maximum concurrent tasks. - pub max_concurrent_tasks: u32, - /// Base directory for worktrees. - pub worktree_base_dir: PathBuf, - /// Environment variables to pass to Claude. - pub env_vars: HashMap<String, String>, - /// Claude command path. - pub claude_command: String, - /// Additional arguments to pass to Claude Code. - pub claude_args: Vec<String>, - /// Arguments to pass before defaults. - pub claude_pre_args: Vec<String>, - /// Enable Claude's permission system. - pub enable_permissions: bool, - /// Disable verbose output. - pub disable_verbose: bool, -} - -impl Default for TaskConfig { - fn default() -> Self { - Self { - max_concurrent_tasks: 4, - worktree_base_dir: WorktreeManager::default_base_dir(), - env_vars: HashMap::new(), - claude_command: "claude".to_string(), - claude_args: Vec::new(), - claude_pre_args: Vec::new(), - enable_permissions: false, - disable_verbose: false, - } - } -} - -/// Task manager for handling task lifecycle. -pub struct TaskManager { - /// Worktree manager. - worktree_manager: Arc<WorktreeManager>, - /// Process manager. - process_manager: Arc<ProcessManager>, - /// Temp directory manager. - temp_manager: Arc<TempManager>, - /// Task configuration. - #[allow(dead_code)] - config: TaskConfig, - /// Active tasks. - tasks: Arc<RwLock<HashMap<Uuid, ManagedTask>>>, - /// Channel to send messages to server. - ws_tx: mpsc::Sender<DaemonMessage>, - /// Semaphore for limiting concurrent tasks. - semaphore: Arc<Semaphore>, - /// Channels for sending input to running tasks. - /// Each sender allows sending messages to the stdin of a running Claude process. - task_inputs: Arc<RwLock<HashMap<Uuid, mpsc::Sender<String>>>>, - /// Tracks merge state per orchestrator task (for completion gate). - merge_trackers: Arc<RwLock<HashMap<Uuid, MergeTracker>>>, -} - -impl TaskManager { - /// Create a new task manager. - pub fn new(config: TaskConfig, ws_tx: mpsc::Sender<DaemonMessage>) -> Self { - let max_concurrent = config.max_concurrent_tasks as usize; - let worktree_manager = Arc::new(WorktreeManager::new(config.worktree_base_dir.clone())); - let process_manager = Arc::new( - ProcessManager::with_command(config.claude_command.clone()) - .with_args(config.claude_args.clone()) - .with_pre_args(config.claude_pre_args.clone()) - .with_permissions_enabled(config.enable_permissions) - .with_verbose_disabled(config.disable_verbose) - .with_env(config.env_vars.clone()), - ); - let temp_manager = Arc::new(TempManager::new()); - - Self { - worktree_manager, - process_manager, - temp_manager, - config, - tasks: Arc::new(RwLock::new(HashMap::new())), - ws_tx, - semaphore: Arc::new(Semaphore::new(max_concurrent)), - task_inputs: Arc::new(RwLock::new(HashMap::new())), - merge_trackers: Arc::new(RwLock::new(HashMap::new())), - } - } - - /// Handle a command from the server. - pub async fn handle_command(&self, command: DaemonCommand) -> Result<(), DaemonError> { - tracing::info!("Received command from server: {:?}", command); - - match command { - DaemonCommand::SpawnTask { - task_id, - task_name, - plan, - repo_url, - base_branch, - target_branch, - parent_task_id, - depth, - is_orchestrator, - target_repo_path, - completion_action, - continue_from_task_id, - copy_files, - } => { - tracing::info!( - task_id = %task_id, - task_name = %task_name, - repo_url = ?repo_url, - base_branch = ?base_branch, - target_branch = ?target_branch, - parent_task_id = ?parent_task_id, - depth = depth, - is_orchestrator = is_orchestrator, - target_repo_path = ?target_repo_path, - completion_action = ?completion_action, - continue_from_task_id = ?continue_from_task_id, - copy_files = ?copy_files, - plan_len = plan.len(), - "Spawning new task" - ); - self.spawn_task( - task_id, task_name, plan, repo_url, base_branch, target_branch, - parent_task_id, depth, is_orchestrator, - target_repo_path, completion_action, continue_from_task_id, - copy_files - ).await?; - } - DaemonCommand::PauseTask { task_id } => { - tracing::info!(task_id = %task_id, "Pause not supported for subprocess tasks"); - // Subprocesses don't support pause, just log and ignore - } - DaemonCommand::ResumeTask { task_id } => { - tracing::info!(task_id = %task_id, "Resume not supported for subprocess tasks"); - // Subprocesses don't support resume, just log and ignore - } - DaemonCommand::InterruptTask { task_id, graceful: _ } => { - tracing::info!(task_id = %task_id, "Interrupting task"); - self.interrupt_task(task_id).await?; - } - DaemonCommand::SendMessage { task_id, message } => { - tracing::info!(task_id = %task_id, message_len = message.len(), "Sending message to task"); - // Send message to the task's stdin via the input channel - let inputs = self.task_inputs.read().await; - if let Some(sender) = inputs.get(&task_id) { - if let Err(e) = sender.send(message).await { - tracing::warn!(task_id = %task_id, error = %e, "Failed to send message to task input channel"); - } else { - tracing::info!(task_id = %task_id, "Message sent to task successfully"); - } - } else { - tracing::warn!(task_id = %task_id, "No input channel for task (task may not be running)"); - } - } - DaemonCommand::InjectSiblingContext { task_id, .. } => { - tracing::debug!(task_id = %task_id, "Sibling context injection not supported for subprocess tasks"); - } - DaemonCommand::Authenticated { daemon_id } => { - tracing::debug!(daemon_id = %daemon_id, "Authenticated command (handled by WS client)"); - } - DaemonCommand::Error { code, message } => { - tracing::warn!(code = %code, message = %message, "Error command from server"); - } - - // ========================================================================= - // Merge Commands - // ========================================================================= - - DaemonCommand::ListBranches { task_id } => { - tracing::info!(task_id = %task_id, "Listing task branches"); - self.handle_list_branches(task_id).await?; - } - DaemonCommand::MergeStart { task_id, source_branch } => { - tracing::info!(task_id = %task_id, source_branch = %source_branch, "Starting merge"); - self.handle_merge_start(task_id, source_branch).await?; - } - DaemonCommand::MergeStatus { task_id } => { - tracing::info!(task_id = %task_id, "Getting merge status"); - self.handle_merge_status(task_id).await?; - } - DaemonCommand::MergeResolve { task_id, file, strategy } => { - tracing::info!(task_id = %task_id, file = %file, strategy = %strategy, "Resolving conflict"); - self.handle_merge_resolve(task_id, file, strategy).await?; - } - DaemonCommand::MergeCommit { task_id, message } => { - tracing::info!(task_id = %task_id, "Committing merge"); - self.handle_merge_commit(task_id, message).await?; - } - DaemonCommand::MergeAbort { task_id } => { - tracing::info!(task_id = %task_id, "Aborting merge"); - self.handle_merge_abort(task_id).await?; - } - DaemonCommand::MergeSkip { task_id, subtask_id, reason } => { - tracing::info!(task_id = %task_id, subtask_id = %subtask_id, reason = %reason, "Skipping subtask merge"); - self.handle_merge_skip(task_id, subtask_id, reason).await?; - } - DaemonCommand::CheckMergeComplete { task_id } => { - tracing::info!(task_id = %task_id, "Checking merge completion"); - self.handle_check_merge_complete(task_id).await?; - } - - // ========================================================================= - // Completion Action Commands - // ========================================================================= - - DaemonCommand::RetryCompletionAction { - task_id, - task_name, - action, - target_repo_path, - target_branch, - } => { - tracing::info!( - task_id = %task_id, - task_name = %task_name, - action = %action, - target_repo_path = %target_repo_path, - target_branch = ?target_branch, - "Retrying completion action" - ); - self.handle_retry_completion_action(task_id, task_name, action, target_repo_path, target_branch).await?; - } - - DaemonCommand::CloneWorktree { task_id, target_dir } => { - tracing::info!( - task_id = %task_id, - target_dir = %target_dir, - "Cloning worktree to target directory" - ); - self.handle_clone_worktree(task_id, target_dir).await?; - } - - DaemonCommand::CheckTargetExists { task_id, target_dir } => { - tracing::debug!( - task_id = %task_id, - target_dir = %target_dir, - "Checking if target directory exists" - ); - self.handle_check_target_exists(task_id, target_dir).await?; - } - } - Ok(()) - } - - /// Spawn a new task. - #[allow(clippy::too_many_arguments)] - pub async fn spawn_task( - &self, - task_id: Uuid, - task_name: String, - plan: String, - repo_url: Option<String>, - base_branch: Option<String>, - target_branch: Option<String>, - parent_task_id: Option<Uuid>, - depth: i32, - is_orchestrator: bool, - target_repo_path: Option<String>, - completion_action: Option<String>, - continue_from_task_id: Option<Uuid>, - copy_files: Option<Vec<String>>, - ) -> TaskResult<()> { - tracing::info!(task_id = %task_id, is_orchestrator = is_orchestrator, depth = depth, "=== SPAWN_TASK START ==="); - - // Check if task already exists - allow re-spawning if in terminal state - { - let mut tasks = self.tasks.write().await; - if let Some(existing) = tasks.get(&task_id) { - if existing.state.is_terminal() { - // Task exists but is in terminal state (completed, failed, interrupted) - // Remove it so we can re-spawn - tracing::info!(task_id = %task_id, old_state = ?existing.state, "Removing terminated task to allow re-spawn"); - tasks.remove(&task_id); - } else { - // Task is still active, reject - tracing::warn!(task_id = %task_id, state = ?existing.state, "Task already exists and is active, rejecting spawn"); - return Err(TaskError::AlreadyExists(task_id)); - } - } - } - - // Acquire semaphore permit - tracing::info!(task_id = %task_id, "Acquiring concurrency permit..."); - let permit = self - .semaphore - .clone() - .try_acquire_owned() - .map_err(|_| { - tracing::warn!(task_id = %task_id, "Concurrency limit reached, cannot spawn task"); - TaskError::ConcurrencyLimit - })?; - tracing::info!(task_id = %task_id, "Concurrency permit acquired"); - - // Create task entry - tracing::info!(task_id = %task_id, "Creating task entry in state: Initializing"); - let task = ManagedTask { - id: task_id, - state: TaskState::Initializing, - worktree: None, - plan: plan.clone(), - repo_source: repo_url.clone(), - base_branch: base_branch.clone(), - target_branch: target_branch.clone(), - parent_task_id, - depth, - is_orchestrator, - target_repo_path: target_repo_path.clone(), - completion_action: completion_action.clone(), - continue_from_task_id, - copy_files: copy_files.clone(), - created_at: Instant::now(), - started_at: None, - completed_at: None, - error: None, - }; - - self.tasks.write().await.insert(task_id, task); - tracing::info!(task_id = %task_id, "Task entry created and stored"); - - // Notify server of status change - tracing::info!(task_id = %task_id, "Notifying server: pending -> initializing"); - self.send_status_change(task_id, "pending", "initializing").await; - - // Spawn task in background - tracing::info!(task_id = %task_id, "Spawning background task runner"); - let inner = self.clone_inner(); - tokio::spawn(async move { - let _permit = permit; // Hold permit until done - tracing::info!(task_id = %task_id, "Background task runner started"); - - if let Err(e) = inner.run_task( - task_id, task_name, plan, repo_url, base_branch, target_branch, - is_orchestrator, target_repo_path, completion_action, - continue_from_task_id, copy_files - ).await { - tracing::error!(task_id = %task_id, error = %e, "Task execution failed"); - inner.mark_failed(task_id, &e.to_string()).await; - } - tracing::info!(task_id = %task_id, "Background task runner completed"); - }); - - tracing::info!(task_id = %task_id, "=== SPAWN_TASK END (task running in background) ==="); - Ok(()) - } - - /// Clone inner state for spawned tasks. - fn clone_inner(&self) -> TaskManagerInner { - TaskManagerInner { - worktree_manager: self.worktree_manager.clone(), - process_manager: self.process_manager.clone(), - temp_manager: self.temp_manager.clone(), - tasks: self.tasks.clone(), - ws_tx: self.ws_tx.clone(), - task_inputs: self.task_inputs.clone(), - } - } - - /// Interrupt a task. - pub async fn interrupt_task(&self, task_id: Uuid) -> TaskResult<()> { - let mut tasks = self.tasks.write().await; - let task = tasks.get_mut(&task_id).ok_or(TaskError::NotFound(task_id))?; - - if task.state.is_terminal() { - return Ok(()); // Already done - } - - let old_state = task.state; - task.state = TaskState::Interrupted; - task.completed_at = Some(Instant::now()); - - // Notify server - drop(tasks); - self.send_status_change(task_id, old_state.as_str(), "interrupted").await; - - // Note: The process will be killed when the ClaudeProcess is dropped - // Worktrees are kept until explicitly deleted - - Ok(()) - } - - /// Get list of active task IDs. - pub async fn active_task_ids(&self) -> Vec<Uuid> { - self.tasks - .read() - .await - .iter() - .filter(|(_, t)| t.state.is_active()) - .map(|(id, _)| *id) - .collect() - } - - /// Get task state. - pub async fn get_task_state(&self, task_id: Uuid) -> Option<TaskState> { - self.tasks.read().await.get(&task_id).map(|t| t.state) - } - - /// Send status change notification to server. - async fn send_status_change(&self, task_id: Uuid, old_status: &str, new_status: &str) { - let msg = DaemonMessage::task_status_change(task_id, old_status, new_status); - let _ = self.ws_tx.send(msg).await; - } - - // ========================================================================= - // Merge Handler Methods - // ========================================================================= - - /// Get worktree path for a task, or return error if not found. - /// First checks in-memory tasks, then scans the worktrees directory. - async fn get_task_worktree_path(&self, task_id: Uuid) -> Result<std::path::PathBuf, DaemonError> { - // First try to get from in-memory tasks - { - let tasks = self.tasks.read().await; - if let Some(task) = tasks.get(&task_id) { - if let Some(ref worktree) = task.worktree { - return Ok(worktree.path.clone()); - } - } - } - - // Task not in memory - scan worktrees directory for matching task ID - let short_id = &task_id.to_string()[..8]; - let worktrees_dir = self.worktree_manager.base_dir(); - - if let Ok(mut entries) = tokio::fs::read_dir(worktrees_dir).await { - while let Ok(Some(entry)) = entries.next_entry().await { - let name = entry.file_name(); - let name_str = name.to_string_lossy(); - if name_str.starts_with(short_id) { - let path = entry.path(); - // Verify it's a valid git directory - if path.join(".git").exists() { - tracing::info!( - task_id = %task_id, - worktree_path = %path.display(), - "Found worktree by scanning directory" - ); - return Ok(path); - } - } - } - } - - Err(DaemonError::Task(TaskError::SetupFailed( - format!("No worktree found for task {}. The worktree may have been cleaned up.", task_id) - ))) - } - - /// Handle ListBranches command. - async fn handle_list_branches(&self, task_id: Uuid) -> Result<(), DaemonError> { - let worktree_path = self.get_task_worktree_path(task_id).await?; - - match self.worktree_manager.list_task_branches(&worktree_path).await { - Ok(branches) => { - let branch_infos: Vec<BranchInfo> = branches - .into_iter() - .map(|b| BranchInfo { - name: b.name, - task_id: b.task_id, - is_merged: b.is_merged, - last_commit: b.last_commit, - last_commit_message: b.last_commit_message, - }) - .collect(); - - let msg = DaemonMessage::BranchList { - task_id, - branches: branch_infos, - }; - let _ = self.ws_tx.send(msg).await; - } - Err(e) => { - tracing::error!(task_id = %task_id, error = %e, "Failed to list branches"); - let msg = DaemonMessage::MergeResult { - task_id, - success: false, - message: e.to_string(), - commit_sha: None, - conflicts: None, - }; - let _ = self.ws_tx.send(msg).await; - } - } - Ok(()) - } - - /// Handle MergeStart command. - async fn handle_merge_start(&self, task_id: Uuid, source_branch: String) -> Result<(), DaemonError> { - let worktree_path = self.get_task_worktree_path(task_id).await?; - - match self.worktree_manager.merge_branch(&worktree_path, &source_branch).await { - Ok(None) => { - // Merge succeeded without conflicts - let msg = DaemonMessage::MergeResult { - task_id, - success: true, - message: "Merge completed without conflicts".to_string(), - commit_sha: None, - conflicts: None, - }; - let _ = self.ws_tx.send(msg).await; - } - Ok(Some(conflicts)) => { - // Merge has conflicts - let msg = DaemonMessage::MergeResult { - task_id, - success: false, - message: format!("Merge has {} conflicts", conflicts.len()), - commit_sha: None, - conflicts: Some(conflicts), - }; - let _ = self.ws_tx.send(msg).await; - } - Err(e) => { - let msg = DaemonMessage::MergeResult { - task_id, - success: false, - message: e.to_string(), - commit_sha: None, - conflicts: None, - }; - let _ = self.ws_tx.send(msg).await; - } - } - Ok(()) - } - - /// Handle MergeStatus command. - async fn handle_merge_status(&self, task_id: Uuid) -> Result<(), DaemonError> { - let worktree_path = self.get_task_worktree_path(task_id).await?; - - match self.worktree_manager.get_merge_state(&worktree_path).await { - Ok(state) => { - let msg = DaemonMessage::MergeStatusResponse { - task_id, - in_progress: state.in_progress, - source_branch: if state.in_progress { Some(state.source_branch) } else { None }, - conflicted_files: state.conflicted_files, - }; - let _ = self.ws_tx.send(msg).await; - } - Err(e) => { - tracing::error!(task_id = %task_id, error = %e, "Failed to get merge status"); - let msg = DaemonMessage::MergeStatusResponse { - task_id, - in_progress: false, - source_branch: None, - conflicted_files: vec![], - }; - let _ = self.ws_tx.send(msg).await; - } - } - Ok(()) - } - - /// Handle MergeResolve command. - async fn handle_merge_resolve(&self, task_id: Uuid, file: String, strategy: String) -> Result<(), DaemonError> { - let worktree_path = self.get_task_worktree_path(task_id).await?; - - let resolution = match strategy.to_lowercase().as_str() { - "ours" => ConflictResolution::Ours, - "theirs" => ConflictResolution::Theirs, - _ => { - let msg = DaemonMessage::MergeResult { - task_id, - success: false, - message: format!("Invalid strategy '{}', must be 'ours' or 'theirs'", strategy), - commit_sha: None, - conflicts: None, - }; - let _ = self.ws_tx.send(msg).await; - return Ok(()); - } - }; - - match self.worktree_manager.resolve_conflict(&worktree_path, &file, resolution).await { - Ok(()) => { - let msg = DaemonMessage::MergeResult { - task_id, - success: true, - message: format!("Resolved conflict in {}", file), - commit_sha: None, - conflicts: None, - }; - let _ = self.ws_tx.send(msg).await; - } - Err(e) => { - let msg = DaemonMessage::MergeResult { - task_id, - success: false, - message: e.to_string(), - commit_sha: None, - conflicts: None, - }; - let _ = self.ws_tx.send(msg).await; - } - } - Ok(()) - } - - /// Handle MergeCommit command. - async fn handle_merge_commit(&self, task_id: Uuid, message: String) -> Result<(), DaemonError> { - let worktree_path = self.get_task_worktree_path(task_id).await?; - - match self.worktree_manager.commit_merge(&worktree_path, &message).await { - Ok(commit_sha) => { - // Track this merge as completed (extract subtask ID from branch if possible) - // For now, we'll track it when MergeSkip is called or based on branch names - let msg = DaemonMessage::MergeResult { - task_id, - success: true, - message: "Merge committed successfully".to_string(), - commit_sha: Some(commit_sha), - conflicts: None, - }; - let _ = self.ws_tx.send(msg).await; - } - Err(e) => { - let msg = DaemonMessage::MergeResult { - task_id, - success: false, - message: e.to_string(), - commit_sha: None, - conflicts: None, - }; - let _ = self.ws_tx.send(msg).await; - } - } - Ok(()) - } - - /// Handle MergeAbort command. - async fn handle_merge_abort(&self, task_id: Uuid) -> Result<(), DaemonError> { - let worktree_path = self.get_task_worktree_path(task_id).await?; - - match self.worktree_manager.abort_merge(&worktree_path).await { - Ok(()) => { - let msg = DaemonMessage::MergeResult { - task_id, - success: true, - message: "Merge aborted".to_string(), - commit_sha: None, - conflicts: None, - }; - let _ = self.ws_tx.send(msg).await; - } - Err(e) => { - let msg = DaemonMessage::MergeResult { - task_id, - success: false, - message: e.to_string(), - commit_sha: None, - conflicts: None, - }; - let _ = self.ws_tx.send(msg).await; - } - } - Ok(()) - } - - /// Handle MergeSkip command. - async fn handle_merge_skip(&self, task_id: Uuid, subtask_id: Uuid, reason: String) -> Result<(), DaemonError> { - // Record that this subtask was skipped - { - let mut trackers = self.merge_trackers.write().await; - let tracker = trackers.entry(task_id).or_insert_with(MergeTracker::default); - tracker.skipped_subtasks.insert(subtask_id, reason.clone()); - } - - let msg = DaemonMessage::MergeResult { - task_id, - success: true, - message: format!("Subtask {} skipped: {}", subtask_id, reason), - commit_sha: None, - conflicts: None, - }; - let _ = self.ws_tx.send(msg).await; - Ok(()) - } - - /// Handle CheckMergeComplete command. - async fn handle_check_merge_complete(&self, task_id: Uuid) -> Result<(), DaemonError> { - let worktree_path = self.get_task_worktree_path(task_id).await?; - - // Get all task branches - let branches = match self.worktree_manager.list_task_branches(&worktree_path).await { - Ok(b) => b, - Err(e) => { - let msg = DaemonMessage::MergeCompleteCheck { - task_id, - can_complete: false, - unmerged_branches: vec![format!("Error listing branches: {}", e)], - merged_count: 0, - skipped_count: 0, - }; - let _ = self.ws_tx.send(msg).await; - return Ok(()); - } - }; - - // Get tracker state - let trackers = self.merge_trackers.read().await; - let empty_merged: HashSet<Uuid> = HashSet::new(); - let empty_skipped: HashMap<Uuid, String> = HashMap::new(); - let tracker = trackers.get(&task_id); - let merged_set = tracker.map(|t| &t.merged_subtasks).unwrap_or(&empty_merged); - let skipped_set = tracker.map(|t| &t.skipped_subtasks).unwrap_or(&empty_skipped); - - let mut merged_count = 0u32; - let mut skipped_count = 0u32; - let mut unmerged_branches = Vec::new(); - - for branch in &branches { - if branch.is_merged { - merged_count += 1; - } else if let Some(subtask_id) = branch.task_id { - if merged_set.contains(&subtask_id) { - merged_count += 1; - } else if skipped_set.contains_key(&subtask_id) { - skipped_count += 1; - } else { - unmerged_branches.push(branch.name.clone()); - } - } else { - // Branch without task ID - check if it's merged - unmerged_branches.push(branch.name.clone()); - } - } - - let can_complete = unmerged_branches.is_empty(); - - let msg = DaemonMessage::MergeCompleteCheck { - task_id, - can_complete, - unmerged_branches, - merged_count, - skipped_count, - }; - let _ = self.ws_tx.send(msg).await; - Ok(()) - } - - /// Mark a subtask as merged in the tracker. - #[allow(dead_code)] - pub async fn mark_subtask_merged(&self, orchestrator_task_id: Uuid, subtask_id: Uuid) { - let mut trackers = self.merge_trackers.write().await; - let tracker = trackers.entry(orchestrator_task_id).or_insert_with(MergeTracker::default); - tracker.merged_subtasks.insert(subtask_id); - } - - // ========================================================================= - // Completion Action Handler Methods - // ========================================================================= - - /// Handle RetryCompletionAction command. - async fn handle_retry_completion_action( - &self, - task_id: Uuid, - task_name: String, - action: String, - target_repo_path: String, - target_branch: Option<String>, - ) -> Result<(), DaemonError> { - // Get the task's worktree path - let worktree_path = self.get_task_worktree_path(task_id).await?; - - // Execute the completion action - let inner = self.clone_inner(); - let result = inner.execute_completion_action( - task_id, - &task_name, - &worktree_path, - &action, - Some(target_repo_path.as_str()), - target_branch.as_deref(), - ).await; - - // Send result back to server - let msg = match result { - Ok(pr_url) => DaemonMessage::CompletionActionResult { - task_id, - success: true, - message: match action.as_str() { - "branch" => format!("Branch pushed to {}", target_repo_path), - "merge" => format!("Merged into {}", target_branch.as_deref().unwrap_or("main")), - "pr" => format!("Pull request created"), - _ => format!("Completion action '{}' executed", action), - }, - pr_url, - }, - Err(e) => DaemonMessage::CompletionActionResult { - task_id, - success: false, - message: e, - pr_url: None, - }, - }; - let _ = self.ws_tx.send(msg).await; - Ok(()) - } - - /// Handle CloneWorktree command. - async fn handle_clone_worktree( - &self, - task_id: Uuid, - target_dir: String, - ) -> Result<(), DaemonError> { - // Get the task's worktree path - let worktree_path = self.get_task_worktree_path(task_id).await?; - - // Expand tilde in target path - let target_path = crate::worktree::expand_tilde(&target_dir); - - // Clone the worktree to target directory - let result = self.worktree_manager.clone_worktree_to_directory( - &worktree_path, - &target_path, - ).await; - - // Send result back to server - let msg = match result { - Ok(message) => DaemonMessage::CloneWorktreeResult { - task_id, - success: true, - message, - target_dir: Some(target_path.to_string_lossy().to_string()), - }, - Err(e) => DaemonMessage::CloneWorktreeResult { - task_id, - success: false, - message: e.to_string(), - target_dir: None, - }, - }; - let _ = self.ws_tx.send(msg).await; - Ok(()) - } - - /// Handle CheckTargetExists command. - async fn handle_check_target_exists( - &self, - task_id: Uuid, - target_dir: String, - ) -> Result<(), DaemonError> { - // Expand tilde in target path - let target_path = crate::worktree::expand_tilde(&target_dir); - - // Check if target exists - let exists = self.worktree_manager.target_directory_exists(&target_path).await; - - // Send result back to server - let msg = DaemonMessage::CheckTargetExistsResult { - task_id, - exists, - target_dir: target_path.to_string_lossy().to_string(), - }; - let _ = self.ws_tx.send(msg).await; - Ok(()) - } -} - -/// Inner state for spawned tasks (cloneable). -struct TaskManagerInner { - worktree_manager: Arc<WorktreeManager>, - process_manager: Arc<ProcessManager>, - temp_manager: Arc<TempManager>, - tasks: Arc<RwLock<HashMap<Uuid, ManagedTask>>>, - ws_tx: mpsc::Sender<DaemonMessage>, - task_inputs: Arc<RwLock<HashMap<Uuid, mpsc::Sender<String>>>>, -} - -impl TaskManagerInner { - /// Run a task to completion. - #[allow(clippy::too_many_arguments)] - async fn run_task( - &self, - task_id: Uuid, - task_name: String, - plan: String, - repo_source: Option<String>, - base_branch: Option<String>, - target_branch: Option<String>, - is_orchestrator: bool, - target_repo_path: Option<String>, - completion_action: Option<String>, - continue_from_task_id: Option<Uuid>, - copy_files: Option<Vec<String>>, - ) -> Result<(), DaemonError> { - tracing::info!(task_id = %task_id, is_orchestrator = is_orchestrator, "=== RUN_TASK START ==="); - - // Determine working directory - let working_dir = if let Some(ref source) = repo_source { - if is_new_repo_request(source) { - // Explicit new repo request: new:// or new://project-name - tracing::info!( - task_id = %task_id, - source = %source, - "Creating new git repository" - ); - - let msg = DaemonMessage::task_output( - task_id, - format!("Initializing new git repository...\n"), - false, - ); - let _ = self.ws_tx.send(msg).await; - - let worktree_info = self.worktree_manager - .init_new_repo(task_id, source) - .await - .map_err(|e| DaemonError::Task(TaskError::SetupFailed(e.to_string())))?; - - tracing::info!( - task_id = %task_id, - path = %worktree_info.path.display(), - "New repository created" - ); - - // Store worktree info - { - let mut tasks = self.tasks.write().await; - if let Some(task) = tasks.get_mut(&task_id) { - task.worktree = Some(worktree_info.clone()); - } - } - - let msg = DaemonMessage::task_output( - task_id, - format!("Repository ready at {}\n", worktree_info.path.display()), - false, - ); - let _ = self.ws_tx.send(msg).await; - - worktree_info.path - } else { - // Send progress message - let msg = DaemonMessage::task_output( - task_id, - format!("Setting up worktree from {}...\n", source), - false, - ); - let _ = self.ws_tx.send(msg).await; - - // Ensure source repo exists (clone if URL, verify if path) - let source_repo = self.worktree_manager.ensure_repo(source).await - .map_err(|e| DaemonError::Task(TaskError::SetupFailed(e.to_string())))?; - - // Detect or use provided base branch - let branch = if let Some(ref b) = base_branch { - b.clone() - } else { - self.worktree_manager.detect_default_branch(&source_repo).await - .map_err(|e| DaemonError::Task(TaskError::SetupFailed(e.to_string())))? - }; - - tracing::info!( - task_id = %task_id, - source = %source, - branch = %branch, - continue_from_task_id = ?continue_from_task_id, - "Setting up worktree" - ); - - // Create worktree - either from scratch or copying from another task - let task_name = format!("task-{}", &task_id.to_string()[..8]); - let worktree_info = if let Some(from_task_id) = continue_from_task_id { - // Find the source task's worktree path - let source_worktree = self.find_worktree_for_task(from_task_id).await - .map_err(|e| DaemonError::Task(TaskError::SetupFailed( - format!("Cannot continue from task {}: {}", from_task_id, e) - )))?; - - let msg = DaemonMessage::task_output( - task_id, - format!("Continuing from task {} worktree...\n", &from_task_id.to_string()[..8]), - false, - ); - let _ = self.ws_tx.send(msg).await; - - // Create worktree by copying from source task - self.worktree_manager - .create_worktree_from_task(&source_worktree, task_id, &task_name) - .await - .map_err(|e| DaemonError::Task(TaskError::SetupFailed(e.to_string())))? - } else { - // Create fresh worktree from repo - self.worktree_manager - .create_worktree(&source_repo, task_id, &task_name, &branch) - .await - .map_err(|e| DaemonError::Task(TaskError::SetupFailed(e.to_string())))? - }; - - tracing::info!( - task_id = %task_id, - worktree_path = %worktree_info.path.display(), - branch = %worktree_info.branch, - continued_from = ?continue_from_task_id, - "Worktree created" - ); - - // Store worktree info - { - let mut tasks = self.tasks.write().await; - if let Some(task) = tasks.get_mut(&task_id) { - task.worktree = Some(worktree_info.clone()); - } - } - - let msg = DaemonMessage::task_output( - task_id, - format!("Worktree ready at {}\n", worktree_info.path.display()), - false, - ); - let _ = self.ws_tx.send(msg).await; - - worktree_info.path - } - } else { - // No repo specified - use managed temp directory in ~/.makima/temp/ - tracing::info!(task_id = %task_id, "Creating managed temp directory (no repo)"); - - let msg = DaemonMessage::task_output( - task_id, - "Creating temporary working directory...\n".to_string(), - false, - ); - let _ = self.ws_tx.send(msg).await; - - let temp_dir = self.temp_manager.create_task_dir(task_id).await?; - - let msg = DaemonMessage::task_output( - task_id, - format!("Working directory ready at {}\n", temp_dir.display()), - false, - ); - let _ = self.ws_tx.send(msg).await; - - temp_dir - }; - - // Copy files from parent task's worktree if specified - if let Some(ref files) = copy_files { - if !files.is_empty() { - // Get the parent task ID to find its worktree - let parent_task_id = { - let tasks = self.tasks.read().await; - tasks.get(&task_id).and_then(|t| t.parent_task_id) - }; - - if let Some(parent_id) = parent_task_id { - match self.find_worktree_for_task(parent_id).await { - Ok(parent_worktree) => { - let msg = DaemonMessage::task_output( - task_id, - format!("Copying {} files from orchestrator...\n", files.len()), - false, - ); - let _ = self.ws_tx.send(msg).await; - - for file_path in files { - let source = parent_worktree.join(file_path); - let dest = working_dir.join(file_path); - - // Create parent directories if needed - if let Some(parent) = dest.parent() { - if let Err(e) = tokio::fs::create_dir_all(parent).await { - tracing::warn!( - task_id = %task_id, - file = %file_path, - error = %e, - "Failed to create parent directory for file" - ); - continue; - } - } - - // Copy the file - match tokio::fs::copy(&source, &dest).await { - Ok(_) => { - tracing::info!( - task_id = %task_id, - source = %source.display(), - dest = %dest.display(), - "Copied file from orchestrator" - ); - } - Err(e) => { - tracing::warn!( - task_id = %task_id, - source = %source.display(), - dest = %dest.display(), - error = %e, - "Failed to copy file from orchestrator" - ); - // Notify but don't fail - the file might be optional - let msg = DaemonMessage::task_output( - task_id, - format!("Warning: Could not copy {}: {}\n", file_path, e), - false, - ); - let _ = self.ws_tx.send(msg).await; - } - } - } - - let msg = DaemonMessage::task_output( - task_id, - "Files copied from orchestrator.\n".to_string(), - false, - ); - let _ = self.ws_tx.send(msg).await; - } - Err(e) => { - tracing::warn!( - task_id = %task_id, - parent_id = %parent_id, - error = %e, - "Could not find parent task worktree for file copying" - ); - } - } - } else { - tracing::warn!( - task_id = %task_id, - "copy_files specified but no parent_task_id" - ); - } - } - } - - // Update state to Starting - tracing::info!(task_id = %task_id, "Updating state: Initializing -> Starting"); - self.update_state(task_id, TaskState::Starting).await; - self.send_status_change(task_id, "initializing", "starting").await; - - // Check Claude is available - match self.process_manager.check_claude_available().await { - Ok(version) => { - tracing::info!(task_id = %task_id, version = %version, "Claude Code available"); - let msg = DaemonMessage::task_output( - task_id, - format!("Claude Code {} ready\n", version), - false, - ); - let _ = self.ws_tx.send(msg).await; - } - Err(e) => { - let err_msg = format!("Claude Code not available: {}", e); - tracing::error!(task_id = %task_id, error = %err_msg); - return Err(DaemonError::Task(TaskError::SetupFailed(err_msg))); - } - } - - // Set up orchestrator mode if needed - let (extra_env, full_plan) = if is_orchestrator { - tracing::info!(task_id = %task_id, working_dir = %working_dir.display(), "Setting up orchestrator mode"); - - let msg = DaemonMessage::task_output( - task_id, - "Setting up orchestrator environment...\n".to_string(), - false, - ); - let _ = self.ws_tx.send(msg).await; - - // Generate tool key for API access - let tool_key = generate_tool_key(); - tracing::info!(task_id = %task_id, tool_key_len = tool_key.len(), "Generated tool key for orchestrator"); - - // Register tool key with server - let register_msg = DaemonMessage::register_tool_key(task_id, tool_key.clone()); - if self.ws_tx.send(register_msg).await.is_err() { - tracing::warn!(task_id = %task_id, "Failed to register tool key"); - } else { - tracing::info!(task_id = %task_id, "Tool key registration message sent to server"); - } - - // Create .makima directory and helper script - let makima_dir = working_dir.join(".makima"); - if let Err(e) = tokio::fs::create_dir_all(&makima_dir).await { - tracing::warn!(task_id = %task_id, makima_dir = %makima_dir.display(), "Failed to create .makima directory: {}", e); - } else { - tracing::info!(task_id = %task_id, makima_dir = %makima_dir.display(), "Created .makima directory"); - } - - let script_path = makima_dir.join("orchestrate.sh"); - if let Err(e) = tokio::fs::write(&script_path, ORCHESTRATE_SCRIPT).await { - tracing::warn!(task_id = %task_id, script_path = %script_path.display(), "Failed to write orchestrate.sh: {}", e); - } else { - tracing::info!(task_id = %task_id, script_path = %script_path.display(), script_size = ORCHESTRATE_SCRIPT.len(), "Wrote orchestrate.sh"); - // Make script executable - #[cfg(unix)] - { - use std::os::unix::fs::PermissionsExt; - if let Err(e) = std::fs::set_permissions(&script_path, std::fs::Permissions::from_mode(0o755)) { - tracing::warn!(task_id = %task_id, "Failed to set script permissions: {}", e); - } else { - tracing::info!(task_id = %task_id, "Set orchestrate.sh executable (0o755)"); - } - } - } - - // Set up environment variables - let mut env = HashMap::new(); - // TODO: Make API URL configurable - env.insert("MAKIMA_API_URL".to_string(), "http://localhost:8080".to_string()); - env.insert("MAKIMA_API_KEY".to_string(), tool_key.clone()); - env.insert("MAKIMA_TASK_ID".to_string(), task_id.to_string()); - - tracing::info!( - task_id = %task_id, - api_url = "http://localhost:8080", - tool_key_preview = &tool_key[..8.min(tool_key.len())], - "Set orchestrator environment variables" - ); - - // Prepend orchestrator instructions to the plan - let orchestrator_plan = format!( - "{}\n\n---\n\nYour task:\n{}", - ORCHESTRATOR_SYSTEM_PROMPT, - plan - ); - - let msg = DaemonMessage::task_output( - task_id, - format!("Orchestrator environment ready (script at {})\n", script_path.display()), - false, - ); - let _ = self.ws_tx.send(msg).await; - - (Some(env), orchestrator_plan) - } else { - tracing::info!(task_id = %task_id, "Running as regular subtask (not orchestrator)"); - // Prepend subtask instructions to ensure worktree isolation - let subtask_plan = format!( - "{}\nYour task:\n{}", - SUBTASK_SYSTEM_PROMPT, - plan - ); - (None, subtask_plan) - }; - - // Spawn Claude process - let plan_bytes = full_plan.len(); - let plan_chars = full_plan.chars().count(); - // Rough token estimate: ~4 chars per token for English - let estimated_tokens = plan_chars / 4; - - tracing::info!( - task_id = %task_id, - working_dir = %working_dir.display(), - is_orchestrator = is_orchestrator, - plan_bytes = plan_bytes, - plan_chars = plan_chars, - estimated_tokens = estimated_tokens, - "Spawning Claude process" - ); - - // Warn if plan is very large (Claude's context is typically 100k-200k tokens) - if estimated_tokens > 50_000 { - tracing::warn!(task_id = %task_id, estimated_tokens = estimated_tokens, "Plan is very large - may hit context limits!"); - let msg = DaemonMessage::task_output( - task_id, - format!("Warning: Plan is very large (~{} tokens). This may cause issues.\n", estimated_tokens), - false, - ); - let _ = self.ws_tx.send(msg).await; - } - - let msg = DaemonMessage::task_output( - task_id, - if is_orchestrator { - format!("Starting Claude Code (orchestrator mode, ~{} tokens)...\n", estimated_tokens) - } else { - format!("Starting Claude Code (~{} tokens)...\n", estimated_tokens) - }, - false, - ); - let _ = self.ws_tx.send(msg).await; - - tracing::debug!(task_id = %task_id, "Calling process_manager.spawn()..."); - let mut process = self.process_manager - .spawn(&working_dir, &full_plan, extra_env) - .await - .map_err(|e| { - tracing::error!(task_id = %task_id, error = %e, "Failed to spawn Claude process"); - DaemonError::Task(TaskError::SetupFailed(e.to_string())) - })?; - tracing::info!(task_id = %task_id, "Claude process spawned successfully"); - - // Set up input channel for this task so we can send messages to its stdin - tracing::debug!(task_id = %task_id, "Setting up input channel..."); - let (input_tx, mut input_rx) = mpsc::channel::<String>(100); - tracing::debug!(task_id = %task_id, "Acquiring task_inputs write lock..."); - self.task_inputs.write().await.insert(task_id, input_tx); - tracing::debug!(task_id = %task_id, "Input channel registered"); - - // Get stdin handle for input forwarding and completion signaling - let stdin_handle = process.stdin_handle(); - let stdin_handle_for_completion = stdin_handle.clone(); - - tracing::info!(task_id = %task_id, "Setting up stdin forwarder for task input (JSON protocol)"); - tokio::spawn(async move { - tracing::info!(task_id = %task_id, "Stdin forwarder task started, waiting for messages..."); - while let Some(msg) = input_rx.recv().await { - tracing::info!(task_id = %task_id, msg_len = msg.len(), msg_preview = %if msg.len() > 50 { &msg[..50] } else { &msg }, "Received message from input channel"); - - // Format as JSON user message for stream-json input protocol - let json_msg = ClaudeInputMessage::user(&msg); - let json_line = match json_msg.to_json_line() { - Ok(line) => line, - Err(e) => { - tracing::error!(task_id = %task_id, error = %e, "Failed to serialize input message"); - continue; - } - }; - - tracing::debug!(task_id = %task_id, json_line = %json_line.trim(), "Formatted JSON line for stdin"); - - let mut stdin_guard = stdin_handle.lock().await; - if let Some(ref mut stdin) = *stdin_guard { - tracing::debug!(task_id = %task_id, "Acquired stdin lock, writing..."); - if stdin.write_all(json_line.as_bytes()).await.is_err() { - tracing::warn!(task_id = %task_id, "Failed to write to stdin, breaking"); - break; - } - if stdin.flush().await.is_err() { - tracing::warn!(task_id = %task_id, "Failed to flush stdin, breaking"); - break; - } - tracing::info!(task_id = %task_id, json_len = json_line.len(), "Successfully wrote user message to Claude stdin"); - } else { - tracing::warn!(task_id = %task_id, "Stdin is None (already closed), cannot send message"); - break; - } - } - tracing::info!(task_id = %task_id, "Stdin forwarder task ended (channel closed or stdin unavailable)"); - }); - - // Update state to Running - { - tracing::debug!(task_id = %task_id, "Acquiring tasks write lock for Running state update"); - let mut tasks = self.tasks.write().await; - if let Some(task) = tasks.get_mut(&task_id) { - task.state = TaskState::Running; - task.started_at = Some(Instant::now()); - } - tracing::debug!(task_id = %task_id, "Released tasks write lock"); - } - tracing::info!(task_id = %task_id, "Updating state: Starting -> Running"); - self.send_status_change(task_id, "starting", "running").await; - tracing::debug!(task_id = %task_id, "Sent status change notification"); - - // Stream output with startup timeout check - tracing::info!(task_id = %task_id, "Starting output stream - waiting for Claude output..."); - tracing::debug!(task_id = %task_id, "Output will be forwarded via WebSocket to server"); - let ws_tx = self.ws_tx.clone(); - - let mut output_count = 0u64; - let mut output_bytes = 0usize; - let startup_timeout = tokio::time::Duration::from_secs(30); - let mut startup_check = tokio::time::interval(tokio::time::Duration::from_secs(5)); - startup_check.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); - let startup_deadline = tokio::time::Instant::now() + startup_timeout; - - loop { - tokio::select! { - maybe_line = process.next_output() => { - match maybe_line { - Some(line) => { - output_count += 1; - output_bytes += line.content.len(); - - if output_count == 1 { - tracing::info!(task_id = %task_id, "Received first output line from Claude"); - } - if output_count % 100 == 0 { - tracing::debug!(task_id = %task_id, output_count = output_count, output_bytes = output_bytes, "Output progress"); - } - - // Log output details for debugging - tracing::trace!( - task_id = %task_id, - line_num = output_count, - content_len = line.content.len(), - is_stdout = line.is_stdout, - json_type = ?line.json_type, - "Forwarding output to WebSocket" - ); - - // Check if this is a "result" message indicating task completion - // With --input-format=stream-json, Claude waits for more input after completion - // We close stdin to signal EOF and let the process exit - if line.json_type.as_deref() == Some("result") { - tracing::info!(task_id = %task_id, "Received result message, closing stdin to signal completion"); - let mut stdin_guard = stdin_handle_for_completion.lock().await; - if let Some(mut stdin) = stdin_guard.take() { - let _ = stdin.shutdown().await; - } - } - - let msg = DaemonMessage::task_output(task_id, line.content, false); - if ws_tx.send(msg).await.is_err() { - tracing::warn!(task_id = %task_id, "Failed to send output, channel closed"); - break; - } - } - None => { - tracing::info!(task_id = %task_id, output_count = output_count, output_bytes = output_bytes, "Output stream ended"); - break; - } - } - } - _ = startup_check.tick(), if output_count == 0 => { - // Check if process is still alive - match process.try_wait() { - Ok(Some(exit_code)) => { - tracing::error!(task_id = %task_id, exit_code = exit_code, "Claude process exited before producing output!"); - let msg = DaemonMessage::task_output( - task_id, - format!("Error: Claude process exited unexpectedly with code {}\n", exit_code), - false, - ); - let _ = ws_tx.send(msg).await; - break; - } - Ok(None) => { - // Still running but no output - if tokio::time::Instant::now() > startup_deadline { - tracing::warn!(task_id = %task_id, "Claude process not producing output after 30s - may be stuck"); - let msg = DaemonMessage::task_output( - task_id, - "Warning: Claude Code is taking longer than expected to start. It may be waiting for authentication or network access.\n".to_string(), - false, - ); - let _ = ws_tx.send(msg).await; - } else { - tracing::debug!(task_id = %task_id, "Claude process still running, waiting for output..."); - } - } - Err(e) => { - tracing::error!(task_id = %task_id, error = %e, "Failed to check Claude process status"); - } - } - } - } - } - - // Wait for process to exit - let exit_code = process.wait().await.unwrap_or(-1); - - // Clean up input channel for this task - self.task_inputs.write().await.remove(&task_id); - tracing::debug!(task_id = %task_id, "Removed task input channel"); - - // Update state based on exit code - let success = exit_code == 0; - let new_state = if success { - TaskState::Completed - } else { - TaskState::Failed - }; - - tracing::info!( - task_id = %task_id, - exit_code = exit_code, - success = success, - new_state = ?new_state, - "Claude process exited, updating task state" - ); - - { - let mut tasks = self.tasks.write().await; - if let Some(task) = tasks.get_mut(&task_id) { - task.state = new_state; - task.completed_at = Some(Instant::now()); - if !success { - task.error = Some(format!("Process exited with code {}", exit_code)); - } - } - } - - // Execute completion action if task succeeded - let completion_result = if success { - if let Some(ref action) = completion_action { - if action != "none" { - self.execute_completion_action( - task_id, - &task_name, - &working_dir, - action, - target_repo_path.as_deref(), - target_branch.as_deref(), - ).await - } else { - Ok(None) - } - } else { - Ok(None) - } - } else { - Ok(None) - }; - - // Log completion action result - match &completion_result { - Ok(Some(pr_url)) => { - tracing::info!(task_id = %task_id, pr_url = %pr_url, "Completion action created PR"); - } - Ok(None) => {} - Err(e) => { - tracing::warn!(task_id = %task_id, error = %e, "Completion action failed (task still marked as done)"); - } - } - - // Notify server - let error = if success { - None - } else { - Some(format!("Exit code: {}", exit_code)) - }; - tracing::info!(task_id = %task_id, success = success, "Notifying server of task completion"); - let msg = DaemonMessage::task_complete(task_id, success, error); - let _ = self.ws_tx.send(msg).await; - - // Note: Worktrees are kept until explicitly deleted (per user preference) - // This allows inspection, PR creation, etc. - - tracing::info!(task_id = %task_id, "=== RUN_TASK END ==="); - Ok(()) - } - - /// Execute the completion action for a task. - async fn execute_completion_action( - &self, - task_id: Uuid, - task_name: &str, - worktree_path: &std::path::Path, - action: &str, - target_repo_path: Option<&str>, - target_branch: Option<&str>, - ) -> Result<Option<String>, String> { - let target_repo = match target_repo_path { - Some(path) => crate::worktree::expand_tilde(path), - None => { - tracing::warn!(task_id = %task_id, "No target_repo_path configured, skipping completion action"); - return Ok(None); - } - }; - - if !target_repo.exists() { - return Err(format!("Target repo not found: {} (expanded from {:?})", target_repo.display(), target_repo_path)); - } - - // Get the branch name: makima/{task-name-with-dashes}-{short-id} - let branch_name = format!( - "makima/{}-{}", - crate::worktree::sanitize_name(task_name), - crate::worktree::short_uuid(task_id) - ); - - // Determine target branch - use provided value or detect default branch of target repo - let target_branch = match target_branch { - Some(branch) => branch.to_string(), - None => { - // Detect default branch (main, master, develop, etc.) - self.worktree_manager - .detect_default_branch(&target_repo) - .await - .unwrap_or_else(|_| "master".to_string()) - } - }; - - let msg = DaemonMessage::task_output( - task_id, - format!("Executing completion action: {}...\n", action), - false, - ); - let _ = self.ws_tx.send(msg).await; - - match action { - "branch" => { - // Just push the branch to target repo - self.worktree_manager - .push_to_target_repo(worktree_path, &target_repo, &branch_name, task_name) - .await - .map_err(|e| e.to_string())?; - - let msg = DaemonMessage::task_output( - task_id, - format!("Branch '{}' pushed to {}\n", branch_name, target_repo.display()), - false, - ); - let _ = self.ws_tx.send(msg).await; - Ok(None) - } - "merge" => { - // Push and merge into target branch - let commit_sha = self.worktree_manager - .merge_to_target(worktree_path, &target_repo, &branch_name, &target_branch, task_name) - .await - .map_err(|e| e.to_string())?; - - let msg = DaemonMessage::task_output( - task_id, - format!("Branch merged into {} (commit: {})\n", target_branch, commit_sha), - false, - ); - let _ = self.ws_tx.send(msg).await; - Ok(None) - } - "pr" => { - // Push and create PR - let title = task_name.to_string(); - let body = format!( - "Automated PR from makima task.\n\nTask ID: `{}`", - task_id - ); - let pr_url = self.worktree_manager - .create_pull_request( - worktree_path, - &target_repo, - &branch_name, - &target_branch, - &title, - &body, - ) - .await - .map_err(|e| e.to_string())?; - - let msg = DaemonMessage::task_output( - task_id, - format!("Pull request created: {}\n", pr_url), - false, - ); - let _ = self.ws_tx.send(msg).await; - Ok(Some(pr_url)) - } - _ => { - tracing::warn!(task_id = %task_id, action = %action, "Unknown completion action"); - Ok(None) - } - } - } - - /// Find worktree path for a task ID. - /// First checks in-memory tasks, then scans the worktrees directory. - async fn find_worktree_for_task(&self, task_id: Uuid) -> Result<PathBuf, String> { - // First try to get from in-memory tasks - { - let tasks = self.tasks.read().await; - if let Some(task) = tasks.get(&task_id) { - if let Some(ref worktree) = task.worktree { - return Ok(worktree.path.clone()); - } - } - } - - // Task not in memory - scan worktrees directory for matching task ID - let short_id = &task_id.to_string()[..8]; - let worktrees_dir = self.worktree_manager.base_dir(); - - if let Ok(mut entries) = tokio::fs::read_dir(worktrees_dir).await { - while let Ok(Some(entry)) = entries.next_entry().await { - let name = entry.file_name(); - let name_str = name.to_string_lossy(); - if name_str.starts_with(short_id) { - let path = entry.path(); - // Verify it's a valid git directory - if path.join(".git").exists() { - tracing::info!( - task_id = %task_id, - worktree_path = %path.display(), - "Found worktree by scanning directory" - ); - return Ok(path); - } - } - } - } - - Err(format!( - "No worktree found for task {}. The worktree may have been cleaned up.", - task_id - )) - } - - async fn update_state(&self, task_id: Uuid, state: TaskState) { - let mut tasks = self.tasks.write().await; - if let Some(task) = tasks.get_mut(&task_id) { - task.state = state; - } - } - - async fn send_status_change(&self, task_id: Uuid, old_status: &str, new_status: &str) { - let msg = DaemonMessage::task_status_change(task_id, old_status, new_status); - let _ = self.ws_tx.send(msg).await; - } - - /// Mark task as failed. - async fn mark_failed(&self, task_id: Uuid, error: &str) { - { - let mut tasks = self.tasks.write().await; - if let Some(task) = tasks.get_mut(&task_id) { - task.state = TaskState::Failed; - task.error = Some(error.to_string()); - task.completed_at = Some(Instant::now()); - } - } - - // Notify server - let msg = DaemonMessage::task_complete(task_id, false, Some(error.to_string())); - let _ = self.ws_tx.send(msg).await; - } -} - -impl Clone for TaskManagerInner { - fn clone(&self) -> Self { - Self { - worktree_manager: self.worktree_manager.clone(), - process_manager: self.process_manager.clone(), - temp_manager: self.temp_manager.clone(), - tasks: self.tasks.clone(), - ws_tx: self.ws_tx.clone(), - task_inputs: self.task_inputs.clone(), - } - } -} diff --git a/makima/daemon/src/task/mod.rs b/makima/daemon/src/task/mod.rs deleted file mode 100644 index 29c261e..0000000 --- a/makima/daemon/src/task/mod.rs +++ /dev/null @@ -1,7 +0,0 @@ -//! Task management and execution. - -pub mod manager; -pub mod state; - -pub use manager::{ManagedTask, TaskConfig, TaskManager}; -pub use state::TaskState; diff --git a/makima/daemon/src/task/state.rs b/makima/daemon/src/task/state.rs deleted file mode 100644 index fe73de1..0000000 --- a/makima/daemon/src/task/state.rs +++ /dev/null @@ -1,161 +0,0 @@ -//! Task state machine. - -use std::fmt; - -/// Task execution state. -#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] -pub enum TaskState { - /// Task received, preparing overlay. - Initializing, - /// Overlay ready, starting container. - Starting, - /// Container running. - Running, - /// Container paused. - Paused, - /// Waiting for sibling or resource. - Blocked, - /// Task completed successfully. - Completed, - /// Task failed with error. - Failed, - /// Task interrupted by user. - Interrupted, -} - -impl TaskState { - /// Check if a state transition is valid. - pub fn can_transition_to(&self, target: TaskState) -> bool { - use TaskState::*; - - matches!( - (self, target), - // From Initializing - (Initializing, Starting) - | (Initializing, Failed) - | (Initializing, Interrupted) - // From Starting - | (Starting, Running) - | (Starting, Failed) - | (Starting, Interrupted) - // From Running - | (Running, Paused) - | (Running, Blocked) - | (Running, Completed) - | (Running, Failed) - | (Running, Interrupted) - // From Paused - | (Paused, Running) - | (Paused, Interrupted) - | (Paused, Failed) - // From Blocked - | (Blocked, Running) - | (Blocked, Failed) - | (Blocked, Interrupted) - ) - } - - /// Check if this state is terminal (no more transitions possible). - pub fn is_terminal(&self) -> bool { - matches!( - self, - TaskState::Completed | TaskState::Failed | TaskState::Interrupted - ) - } - - /// Check if the task is currently active (running or paused). - pub fn is_active(&self) -> bool { - matches!( - self, - TaskState::Initializing - | TaskState::Starting - | TaskState::Running - | TaskState::Paused - | TaskState::Blocked - ) - } - - /// Check if the task is running. - pub fn is_running(&self) -> bool { - matches!(self, TaskState::Running) - } - - /// Convert to string for protocol messages. - pub fn as_str(&self) -> &'static str { - match self { - TaskState::Initializing => "initializing", - TaskState::Starting => "starting", - TaskState::Running => "running", - TaskState::Paused => "paused", - TaskState::Blocked => "blocked", - TaskState::Completed => "done", - TaskState::Failed => "failed", - TaskState::Interrupted => "interrupted", - } - } - - /// Parse from string. - pub fn from_str(s: &str) -> Option<Self> { - match s.to_lowercase().as_str() { - "initializing" => Some(TaskState::Initializing), - "starting" => Some(TaskState::Starting), - "running" => Some(TaskState::Running), - "paused" => Some(TaskState::Paused), - "blocked" => Some(TaskState::Blocked), - "done" | "completed" => Some(TaskState::Completed), - "failed" => Some(TaskState::Failed), - "interrupted" => Some(TaskState::Interrupted), - _ => None, - } - } -} - -impl fmt::Display for TaskState { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - write!(f, "{}", self.as_str()) - } -} - -impl Default for TaskState { - fn default() -> Self { - TaskState::Initializing - } -} - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn test_valid_transitions() { - use TaskState::*; - - // Valid transitions - assert!(Initializing.can_transition_to(Starting)); - assert!(Starting.can_transition_to(Running)); - assert!(Running.can_transition_to(Completed)); - assert!(Running.can_transition_to(Paused)); - assert!(Paused.can_transition_to(Running)); - - // Invalid transitions - assert!(!Completed.can_transition_to(Running)); - assert!(!Failed.can_transition_to(Running)); - assert!(!Running.can_transition_to(Initializing)); - } - - #[test] - fn test_terminal_states() { - assert!(TaskState::Completed.is_terminal()); - assert!(TaskState::Failed.is_terminal()); - assert!(TaskState::Interrupted.is_terminal()); - assert!(!TaskState::Running.is_terminal()); - assert!(!TaskState::Paused.is_terminal()); - } - - #[test] - fn test_parse() { - assert_eq!(TaskState::from_str("running"), Some(TaskState::Running)); - assert_eq!(TaskState::from_str("done"), Some(TaskState::Completed)); - assert_eq!(TaskState::from_str("invalid"), None); - } -} diff --git a/makima/daemon/src/temp.rs b/makima/daemon/src/temp.rs deleted file mode 100644 index 015b21b..0000000 --- a/makima/daemon/src/temp.rs +++ /dev/null @@ -1,224 +0,0 @@ -//! Managed temporary directory for tasks without repositories. -//! -//! Tasks that don't have a repository URL and aren't subtasks (which inherit -//! from parent) use a managed temp directory in ~/.makima/temp/. The directory -//! is automatically cleaned up when it exceeds a size limit. - -use std::path::PathBuf; - -use tokio::fs; -use uuid::Uuid; - -/// Maximum size of the temp directory before cleanup (5GB). -const MAX_TEMP_SIZE_BYTES: u64 = 5 * 1024 * 1024 * 1024; - -/// Manages temporary directories for tasks without repositories. -pub struct TempManager { - /// Base directory for temp task directories (~/.makima/temp/). - temp_dir: PathBuf, -} - -impl TempManager { - /// Create a new TempManager. - pub fn new() -> Self { - let home = dirs::home_dir().unwrap_or_else(|| PathBuf::from(".")); - Self { - temp_dir: home.join(".makima").join("temp"), - } - } - - /// Create a new TempManager with a custom base directory. - #[allow(dead_code)] - pub fn with_base_dir(base_dir: PathBuf) -> Self { - Self { temp_dir: base_dir } - } - - /// Get the base temp directory path. - pub fn temp_dir(&self) -> &PathBuf { - &self.temp_dir - } - - /// Create a temp directory for a task. - /// - /// This creates a directory at ~/.makima/temp/task-{id}/ and triggers - /// cleanup if the total size exceeds the limit. - pub async fn create_task_dir(&self, task_id: Uuid) -> Result<PathBuf, std::io::Error> { - // Ensure base directory exists - fs::create_dir_all(&self.temp_dir).await?; - - // Check size and cleanup if needed - if let Err(e) = self.cleanup_if_needed().await { - tracing::warn!("Temp directory cleanup failed: {}", e); - // Continue anyway, cleanup is best-effort - } - - // Create task-specific directory - let task_dir = self.temp_dir.join(format!("task-{}", task_id)); - fs::create_dir_all(&task_dir).await?; - - tracing::info!( - task_id = %task_id, - path = %task_dir.display(), - "Created temp directory for task" - ); - - Ok(task_dir) - } - - /// Calculate total size of temp directory recursively. - async fn get_total_size(&self) -> Result<u64, std::io::Error> { - if !self.temp_dir.exists() { - return Ok(0); - } - - let mut total = 0u64; - let mut stack = vec![self.temp_dir.clone()]; - - while let Some(dir) = stack.pop() { - let mut entries = match fs::read_dir(&dir).await { - Ok(e) => e, - Err(_) => continue, - }; - - while let Ok(Some(entry)) = entries.next_entry().await { - let metadata = match entry.metadata().await { - Ok(m) => m, - Err(_) => continue, - }; - - if metadata.is_dir() { - stack.push(entry.path()); - } else { - total += metadata.len(); - } - } - } - - Ok(total) - } - - /// Remove oldest directories if total size exceeds limit. - async fn cleanup_if_needed(&self) -> Result<(), std::io::Error> { - let size = self.get_total_size().await?; - if size <= MAX_TEMP_SIZE_BYTES { - return Ok(()); - } - - tracing::info!( - current_size_mb = size / 1024 / 1024, - limit_mb = MAX_TEMP_SIZE_BYTES / 1024 / 1024, - "Temp directory exceeds size limit, starting cleanup" - ); - - // Get all task dirs with modification times - let mut dirs: Vec<(PathBuf, std::time::SystemTime, u64)> = vec![]; - let mut entries = fs::read_dir(&self.temp_dir).await?; - - while let Ok(Some(entry)) = entries.next_entry().await { - let path = entry.path(); - if !path.is_dir() { - continue; - } - - let metadata = match entry.metadata().await { - Ok(m) => m, - Err(_) => continue, - }; - - let modified = metadata.modified().unwrap_or(std::time::UNIX_EPOCH); - let dir_size = self.get_dir_size(&path).await.unwrap_or(0); - dirs.push((path, modified, dir_size)); - } - - // Sort by oldest first - dirs.sort_by(|a, b| a.1.cmp(&b.1)); - - // Remove oldest until under limit - let mut current_size = size; - for (path, _, dir_size) in dirs { - if current_size <= MAX_TEMP_SIZE_BYTES { - break; - } - - tracing::info!( - path = %path.display(), - size_mb = dir_size / 1024 / 1024, - "Removing old temp directory" - ); - - if let Err(e) = fs::remove_dir_all(&path).await { - tracing::warn!(path = %path.display(), error = %e, "Failed to remove temp directory"); - continue; - } - - current_size = current_size.saturating_sub(dir_size); - } - - tracing::info!( - new_size_mb = current_size / 1024 / 1024, - "Temp directory cleanup complete" - ); - - Ok(()) - } - - /// Calculate size of a directory recursively. - async fn get_dir_size(&self, path: &PathBuf) -> Result<u64, std::io::Error> { - let mut total = 0u64; - let mut stack = vec![path.clone()]; - - while let Some(dir) = stack.pop() { - let mut entries = match fs::read_dir(&dir).await { - Ok(e) => e, - Err(_) => continue, - }; - - while let Ok(Some(entry)) = entries.next_entry().await { - let metadata = match entry.metadata().await { - Ok(m) => m, - Err(_) => continue, - }; - - if metadata.is_dir() { - stack.push(entry.path()); - } else { - total += metadata.len(); - } - } - } - - Ok(total) - } - - /// Remove a specific task's temp directory. - #[allow(dead_code)] - pub async fn remove_task_dir(&self, task_id: Uuid) -> Result<(), std::io::Error> { - let task_dir = self.temp_dir.join(format!("task-{}", task_id)); - if task_dir.exists() { - fs::remove_dir_all(&task_dir).await?; - tracing::info!( - task_id = %task_id, - path = %task_dir.display(), - "Removed temp directory for task" - ); - } - Ok(()) - } -} - -impl Default for TempManager { - fn default() -> Self { - Self::new() - } -} - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn test_temp_manager_default_dir() { - let manager = TempManager::new(); - assert!(manager.temp_dir().ends_with(".makima/temp")); - } -} diff --git a/makima/daemon/src/worktree/manager.rs b/makima/daemon/src/worktree/manager.rs deleted file mode 100644 index 266b970..0000000 --- a/makima/daemon/src/worktree/manager.rs +++ /dev/null @@ -1,1623 +0,0 @@ -//! Worktree manager implementation. - -use std::collections::HashMap; -use std::path::{Path, PathBuf}; -use std::sync::LazyLock; - -use tokio::process::Command; -use tokio::sync::Mutex; -use uuid::Uuid; - -/// Errors that can occur during worktree operations. -#[derive(Debug, thiserror::Error)] -pub enum WorktreeError { - #[error("Git command failed: {0}")] - GitCommand(String), - - #[error("Repository not found: {0}")] - RepoNotFound(String), - - #[error("Failed to create directory: {0}")] - CreateDir(#[from] std::io::Error), - - #[error("Invalid repository path: {0}")] - InvalidPath(String), - - #[error("Worktree already exists: {0}")] - AlreadyExists(String), - - #[error("Clone failed: {0}")] - CloneFailed(String), - - #[error("Merge in progress")] - MergeInProgress, - - #[error("No merge in progress")] - NoMergeInProgress, - - #[error("Merge has conflicts: {0}")] - MergeConflicts(String), -} - -/// Strategy for resolving a merge conflict. -#[derive(Debug, Clone, Copy, PartialEq, Eq)] -pub enum ConflictResolution { - /// Use our version (the branch being merged into). - Ours, - /// Use their version (the branch being merged). - Theirs, -} - -/// State of an in-progress merge. -#[derive(Debug, Clone)] -pub struct MergeState { - /// The branch being merged. - pub source_branch: String, - /// Files with unresolved conflicts. - pub conflicted_files: Vec<String>, - /// Whether a merge is currently in progress. - pub in_progress: bool, -} - -/// Information about a task branch. -#[derive(Debug, Clone)] -pub struct TaskBranchInfo { - /// Full branch name. - pub name: String, - /// Task ID extracted from branch name (if parseable). - pub task_id: Option<Uuid>, - /// Whether this branch has been merged into the current branch. - pub is_merged: bool, - /// Short SHA of the last commit. - pub last_commit: String, - /// Subject line of the last commit. - pub last_commit_message: String, -} - -/// Information about a created worktree. -#[derive(Debug, Clone)] -pub struct WorktreeInfo { - /// Path to the worktree directory. - pub path: PathBuf, - /// Git branch name for this worktree. - pub branch: String, - /// Source repository path. - pub source_repo: PathBuf, -} - -/// Manages git worktrees for task isolation. -pub struct WorktreeManager { - /// Base directory for all worktrees (~/.makima/worktrees). - base_dir: PathBuf, - /// Base directory for cloned repos (~/.makima/repos). - repos_dir: PathBuf, - /// Branch prefix for task branches. - branch_prefix: String, -} - -/// Per-worktree locks to prevent concurrent creation issues. -static WORKTREE_LOCKS: LazyLock<Mutex<HashMap<String, std::sync::Arc<tokio::sync::Mutex<()>>>>> = - LazyLock::new(|| Mutex::new(HashMap::new())); - -impl WorktreeManager { - /// Create a new WorktreeManager with the given base directory. - pub fn new(base_dir: PathBuf) -> Self { - let repos_dir = base_dir.parent() - .map(|p| p.join("repos")) - .unwrap_or_else(|| base_dir.join("repos")); - - Self { - base_dir, - repos_dir, - branch_prefix: "makima/task-".to_string(), - } - } - - /// Get the default worktree base directory (~/.makima/worktrees). - pub fn default_base_dir() -> PathBuf { - dirs::home_dir() - .unwrap_or_else(|| PathBuf::from(".")) - .join(".makima") - .join("worktrees") - } - - /// Get the base directory for worktrees. - pub fn base_dir(&self) -> &Path { - &self.base_dir - } - - /// Detect the default branch of a repository. - /// Tries to find HEAD's target, falling back to common branch names. - pub async fn detect_default_branch(&self, repo_path: &Path) -> Result<String, WorktreeError> { - // Try to get the branch that HEAD points to - let output = Command::new("git") - .args(["symbolic-ref", "refs/remotes/origin/HEAD", "--short"]) - .current_dir(repo_path) - .output() - .await?; - - if output.status.success() { - let branch = String::from_utf8_lossy(&output.stdout).trim().to_string(); - // Remove "origin/" prefix if present - let branch = branch.strip_prefix("origin/").unwrap_or(&branch).to_string(); - if !branch.is_empty() { - return Ok(branch); - } - } - - // Try common branch names - for branch in ["main", "master", "develop", "trunk"] { - let output = Command::new("git") - .args(["rev-parse", "--verify", &format!("refs/heads/{}", branch)]) - .current_dir(repo_path) - .output() - .await?; - - if output.status.success() { - return Ok(branch.to_string()); - } - } - - // Fall back to getting the current branch - let output = Command::new("git") - .args(["rev-parse", "--abbrev-ref", "HEAD"]) - .current_dir(repo_path) - .output() - .await?; - - if output.status.success() { - let branch = String::from_utf8_lossy(&output.stdout).trim().to_string(); - if !branch.is_empty() && branch != "HEAD" { - return Ok(branch); - } - } - - Err(WorktreeError::GitCommand( - "Could not detect default branch".to_string(), - )) - } - - /// Ensure the source repository exists locally and is up-to-date. - /// If repo_source is a URL, clone it. If it's a path, verify it exists. - /// For both cases, fetch latest changes from remote if available. - pub async fn ensure_repo(&self, repo_source: &str) -> Result<PathBuf, WorktreeError> { - // Check if it's a URL (simple heuristic) - if repo_source.starts_with("http://") - || repo_source.starts_with("https://") - || repo_source.starts_with("git@") - || repo_source.starts_with("ssh://") - { - self.clone_or_fetch_repo(repo_source).await - } else { - // Treat as local path - expand tilde if present - let path = expand_tilde(repo_source); - if !path.exists() { - return Err(WorktreeError::RepoNotFound(repo_source.to_string())); - } - // Verify it's a git repo - let git_dir = path.join(".git"); - if !git_dir.exists() { - return Err(WorktreeError::InvalidPath(format!( - "{} is not a git repository", - repo_source - ))); - } - - // Fetch latest changes from remote if configured - tracing::info!("Fetching latest changes for local repo: {}", repo_source); - let output = Command::new("git") - .args(["fetch", "--all", "--prune"]) - .current_dir(&path) - .output() - .await?; - - if !output.status.success() { - let stderr = String::from_utf8_lossy(&output.stderr); - // Don't fail - repo might not have a remote configured - tracing::debug!("Git fetch for local repo (may not have remote): {}", stderr); - } else { - tracing::info!("Fetched latest changes for {}", repo_source); - } - - Ok(path) - } - } - - /// Clone a repository or fetch if already cloned. - async fn clone_or_fetch_repo(&self, url: &str) -> Result<PathBuf, WorktreeError> { - // Extract repo name from URL - let repo_name = extract_repo_name(url); - let repo_path = self.repos_dir.join(&repo_name); - - // Create repos directory if needed - tokio::fs::create_dir_all(&self.repos_dir).await?; - - if repo_path.exists() { - // Fetch latest changes - tracing::info!("Fetching updates for existing repo: {}", repo_name); - let output = Command::new("git") - .args(["fetch", "--all", "--prune"]) - .current_dir(&repo_path) - .output() - .await?; - - if !output.status.success() { - let stderr = String::from_utf8_lossy(&output.stderr); - tracing::warn!("Git fetch warning: {}", stderr); - // Don't fail on fetch errors, repo might still be usable - } - } else { - // Clone the repository - tracing::info!("Cloning repository: {} -> {}", url, repo_path.display()); - let output = Command::new("git") - .args(["clone", "--bare", url]) - .arg(&repo_path) - .output() - .await?; - - if !output.status.success() { - let stderr = String::from_utf8_lossy(&output.stderr); - return Err(WorktreeError::CloneFailed(stderr.to_string())); - } - } - - Ok(repo_path) - } - - /// Create a worktree for a task. - /// - /// This creates a unique directory with a git worktree checked out to a new branch. - pub async fn create_worktree( - &self, - source_repo: &Path, - task_id: Uuid, - task_name: &str, - base_branch: &str, - ) -> Result<WorktreeInfo, WorktreeError> { - // Generate unique directory name and branch - let dir_name = format!("{}-{}", short_uuid(task_id), sanitize_name(task_name)); - let worktree_path = self.base_dir.join(&dir_name); - // Branch name: makima/{task-name-with-dashes}-{short-id} - let branch_name = format!("{}{}-{}", self.branch_prefix, sanitize_name(task_name), short_uuid(task_id)); - - // Acquire lock for this worktree path - let lock = { - let mut locks = WORKTREE_LOCKS.lock().await; - locks - .entry(worktree_path.to_string_lossy().to_string()) - .or_insert_with(|| std::sync::Arc::new(tokio::sync::Mutex::new(()))) - .clone() - }; - let _guard = lock.lock().await; - - // Check if worktree already exists - reuse it if so - if worktree_path.exists() { - tracing::info!( - task_id = %task_id, - worktree_path = %worktree_path.display(), - "Worktree already exists, reusing" - ); - - // Verify it's a valid git directory - let git_dir = worktree_path.join(".git"); - if git_dir.exists() { - // Get the current branch name - let output = Command::new("git") - .args(["rev-parse", "--abbrev-ref", "HEAD"]) - .current_dir(&worktree_path) - .output() - .await?; - - let current_branch = if output.status.success() { - String::from_utf8_lossy(&output.stdout).trim().to_string() - } else { - branch_name.clone() - }; - - return Ok(WorktreeInfo { - path: worktree_path, - branch: current_branch, - source_repo: source_repo.to_path_buf(), - }); - } else { - // Directory exists but isn't a git worktree - remove and recreate - tracing::warn!( - task_id = %task_id, - worktree_path = %worktree_path.display(), - "Directory exists but is not a git worktree, removing" - ); - tokio::fs::remove_dir_all(&worktree_path).await?; - } - } - - // Create base directory - tokio::fs::create_dir_all(&self.base_dir).await?; - - tracing::info!( - task_id = %task_id, - worktree_path = %worktree_path.display(), - branch = %branch_name, - base_branch = %base_branch, - "Creating worktree from local branch" - ); - - // Create the worktree with a new branch based on the local base_branch - let output = Command::new("git") - .args([ - "worktree", - "add", - "-b", - &branch_name, - ]) - .arg(&worktree_path) - .arg(base_branch) - .current_dir(source_repo) - .output() - .await?; - - if !output.status.success() { - let stderr = String::from_utf8_lossy(&output.stderr); - return Err(WorktreeError::GitCommand(format!( - "Failed to create worktree: {}", - stderr - ))); - } - - tracing::info!( - task_id = %task_id, - worktree_path = %worktree_path.display(), - "Worktree created successfully" - ); - - Ok(WorktreeInfo { - path: worktree_path, - branch: branch_name, - source_repo: source_repo.to_path_buf(), - }) - } - - /// Create a worktree for a task by copying from another task's worktree. - /// - /// This allows sequential subtasks where one continues from another's work, - /// including uncommitted changes. - pub async fn create_worktree_from_task( - &self, - source_worktree: &Path, - task_id: Uuid, - task_name: &str, - ) -> Result<WorktreeInfo, WorktreeError> { - // Verify source worktree exists - if !source_worktree.exists() { - return Err(WorktreeError::RepoNotFound(format!( - "Source worktree not found: {}", - source_worktree.display() - ))); - } - - // Get the source repo from the source worktree - let source_repo = self.get_worktree_source(source_worktree).await?; - - // Get the base branch from source worktree's current HEAD - let output = Command::new("git") - .args(["rev-parse", "HEAD"]) - .current_dir(source_worktree) - .output() - .await?; - - if !output.status.success() { - return Err(WorktreeError::GitCommand( - "Failed to get source worktree HEAD".to_string(), - )); - } - let source_commit = String::from_utf8_lossy(&output.stdout).trim().to_string(); - - // Generate unique directory name and branch for new worktree - let dir_name = format!("{}-{}", short_uuid(task_id), sanitize_name(task_name)); - let worktree_path = self.base_dir.join(&dir_name); - let branch_name = format!("{}{}", self.branch_prefix, task_id); - - // Acquire lock for this worktree path - let lock = { - let mut locks = WORKTREE_LOCKS.lock().await; - locks - .entry(worktree_path.to_string_lossy().to_string()) - .or_insert_with(|| std::sync::Arc::new(tokio::sync::Mutex::new(()))) - .clone() - }; - let _guard = lock.lock().await; - - // Remove existing worktree if present - if worktree_path.exists() { - tracing::info!( - task_id = %task_id, - worktree_path = %worktree_path.display(), - "Removing existing worktree before creating from source" - ); - tokio::fs::remove_dir_all(&worktree_path).await?; - } - - // Create base directory - tokio::fs::create_dir_all(&self.base_dir).await?; - - tracing::info!( - task_id = %task_id, - source_worktree = %source_worktree.display(), - worktree_path = %worktree_path.display(), - branch = %branch_name, - source_commit = %source_commit, - "Creating worktree from source task" - ); - - // Create a new worktree based on the source commit - let output = Command::new("git") - .args([ - "worktree", - "add", - "-b", - &branch_name, - ]) - .arg(&worktree_path) - .arg(&source_commit) - .current_dir(&source_repo) - .output() - .await?; - - if !output.status.success() { - let stderr = String::from_utf8_lossy(&output.stderr); - return Err(WorktreeError::GitCommand(format!( - "Failed to create worktree: {}", - stderr - ))); - } - - // Now copy uncommitted changes from source worktree - // Use rsync to copy all files except .git - let output = Command::new("rsync") - .args([ - "-a", - "--exclude", ".git", - "--exclude", ".makima", - &format!("{}/", source_worktree.display()), - &format!("{}/", worktree_path.display()), - ]) - .output() - .await?; - - if !output.status.success() { - let stderr = String::from_utf8_lossy(&output.stderr); - tracing::warn!( - task_id = %task_id, - "rsync warning (continuing anyway): {}", - stderr - ); - } - - tracing::info!( - task_id = %task_id, - worktree_path = %worktree_path.display(), - "Worktree created from source task successfully" - ); - - Ok(WorktreeInfo { - path: worktree_path, - branch: branch_name, - source_repo: source_repo.to_path_buf(), - }) - } - - /// Remove a worktree and optionally its branch. - pub async fn remove_worktree( - &self, - worktree_path: &Path, - delete_branch: bool, - ) -> Result<(), WorktreeError> { - if !worktree_path.exists() { - return Ok(()); // Already gone - } - - // Get the branch name before removing - let branch_name = if delete_branch { - self.get_worktree_branch(worktree_path).await.ok() - } else { - None - }; - - // Find the source repo from worktree - let source_repo = self.get_worktree_source(worktree_path).await?; - - tracing::info!( - worktree_path = %worktree_path.display(), - delete_branch = delete_branch, - "Removing worktree" - ); - - // Remove the worktree - let output = Command::new("git") - .args(["worktree", "remove", "--force"]) - .arg(worktree_path) - .current_dir(&source_repo) - .output() - .await?; - - if !output.status.success() { - let stderr = String::from_utf8_lossy(&output.stderr); - // Try force removal of directory if git worktree remove fails - if worktree_path.exists() { - tokio::fs::remove_dir_all(worktree_path).await?; - } - tracing::warn!("Git worktree remove warning: {}", stderr); - } - - // Prune worktree references - let _ = Command::new("git") - .args(["worktree", "prune"]) - .current_dir(&source_repo) - .output() - .await; - - // Delete the branch if requested - if let Some(branch) = branch_name { - let output = Command::new("git") - .args(["branch", "-D", &branch]) - .current_dir(&source_repo) - .output() - .await?; - - if !output.status.success() { - let stderr = String::from_utf8_lossy(&output.stderr); - tracing::warn!("Failed to delete branch {}: {}", branch, stderr); - } - } - - Ok(()) - } - - /// Get the branch name of a worktree. - async fn get_worktree_branch(&self, worktree_path: &Path) -> Result<String, WorktreeError> { - let output = Command::new("git") - .args(["rev-parse", "--abbrev-ref", "HEAD"]) - .current_dir(worktree_path) - .output() - .await?; - - if !output.status.success() { - let stderr = String::from_utf8_lossy(&output.stderr); - return Err(WorktreeError::GitCommand(format!( - "Failed to get branch: {}", - stderr - ))); - } - - Ok(String::from_utf8_lossy(&output.stdout).trim().to_string()) - } - - /// Get the source repository path for a worktree. - async fn get_worktree_source(&self, worktree_path: &Path) -> Result<PathBuf, WorktreeError> { - // Read the .git file in the worktree which contains the path to the main repo - let git_file = worktree_path.join(".git"); - - if git_file.is_file() { - let content = tokio::fs::read_to_string(&git_file).await?; - // Format: "gitdir: /path/to/repo/.git/worktrees/name" - if let Some(gitdir) = content.strip_prefix("gitdir: ") { - let gitdir = gitdir.trim(); - // Navigate from worktrees/name back to the main repo - let path = PathBuf::from(gitdir); - if let Some(worktrees_dir) = path.parent() { - if let Some(git_dir) = worktrees_dir.parent() { - if let Some(repo_dir) = git_dir.parent() { - return Ok(repo_dir.to_path_buf()); - } - } - } - } - } - - // Fallback: try to find it in our repos directory - Err(WorktreeError::InvalidPath(format!( - "Could not determine source repo for worktree: {}", - worktree_path.display() - ))) - } - - /// List all worktrees in the base directory. - pub async fn list_worktrees(&self) -> Result<Vec<PathBuf>, WorktreeError> { - let mut worktrees = Vec::new(); - - if !self.base_dir.exists() { - return Ok(worktrees); - } - - let mut entries = tokio::fs::read_dir(&self.base_dir).await?; - while let Some(entry) = entries.next_entry().await? { - let path = entry.path(); - if path.is_dir() && path.join(".git").exists() { - worktrees.push(path); - } - } - - Ok(worktrees) - } - - /// Initialize a new git repository for a task. - /// - /// This creates a fresh git repo (not a worktree) for tasks that don't need - /// an existing codebase. Use this when `repository_url` is `new://` or `new://project-name`. - pub async fn init_new_repo( - &self, - task_id: Uuid, - repo_source: &str, - ) -> Result<WorktreeInfo, WorktreeError> { - let project_name = extract_new_repo_name(repo_source); - let dir_name = match project_name { - Some(name) => format!("{}-{}", short_uuid(task_id), sanitize_name(name)), - None => format!("{}-new", short_uuid(task_id)), - }; - let repo_path = self.repos_dir.join(&dir_name); - - tracing::info!( - task_id = %task_id, - path = %repo_path.display(), - project_name = ?project_name, - "Initializing new git repository" - ); - - // Create directory - tokio::fs::create_dir_all(&repo_path).await?; - - // git init - let output = Command::new("git") - .args(["init"]) - .current_dir(&repo_path) - .output() - .await?; - - if !output.status.success() { - let stderr = String::from_utf8_lossy(&output.stderr); - return Err(WorktreeError::GitCommand(format!( - "Failed to init repository: {}", - stderr - ))); - } - - // Configure git user (needed for commits) - let _ = Command::new("git") - .args(["config", "user.email", "makima@localhost"]) - .current_dir(&repo_path) - .output() - .await; - let _ = Command::new("git") - .args(["config", "user.name", "Makima"]) - .current_dir(&repo_path) - .output() - .await; - - // Initial commit (required for worktrees to work later if needed) - let output = Command::new("git") - .args(["commit", "--allow-empty", "-m", "Initial commit"]) - .current_dir(&repo_path) - .output() - .await?; - - if !output.status.success() { - let stderr = String::from_utf8_lossy(&output.stderr); - return Err(WorktreeError::GitCommand(format!( - "Failed to create initial commit: {}", - stderr - ))); - } - - tracing::info!( - task_id = %task_id, - path = %repo_path.display(), - "New git repository initialized" - ); - - Ok(WorktreeInfo { - path: repo_path.clone(), - branch: "main".to_string(), - source_repo: repo_path, - }) - } - - // ========== Merge Operations ========== - - /// List all task branches in a repository. - /// - /// Returns branches matching the pattern `makima/task-*`. - pub async fn list_task_branches( - &self, - repo_path: &Path, - ) -> Result<Vec<TaskBranchInfo>, WorktreeError> { - // Get all branches matching our prefix - let output = Command::new("git") - .args([ - "branch", - "--list", - &format!("{}*", self.branch_prefix), - "--format=%(refname:short)|%(objectname:short)|%(subject)", - ]) - .current_dir(repo_path) - .output() - .await?; - - if !output.status.success() { - let stderr = String::from_utf8_lossy(&output.stderr); - return Err(WorktreeError::GitCommand(format!( - "Failed to list branches: {}", - stderr - ))); - } - - // Get list of merged branches - let merged_output = Command::new("git") - .args(["branch", "--merged", "HEAD", "--format=%(refname:short)"]) - .current_dir(repo_path) - .output() - .await?; - - let merged_branches: std::collections::HashSet<String> = if merged_output.status.success() { - String::from_utf8_lossy(&merged_output.stdout) - .lines() - .map(|s| s.trim().to_string()) - .collect() - } else { - std::collections::HashSet::new() - }; - - let stdout = String::from_utf8_lossy(&output.stdout); - let mut branches = Vec::new(); - - for line in stdout.lines() { - let parts: Vec<&str> = line.split('|').collect(); - if parts.len() >= 3 { - let name = parts[0].trim().to_string(); - let last_commit = parts[1].trim().to_string(); - let last_commit_message = parts[2].trim().to_string(); - - // Try to extract task ID from branch name - let task_id = name - .strip_prefix(&self.branch_prefix) - .and_then(|s| Uuid::parse_str(s).ok()); - - let is_merged = merged_branches.contains(&name); - - branches.push(TaskBranchInfo { - name, - task_id, - is_merged, - last_commit, - last_commit_message, - }); - } - } - - Ok(branches) - } - - /// Start a merge of a branch into the current worktree. - /// - /// Uses `--no-commit` to allow conflict resolution before committing. - /// Returns Ok(None) if merge succeeds without conflicts, or Ok(Some(files)) - /// with the list of conflicted files. - pub async fn merge_branch( - &self, - worktree_path: &Path, - source_branch: &str, - ) -> Result<Option<Vec<String>>, WorktreeError> { - // Check if there's already a merge in progress - if self.is_merge_in_progress(worktree_path).await? { - return Err(WorktreeError::MergeInProgress); - } - - tracing::info!( - worktree = %worktree_path.display(), - source_branch = %source_branch, - "Starting merge" - ); - - // Attempt the merge with --no-commit --no-ff - let output = Command::new("git") - .args(["merge", "--no-commit", "--no-ff", source_branch]) - .current_dir(worktree_path) - .output() - .await?; - - if output.status.success() { - tracing::info!("Merge completed without conflicts"); - return Ok(None); - } - - // Check if there are conflicts - let conflicts = self.get_conflicted_files(worktree_path).await?; - if !conflicts.is_empty() { - tracing::info!( - conflicts = ?conflicts, - "Merge has conflicts" - ); - return Ok(Some(conflicts)); - } - - // Other error - let stderr = String::from_utf8_lossy(&output.stderr); - Err(WorktreeError::GitCommand(format!( - "Merge failed: {}", - stderr - ))) - } - - /// Check if a merge is currently in progress. - pub async fn is_merge_in_progress(&self, worktree_path: &Path) -> Result<bool, WorktreeError> { - // Check for MERGE_HEAD file - let merge_head = worktree_path.join(".git").join("MERGE_HEAD"); - if merge_head.exists() { - return Ok(true); - } - - // Also check in .git file (for worktrees) - let git_file = worktree_path.join(".git"); - if git_file.is_file() { - if let Ok(content) = tokio::fs::read_to_string(&git_file).await { - if let Some(gitdir) = content.strip_prefix("gitdir: ") { - let gitdir = PathBuf::from(gitdir.trim()); - let merge_head = gitdir.join("MERGE_HEAD"); - if merge_head.exists() { - return Ok(true); - } - } - } - } - - Ok(false) - } - - /// Get the list of files with unresolved conflicts. - pub async fn get_conflicted_files( - &self, - worktree_path: &Path, - ) -> Result<Vec<String>, WorktreeError> { - let output = Command::new("git") - .args(["diff", "--name-only", "--diff-filter=U"]) - .current_dir(worktree_path) - .output() - .await?; - - if !output.status.success() { - // No conflicts or not in merge state - return Ok(Vec::new()); - } - - let stdout = String::from_utf8_lossy(&output.stdout); - let files: Vec<String> = stdout - .lines() - .map(|s| s.trim().to_string()) - .filter(|s| !s.is_empty()) - .collect(); - - Ok(files) - } - - /// Get the current merge state. - pub async fn get_merge_state( - &self, - worktree_path: &Path, - ) -> Result<MergeState, WorktreeError> { - let in_progress = self.is_merge_in_progress(worktree_path).await?; - - if !in_progress { - return Ok(MergeState { - source_branch: String::new(), - conflicted_files: Vec::new(), - in_progress: false, - }); - } - - // Get the branch being merged from MERGE_HEAD - let source_branch = self.get_merge_source_branch(worktree_path).await?; - let conflicted_files = self.get_conflicted_files(worktree_path).await?; - - Ok(MergeState { - source_branch, - conflicted_files, - in_progress: true, - }) - } - - /// Get the branch name being merged (from MERGE_HEAD). - async fn get_merge_source_branch(&self, worktree_path: &Path) -> Result<String, WorktreeError> { - // Get MERGE_HEAD commit - let output = Command::new("git") - .args(["rev-parse", "MERGE_HEAD"]) - .current_dir(worktree_path) - .output() - .await?; - - if !output.status.success() { - return Ok("unknown".to_string()); - } - - let commit = String::from_utf8_lossy(&output.stdout).trim().to_string(); - - // Try to find branch name for this commit - let output = Command::new("git") - .args(["name-rev", "--name-only", &commit]) - .current_dir(worktree_path) - .output() - .await?; - - if output.status.success() { - let name = String::from_utf8_lossy(&output.stdout).trim().to_string(); - // Clean up the name (remove ~N suffixes, etc.) - let name = name.split('~').next().unwrap_or(&name); - let name = name.split('^').next().unwrap_or(name); - return Ok(name.to_string()); - } - - Ok(commit[..8.min(commit.len())].to_string()) - } - - /// Resolve a conflict in a specific file. - pub async fn resolve_conflict( - &self, - worktree_path: &Path, - file_path: &str, - resolution: ConflictResolution, - ) -> Result<(), WorktreeError> { - if !self.is_merge_in_progress(worktree_path).await? { - return Err(WorktreeError::NoMergeInProgress); - } - - let strategy = match resolution { - ConflictResolution::Ours => "--ours", - ConflictResolution::Theirs => "--theirs", - }; - - tracing::info!( - worktree = %worktree_path.display(), - file = %file_path, - strategy = %strategy, - "Resolving conflict" - ); - - // Checkout the chosen version - let output = Command::new("git") - .args(["checkout", strategy, "--", file_path]) - .current_dir(worktree_path) - .output() - .await?; - - if !output.status.success() { - let stderr = String::from_utf8_lossy(&output.stderr); - return Err(WorktreeError::GitCommand(format!( - "Failed to resolve conflict: {}", - stderr - ))); - } - - // Stage the resolved file - let output = Command::new("git") - .args(["add", file_path]) - .current_dir(worktree_path) - .output() - .await?; - - if !output.status.success() { - let stderr = String::from_utf8_lossy(&output.stderr); - return Err(WorktreeError::GitCommand(format!( - "Failed to stage resolved file: {}", - stderr - ))); - } - - Ok(()) - } - - /// Abort the current merge. - pub async fn abort_merge(&self, worktree_path: &Path) -> Result<(), WorktreeError> { - if !self.is_merge_in_progress(worktree_path).await? { - return Err(WorktreeError::NoMergeInProgress); - } - - tracing::info!( - worktree = %worktree_path.display(), - "Aborting merge" - ); - - let output = Command::new("git") - .args(["merge", "--abort"]) - .current_dir(worktree_path) - .output() - .await?; - - if !output.status.success() { - let stderr = String::from_utf8_lossy(&output.stderr); - return Err(WorktreeError::GitCommand(format!( - "Failed to abort merge: {}", - stderr - ))); - } - - Ok(()) - } - - /// Commit the current merge. - pub async fn commit_merge( - &self, - worktree_path: &Path, - message: &str, - ) -> Result<String, WorktreeError> { - // Check for remaining conflicts - let conflicts = self.get_conflicted_files(worktree_path).await?; - if !conflicts.is_empty() { - return Err(WorktreeError::MergeConflicts(conflicts.join(", "))); - } - - tracing::info!( - worktree = %worktree_path.display(), - message = %message, - "Committing merge" - ); - - let output = Command::new("git") - .args(["commit", "-m", message]) - .current_dir(worktree_path) - .output() - .await?; - - if !output.status.success() { - let stderr = String::from_utf8_lossy(&output.stderr); - return Err(WorktreeError::GitCommand(format!( - "Failed to commit merge: {}", - stderr - ))); - } - - // Get the new commit SHA - let output = Command::new("git") - .args(["rev-parse", "HEAD"]) - .current_dir(worktree_path) - .output() - .await?; - - if output.status.success() { - let sha = String::from_utf8_lossy(&output.stdout).trim().to_string(); - return Ok(sha); - } - - Ok("unknown".to_string()) - } - - // ========== Completion Action Operations ========== - - /// Push task branch from worktree to an external target repository. - /// - /// This stages and commits any uncommitted changes, then pushes to the target repo. - pub async fn push_to_target_repo( - &self, - worktree_path: &Path, - target_repo: &Path, - branch_name: &str, - task_name: &str, - ) -> Result<(), WorktreeError> { - tracing::info!( - worktree = %worktree_path.display(), - target_repo = %target_repo.display(), - branch = %branch_name, - "Pushing branch to target repository" - ); - - // First, stage all changes (including new files) - let output = Command::new("git") - .args(["add", "-A"]) - .current_dir(worktree_path) - .output() - .await?; - - if !output.status.success() { - let stderr = String::from_utf8_lossy(&output.stderr); - return Err(WorktreeError::GitCommand(format!( - "Failed to stage changes: {}", - stderr - ))); - } - - // Check if there are staged changes to commit - let output = Command::new("git") - .args(["diff", "--cached", "--quiet"]) - .current_dir(worktree_path) - .output() - .await?; - - // Exit code 1 means there are staged changes - if !output.status.success() { - tracing::info!("Committing staged changes before push"); - - let commit_message = format!("feat: {}", task_name); - let output = Command::new("git") - .args(["commit", "-m", &commit_message]) - .current_dir(worktree_path) - .output() - .await?; - - if !output.status.success() { - let stderr = String::from_utf8_lossy(&output.stderr); - return Err(WorktreeError::GitCommand(format!( - "Failed to commit changes: {}", - stderr - ))); - } - } - - // Ensure there are commits to push - let output = Command::new("git") - .args(["log", "--oneline", "-1"]) - .current_dir(worktree_path) - .output() - .await?; - - if !output.status.success() { - return Err(WorktreeError::GitCommand( - "No commits in worktree".to_string(), - )); - } - - // Add target repo as a remote in the worktree (if not already) - let remote_name = "target"; - let target_path_str = target_repo.to_string_lossy(); - - // Remove existing remote if any (ignore errors) - let _ = Command::new("git") - .args(["remote", "remove", remote_name]) - .current_dir(worktree_path) - .output() - .await; - - // Add the target as a remote - let output = Command::new("git") - .args(["remote", "add", remote_name, &target_path_str]) - .current_dir(worktree_path) - .output() - .await?; - - if !output.status.success() { - let stderr = String::from_utf8_lossy(&output.stderr); - return Err(WorktreeError::GitCommand(format!( - "Failed to add remote: {}", - stderr - ))); - } - - // Push the branch to the target - let output = Command::new("git") - .args(["push", "-u", remote_name, &format!("HEAD:{}", branch_name)]) - .current_dir(worktree_path) - .output() - .await?; - - if !output.status.success() { - let stderr = String::from_utf8_lossy(&output.stderr); - return Err(WorktreeError::GitCommand(format!( - "Failed to push to target: {}", - stderr - ))); - } - - tracing::info!( - branch = %branch_name, - target_repo = %target_repo.display(), - "Branch pushed successfully" - ); - - // Detach HEAD in the worktree to release the branch - // This allows the branch to be checked out in the target repo - let output = Command::new("git") - .args(["checkout", "--detach", "HEAD"]) - .current_dir(worktree_path) - .output() - .await?; - - if !output.status.success() { - // Non-fatal: log but don't fail the push - let stderr = String::from_utf8_lossy(&output.stderr); - tracing::warn!( - "Failed to detach HEAD in worktree (branch may not be checkable in target): {}", - stderr - ); - } else { - tracing::info!("Detached HEAD in worktree to release branch"); - } - - Ok(()) - } - - /// Merge a branch into the target branch in the target repository. - /// - /// This pushes the branch first (if needed), then performs a merge in the target repo. - pub async fn merge_to_target( - &self, - worktree_path: &Path, - target_repo: &Path, - source_branch: &str, - target_branch: &str, - task_name: &str, - ) -> Result<String, WorktreeError> { - tracing::info!( - worktree = %worktree_path.display(), - target_repo = %target_repo.display(), - source_branch = %source_branch, - target_branch = %target_branch, - "Merging branch to target" - ); - - // First, push the branch to target repo - self.push_to_target_repo(worktree_path, target_repo, source_branch, task_name) - .await?; - - // In target repo, checkout the target branch - let output = Command::new("git") - .args(["checkout", target_branch]) - .current_dir(target_repo) - .output() - .await?; - - if !output.status.success() { - let stderr = String::from_utf8_lossy(&output.stderr); - return Err(WorktreeError::GitCommand(format!( - "Failed to checkout target branch: {}", - stderr - ))); - } - - // Pull latest changes first - let _ = Command::new("git") - .args(["pull", "--ff-only"]) - .current_dir(target_repo) - .output() - .await; - - // Merge the source branch - let merge_message = format!("feat: {}", task_name); - let output = Command::new("git") - .args(["merge", "--no-ff", source_branch, "-m", &merge_message]) - .current_dir(target_repo) - .output() - .await?; - - if !output.status.success() { - let stderr = String::from_utf8_lossy(&output.stderr); - - // Check if it's a conflict - let conflicts = self.get_conflicted_files(target_repo).await?; - if !conflicts.is_empty() { - // Abort the merge - let _ = Command::new("git") - .args(["merge", "--abort"]) - .current_dir(target_repo) - .output() - .await; - - return Err(WorktreeError::MergeConflicts(format!( - "Merge conflicts in: {}. Consider creating a PR instead.", - conflicts.join(", ") - ))); - } - - return Err(WorktreeError::GitCommand(format!( - "Failed to merge: {}", - stderr - ))); - } - - // Get the merge commit SHA - let output = Command::new("git") - .args(["rev-parse", "HEAD"]) - .current_dir(target_repo) - .output() - .await?; - - let commit_sha = if output.status.success() { - String::from_utf8_lossy(&output.stdout).trim().to_string() - } else { - "unknown".to_string() - }; - - tracing::info!( - commit_sha = %commit_sha, - "Merge completed successfully" - ); - - Ok(commit_sha) - } - - /// Create a GitHub pull request using the gh CLI. - /// - /// This pushes the branch first, then creates a PR. - pub async fn create_pull_request( - &self, - worktree_path: &Path, - target_repo: &Path, - source_branch: &str, - target_branch: &str, - title: &str, - body: &str, - ) -> Result<String, WorktreeError> { - tracing::info!( - worktree = %worktree_path.display(), - target_repo = %target_repo.display(), - source_branch = %source_branch, - target_branch = %target_branch, - title = %title, - "Creating pull request" - ); - - // First, push the branch to the target repo's remote - // For PRs, we need to push to origin (the GitHub remote) - - // Get the worktree's current branch - let output = Command::new("git") - .args(["rev-parse", "--abbrev-ref", "HEAD"]) - .current_dir(worktree_path) - .output() - .await?; - - let current_branch = if output.status.success() { - String::from_utf8_lossy(&output.stdout).trim().to_string() - } else { - source_branch.to_string() - }; - - // Push to the target repo's origin - // First, check if target_repo has an origin remote - let output = Command::new("git") - .args(["remote", "get-url", "origin"]) - .current_dir(target_repo) - .output() - .await?; - - if !output.status.success() { - return Err(WorktreeError::GitCommand( - "Target repository has no origin remote configured".to_string(), - )); - } - - let origin_url = String::from_utf8_lossy(&output.stdout).trim().to_string(); - - // Push the branch from worktree to the remote - // First add the remote to worktree - let _ = Command::new("git") - .args(["remote", "remove", "pr-origin"]) - .current_dir(worktree_path) - .output() - .await; - - let output = Command::new("git") - .args(["remote", "add", "pr-origin", &origin_url]) - .current_dir(worktree_path) - .output() - .await?; - - if !output.status.success() { - let stderr = String::from_utf8_lossy(&output.stderr); - return Err(WorktreeError::GitCommand(format!( - "Failed to add remote: {}", - stderr - ))); - } - - // Push to the remote - let output = Command::new("git") - .args(["push", "-u", "pr-origin", &format!("{}:{}", current_branch, source_branch)]) - .current_dir(worktree_path) - .output() - .await?; - - if !output.status.success() { - let stderr = String::from_utf8_lossy(&output.stderr); - return Err(WorktreeError::GitCommand(format!( - "Failed to push branch: {}", - stderr - ))); - } - - // Create PR using gh CLI in the target repo - let output = Command::new("gh") - .args([ - "pr", - "create", - "--title", title, - "--body", body, - "--head", source_branch, - "--base", target_branch, - ]) - .current_dir(target_repo) - .output() - .await?; - - if !output.status.success() { - let stderr = String::from_utf8_lossy(&output.stderr); - return Err(WorktreeError::GitCommand(format!( - "Failed to create PR: {}", - stderr - ))); - } - - // The gh CLI outputs the PR URL - let pr_url = String::from_utf8_lossy(&output.stdout).trim().to_string(); - - tracing::info!( - pr_url = %pr_url, - "Pull request created successfully" - ); - - Ok(pr_url) - } - - /// Clone/copy the worktree contents to a target directory. - /// - /// This creates a new git repository at the target path with the same contents - /// as the worktree. Returns (success, message). - pub async fn clone_worktree_to_directory( - &self, - worktree_path: &Path, - target_dir: &Path, - ) -> Result<String, WorktreeError> { - tracing::info!( - worktree = %worktree_path.display(), - target = %target_dir.display(), - "Cloning worktree to target directory" - ); - - // Check if target directory already exists - if target_dir.exists() { - return Err(WorktreeError::AlreadyExists(format!( - "Target directory already exists: {}", - target_dir.display() - ))); - } - - // Get parent directory to ensure it exists - if let Some(parent) = target_dir.parent() { - if !parent.exists() { - tokio::fs::create_dir_all(parent).await?; - } - } - - // Use git clone --local to efficiently copy the repository - // This is more efficient than cp -r for git repos - let output = Command::new("git") - .args([ - "clone", - "--local", - "--no-hardlinks", - &worktree_path.to_string_lossy(), - &target_dir.to_string_lossy(), - ]) - .output() - .await?; - - if !output.status.success() { - let stderr = String::from_utf8_lossy(&output.stderr); - return Err(WorktreeError::CloneFailed(format!( - "Failed to clone worktree: {}", - stderr - ))); - } - - // Remove the 'origin' remote that points back to the worktree - let _ = Command::new("git") - .args(["remote", "remove", "origin"]) - .current_dir(target_dir) - .output() - .await; - - tracing::info!( - target = %target_dir.display(), - "Worktree cloned successfully" - ); - - Ok(format!("Cloned to {}", target_dir.display())) - } - - /// Check if a target directory exists. - pub async fn target_directory_exists(&self, target_dir: &Path) -> bool { - target_dir.exists() - } -} - -/// Check if repo_source is a "new repo" request. -/// -/// Accepts `new://` or `new://project-name` to create a fresh git repository. -pub fn is_new_repo_request(source: &str) -> bool { - source == "new" || source == "new://" || source.starts_with("new://") -} - -/// Extract optional project name from new:// URL. -fn extract_new_repo_name(source: &str) -> Option<&str> { - source.strip_prefix("new://").filter(|s| !s.is_empty()) -} - -/// Extract repository name from URL. -fn extract_repo_name(url: &str) -> String { - // Handle various URL formats: - // https://github.com/user/repo.git -> repo - // git@github.com:user/repo.git -> repo - // https://github.com/user/repo -> repo - - let url = url.trim_end_matches('/'); - let url = url.trim_end_matches(".git"); - - url.rsplit('/') - .next() - .or_else(|| url.rsplit(':').next()) - .unwrap_or("repo") - .to_string() -} - -/// Create a short UUID string for directory naming. -pub fn short_uuid(id: Uuid) -> String { - id.to_string()[..8].to_string() -} - -/// Expand tilde (~) in path to home directory. -pub fn expand_tilde(path: &str) -> PathBuf { - if let Some(rest) = path.strip_prefix("~/") { - if let Some(home) = dirs::home_dir() { - return home.join(rest); - } - } else if path == "~" { - if let Some(home) = dirs::home_dir() { - return home; - } - } - PathBuf::from(path) -} - -/// Sanitize a name for use in directory/branch names. -pub fn sanitize_name(name: &str) -> String { - name.chars() - .map(|c| { - if c.is_alphanumeric() || c == '-' || c == '_' { - c.to_ascii_lowercase() - } else { - '-' - } - }) - .collect::<String>() - .chars() - .take(50) // Limit length - .collect() -} - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn test_extract_repo_name() { - assert_eq!( - extract_repo_name("https://github.com/user/repo.git"), - "repo" - ); - assert_eq!( - extract_repo_name("https://github.com/user/repo"), - "repo" - ); - assert_eq!( - extract_repo_name("git@github.com:user/repo.git"), - "repo" - ); - } - - #[test] - fn test_sanitize_name() { - assert_eq!(sanitize_name("Hello World!"), "hello-world-"); - assert_eq!(sanitize_name("test_name-123"), "test_name-123"); - assert_eq!(sanitize_name("A".repeat(100).as_str()).len(), 50); - } - - #[test] - fn test_short_uuid() { - let id = Uuid::parse_str("550e8400-e29b-41d4-a716-446655440000").unwrap(); - assert_eq!(short_uuid(id), "550e8400"); - } -} diff --git a/makima/daemon/src/worktree/mod.rs b/makima/daemon/src/worktree/mod.rs deleted file mode 100644 index eb9f031..0000000 --- a/makima/daemon/src/worktree/mod.rs +++ /dev/null @@ -1,11 +0,0 @@ -//! Git worktree management for task isolation. -//! -//! Each task gets a unique git worktree with its own branch, -//! providing isolation without the overhead of Docker containers. - -mod manager; - -pub use manager::{ - expand_tilde, is_new_repo_request, sanitize_name, short_uuid, ConflictResolution, MergeState, - TaskBranchInfo, WorktreeError, WorktreeInfo, WorktreeManager, -}; diff --git a/makima/daemon/src/ws/client.rs b/makima/daemon/src/ws/client.rs deleted file mode 100644 index ba1263f..0000000 --- a/makima/daemon/src/ws/client.rs +++ /dev/null @@ -1,290 +0,0 @@ -//! WebSocket client for connecting to the makima server. - -use std::sync::Arc; -use std::time::Duration; - -use backoff::backoff::Backoff; -use backoff::ExponentialBackoff; -use futures::{SinkExt, StreamExt}; -use tokio::sync::{mpsc, RwLock}; -use tokio_tungstenite::{connect_async, tungstenite::{client::IntoClientRequest, Message}}; -use uuid::Uuid; - -use super::protocol::{DaemonCommand, DaemonMessage}; -use crate::config::ServerConfig; -use crate::error::{DaemonError, Result}; - -/// WebSocket client state. -#[derive(Debug, Clone, Copy, PartialEq, Eq)] -pub enum ConnectionState { - /// Not connected to server. - Disconnected, - /// Currently connecting. - Connecting, - /// Connected and authenticated. - Connected, - /// Connection failed, will retry. - Reconnecting, - /// Permanently failed (e.g., auth failure). - Failed, -} - -/// WebSocket client for daemon-server communication. -pub struct WsClient { - config: ServerConfig, - machine_id: String, - hostname: String, - max_concurrent_tasks: i32, - state: Arc<RwLock<ConnectionState>>, - daemon_id: Arc<RwLock<Option<Uuid>>>, - /// Channel to receive messages to send to server. - outgoing_rx: mpsc::Receiver<DaemonMessage>, - /// Sender for outgoing messages (clone this to send messages). - outgoing_tx: mpsc::Sender<DaemonMessage>, - /// Channel to send received commands to the task manager. - incoming_tx: mpsc::Sender<DaemonCommand>, -} - -impl WsClient { - /// Create a new WebSocket client. - pub fn new( - config: ServerConfig, - machine_id: String, - hostname: String, - max_concurrent_tasks: i32, - incoming_tx: mpsc::Sender<DaemonCommand>, - ) -> Self { - let (outgoing_tx, outgoing_rx) = mpsc::channel(256); - - Self { - config, - machine_id, - hostname, - max_concurrent_tasks, - state: Arc::new(RwLock::new(ConnectionState::Disconnected)), - daemon_id: Arc::new(RwLock::new(None)), - outgoing_rx, - outgoing_tx, - incoming_tx, - } - } - - /// Get a sender for outgoing messages. - pub fn sender(&self) -> mpsc::Sender<DaemonMessage> { - self.outgoing_tx.clone() - } - - /// Get current connection state. - pub async fn state(&self) -> ConnectionState { - *self.state.read().await - } - - /// Get daemon ID if authenticated. - pub async fn daemon_id(&self) -> Option<Uuid> { - *self.daemon_id.read().await - } - - /// Run the WebSocket client with automatic reconnection. - pub async fn run(&mut self) -> Result<()> { - let mut backoff = ExponentialBackoff { - initial_interval: Duration::from_secs(self.config.reconnect_interval_secs), - max_interval: Duration::from_secs(60), - max_elapsed_time: if self.config.max_reconnect_attempts > 0 { - Some(Duration::from_secs( - self.config.reconnect_interval_secs * self.config.max_reconnect_attempts as u64 * 10, - )) - } else { - None // Infinite retries - }, - ..Default::default() - }; - - loop { - *self.state.write().await = ConnectionState::Connecting; - tracing::info!("Connecting to server: {}", self.config.url); - - match self.connect_and_run().await { - Ok(()) => { - // Clean shutdown - tracing::info!("WebSocket connection closed cleanly"); - break; - } - Err(DaemonError::AuthFailed(msg)) => { - tracing::error!("Authentication failed: {}", msg); - *self.state.write().await = ConnectionState::Failed; - return Err(DaemonError::AuthFailed(msg)); - } - Err(e) => { - tracing::warn!("Connection error: {}", e); - *self.state.write().await = ConnectionState::Reconnecting; - - if let Some(delay) = backoff.next_backoff() { - tracing::info!("Reconnecting in {:?}...", delay); - tokio::time::sleep(delay).await; - } else { - tracing::error!("Max reconnection attempts reached"); - *self.state.write().await = ConnectionState::Failed; - return Err(DaemonError::ConnectionLost); - } - } - } - } - - Ok(()) - } - - /// Connect to server and run the message loop. - async fn connect_and_run(&mut self) -> Result<()> { - // Build WebSocket URL - let ws_url = format!("{}/api/v1/mesh/daemons/connect", self.config.url); - tracing::debug!("Connecting to WebSocket: {}", ws_url); - - // Build request with API key header - let mut request = ws_url.into_client_request()?; - request.headers_mut().insert( - "x-makima-api-key", - self.config.api_key.parse().map_err(|_| { - DaemonError::AuthFailed("Invalid API key format".into()) - })?, - ); - - // Connect with API key in headers - let (ws_stream, _response) = connect_async(request).await?; - let (mut write, mut read) = ws_stream.split(); - - // Send daemon info after connection (server authenticated us via header) - let info_msg = DaemonMessage::authenticate( - &self.config.api_key, - &self.machine_id, - &self.hostname, - self.max_concurrent_tasks, - ); - let info_json = serde_json::to_string(&info_msg)?; - write.send(Message::Text(info_json)).await?; - - // Wait for authentication response - let auth_response = read - .next() - .await - .ok_or(DaemonError::ConnectionLost)??; - - let auth_text = match auth_response { - Message::Text(text) => text, - Message::Close(_) => return Err(DaemonError::ConnectionLost), - _ => return Err(DaemonError::AuthFailed("Unexpected response type".into())), - }; - - let command: DaemonCommand = serde_json::from_str(&auth_text)?; - match command { - DaemonCommand::Authenticated { daemon_id } => { - tracing::info!("Authenticated with daemon ID: {}", daemon_id); - *self.daemon_id.write().await = Some(daemon_id); - *self.state.write().await = ConnectionState::Connected; - - // Send daemon directories info to server - let working_directory = std::env::current_dir() - .map(|p| p.to_string_lossy().to_string()) - .unwrap_or_else(|_| ".".to_string()); - let home_directory = dirs::home_dir() - .map(|h| h.join(".makima").join("home")) - .unwrap_or_else(|| std::path::PathBuf::from("~/.makima/home")); - // Create home directory if it doesn't exist - if let Err(e) = std::fs::create_dir_all(&home_directory) { - tracing::warn!("Failed to create home directory {:?}: {}", home_directory, e); - } - let home_directory_str = home_directory.to_string_lossy().to_string(); - let worktrees_directory = dirs::home_dir() - .map(|h| h.join(".makima").join("worktrees").to_string_lossy().to_string()) - .unwrap_or_else(|| "~/.makima/worktrees".to_string()); - - let dirs_msg = DaemonMessage::DaemonDirectories { - working_directory, - home_directory: home_directory_str, - worktrees_directory, - }; - let dirs_json = serde_json::to_string(&dirs_msg)?; - write.send(Message::Text(dirs_json)).await?; - tracing::info!("Sent daemon directories info to server"); - } - DaemonCommand::Error { code, message } => { - return Err(DaemonError::AuthFailed(format!("{}: {}", code, message))); - } - _ => { - return Err(DaemonError::AuthFailed( - "Unexpected response to authentication".into(), - )); - } - } - - // Start main message loop - let heartbeat_interval = Duration::from_secs(self.config.heartbeat_interval_secs); - let mut heartbeat_timer = tokio::time::interval(heartbeat_interval); - - loop { - tokio::select! { - // Handle incoming server commands - msg = read.next() => { - match msg { - Some(Ok(Message::Text(text))) => { - tracing::info!("Received WebSocket message: {} bytes", text.len()); - match serde_json::from_str::<DaemonCommand>(&text) { - Ok(command) => { - tracing::info!("Parsed command: {:?}", command); - tracing::info!("Sending command to task manager channel..."); - if self.incoming_tx.send(command).await.is_err() { - tracing::warn!("Command receiver dropped, shutting down"); - break; - } - tracing::info!("Command sent to task manager successfully"); - } - Err(e) => { - tracing::warn!("Failed to parse server message: {}", e); - tracing::debug!("Raw message: {}", text); - } - } - } - Some(Ok(Message::Ping(data))) => { - write.send(Message::Pong(data)).await?; - } - Some(Ok(Message::Close(_))) | None => { - tracing::info!("Server closed connection"); - return Err(DaemonError::ConnectionLost); - } - Some(Err(e)) => { - tracing::warn!("WebSocket error: {}", e); - return Err(e.into()); - } - _ => {} - } - } - - // Handle outgoing messages - msg = self.outgoing_rx.recv() => { - match msg { - Some(message) => { - let json = serde_json::to_string(&message)?; - tracing::trace!("Sending message: {}", json); - write.send(Message::Text(json)).await?; - } - None => { - // Sender dropped, shutdown - tracing::info!("Outgoing channel closed, shutting down"); - break; - } - } - } - - // Send heartbeat - _ = heartbeat_timer.tick() => { - // Get active task IDs from task manager - // For now, send empty list - will be connected to task manager - let heartbeat = DaemonMessage::heartbeat(vec![]); - let json = serde_json::to_string(&heartbeat)?; - write.send(Message::Text(json)).await?; - } - } - } - - Ok(()) - } -} diff --git a/makima/daemon/src/ws/mod.rs b/makima/daemon/src/ws/mod.rs deleted file mode 100644 index 5a0e9d1..0000000 --- a/makima/daemon/src/ws/mod.rs +++ /dev/null @@ -1,7 +0,0 @@ -//! WebSocket client and protocol types for daemon-server communication. - -pub mod client; -pub mod protocol; - -pub use client::{ConnectionState, WsClient}; -pub use protocol::{BranchInfo, DaemonCommand, DaemonMessage}; diff --git a/makima/daemon/src/ws/protocol.rs b/makima/daemon/src/ws/protocol.rs deleted file mode 100644 index 7c2ad6d..0000000 --- a/makima/daemon/src/ws/protocol.rs +++ /dev/null @@ -1,511 +0,0 @@ -//! Protocol types for daemon-server communication. -//! -//! These types mirror the server's protocol exactly for compatibility. - -use serde::{Deserialize, Serialize}; -use uuid::Uuid; - -/// Message from daemon to server. -#[derive(Debug, Clone, Serialize, Deserialize)] -#[serde(tag = "type", rename_all = "camelCase")] -pub enum DaemonMessage { - /// Authentication request (first message required). - Authenticate { - #[serde(rename = "apiKey")] - api_key: String, - #[serde(rename = "machineId")] - machine_id: String, - hostname: String, - #[serde(rename = "maxConcurrentTasks")] - max_concurrent_tasks: i32, - }, - - /// Periodic heartbeat with current status. - Heartbeat { - #[serde(rename = "activeTasks")] - active_tasks: Vec<Uuid>, - }, - - /// Task output streaming (stdout/stderr from Claude Code). - TaskOutput { - #[serde(rename = "taskId")] - task_id: Uuid, - output: String, - #[serde(rename = "isPartial")] - is_partial: bool, - }, - - /// Task status change notification. - TaskStatusChange { - #[serde(rename = "taskId")] - task_id: Uuid, - #[serde(rename = "oldStatus")] - old_status: String, - #[serde(rename = "newStatus")] - new_status: String, - }, - - /// Task progress update with summary. - TaskProgress { - #[serde(rename = "taskId")] - task_id: Uuid, - summary: String, - }, - - /// Task completion notification. - TaskComplete { - #[serde(rename = "taskId")] - task_id: Uuid, - success: bool, - error: Option<String>, - }, - - /// Register a tool key for orchestrator API access. - RegisterToolKey { - #[serde(rename = "taskId")] - task_id: Uuid, - /// The API key for this orchestrator to use when calling mesh endpoints. - key: String, - }, - - /// Revoke a tool key when task completes. - RevokeToolKey { - #[serde(rename = "taskId")] - task_id: Uuid, - }, - - // ========================================================================= - // Merge Response Messages (sent by daemon after processing merge commands) - // ========================================================================= - - /// Response to ListBranches command. - BranchList { - #[serde(rename = "taskId")] - task_id: Uuid, - branches: Vec<BranchInfo>, - }, - - /// Response to MergeStatus command. - MergeStatusResponse { - #[serde(rename = "taskId")] - task_id: Uuid, - #[serde(rename = "inProgress")] - in_progress: bool, - #[serde(rename = "sourceBranch")] - source_branch: Option<String>, - #[serde(rename = "conflictedFiles")] - conflicted_files: Vec<String>, - }, - - /// Response to merge operations (MergeStart, MergeResolve, MergeCommit, MergeAbort). - MergeResult { - #[serde(rename = "taskId")] - task_id: Uuid, - success: bool, - message: String, - #[serde(rename = "commitSha")] - commit_sha: Option<String>, - /// Present only when conflicts occurred. - conflicts: Option<Vec<String>>, - }, - - /// Response to CheckMergeComplete command. - MergeCompleteCheck { - #[serde(rename = "taskId")] - task_id: Uuid, - #[serde(rename = "canComplete")] - can_complete: bool, - #[serde(rename = "unmergedBranches")] - unmerged_branches: Vec<String>, - #[serde(rename = "mergedCount")] - merged_count: u32, - #[serde(rename = "skippedCount")] - skipped_count: u32, - }, - - // ========================================================================= - // Completion Action Response Messages - // ========================================================================= - - /// Response to RetryCompletionAction command. - CompletionActionResult { - #[serde(rename = "taskId")] - task_id: Uuid, - success: bool, - message: String, - /// PR URL if action was "pr" and successful. - #[serde(rename = "prUrl")] - pr_url: Option<String>, - }, - - /// Report daemon's available directories for task output. - DaemonDirectories { - /// Current working directory of the daemon. - #[serde(rename = "workingDirectory")] - working_directory: String, - /// Path to ~/.makima/home directory (for cloning completed work). - #[serde(rename = "homeDirectory")] - home_directory: String, - /// Path to worktrees directory (~/.makima/worktrees). - #[serde(rename = "worktreesDirectory")] - worktrees_directory: String, - }, - - /// Response to CloneWorktree command. - CloneWorktreeResult { - #[serde(rename = "taskId")] - task_id: Uuid, - success: bool, - message: String, - /// The path where the worktree was cloned. - #[serde(rename = "targetDir")] - target_dir: Option<String>, - }, - - /// Response to CheckTargetExists command. - CheckTargetExistsResult { - #[serde(rename = "taskId")] - task_id: Uuid, - /// Whether the target directory exists. - exists: bool, - /// The path that was checked. - #[serde(rename = "targetDir")] - target_dir: String, - }, -} - -/// Information about a branch (used in BranchList message). -#[derive(Debug, Clone, Serialize, Deserialize)] -#[serde(rename_all = "camelCase")] -pub struct BranchInfo { - /// Full branch name. - pub name: String, - /// Task ID extracted from branch name (if parseable). - #[serde(rename = "taskId")] - pub task_id: Option<Uuid>, - /// Whether this branch has been merged. - #[serde(rename = "isMerged")] - pub is_merged: bool, - /// Short SHA of the last commit. - #[serde(rename = "lastCommit")] - pub last_commit: String, - /// Subject line of the last commit. - #[serde(rename = "lastCommitMessage")] - pub last_commit_message: String, -} - -/// Command from server to daemon. -#[derive(Debug, Clone, Serialize, Deserialize)] -#[serde(tag = "type", rename_all = "camelCase")] -pub enum DaemonCommand { - /// Confirm successful authentication. - Authenticated { - #[serde(rename = "daemonId")] - daemon_id: Uuid, - }, - - /// Spawn a new task in a container. - SpawnTask { - #[serde(rename = "taskId")] - task_id: Uuid, - /// Human-readable task name (used for commit messages). - #[serde(rename = "taskName")] - task_name: String, - plan: String, - #[serde(rename = "repoUrl")] - repo_url: Option<String>, - #[serde(rename = "baseBranch")] - base_branch: Option<String>, - /// Target branch to merge into (used for completion actions). - #[serde(rename = "targetBranch")] - target_branch: Option<String>, - /// Parent task ID if this is a subtask. - #[serde(rename = "parentTaskId")] - parent_task_id: Option<Uuid>, - /// Depth in task hierarchy (0=top-level, 1=subtask, 2=sub-subtask). - depth: i32, - /// Whether this task should run as an orchestrator (true if depth==0 and has subtasks). - #[serde(rename = "isOrchestrator")] - is_orchestrator: bool, - /// Path to user's local repository (outside ~/.makima) for completion actions. - #[serde(rename = "targetRepoPath")] - target_repo_path: Option<String>, - /// Action on completion: "none", "branch", "merge", "pr". - #[serde(rename = "completionAction")] - completion_action: Option<String>, - /// Task ID to continue from (copy worktree from this task). - #[serde(rename = "continueFromTaskId")] - continue_from_task_id: Option<Uuid>, - /// Files to copy from parent task's worktree. - #[serde(rename = "copyFiles")] - copy_files: Option<Vec<String>>, - }, - - /// Pause a running task. - PauseTask { - #[serde(rename = "taskId")] - task_id: Uuid, - }, - - /// Resume a paused task. - ResumeTask { - #[serde(rename = "taskId")] - task_id: Uuid, - }, - - /// Interrupt a task (gracefully or forced). - InterruptTask { - #[serde(rename = "taskId")] - task_id: Uuid, - graceful: bool, - }, - - /// Send a message to a running task. - SendMessage { - #[serde(rename = "taskId")] - task_id: Uuid, - message: String, - }, - - /// Inject context about sibling task progress. - InjectSiblingContext { - #[serde(rename = "taskId")] - task_id: Uuid, - #[serde(rename = "siblingTaskId")] - sibling_task_id: Uuid, - #[serde(rename = "siblingName")] - sibling_name: String, - #[serde(rename = "siblingStatus")] - sibling_status: String, - #[serde(rename = "progressSummary")] - progress_summary: Option<String>, - #[serde(rename = "changedFiles")] - changed_files: Vec<String>, - }, - - // ========================================================================= - // Merge Commands (for orchestrators to merge subtask branches) - // ========================================================================= - - /// List all subtask branches for a task. - ListBranches { - #[serde(rename = "taskId")] - task_id: Uuid, - }, - - /// Start merging a subtask branch. - MergeStart { - #[serde(rename = "taskId")] - task_id: Uuid, - #[serde(rename = "sourceBranch")] - source_branch: String, - }, - - /// Get current merge status. - MergeStatus { - #[serde(rename = "taskId")] - task_id: Uuid, - }, - - /// Resolve a merge conflict. - MergeResolve { - #[serde(rename = "taskId")] - task_id: Uuid, - file: String, - /// "ours" or "theirs" - strategy: String, - }, - - /// Commit the current merge. - MergeCommit { - #[serde(rename = "taskId")] - task_id: Uuid, - message: String, - }, - - /// Abort the current merge. - MergeAbort { - #[serde(rename = "taskId")] - task_id: Uuid, - }, - - /// Skip merging a subtask branch (mark as intentionally not merged). - MergeSkip { - #[serde(rename = "taskId")] - task_id: Uuid, - #[serde(rename = "subtaskId")] - subtask_id: Uuid, - reason: String, - }, - - /// Check if all subtask branches have been merged or skipped (completion gate). - CheckMergeComplete { - #[serde(rename = "taskId")] - task_id: Uuid, - }, - - // ========================================================================= - // Completion Action Commands - // ========================================================================= - - /// Retry a completion action for a completed task. - RetryCompletionAction { - #[serde(rename = "taskId")] - task_id: Uuid, - /// Human-readable task name (used for commit messages). - #[serde(rename = "taskName")] - task_name: String, - /// The action to execute: "branch", "merge", or "pr". - action: String, - /// Path to the target repository. - #[serde(rename = "targetRepoPath")] - target_repo_path: String, - /// Target branch to merge into (for merge/pr actions). - #[serde(rename = "targetBranch")] - target_branch: Option<String>, - }, - - /// Clone worktree to a target directory. - CloneWorktree { - #[serde(rename = "taskId")] - task_id: Uuid, - /// Path to the target directory. - #[serde(rename = "targetDir")] - target_dir: String, - }, - - /// Check if a target directory exists. - CheckTargetExists { - #[serde(rename = "taskId")] - task_id: Uuid, - /// Path to check. - #[serde(rename = "targetDir")] - target_dir: String, - }, - - /// Error response. - Error { - code: String, - message: String, - }, -} - -impl DaemonMessage { - /// Create an authentication message. - pub fn authenticate( - api_key: &str, - machine_id: &str, - hostname: &str, - max_concurrent_tasks: i32, - ) -> Self { - Self::Authenticate { - api_key: api_key.to_string(), - machine_id: machine_id.to_string(), - hostname: hostname.to_string(), - max_concurrent_tasks, - } - } - - /// Create a heartbeat message. - pub fn heartbeat(active_tasks: Vec<Uuid>) -> Self { - Self::Heartbeat { active_tasks } - } - - /// Create a task output message. - pub fn task_output(task_id: Uuid, output: String, is_partial: bool) -> Self { - Self::TaskOutput { - task_id, - output, - is_partial, - } - } - - /// Create a task status change message. - pub fn task_status_change(task_id: Uuid, old_status: &str, new_status: &str) -> Self { - Self::TaskStatusChange { - task_id, - old_status: old_status.to_string(), - new_status: new_status.to_string(), - } - } - - /// Create a task progress message. - pub fn task_progress(task_id: Uuid, summary: String) -> Self { - Self::TaskProgress { task_id, summary } - } - - /// Create a task complete message. - pub fn task_complete(task_id: Uuid, success: bool, error: Option<String>) -> Self { - Self::TaskComplete { - task_id, - success, - error, - } - } - - /// Create a register tool key message. - pub fn register_tool_key(task_id: Uuid, key: String) -> Self { - Self::RegisterToolKey { task_id, key } - } - - /// Create a revoke tool key message. - pub fn revoke_tool_key(task_id: Uuid) -> Self { - Self::RevokeToolKey { task_id } - } -} - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn test_daemon_message_serialization() { - let msg = DaemonMessage::authenticate("key123", "machine-abc", "worker-1", 4); - let json = serde_json::to_string(&msg).unwrap(); - assert!(json.contains("\"type\":\"authenticate\"")); - assert!(json.contains("\"apiKey\":\"key123\"")); - assert!(json.contains("\"machineId\":\"machine-abc\"")); - } - - #[test] - fn test_daemon_command_deserialization() { - let json = r#"{"type":"spawnTask","taskId":"550e8400-e29b-41d4-a716-446655440000","plan":"Build the feature","repoUrl":"https://github.com/test/repo","baseBranch":"main","parentTaskId":null,"depth":0,"isOrchestrator":false}"#; - let cmd: DaemonCommand = serde_json::from_str(json).unwrap(); - match cmd { - DaemonCommand::SpawnTask { - plan, - repo_url, - base_branch, - parent_task_id, - depth, - is_orchestrator, - .. - } => { - assert_eq!(plan, "Build the feature"); - assert_eq!(repo_url, Some("https://github.com/test/repo".to_string())); - assert_eq!(base_branch, Some("main".to_string())); - assert_eq!(parent_task_id, None); - assert_eq!(depth, 0); - assert!(!is_orchestrator); - } - _ => panic!("Expected SpawnTask"), - } - } - - #[test] - fn test_orchestrator_spawn_deserialization() { - let json = r#"{"type":"spawnTask","taskId":"550e8400-e29b-41d4-a716-446655440000","plan":"Coordinate subtasks","repoUrl":"https://github.com/test/repo","baseBranch":"main","parentTaskId":null,"depth":0,"isOrchestrator":true}"#; - let cmd: DaemonCommand = serde_json::from_str(json).unwrap(); - match cmd { - DaemonCommand::SpawnTask { - is_orchestrator, - depth, - .. - } => { - assert!(is_orchestrator); - assert_eq!(depth, 0); - } - _ => panic!("Expected SpawnTask"), - } - } -} |
