summaryrefslogtreecommitdiff
path: root/makima/daemon
diff options
context:
space:
mode:
Diffstat (limited to 'makima/daemon')
-rw-r--r--makima/daemon/Cargo.toml48
-rw-r--r--makima/daemon/README.md353
-rw-r--r--makima/daemon/src/cli.rs45
-rw-r--r--makima/daemon/src/config.rs550
-rw-r--r--makima/daemon/src/db/local.rs391
-rw-r--r--makima/daemon/src/db/mod.rs5
-rw-r--r--makima/daemon/src/error.rs75
-rw-r--r--makima/daemon/src/lib.rs21
-rw-r--r--makima/daemon/src/main.rs313
-rw-r--r--makima/daemon/src/process/claude.rs481
-rw-r--r--makima/daemon/src/process/claude_protocol.rs59
-rw-r--r--makima/daemon/src/process/mod.rs10
-rw-r--r--makima/daemon/src/task/manager.rs2248
-rw-r--r--makima/daemon/src/task/mod.rs7
-rw-r--r--makima/daemon/src/task/state.rs161
-rw-r--r--makima/daemon/src/temp.rs224
-rw-r--r--makima/daemon/src/worktree/manager.rs1623
-rw-r--r--makima/daemon/src/worktree/mod.rs11
-rw-r--r--makima/daemon/src/ws/client.rs290
-rw-r--r--makima/daemon/src/ws/mod.rs7
-rw-r--r--makima/daemon/src/ws/protocol.rs511
21 files changed, 0 insertions, 7433 deletions
diff --git a/makima/daemon/Cargo.toml b/makima/daemon/Cargo.toml
deleted file mode 100644
index 02ecbb3..0000000
--- a/makima/daemon/Cargo.toml
+++ /dev/null
@@ -1,48 +0,0 @@
-[package]
-name = "makima-daemon"
-version = "0.1.0"
-edition = "2024"
-
-[[bin]]
-name = "makima-daemon"
-path = "src/main.rs"
-
-[dependencies]
-# Async runtime
-tokio = { version = "1.0", features = ["full", "signal", "process"] }
-futures = "0.3"
-async-trait = "0.1"
-
-# WebSocket client
-tokio-tungstenite = { version = "0.24", features = ["native-tls"] }
-
-# Serialization
-serde = { version = "1.0", features = ["derive"] }
-serde_json = "1.0"
-
-# Configuration
-config = "0.14"
-clap = { version = "4.4", features = ["derive", "env"] }
-
-# Database (local state)
-rusqlite = { version = "0.32", features = ["bundled"] }
-
-# Error handling
-thiserror = "2.0"
-anyhow = "1.0"
-
-# Logging
-tracing = "0.1"
-tracing-subscriber = { version = "0.3", features = ["env-filter", "json"] }
-
-# Utilities
-uuid = { version = "1.0", features = ["v4", "serde"] }
-chrono = { version = "0.4", features = ["serde"] }
-dashmap = "6.0"
-backoff = { version = "0.4", features = ["tokio"] }
-hostname = "0.4"
-sha2 = "0.10"
-hex = "0.4"
-shell-escape = "0.1"
-dirs = "5.0"
-rand = "0.9"
diff --git a/makima/daemon/README.md b/makima/daemon/README.md
deleted file mode 100644
index 7c577c5..0000000
--- a/makima/daemon/README.md
+++ /dev/null
@@ -1,353 +0,0 @@
-# Makima Daemon
-
-The Makima daemon connects to the Makima server and executes tasks using Claude Code in isolated git worktrees.
-
-## Installation
-
-```bash
-cd makima/daemon
-cargo build --release
-```
-
-The binary will be at `target/release/makima-daemon`.
-
-## Quick Start
-
-```bash
-# Set required environment variables
-export MAKIMA_API_KEY="your-api-key"
-export MAKIMA_DAEMON_SERVER_URL="ws://localhost:8080"
-
-# Run the daemon
-makima-daemon
-```
-
-## Configuration
-
-Configuration is loaded from multiple sources in order of precedence (highest first):
-
-1. CLI arguments
-2. Environment variables
-3. `./makima-daemon.toml` (current directory)
-4. `~/.config/makima-daemon/config.toml` (user config)
-5. `/etc/makima-daemon/config.toml` (system config, Linux only)
-
-### Environment Variables
-
-| Variable | Description |
-|----------|-------------|
-| `MAKIMA_API_KEY` | API key for authentication (preferred) |
-| `MAKIMA_DAEMON_SERVER_URL` | WebSocket URL of the Makima server |
-| `MAKIMA_DAEMON_SERVER_APIKEY` | Alternative to MAKIMA_API_KEY |
-| `MAKIMA_DAEMON_PROCESS_MAXCONCURRENTTASKS` | Max concurrent tasks |
-
-### CLI Arguments
-
-```
-makima-daemon [OPTIONS]
-
-Options:
- -c, --config <PATH> Path to config file
- -s, --server-url <URL> WebSocket URL of makima server
- -k, --api-key <KEY> API key for authentication
- -m, --max-tasks <N> Maximum concurrent tasks
- -l, --log-level <LEVEL> Log level (trace, debug, info, warn, error)
- --repos-dir <PATH> Directory for cloned repositories
- --worktrees-dir <PATH> Directory for worktrees
- -h, --help Print help
- -V, --version Print version
-```
-
----
-
-## Configuration File Reference
-
-Below is a complete configuration file with all options and their defaults.
-
-```toml
-# =============================================================================
-# Server Connection
-# =============================================================================
-[server]
-# WebSocket URL of the Makima server (required)
-url = "ws://localhost:8080"
-
-# API key for authentication (required)
-# Can also be set via MAKIMA_API_KEY environment variable
-api_key = "your-api-key"
-
-# Heartbeat interval in seconds (default: 30)
-heartbeat_interval_secs = 30
-
-# Reconnect interval after connection loss in seconds (default: 5)
-reconnect_interval_secs = 5
-
-# Maximum reconnect attempts before giving up (default: 0 = infinite)
-max_reconnect_attempts = 0
-
-# =============================================================================
-# Worktree Settings
-# =============================================================================
-[worktree]
-# Base directory for task worktrees (default: ~/.makima/worktrees)
-base_dir = "/home/user/.makima/worktrees"
-
-# Directory for cached repository clones (default: ~/.makima/repos)
-repos_dir = "/home/user/.makima/repos"
-
-# Branch prefix for task branches (default: "makima/task-")
-branch_prefix = "makima/task-"
-
-# Clean up old worktrees on daemon start (default: false)
-cleanup_on_start = false
-
-# Default target repository for pushing completed branches
-# Used when task.target_repo_path is not set
-default_target_repo = "/home/user/projects/my-repo"
-
-# =============================================================================
-# Process Settings (Claude Code)
-# =============================================================================
-[process]
-# Path or command for Claude Code CLI (default: "claude")
-claude_command = "claude"
-
-# Additional arguments to pass to Claude Code (after default arguments)
-# Default arguments are: --output-format=stream-json --input-format=stream-json
-# --verbose --dangerously-skip-permissions
-claude_args = ["--model", "opus"]
-
-# Arguments to pass before default arguments (for overriding defaults)
-claude_pre_args = []
-
-# Enable Claude's permission system (default: false)
-# When true, removes --dangerously-skip-permissions flag
-enable_permissions = false
-
-# Disable verbose output (default: false)
-# When true, removes --verbose flag
-disable_verbose = false
-
-# Maximum concurrent tasks (default: 4)
-max_concurrent_tasks = 4
-
-# Default timeout for tasks in seconds (default: 0 = no timeout)
-default_timeout_secs = 0
-
-# Additional environment variables to pass to Claude Code
-[process.env_vars]
-ANTHROPIC_API_KEY = "sk-ant-..."
-CUSTOM_VAR = "value"
-
-# =============================================================================
-# Repository Auto-Clone
-# =============================================================================
-[repos]
-# Directory to clone repositories into (default: ~/.makima/home)
-home_dir = "/home/user/.makima/home"
-
-# List of repositories to auto-clone on startup
-# Repositories that already exist are skipped
-
-# Simple format - just URLs
-auto_clone = [
- "https://github.com/user/repo1.git",
- "https://github.com/user/repo2.git",
-]
-
-# Shorthand format supported:
-# github:user/repo -> https://github.com/user/repo.git
-# gitlab:user/repo -> https://gitlab.com/user/repo.git
-auto_clone = [
- "github:anthropics/claude-code",
- "gitlab:company/project",
-]
-
-# Detailed format with options (use [[repos.auto_clone]] for each entry)
-[[repos.auto_clone]]
-url = "github:user/repo"
-name = "custom-directory-name" # Optional: override directory name
-branch = "develop" # Optional: checkout specific branch
-shallow = true # Optional: shallow clone (--depth 1)
-
-[[repos.auto_clone]]
-url = "https://github.com/org/large-repo.git"
-shallow = true # Faster clone for large repos
-
-# =============================================================================
-# Local Database
-# =============================================================================
-[local_db]
-# Path to local SQLite database (default: ~/.makima/daemon.db)
-path = "/home/user/.makima/daemon.db"
-
-# =============================================================================
-# Logging
-# =============================================================================
-[logging]
-# Log level: trace, debug, info, warn, error (default: "info")
-level = "info"
-
-# Log format: "pretty" or "json" (default: "pretty")
-format = "pretty"
-```
-
----
-
-## Examples
-
-### Minimal Configuration
-
-```toml
-[server]
-url = "ws://localhost:8080"
-api_key = "your-api-key"
-```
-
-### Production Configuration
-
-```toml
-[server]
-url = "wss://api.makima.example.com/daemon"
-api_key = "prod-api-key"
-heartbeat_interval_secs = 30
-reconnect_interval_secs = 10
-max_reconnect_attempts = 0
-
-[worktree]
-base_dir = "/var/lib/makima/worktrees"
-repos_dir = "/var/lib/makima/repos"
-cleanup_on_start = true
-
-[process]
-max_concurrent_tasks = 8
-default_timeout_secs = 3600 # 1 hour timeout
-
-[logging]
-level = "info"
-format = "json"
-```
-
-### Development with Custom Claude
-
-```toml
-[server]
-url = "ws://localhost:8080"
-api_key = "dev-key"
-
-[process]
-# Use a specific claude binary
-claude_command = "/usr/local/bin/claude-dev"
-
-# Add custom arguments
-claude_args = ["--model", "sonnet", "--max-turns", "50"]
-
-# Enable permission prompts for testing
-enable_permissions = true
-
-[process.env_vars]
-ANTHROPIC_API_KEY = "sk-ant-dev-..."
-DEBUG = "1"
-
-[logging]
-level = "debug"
-```
-
-### Auto-Clone Team Repositories
-
-```toml
-[server]
-url = "ws://localhost:8080"
-api_key = "team-key"
-
-[repos]
-home_dir = "/home/dev/.makima/projects"
-
-# Clone all team repos on startup
-[[repos.auto_clone]]
-url = "github:myteam/frontend"
-branch = "main"
-
-[[repos.auto_clone]]
-url = "github:myteam/backend"
-branch = "main"
-
-[[repos.auto_clone]]
-url = "github:myteam/shared-libs"
-shallow = true # Only need latest commit
-```
-
----
-
-## Directory Structure
-
-After running, the daemon creates the following directories:
-
-```
-~/.makima/
-├── daemon.db # Local state database
-├── worktrees/ # Task worktrees (temporary)
-│ └── task-abc123/ # Individual task worktree
-├── repos/ # Cached repository clones
-│ └── repo-name/ # Bare clone for worktree creation
-└── home/ # Auto-cloned repositories
- └── my-repo/ # Full repository clone
-```
-
----
-
-## Troubleshooting
-
-### Connection Issues
-
-```bash
-# Check server connectivity
-curl -I http://localhost:8080/health
-
-# Run with debug logging
-makima-daemon --log-level debug
-```
-
-### Claude Code Not Found
-
-```bash
-# Verify claude is installed and in PATH
-which claude
-claude --version
-
-# Or specify full path in config
-[process]
-claude_command = "/full/path/to/claude"
-```
-
-### Permission Errors
-
-If Claude Code requires permissions, either:
-
-1. Use `--dangerously-skip-permissions` (default behavior)
-2. Set `enable_permissions = true` and handle permission prompts
-3. Pre-configure Claude Code permissions in `~/.claude/`
-
-### Task Timeouts
-
-Set an appropriate timeout for long-running tasks:
-
-```toml
-[process]
-default_timeout_secs = 7200 # 2 hours
-```
-
----
-
-## Environment Variable Reference
-
-All configuration options can be set via environment variables using the pattern:
-`MAKIMA_DAEMON_<SECTION>_<KEY>`
-
-Examples:
-- `MAKIMA_DAEMON_SERVER_URL` -> `server.url`
-- `MAKIMA_DAEMON_PROCESS_MAXCONCURRENTTASKS` -> `process.max_concurrent_tasks`
-- `MAKIMA_DAEMON_LOGGING_LEVEL` -> `logging.level`
-
-Special case:
-- `MAKIMA_API_KEY` -> `server.api_key` (preferred method)
diff --git a/makima/daemon/src/cli.rs b/makima/daemon/src/cli.rs
deleted file mode 100644
index ca84017..0000000
--- a/makima/daemon/src/cli.rs
+++ /dev/null
@@ -1,45 +0,0 @@
-//! Command-line argument parsing for makima-daemon.
-
-use clap::Parser;
-use std::path::PathBuf;
-
-/// Makima daemon for managing Claude Code instances in isolated worktrees.
-#[derive(Parser, Debug)]
-#[command(name = "makima-daemon")]
-#[command(version, about, long_about = None)]
-pub struct Cli {
- /// Path to custom config file
- #[arg(short, long)]
- pub config: Option<PathBuf>,
-
- /// Directory where repositories are cloned
- #[arg(long, env = "MAKIMA_DAEMON_REPOS_DIR")]
- pub repos_dir: Option<PathBuf>,
-
- /// Directory where worktrees are created
- #[arg(long, env = "MAKIMA_DAEMON_WORKTREES_DIR")]
- pub worktrees_dir: Option<PathBuf>,
-
- /// WebSocket server URL to connect to
- #[arg(long, env = "MAKIMA_DAEMON_SERVER_URL")]
- pub server_url: Option<String>,
-
- /// API key for server authentication
- #[arg(long, env = "MAKIMA_DAEMON_SERVER_APIKEY")]
- pub api_key: Option<String>,
-
- /// Maximum number of concurrent tasks
- #[arg(long)]
- pub max_tasks: Option<u32>,
-
- /// Log level (trace, debug, info, warn, error)
- #[arg(short, long, default_value = "info")]
- pub log_level: String,
-}
-
-impl Cli {
- /// Parse command-line arguments
- pub fn parse_args() -> Self {
- Self::parse()
- }
-}
diff --git a/makima/daemon/src/config.rs b/makima/daemon/src/config.rs
deleted file mode 100644
index 94d1e8a..0000000
--- a/makima/daemon/src/config.rs
+++ /dev/null
@@ -1,550 +0,0 @@
-//! Configuration management for the makima daemon.
-
-use config::{Config, Environment, File};
-use serde::Deserialize;
-use std::collections::HashMap;
-use std::path::PathBuf;
-
-/// Root daemon configuration.
-#[derive(Debug, Clone, Deserialize)]
-pub struct DaemonConfig {
- /// Server connection settings.
- #[serde(default)]
- pub server: ServerConfig,
-
- /// Worktree settings.
- #[serde(default)]
- pub worktree: WorktreeConfig,
-
- /// Process settings.
- #[serde(default)]
- pub process: ProcessConfig,
-
- /// Local database settings.
- #[serde(default)]
- pub local_db: LocalDbConfig,
-
- /// Logging settings.
- #[serde(default)]
- pub logging: LoggingConfig,
-
- /// Repositories to auto-clone on startup.
- #[serde(default)]
- pub repos: ReposConfig,
-}
-
-/// Server connection configuration.
-#[derive(Debug, Clone, Deserialize)]
-#[serde(default)]
-pub struct ServerConfig {
- /// WebSocket URL of makima server (e.g., ws://localhost:8080 or wss://makima.example.com).
- /// Defaults to wss://api.makima.jp.
- #[serde(default = "default_server_url")]
- pub url: String,
-
- /// API key for authentication.
- #[serde(default, alias = "apikey")]
- pub api_key: String,
-
- /// Heartbeat interval in seconds.
- #[serde(default = "default_heartbeat_interval", alias = "heartbeatintervalsecs")]
- pub heartbeat_interval_secs: u64,
-
- /// Reconnect interval in seconds after connection loss.
- #[serde(default = "default_reconnect_interval", alias = "reconnectintervalsecs")]
- pub reconnect_interval_secs: u64,
-
- /// Maximum reconnect attempts before giving up (0 = infinite).
- #[serde(default, alias = "maxreconnectattempts")]
- pub max_reconnect_attempts: u32,
-}
-
-fn default_heartbeat_interval() -> u64 {
- 30
-}
-
-fn default_reconnect_interval() -> u64 {
- 5
-}
-
-fn default_server_url() -> String {
- "wss://api.makima.jp".to_string()
-}
-
-impl Default for ServerConfig {
- fn default() -> Self {
- Self {
- url: default_server_url(),
- api_key: String::new(),
- heartbeat_interval_secs: default_heartbeat_interval(),
- reconnect_interval_secs: default_reconnect_interval(),
- max_reconnect_attempts: 0,
- }
- }
-}
-
-/// Worktree configuration for task isolation.
-#[derive(Debug, Clone, Deserialize)]
-#[serde(default)]
-pub struct WorktreeConfig {
- /// Base directory for worktrees (~/.makima/worktrees).
- #[serde(default = "default_worktree_base_dir", alias = "basedir")]
- pub base_dir: PathBuf,
-
- /// Base directory for cloned repositories (~/.makima/repos).
- #[serde(default = "default_repos_base_dir", alias = "reposdir")]
- pub repos_dir: PathBuf,
-
- /// Branch prefix for task branches.
- #[serde(default = "default_branch_prefix", alias = "branchprefix")]
- pub branch_prefix: String,
-
- /// Clean up worktrees on daemon start.
- #[serde(default, alias = "cleanuponstart")]
- pub cleanup_on_start: bool,
-
- /// Default target repository path for pushing completed branches.
- /// Used when task.target_repo_path is not set.
- #[serde(default, alias = "defaulttargetrepo")]
- pub default_target_repo: Option<PathBuf>,
-}
-
-fn default_worktree_base_dir() -> PathBuf {
- dirs::home_dir()
- .unwrap_or_else(|| PathBuf::from("."))
- .join(".makima")
- .join("worktrees")
-}
-
-fn default_repos_base_dir() -> PathBuf {
- dirs::home_dir()
- .unwrap_or_else(|| PathBuf::from("."))
- .join(".makima")
- .join("repos")
-}
-
-fn default_branch_prefix() -> String {
- "makima/task-".to_string()
-}
-
-impl Default for WorktreeConfig {
- fn default() -> Self {
- Self {
- base_dir: default_worktree_base_dir(),
- repos_dir: default_repos_base_dir(),
- branch_prefix: default_branch_prefix(),
- cleanup_on_start: false,
- default_target_repo: None,
- }
- }
-}
-
-/// Process configuration for Claude Code subprocess execution.
-#[derive(Debug, Clone, Deserialize)]
-#[serde(default)]
-pub struct ProcessConfig {
- /// Path or command for Claude Code CLI.
- #[serde(default = "default_claude_command", alias = "claudecommand")]
- pub claude_command: String,
-
- /// Additional arguments to pass to Claude Code.
- /// These are added after the default arguments.
- #[serde(default, alias = "claudeargs")]
- pub claude_args: Vec<String>,
-
- /// Arguments to pass before the default arguments.
- /// Useful for overriding defaults.
- #[serde(default, alias = "claudepreargs")]
- pub claude_pre_args: Vec<String>,
-
- /// Skip the --dangerously-skip-permissions flag (default: false).
- /// Set to true if you want to use Claude's permission system.
- #[serde(default, alias = "enablepermissions")]
- pub enable_permissions: bool,
-
- /// Skip the --verbose flag (default: false).
- #[serde(default, alias = "disableverbose")]
- pub disable_verbose: bool,
-
- /// Maximum concurrent tasks.
- #[serde(default = "default_max_tasks", alias = "maxconcurrenttasks")]
- pub max_concurrent_tasks: u32,
-
- /// Default timeout for tasks in seconds (0 = no timeout).
- #[serde(default, alias = "defaulttimeoutsecs")]
- pub default_timeout_secs: u64,
-
- /// Additional environment variables to pass to Claude Code.
- #[serde(default, alias = "envvars")]
- pub env_vars: HashMap<String, String>,
-}
-
-fn default_claude_command() -> String {
- "claude".to_string()
-}
-
-fn default_max_tasks() -> u32 {
- 4
-}
-
-impl Default for ProcessConfig {
- fn default() -> Self {
- Self {
- claude_command: default_claude_command(),
- claude_args: Vec::new(),
- claude_pre_args: Vec::new(),
- enable_permissions: false,
- disable_verbose: false,
- max_concurrent_tasks: default_max_tasks(),
- default_timeout_secs: 0,
- env_vars: HashMap::new(),
- }
- }
-}
-
-/// Local database configuration.
-#[derive(Debug, Clone, Deserialize)]
-#[serde(default)]
-pub struct LocalDbConfig {
- /// Path to local SQLite database.
- #[serde(default = "default_db_path")]
- pub path: PathBuf,
-}
-
-impl Default for LocalDbConfig {
- fn default() -> Self {
- Self {
- path: default_db_path(),
- }
- }
-}
-
-fn default_db_path() -> PathBuf {
- dirs::home_dir()
- .unwrap_or_else(|| PathBuf::from("."))
- .join(".makima")
- .join("daemon.db")
-}
-
-/// Logging configuration.
-#[derive(Debug, Clone, Deserialize, Default)]
-pub struct LoggingConfig {
- /// Log level: "trace", "debug", "info", "warn", "error".
- #[serde(default = "default_log_level")]
- pub level: String,
-
- /// Log format: "pretty" or "json".
- #[serde(default = "default_log_format")]
- pub format: String,
-}
-
-fn default_log_level() -> String {
- "info".to_string()
-}
-
-fn default_log_format() -> String {
- "pretty".to_string()
-}
-
-/// Repository auto-clone configuration.
-#[derive(Debug, Clone, Deserialize, Default)]
-pub struct ReposConfig {
- /// Directory to clone repositories into (default: ~/.makima/home).
- #[serde(default = "default_home_dir")]
- pub home_dir: PathBuf,
-
- /// List of repositories to auto-clone on startup.
- /// Each entry can be a URL (e.g., "https://github.com/user/repo.git")
- /// or a shorthand (e.g., "github:user/repo").
- #[serde(default, alias = "autoclone")]
- pub auto_clone: Vec<RepoEntry>,
-}
-
-/// A repository entry for auto-cloning.
-#[derive(Debug, Clone, Deserialize)]
-#[serde(untagged)]
-pub enum RepoEntry {
- /// Simple URL string.
- Url(String),
- /// Detailed configuration.
- Config {
- /// Repository URL.
- url: String,
- /// Custom directory name (defaults to repo name from URL).
- #[serde(default)]
- name: Option<String>,
- /// Branch to checkout after cloning (defaults to default branch).
- #[serde(default)]
- branch: Option<String>,
- /// Whether to do a shallow clone (default: false).
- #[serde(default)]
- shallow: bool,
- },
-}
-
-impl RepoEntry {
- /// Get the URL for this repo entry.
- pub fn url(&self) -> &str {
- match self {
- RepoEntry::Url(url) => url,
- RepoEntry::Config { url, .. } => url,
- }
- }
-
- /// Get the custom name, if any.
- pub fn name(&self) -> Option<&str> {
- match self {
- RepoEntry::Url(_) => None,
- RepoEntry::Config { name, .. } => name.as_deref(),
- }
- }
-
- /// Get the branch to checkout, if any.
- pub fn branch(&self) -> Option<&str> {
- match self {
- RepoEntry::Url(_) => None,
- RepoEntry::Config { branch, .. } => branch.as_deref(),
- }
- }
-
- /// Whether to do a shallow clone.
- pub fn shallow(&self) -> bool {
- match self {
- RepoEntry::Url(_) => false,
- RepoEntry::Config { shallow, .. } => *shallow,
- }
- }
-
- /// Get the directory name to use (either custom name or derived from URL).
- pub fn dir_name(&self) -> Option<String> {
- if let Some(name) = self.name() {
- return Some(name.to_string());
- }
-
- // Derive from URL
- let url = self.url();
-
- // Handle shorthand formats
- let url = if url.starts_with("github:") {
- url.strip_prefix("github:").unwrap_or(url)
- } else if url.starts_with("gitlab:") {
- url.strip_prefix("gitlab:").unwrap_or(url)
- } else {
- url
- };
-
- // Extract repo name from URL
- url.trim_end_matches('/')
- .trim_end_matches(".git")
- .rsplit('/')
- .next()
- .map(|s| s.to_string())
- }
-
- /// Expand the URL (e.g., convert shorthand to full URL).
- pub fn expanded_url(&self) -> String {
- let url = self.url();
-
- if url.starts_with("github:") {
- format!("https://github.com/{}.git", url.strip_prefix("github:").unwrap_or(""))
- } else if url.starts_with("gitlab:") {
- format!("https://gitlab.com/{}.git", url.strip_prefix("gitlab:").unwrap_or(""))
- } else {
- url.to_string()
- }
- }
-}
-
-fn default_home_dir() -> PathBuf {
- dirs::home_dir()
- .unwrap_or_else(|| PathBuf::from("."))
- .join(".makima")
- .join("home")
-}
-
-impl DaemonConfig {
- /// Load configuration from files and environment variables.
- ///
- /// Configuration sources (in order of precedence):
- /// 1. Environment variables (MAKIMA_API_KEY, MAKIMA_DAEMON_SERVER_URL, etc.)
- /// 2. ./makima-daemon.toml (current directory)
- /// 3. ~/.config/makima-daemon/config.toml
- /// 4. /etc/makima-daemon/config.toml (Linux only)
- ///
- /// Environment variable examples:
- /// - MAKIMA_API_KEY=your-api-key (preferred)
- /// - MAKIMA_DAEMON_SERVER_URL=ws://localhost:8080
- /// - MAKIMA_DAEMON_PROCESS_MAXCONCURRENTTASKS=4
- pub fn load() -> Result<Self, config::ConfigError> {
- Self::load_from_path(None)
- }
-
- /// Load configuration from a specific path plus standard sources.
- fn load_from_path(config_path: Option<&std::path::Path>) -> Result<Self, config::ConfigError> {
- let mut builder = Config::builder();
-
- // System-wide config (Linux only)
- #[cfg(target_os = "linux")]
- {
- builder = builder.add_source(
- File::with_name("/etc/makima-daemon/config").required(false),
- );
- }
-
- // User config
- if let Some(config_dir) = dirs::config_dir() {
- let user_config = config_dir.join("makima-daemon").join("config");
- builder = builder.add_source(
- File::with_name(user_config.to_str().unwrap_or("")).required(false),
- );
- }
-
- // Local config
- builder = builder.add_source(File::with_name("makima-daemon").required(false));
-
- // Custom config file (if provided)
- if let Some(path) = config_path {
- builder = builder.add_source(
- File::with_name(path.to_str().unwrap_or("")).required(true),
- );
- }
-
- // Environment variables with underscore separator for nesting
- // e.g., MAKIMA_DAEMON_SERVER_URL -> server.url
- // MAKIMA_DAEMON_SERVER_APIKEY -> server.api_key
- builder = builder.add_source(
- Environment::with_prefix("MAKIMA_DAEMON")
- .separator("_")
- .try_parsing(true),
- );
-
- let config = builder.build()?;
- let mut config: DaemonConfig = config.try_deserialize()?;
-
- // Check for MAKIMA_API_KEY environment variable (preferred over MAKIMA_DAEMON_SERVER_APIKEY)
- if let Ok(api_key) = std::env::var("MAKIMA_API_KEY") {
- config.server.api_key = api_key;
- }
-
- // Validate required fields (don't validate here - let load_with_cli do final validation)
- Ok(config)
- }
-
- /// Validate that required configuration fields are set.
- pub fn validate(&self) -> Result<(), config::ConfigError> {
- if self.server.api_key.is_empty() {
- return Err(config::ConfigError::Message(
- "API key is required. Set via MAKIMA_API_KEY, config file, or --api-key".to_string()
- ));
- }
- Ok(())
- }
-
- /// Load configuration with CLI argument overrides.
- ///
- /// Configuration sources (in order of precedence, highest first):
- /// 1. CLI arguments
- /// 2. Environment variables
- /// 3. Custom config file (if --config specified)
- /// 4. ./makima-daemon.toml (current directory)
- /// 5. ~/.config/makima-daemon/config.toml
- /// 6. /etc/makima-daemon/config.toml (Linux only)
- /// 7. Default values
- pub fn load_with_cli(cli: &crate::cli::Cli) -> Result<Self, config::ConfigError> {
- // Load base config (with optional custom config file)
- let mut config = Self::load_from_path(cli.config.as_deref())?;
-
- // Apply CLI overrides (highest priority)
- if let Some(ref repos_dir) = cli.repos_dir {
- config.worktree.repos_dir = repos_dir.clone();
- }
- if let Some(ref worktrees_dir) = cli.worktrees_dir {
- config.worktree.base_dir = worktrees_dir.clone();
- }
- if let Some(ref server_url) = cli.server_url {
- config.server.url = server_url.clone();
- }
- if let Some(ref api_key) = cli.api_key {
- config.server.api_key = api_key.clone();
- }
- if let Some(max_tasks) = cli.max_tasks {
- config.process.max_concurrent_tasks = max_tasks;
- }
- // Log level is always set (has default)
- config.logging.level = cli.log_level.clone();
-
- // Validate required fields after all sources are merged
- config.validate()?;
-
- Ok(config)
- }
-
- /// Create a minimal config for testing.
- #[cfg(test)]
- pub fn test_config() -> Self {
- Self {
- server: ServerConfig {
- url: "ws://localhost:8080".to_string(),
- api_key: "test-key".to_string(),
- heartbeat_interval_secs: 30,
- reconnect_interval_secs: 5,
- max_reconnect_attempts: 0,
- },
- worktree: WorktreeConfig {
- base_dir: PathBuf::from("/tmp/makima-daemon-test/worktrees"),
- repos_dir: PathBuf::from("/tmp/makima-daemon-test/repos"),
- branch_prefix: "makima/task-".to_string(),
- cleanup_on_start: true,
- default_target_repo: None,
- },
- process: ProcessConfig {
- claude_command: "claude".to_string(),
- claude_args: Vec::new(),
- claude_pre_args: Vec::new(),
- enable_permissions: false,
- disable_verbose: false,
- max_concurrent_tasks: 2,
- default_timeout_secs: 0,
- env_vars: HashMap::new(),
- },
- local_db: LocalDbConfig {
- path: PathBuf::from("/tmp/makima-daemon-test/state.db"),
- },
- logging: LoggingConfig::default(),
- repos: ReposConfig::default(),
- }
- }
-}
-
-/// Helper module for dirs crate (minimal subset).
-mod dirs {
- use std::path::PathBuf;
-
- pub fn home_dir() -> Option<PathBuf> {
- std::env::var("HOME").ok().map(PathBuf::from)
- }
-
- pub fn config_dir() -> Option<PathBuf> {
- #[cfg(target_os = "macos")]
- {
- std::env::var("HOME")
- .ok()
- .map(|h| PathBuf::from(h).join("Library").join("Application Support"))
- }
- #[cfg(target_os = "linux")]
- {
- std::env::var("XDG_CONFIG_HOME")
- .ok()
- .map(PathBuf::from)
- .or_else(|| std::env::var("HOME").ok().map(|h| PathBuf::from(h).join(".config")))
- }
- #[cfg(target_os = "windows")]
- {
- std::env::var("APPDATA").ok().map(PathBuf::from)
- }
- #[cfg(not(any(target_os = "macos", target_os = "linux", target_os = "windows")))]
- {
- None
- }
- }
-}
diff --git a/makima/daemon/src/db/local.rs b/makima/daemon/src/db/local.rs
deleted file mode 100644
index 5adbf98..0000000
--- a/makima/daemon/src/db/local.rs
+++ /dev/null
@@ -1,391 +0,0 @@
-//! Local SQLite database for crash recovery and state persistence.
-
-use std::path::Path;
-
-use chrono::{DateTime, Utc};
-use rusqlite::{params, Connection, Result as SqliteResult};
-use uuid::Uuid;
-
-use crate::task::TaskState;
-
-/// Local task record for persistence.
-#[derive(Debug, Clone)]
-pub struct LocalTask {
- pub id: Uuid,
- pub server_task_id: Uuid,
- pub state: TaskState,
- pub container_id: Option<String>,
- pub overlay_path: Option<String>,
- pub repo_url: Option<String>,
- pub base_branch: Option<String>,
- pub plan: String,
- pub created_at: DateTime<Utc>,
- pub started_at: Option<DateTime<Utc>>,
- pub completed_at: Option<DateTime<Utc>>,
- pub error_message: Option<String>,
-}
-
-/// Buffered output for reliable delivery.
-#[derive(Debug, Clone)]
-pub struct BufferedOutput {
- pub id: i64,
- pub task_id: Uuid,
- pub output: String,
- pub is_partial: bool,
- pub timestamp: DateTime<Utc>,
-}
-
-/// Local database for daemon state persistence.
-pub struct LocalDb {
- conn: Connection,
-}
-
-impl LocalDb {
- /// Open or create the local database.
- pub fn open(path: &Path) -> SqliteResult<Self> {
- // Create parent directory if needed
- if let Some(parent) = path.parent() {
- std::fs::create_dir_all(parent).ok();
- }
-
- let conn = Connection::open(path)?;
-
- // Initialize schema
- conn.execute_batch(Self::schema())?;
-
- Ok(Self { conn })
- }
-
- /// Open an in-memory database (for testing).
- #[cfg(test)]
- pub fn open_memory() -> SqliteResult<Self> {
- let conn = Connection::open_in_memory()?;
- conn.execute_batch(Self::schema())?;
- Ok(Self { conn })
- }
-
- /// Database schema.
- fn schema() -> &'static str {
- r#"
- -- Local task state for crash recovery
- CREATE TABLE IF NOT EXISTS tasks (
- id TEXT PRIMARY KEY,
- server_task_id TEXT NOT NULL,
- state TEXT NOT NULL,
- container_id TEXT,
- overlay_path TEXT,
- repo_url TEXT,
- base_branch TEXT,
- plan TEXT NOT NULL,
- created_at TEXT NOT NULL,
- started_at TEXT,
- completed_at TEXT,
- error_message TEXT
- );
-
- -- Buffered output for reliable delivery
- CREATE TABLE IF NOT EXISTS output_buffer (
- id INTEGER PRIMARY KEY AUTOINCREMENT,
- task_id TEXT NOT NULL,
- output TEXT NOT NULL,
- is_partial INTEGER NOT NULL,
- timestamp TEXT NOT NULL,
- sent INTEGER NOT NULL DEFAULT 0
- );
-
- -- Daemon state key-value store
- CREATE TABLE IF NOT EXISTS daemon_state (
- key TEXT PRIMARY KEY,
- value TEXT NOT NULL,
- updated_at TEXT NOT NULL
- );
-
- -- Indexes
- CREATE INDEX IF NOT EXISTS idx_tasks_state ON tasks(state);
- CREATE INDEX IF NOT EXISTS idx_output_buffer_sent ON output_buffer(sent, id);
- CREATE INDEX IF NOT EXISTS idx_output_buffer_task ON output_buffer(task_id);
- "#
- }
-
- /// Save a task.
- pub fn save_task(&self, task: &LocalTask) -> SqliteResult<()> {
- self.conn.execute(
- r#"
- INSERT OR REPLACE INTO tasks
- (id, server_task_id, state, container_id, overlay_path, repo_url, base_branch, plan, created_at, started_at, completed_at, error_message)
- VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12)
- "#,
- params![
- task.id.to_string(),
- task.server_task_id.to_string(),
- task.state.as_str(),
- task.container_id,
- task.overlay_path,
- task.repo_url,
- task.base_branch,
- task.plan,
- task.created_at.to_rfc3339(),
- task.started_at.map(|t| t.to_rfc3339()),
- task.completed_at.map(|t| t.to_rfc3339()),
- task.error_message,
- ],
- )?;
- Ok(())
- }
-
- /// Get a task by ID.
- pub fn get_task(&self, id: Uuid) -> SqliteResult<Option<LocalTask>> {
- let mut stmt = self.conn.prepare(
- "SELECT id, server_task_id, state, container_id, overlay_path, repo_url, base_branch, plan, created_at, started_at, completed_at, error_message FROM tasks WHERE id = ?1",
- )?;
-
- let mut rows = stmt.query(params![id.to_string()])?;
-
- if let Some(row) = rows.next()? {
- Ok(Some(Self::task_from_row(row)?))
- } else {
- Ok(None)
- }
- }
-
- /// Get all running/active tasks (for recovery).
- pub fn get_active_tasks(&self) -> SqliteResult<Vec<LocalTask>> {
- let mut stmt = self.conn.prepare(
- r#"
- SELECT id, server_task_id, state, container_id, overlay_path, repo_url, base_branch, plan, created_at, started_at, completed_at, error_message
- FROM tasks
- WHERE state IN ('initializing', 'starting', 'running', 'paused', 'blocked')
- "#,
- )?;
-
- let rows = stmt.query_map([], |row| Self::task_from_row(row))?;
-
- rows.collect()
- }
-
- /// Delete a task.
- pub fn delete_task(&self, id: Uuid) -> SqliteResult<()> {
- self.conn.execute(
- "DELETE FROM tasks WHERE id = ?1",
- params![id.to_string()],
- )?;
- Ok(())
- }
-
- /// Update task state.
- pub fn update_task_state(&self, id: Uuid, state: TaskState) -> SqliteResult<()> {
- self.conn.execute(
- "UPDATE tasks SET state = ?2 WHERE id = ?1",
- params![id.to_string(), state.as_str()],
- )?;
- Ok(())
- }
-
- /// Buffer output for reliable delivery.
- pub fn buffer_output(&self, task_id: Uuid, output: &str, is_partial: bool) -> SqliteResult<i64> {
- self.conn.execute(
- r#"
- INSERT INTO output_buffer (task_id, output, is_partial, timestamp, sent)
- VALUES (?1, ?2, ?3, datetime('now'), 0)
- "#,
- params![task_id.to_string(), output, is_partial as i32],
- )?;
- Ok(self.conn.last_insert_rowid())
- }
-
- /// Get unsent outputs.
- pub fn get_unsent_outputs(&self, limit: i64) -> SqliteResult<Vec<BufferedOutput>> {
- let mut stmt = self.conn.prepare(
- r#"
- SELECT id, task_id, output, is_partial, timestamp
- FROM output_buffer
- WHERE sent = 0
- ORDER BY id
- LIMIT ?1
- "#,
- )?;
-
- let rows = stmt.query_map(params![limit], |row| {
- let id: i64 = row.get(0)?;
- let task_id_str: String = row.get(1)?;
- let task_id = Uuid::parse_str(&task_id_str).unwrap_or_default();
- let output: String = row.get(2)?;
- let is_partial: i32 = row.get(3)?;
- let timestamp_str: String = row.get(4)?;
- let timestamp = DateTime::parse_from_rfc3339(&timestamp_str)
- .map(|dt| dt.with_timezone(&Utc))
- .unwrap_or_else(|_| Utc::now());
-
- Ok(BufferedOutput {
- id,
- task_id,
- output,
- is_partial: is_partial != 0,
- timestamp,
- })
- })?;
-
- rows.collect()
- }
-
- /// Mark outputs as sent.
- pub fn mark_outputs_sent(&self, ids: &[i64]) -> SqliteResult<()> {
- if ids.is_empty() {
- return Ok(());
- }
-
- let placeholders: Vec<&str> = ids.iter().map(|_| "?").collect();
- let sql = format!(
- "UPDATE output_buffer SET sent = 1 WHERE id IN ({})",
- placeholders.join(",")
- );
-
- let params: Vec<rusqlite::types::Value> = ids
- .iter()
- .map(|id| rusqlite::types::Value::Integer(*id))
- .collect();
-
- self.conn.execute(&sql, rusqlite::params_from_iter(params))?;
- Ok(())
- }
-
- /// Clean up old sent outputs.
- pub fn cleanup_sent_outputs(&self, older_than_hours: i64) -> SqliteResult<usize> {
- let result = self.conn.execute(
- r#"
- DELETE FROM output_buffer
- WHERE sent = 1 AND timestamp < datetime('now', ?1 || ' hours')
- "#,
- params![format!("-{}", older_than_hours)],
- )?;
- Ok(result)
- }
-
- /// Get daemon state value.
- pub fn get_state(&self, key: &str) -> SqliteResult<Option<String>> {
- let mut stmt = self.conn.prepare(
- "SELECT value FROM daemon_state WHERE key = ?1",
- )?;
-
- let mut rows = stmt.query(params![key])?;
-
- if let Some(row) = rows.next()? {
- let value: String = row.get(0)?;
- Ok(Some(value))
- } else {
- Ok(None)
- }
- }
-
- /// Set daemon state value.
- pub fn set_state(&self, key: &str, value: &str) -> SqliteResult<()> {
- self.conn.execute(
- r#"
- INSERT OR REPLACE INTO daemon_state (key, value, updated_at)
- VALUES (?1, ?2, datetime('now'))
- "#,
- params![key, value],
- )?;
- Ok(())
- }
-
- /// Parse a task from a database row.
- fn task_from_row(row: &rusqlite::Row) -> SqliteResult<LocalTask> {
- let id_str: String = row.get(0)?;
- let server_task_id_str: String = row.get(1)?;
- let state_str: String = row.get(2)?;
- let container_id: Option<String> = row.get(3)?;
- let overlay_path: Option<String> = row.get(4)?;
- let repo_url: Option<String> = row.get(5)?;
- let base_branch: Option<String> = row.get(6)?;
- let plan: String = row.get(7)?;
- let created_at_str: String = row.get(8)?;
- let started_at_str: Option<String> = row.get(9)?;
- let completed_at_str: Option<String> = row.get(10)?;
- let error_message: Option<String> = row.get(11)?;
-
- let id = Uuid::parse_str(&id_str).unwrap_or_default();
- let server_task_id = Uuid::parse_str(&server_task_id_str).unwrap_or_default();
- let state = TaskState::from_str(&state_str).unwrap_or_default();
- let created_at = DateTime::parse_from_rfc3339(&created_at_str)
- .map(|dt| dt.with_timezone(&Utc))
- .unwrap_or_else(|_| Utc::now());
- let started_at = started_at_str
- .and_then(|s| DateTime::parse_from_rfc3339(&s).ok())
- .map(|dt| dt.with_timezone(&Utc));
- let completed_at = completed_at_str
- .and_then(|s| DateTime::parse_from_rfc3339(&s).ok())
- .map(|dt| dt.with_timezone(&Utc));
-
- Ok(LocalTask {
- id,
- server_task_id,
- state,
- container_id,
- overlay_path,
- repo_url,
- base_branch,
- plan,
- created_at,
- started_at,
- completed_at,
- error_message,
- })
- }
-}
-
-#[cfg(test)]
-mod tests {
- use super::*;
-
- #[test]
- fn test_open_memory() {
- let db = LocalDb::open_memory().unwrap();
- assert!(db.get_active_tasks().unwrap().is_empty());
- }
-
- #[test]
- fn test_save_and_get_task() {
- let db = LocalDb::open_memory().unwrap();
-
- let task = LocalTask {
- id: Uuid::new_v4(),
- server_task_id: Uuid::new_v4(),
- state: TaskState::Running,
- container_id: Some("abc123".to_string()),
- overlay_path: Some("/tmp/overlay".to_string()),
- repo_url: Some("https://github.com/test/repo".to_string()),
- base_branch: Some("main".to_string()),
- plan: "Build the feature".to_string(),
- created_at: Utc::now(),
- started_at: Some(Utc::now()),
- completed_at: None,
- error_message: None,
- };
-
- db.save_task(&task).unwrap();
-
- let loaded = db.get_task(task.id).unwrap().unwrap();
- assert_eq!(loaded.id, task.id);
- assert_eq!(loaded.state, TaskState::Running);
- assert_eq!(loaded.plan, "Build the feature");
- }
-
- #[test]
- fn test_output_buffer() {
- let db = LocalDb::open_memory().unwrap();
- let task_id = Uuid::new_v4();
-
- db.buffer_output(task_id, "line 1", false).unwrap();
- db.buffer_output(task_id, "line 2", false).unwrap();
-
- let unsent = db.get_unsent_outputs(10).unwrap();
- assert_eq!(unsent.len(), 2);
-
- let ids: Vec<i64> = unsent.iter().map(|o| o.id).collect();
- db.mark_outputs_sent(&ids).unwrap();
-
- let unsent = db.get_unsent_outputs(10).unwrap();
- assert!(unsent.is_empty());
- }
-}
diff --git a/makima/daemon/src/db/mod.rs b/makima/daemon/src/db/mod.rs
deleted file mode 100644
index 2c6e0f3..0000000
--- a/makima/daemon/src/db/mod.rs
+++ /dev/null
@@ -1,5 +0,0 @@
-//! Local database for daemon state persistence.
-
-pub mod local;
-
-pub use local::{BufferedOutput, LocalDb, LocalTask};
diff --git a/makima/daemon/src/error.rs b/makima/daemon/src/error.rs
deleted file mode 100644
index 00e5140..0000000
--- a/makima/daemon/src/error.rs
+++ /dev/null
@@ -1,75 +0,0 @@
-//! Error types for the makima daemon.
-
-use thiserror::Error;
-use uuid::Uuid;
-
-/// Top-level daemon error type.
-#[derive(Error, Debug)]
-pub enum DaemonError {
- #[error("WebSocket error: {0}")]
- WebSocket(#[from] tokio_tungstenite::tungstenite::Error),
-
- #[error("Worktree error: {0}")]
- Worktree(#[from] crate::worktree::WorktreeError),
-
- #[error("Process error: {0}")]
- Process(#[from] crate::process::ClaudeProcessError),
-
- #[error("Task error: {0}")]
- Task(#[from] TaskError),
-
- #[error("Configuration error: {0}")]
- Config(#[from] config::ConfigError),
-
- #[error("Database error: {0}")]
- Database(#[from] rusqlite::Error),
-
- #[error("IO error: {0}")]
- Io(#[from] std::io::Error),
-
- #[error("JSON error: {0}")]
- Json(#[from] serde_json::Error),
-
- #[error("Authentication failed: {0}")]
- AuthFailed(String),
-
- #[error("Connection lost")]
- ConnectionLost,
-
- #[error("Server error: {code} - {message}")]
- ServerError { code: String, message: String },
-}
-
-/// Task management errors.
-#[derive(Error, Debug)]
-pub enum TaskError {
- #[error("Task not found: {0}")]
- NotFound(Uuid),
-
- #[error("Invalid state transition from {from} to {to}")]
- InvalidStateTransition { from: String, to: String },
-
- #[error("Concurrency limit reached")]
- ConcurrencyLimit,
-
- #[error("Task already exists: {0}")]
- AlreadyExists(Uuid),
-
- #[error("Task not running: {0}")]
- NotRunning(Uuid),
-
- #[error("Failed to send message to task: {0}")]
- MessageFailed(String),
-
- #[error("Task setup failed: {0}")]
- SetupFailed(String),
-
- #[error("Task execution failed: {0}")]
- ExecutionFailed(String),
-}
-
-/// Result type alias for daemon operations.
-pub type Result<T> = std::result::Result<T, DaemonError>;
-
-/// Result type alias for task operations.
-pub type TaskResult<T> = std::result::Result<T, TaskError>;
diff --git a/makima/daemon/src/lib.rs b/makima/daemon/src/lib.rs
deleted file mode 100644
index 9555681..0000000
--- a/makima/daemon/src/lib.rs
+++ /dev/null
@@ -1,21 +0,0 @@
-//! Makima Daemon - Git worktree orchestration for Claude Code.
-//!
-//! This daemon runs on worker machines and:
-//! - Connects to the makima server via WebSocket
-//! - Creates git worktrees for task isolation
-//! - Runs Claude Code CLI as subprocesses in worktrees
-//! - Streams JSON output back to server
-
-pub mod cli;
-pub mod config;
-pub mod db;
-pub mod error;
-pub mod process;
-pub mod task;
-pub mod temp;
-pub mod worktree;
-pub mod ws;
-
-pub use cli::Cli;
-pub use config::DaemonConfig;
-pub use error::{DaemonError, Result};
diff --git a/makima/daemon/src/main.rs b/makima/daemon/src/main.rs
deleted file mode 100644
index e4ca5d4..0000000
--- a/makima/daemon/src/main.rs
+++ /dev/null
@@ -1,313 +0,0 @@
-//! Makima Daemon - Git worktree orchestration for Claude Code.
-
-use std::sync::Arc;
-
-use std::path::Path;
-
-use clap::Parser;
-use makima_daemon::cli::Cli;
-use makima_daemon::config::{DaemonConfig, RepoEntry};
-use makima_daemon::db::LocalDb;
-use makima_daemon::error::DaemonError;
-use makima_daemon::task::{TaskConfig, TaskManager};
-use makima_daemon::ws::{DaemonCommand, WsClient};
-use tokio::process::Command;
-use tokio::sync::mpsc;
-use tracing_subscriber::{fmt, prelude::*, EnvFilter};
-
-#[tokio::main]
-async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
- eprintln!("=== Makima Daemon Starting ===");
-
- // Parse command-line arguments
- let cli = Cli::parse();
-
- // Load configuration with CLI overrides
- eprintln!("[1/5] Loading configuration...");
- let config = match DaemonConfig::load_with_cli(&cli) {
- Ok(cfg) => {
- eprintln!(" Config loaded: server={}", cfg.server.url);
- cfg
- }
- Err(e) => {
- eprintln!("Failed to load configuration: {}", e);
- eprintln!();
- eprintln!("Use CLI flags:");
- eprintln!(" makima-daemon --server-url ws://localhost:8080 --api-key your-api-key");
- eprintln!();
- eprintln!("Or set environment variables:");
- eprintln!(" MAKIMA_DAEMON_SERVER_URL=ws://localhost:8080");
- eprintln!(" MAKIMA_API_KEY=your-api-key");
- eprintln!();
- eprintln!("Or create a config file: makima-daemon.toml");
- eprintln!();
- eprintln!("Run 'makima-daemon --help' for all options.");
- std::process::exit(1);
- }
- };
-
- // Initialize logging
- init_logging(&config.logging.level, &config.logging.format);
- eprintln!("[2/5] Logging initialized");
-
- // Initialize local database
- eprintln!("[3/5] Opening local database: {}", config.local_db.path.display());
- let _local_db = LocalDb::open(&config.local_db.path)?;
- eprintln!(" Database opened");
-
- // Initialize worktree directories
- eprintln!("[4/5] Setting up directories...");
- tokio::fs::create_dir_all(&config.worktree.base_dir).await?;
- tokio::fs::create_dir_all(&config.worktree.repos_dir).await?;
- tokio::fs::create_dir_all(&config.repos.home_dir).await?;
- eprintln!(" Worktree base: {}", config.worktree.base_dir.display());
- eprintln!(" Repos cache: {}", config.worktree.repos_dir.display());
- eprintln!(" Home dir: {}", config.repos.home_dir.display());
-
- // Auto-clone repositories if configured
- if !config.repos.auto_clone.is_empty() {
- eprintln!(" Auto-cloning {} repositories...", config.repos.auto_clone.len());
- for repo_entry in &config.repos.auto_clone {
- if let Err(e) = auto_clone_repo(repo_entry, &config.repos.home_dir).await {
- eprintln!(" WARNING: Failed to clone {}: {}", repo_entry.url(), e);
- }
- }
- }
-
- // Create channels for communication
- let (command_tx, mut command_rx) = mpsc::channel::<DaemonCommand>(64);
-
- // Get machine info
- let machine_id = get_machine_id();
- let hostname = get_hostname();
- eprintln!(" Machine ID: {}", machine_id);
- eprintln!(" Hostname: {}", hostname);
-
- // Create WebSocket client
- eprintln!("[5/5] Connecting to server: {}", config.server.url);
- let mut ws_client = WsClient::new(
- config.server.clone(),
- machine_id,
- hostname,
- config.process.max_concurrent_tasks as i32,
- command_tx,
- );
-
- // Get sender for task manager
- let ws_tx = ws_client.sender();
-
- // Create task configuration
- let task_config = TaskConfig {
- max_concurrent_tasks: config.process.max_concurrent_tasks,
- worktree_base_dir: config.worktree.base_dir.clone(),
- env_vars: config.process.env_vars.clone(),
- claude_command: config.process.claude_command.clone(),
- claude_args: config.process.claude_args.clone(),
- claude_pre_args: config.process.claude_pre_args.clone(),
- enable_permissions: config.process.enable_permissions,
- disable_verbose: config.process.disable_verbose,
- };
-
- // Create task manager
- let task_manager = Arc::new(TaskManager::new(task_config, ws_tx.clone()));
-
- // Spawn command handler
- let task_manager_clone = task_manager.clone();
- tokio::spawn(async move {
- tracing::info!("Command handler started, waiting for commands...");
- while let Some(command) = command_rx.recv().await {
- tracing::info!("Received command from channel: {:?}", command);
- if let Err(e) = task_manager_clone.handle_command(command).await {
- tracing::error!("Failed to handle command: {}", e);
- }
- }
- tracing::info!("Command handler stopped");
- });
-
- // Handle shutdown signals
- let shutdown_signal = async {
- tokio::signal::ctrl_c()
- .await
- .expect("Failed to install Ctrl+C handler");
- eprintln!("\nReceived shutdown signal");
- };
-
- eprintln!("=== Daemon running (Ctrl+C to stop) ===");
-
- // Run WebSocket client with shutdown handling
- tokio::select! {
- result = ws_client.run() => {
- match result {
- Ok(()) => eprintln!("WebSocket client exited cleanly"),
- Err(DaemonError::AuthFailed(msg)) => {
- eprintln!("ERROR: Authentication failed: {}", msg);
- std::process::exit(1);
- }
- Err(e) => {
- eprintln!("ERROR: WebSocket client error: {}", e);
- std::process::exit(1);
- }
- }
- }
- _ = shutdown_signal => {
- eprintln!("Shutting down...");
- }
- }
-
- // Cleanup
- tracing::info!("Daemon stopped");
-
- Ok(())
-}
-
-fn init_logging(level: &str, format: &str) {
- let filter = EnvFilter::try_from_default_env()
- .or_else(|_| EnvFilter::try_new(level))
- .unwrap_or_else(|_| EnvFilter::new("info"));
-
- let subscriber = tracing_subscriber::registry().with(filter);
-
- if format == "json" {
- subscriber.with(fmt::layer().json()).init();
- } else {
- subscriber.with(fmt::layer()).init();
- }
-}
-
-fn get_machine_id() -> String {
- // Try to read machine-id from standard locations
- #[cfg(target_os = "linux")]
- {
- if let Ok(id) = std::fs::read_to_string("/etc/machine-id") {
- return id.trim().to_string();
- }
- if let Ok(id) = std::fs::read_to_string("/var/lib/dbus/machine-id") {
- return id.trim().to_string();
- }
- }
-
- #[cfg(target_os = "macos")]
- {
- // Use IOPlatformSerialNumber
- if let Ok(output) = std::process::Command::new("ioreg")
- .args(["-rd1", "-c", "IOPlatformExpertDevice"])
- .output()
- {
- let stdout = String::from_utf8_lossy(&output.stdout);
- for line in stdout.lines() {
- if line.contains("IOPlatformUUID") {
- if let Some(uuid) = line.split('"').nth(3) {
- return uuid.to_string();
- }
- }
- }
- }
- }
-
- // Fallback: generate a random ID and persist it
- let state_dir = dirs_next::data_local_dir()
- .unwrap_or_else(|| std::path::PathBuf::from("."))
- .join("makima-daemon");
- let machine_id_file = state_dir.join("machine-id");
-
- if let Ok(id) = std::fs::read_to_string(&machine_id_file) {
- return id.trim().to_string();
- }
-
- // Generate new ID
- let new_id = uuid::Uuid::new_v4().to_string();
- std::fs::create_dir_all(&state_dir).ok();
- std::fs::write(&machine_id_file, &new_id).ok();
- new_id
-}
-
-fn get_hostname() -> String {
- hostname::get()
- .map(|h| h.to_string_lossy().to_string())
- .unwrap_or_else(|_| "unknown".to_string())
-}
-
-/// Auto-clone a repository to the home directory if it doesn't exist.
-async fn auto_clone_repo(
- repo_entry: &RepoEntry,
- home_dir: &Path,
-) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
- let dir_name = repo_entry
- .dir_name()
- .ok_or("Could not determine directory name from URL")?;
- let target_dir = home_dir.join(&dir_name);
-
- // Check if already cloned
- if target_dir.exists() {
- eprintln!(" [skip] {} (already exists)", dir_name);
- return Ok(());
- }
-
- let url = repo_entry.expanded_url();
- eprintln!(" [clone] {} -> {}", url, target_dir.display());
-
- // Build git clone command
- let mut args = vec!["clone".to_string()];
-
- // Add shallow clone if requested
- if repo_entry.shallow() {
- args.push("--depth".to_string());
- args.push("1".to_string());
- }
-
- // Add branch if specified
- if let Some(branch) = repo_entry.branch() {
- args.push("--branch".to_string());
- args.push(branch.to_string());
- }
-
- args.push(url.clone());
- args.push(target_dir.to_string_lossy().to_string());
-
- // Run git clone
- let output = Command::new("git")
- .args(&args)
- .output()
- .await?;
-
- if !output.status.success() {
- let stderr = String::from_utf8_lossy(&output.stderr);
- return Err(format!("git clone failed: {}", stderr).into());
- }
-
- eprintln!(" [done] {}", dir_name);
- Ok(())
-}
-
-/// dirs_next minimal replacement
-mod dirs_next {
- use std::path::PathBuf;
-
- pub fn data_local_dir() -> Option<PathBuf> {
- #[cfg(target_os = "macos")]
- {
- std::env::var("HOME")
- .ok()
- .map(|h| PathBuf::from(h).join("Library").join("Application Support"))
- }
- #[cfg(target_os = "linux")]
- {
- std::env::var("XDG_DATA_HOME")
- .ok()
- .map(PathBuf::from)
- .or_else(|| {
- std::env::var("HOME")
- .ok()
- .map(|h| PathBuf::from(h).join(".local").join("share"))
- })
- }
- #[cfg(target_os = "windows")]
- {
- std::env::var("LOCALAPPDATA").ok().map(PathBuf::from)
- }
- #[cfg(not(any(target_os = "macos", target_os = "linux", target_os = "windows")))]
- {
- None
- }
- }
-}
diff --git a/makima/daemon/src/process/claude.rs b/makima/daemon/src/process/claude.rs
deleted file mode 100644
index e06ee09..0000000
--- a/makima/daemon/src/process/claude.rs
+++ /dev/null
@@ -1,481 +0,0 @@
-//! Claude Code process management.
-
-use std::collections::HashMap;
-use std::path::Path;
-use std::process::Stdio;
-use std::sync::Arc;
-
-use futures::Stream;
-use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
-use tokio::process::{Child, ChildStdin, Command};
-use tokio::sync::{mpsc, Mutex};
-
-use super::claude_protocol::ClaudeInputMessage;
-
-/// Errors that can occur during Claude process management.
-#[derive(Debug, thiserror::Error)]
-pub enum ClaudeProcessError {
- #[error("Failed to spawn Claude process: {0}")]
- SpawnFailed(#[from] std::io::Error),
-
- #[error("Claude command not found: {0}")]
- CommandNotFound(String),
-
- #[error("Process already exited")]
- AlreadyExited,
-
- #[error("Failed to read output: {0}")]
- OutputRead(String),
-}
-
-/// A line of output from Claude Code.
-#[derive(Debug, Clone)]
-pub struct OutputLine {
- /// The raw content of the line.
- pub content: String,
- /// Whether this is from stdout (true) or stderr (false).
- pub is_stdout: bool,
- /// Parsed JSON type if available (e.g., "system", "assistant", "result").
- pub json_type: Option<String>,
-}
-
-impl OutputLine {
- /// Create a new stdout output line.
- pub fn stdout(content: String) -> Self {
- let json_type = extract_json_type(&content);
- Self {
- content,
- is_stdout: true,
- json_type,
- }
- }
-
- /// Create a new stderr output line.
- pub fn stderr(content: String) -> Self {
- Self {
- content,
- is_stdout: false,
- json_type: None,
- }
- }
-}
-
-/// Extract the "type" field from a JSON line if present.
-fn extract_json_type(line: &str) -> Option<String> {
- // Quick check for JSON
- if !line.starts_with('{') {
- return None;
- }
-
- // Try to parse and extract type
- if let Ok(json) = serde_json::from_str::<serde_json::Value>(line) {
- json.get("type")
- .and_then(|v| v.as_str())
- .map(|s| s.to_string())
- } else {
- None
- }
-}
-
-/// Handle to a running Claude Code process.
-pub struct ClaudeProcess {
- /// The child process.
- child: Child,
- /// Receiver for output lines.
- output_rx: mpsc::Receiver<OutputLine>,
- /// Stdin handle for sending input to the process (thread-safe).
- stdin: Arc<Mutex<Option<ChildStdin>>>,
-}
-
-impl ClaudeProcess {
- /// Wait for the process to exit and return the exit code.
- pub async fn wait(&mut self) -> Result<i64, ClaudeProcessError> {
- let status = self.child.wait().await?;
- Ok(status.code().unwrap_or(-1) as i64)
- }
-
- /// Check if the process has exited.
- pub fn try_wait(&mut self) -> Result<Option<i64>, ClaudeProcessError> {
- match self.child.try_wait()? {
- Some(status) => Ok(Some(status.code().unwrap_or(-1) as i64)),
- None => Ok(None),
- }
- }
-
- /// Kill the process.
- pub async fn kill(&mut self) -> Result<(), ClaudeProcessError> {
- self.child.kill().await?;
- Ok(())
- }
-
- /// Get the next output line, if available.
- pub async fn next_output(&mut self) -> Option<OutputLine> {
- self.output_rx.recv().await
- }
-
- /// Send a raw message to the process via stdin.
- ///
- /// This can be used to provide input when Claude Code is waiting for user input.
- pub async fn send_input(&self, message: &str) -> Result<(), ClaudeProcessError> {
- let mut stdin_guard = self.stdin.lock().await;
- if let Some(ref mut stdin) = *stdin_guard {
- stdin
- .write_all(message.as_bytes())
- .await
- .map_err(|e| ClaudeProcessError::OutputRead(format!("Failed to write to stdin: {}", e)))?;
- stdin
- .write_all(b"\n")
- .await
- .map_err(|e| ClaudeProcessError::OutputRead(format!("Failed to write newline: {}", e)))?;
- stdin
- .flush()
- .await
- .map_err(|e| ClaudeProcessError::OutputRead(format!("Failed to flush stdin: {}", e)))?;
- Ok(())
- } else {
- Err(ClaudeProcessError::OutputRead("Stdin not available".to_string()))
- }
- }
-
- /// Send a user message to the process via stdin using JSON protocol.
- ///
- /// This is the preferred method when using `--input-format=stream-json`.
- /// The message is serialized as JSON and sent as a single line.
- pub async fn send_user_message(&self, content: &str) -> Result<(), ClaudeProcessError> {
- let message = ClaudeInputMessage::user(content);
- let json_line = message.to_json_line().map_err(|e| {
- ClaudeProcessError::OutputRead(format!("Failed to serialize message: {}", e))
- })?;
-
- tracing::debug!(content_len = content.len(), "Sending user message to Claude process");
-
- let mut stdin_guard = self.stdin.lock().await;
- if let Some(ref mut stdin) = *stdin_guard {
- stdin
- .write_all(json_line.as_bytes())
- .await
- .map_err(|e| ClaudeProcessError::OutputRead(format!("Failed to write to stdin: {}", e)))?;
- stdin
- .flush()
- .await
- .map_err(|e| ClaudeProcessError::OutputRead(format!("Failed to flush stdin: {}", e)))?;
- Ok(())
- } else {
- Err(ClaudeProcessError::OutputRead("Stdin not available".to_string()))
- }
- }
-
- /// Get a clone of the stdin handle for external use.
- pub fn stdin_handle(&self) -> Arc<Mutex<Option<ChildStdin>>> {
- Arc::clone(&self.stdin)
- }
-
- /// Close stdin, signaling EOF to the process.
- pub async fn close_stdin(&self) -> Result<(), ClaudeProcessError> {
- let mut stdin_guard = self.stdin.lock().await;
- if let Some(mut stdin) = stdin_guard.take() {
- let _ = stdin.shutdown().await;
- }
- Ok(())
- }
-
- /// Convert to a stream of output lines.
- pub fn into_stream(self) -> impl Stream<Item = OutputLine> {
- futures::stream::unfold(self.output_rx, |mut rx| async move {
- rx.recv().await.map(|line| (line, rx))
- })
- }
-}
-
-/// Manages Claude Code process spawning.
-pub struct ProcessManager {
- /// Path to the claude command.
- claude_command: String,
- /// Additional arguments to pass to Claude Code (after defaults).
- claude_args: Vec<String>,
- /// Arguments to pass before defaults.
- claude_pre_args: Vec<String>,
- /// Whether to enable Claude's permission system (skip --dangerously-skip-permissions).
- enable_permissions: bool,
- /// Whether to disable verbose output.
- disable_verbose: bool,
- /// Default environment variables to pass.
- default_env: HashMap<String, String>,
-}
-
-impl Default for ProcessManager {
- fn default() -> Self {
- Self::new()
- }
-}
-
-impl ProcessManager {
- /// Create a new ProcessManager with default settings.
- pub fn new() -> Self {
- Self {
- claude_command: "claude".to_string(),
- claude_args: Vec::new(),
- claude_pre_args: Vec::new(),
- enable_permissions: false,
- disable_verbose: false,
- default_env: HashMap::new(),
- }
- }
-
- /// Create a ProcessManager with a custom claude command path.
- pub fn with_command(command: String) -> Self {
- Self {
- claude_command: command,
- claude_args: Vec::new(),
- claude_pre_args: Vec::new(),
- enable_permissions: false,
- disable_verbose: false,
- default_env: HashMap::new(),
- }
- }
-
- /// Set additional arguments to pass after default arguments.
- pub fn with_args(mut self, args: Vec<String>) -> Self {
- self.claude_args = args;
- self
- }
-
- /// Set arguments to pass before default arguments.
- pub fn with_pre_args(mut self, args: Vec<String>) -> Self {
- self.claude_pre_args = args;
- self
- }
-
- /// Enable Claude's permission system (don't pass --dangerously-skip-permissions).
- pub fn with_permissions_enabled(mut self, enabled: bool) -> Self {
- self.enable_permissions = enabled;
- self
- }
-
- /// Disable verbose output.
- pub fn with_verbose_disabled(mut self, disabled: bool) -> Self {
- self.disable_verbose = disabled;
- self
- }
-
- /// Add default environment variables.
- pub fn with_env(mut self, env: HashMap<String, String>) -> Self {
- self.default_env = env;
- self
- }
-
- /// Spawn a Claude Code process to execute a plan.
- ///
- /// The process runs in the specified working directory with stream-json output format.
- pub async fn spawn(
- &self,
- working_dir: &Path,
- plan: &str,
- extra_env: Option<HashMap<String, String>>,
- ) -> Result<ClaudeProcess, ClaudeProcessError> {
- tracing::info!(
- working_dir = %working_dir.display(),
- plan_len = plan.len(),
- plan_preview = %if plan.len() > 200 { &plan[..200] } else { plan },
- "Spawning Claude Code process"
- );
-
- // Verify working directory exists
- if !working_dir.exists() {
- tracing::error!(working_dir = %working_dir.display(), "Working directory does not exist!");
- return Err(ClaudeProcessError::SpawnFailed(std::io::Error::new(
- std::io::ErrorKind::NotFound,
- format!("Working directory does not exist: {}", working_dir.display()),
- )));
- }
-
- // Build environment
- let mut env = self.default_env.clone();
- if let Some(extra) = extra_env {
- env.extend(extra);
- }
-
- // Build arguments list
- let mut args = Vec::new();
-
- // Pre-args (before defaults)
- args.extend(self.claude_pre_args.clone());
-
- // Required arguments for stream-json protocol
- args.push("--output-format=stream-json".to_string());
- args.push("--input-format=stream-json".to_string());
-
- // Optional default arguments
- if !self.disable_verbose {
- args.push("--verbose".to_string());
- }
- if !self.enable_permissions {
- args.push("--dangerously-skip-permissions".to_string());
- }
-
- // Additional user-configured arguments
- args.extend(self.claude_args.clone());
-
- tracing::debug!(args = ?args, "Claude command arguments");
-
- // Spawn the process
- let mut child = Command::new(&self.claude_command)
- .args(&args)
- .current_dir(working_dir)
- .envs(env)
- .stdin(Stdio::piped())
- .stdout(Stdio::piped())
- .stderr(Stdio::piped())
- .kill_on_drop(true)
- .spawn()
- .map_err(|e| {
- if e.kind() == std::io::ErrorKind::NotFound {
- ClaudeProcessError::CommandNotFound(self.claude_command.clone())
- } else {
- ClaudeProcessError::SpawnFailed(e)
- }
- })?;
-
- // Create output channel
- let (tx, rx) = mpsc::channel(1000);
-
- // Take stdout, stderr, and stdin
- // With --input-format=stream-json, we keep stdin open for sending messages
- let stdin = child.stdin.take();
- let stdin = Arc::new(Mutex::new(stdin));
-
- let stdout = child.stdout.take().expect("stdout should be piped");
- let stderr = child.stderr.take().expect("stderr should be piped");
-
- // Spawn task to read stdout
- let tx_stdout = tx.clone();
- tokio::spawn(async move {
- use tokio::io::AsyncReadExt;
- let mut reader = BufReader::new(stdout);
- let mut buffer = vec![0u8; 4096];
- let mut line_buffer = String::new();
-
- loop {
- // Try to read with a timeout to detect if we're stuck
- match tokio::time::timeout(
- tokio::time::Duration::from_secs(5),
- reader.read(&mut buffer)
- ).await {
- Ok(Ok(0)) => {
- // EOF
- tracing::debug!("Claude stdout EOF");
- // Send any remaining content
- if !line_buffer.is_empty() {
- let _ = tx_stdout.send(OutputLine::stdout(line_buffer)).await;
- }
- break;
- }
- Ok(Ok(n)) => {
- let chunk = String::from_utf8_lossy(&buffer[..n]);
- tracing::debug!(bytes = n, chunk_preview = %if chunk.len() > 100 { &chunk[..100] } else { &chunk }, "Got stdout chunk from Claude");
-
- // Accumulate into line buffer and emit complete lines
- line_buffer.push_str(&chunk);
- while let Some(newline_pos) = line_buffer.find('\n') {
- let line = line_buffer[..newline_pos].to_string();
- line_buffer = line_buffer[newline_pos + 1..].to_string();
- if tx_stdout.send(OutputLine::stdout(line)).await.is_err() {
- return;
- }
- }
- }
- Ok(Err(e)) => {
- tracing::error!(error = %e, "Error reading Claude stdout");
- break;
- }
- Err(_) => {
- // Timeout - no data for 5 seconds
- tracing::warn!("No stdout data from Claude for 5 seconds");
- }
- }
- }
- tracing::debug!("Claude stdout reader task ended");
- });
-
- // Spawn task to read stderr
- let tx_stderr = tx;
- tokio::spawn(async move {
- let reader = BufReader::new(stderr);
- let mut lines = reader.lines();
- while let Ok(Some(line)) = lines.next_line().await {
- tracing::debug!(line = %line, "Claude stderr");
- if tx_stderr.send(OutputLine::stderr(line)).await.is_err() {
- break;
- }
- }
- tracing::debug!("Claude stderr reader task ended");
- });
-
- tracing::info!("Claude Code process spawned successfully");
-
- let process = ClaudeProcess {
- child,
- output_rx: rx,
- stdin,
- };
-
- // Send the initial plan as the first user message
- tracing::info!(plan_len = plan.len(), "Sending initial plan to Claude via stdin");
- process.send_user_message(plan).await?;
-
- Ok(process)
- }
-
- /// Check if the claude command is available.
- pub async fn check_claude_available(&self) -> Result<String, ClaudeProcessError> {
- let output = Command::new(&self.claude_command)
- .arg("--version")
- .output()
- .await
- .map_err(|e| {
- if e.kind() == std::io::ErrorKind::NotFound {
- ClaudeProcessError::CommandNotFound(self.claude_command.clone())
- } else {
- ClaudeProcessError::SpawnFailed(e)
- }
- })?;
-
- if output.status.success() {
- Ok(String::from_utf8_lossy(&output.stdout).trim().to_string())
- } else {
- Err(ClaudeProcessError::CommandNotFound(
- self.claude_command.clone(),
- ))
- }
- }
-}
-
-#[cfg(test)]
-mod tests {
- use super::*;
-
- #[test]
- fn test_extract_json_type() {
- assert_eq!(
- extract_json_type(r#"{"type":"system","subtype":"init"}"#),
- Some("system".to_string())
- );
- assert_eq!(
- extract_json_type(r#"{"type":"assistant","message":{}}"#),
- Some("assistant".to_string())
- );
- assert_eq!(extract_json_type("not json"), None);
- assert_eq!(extract_json_type(r#"{"no_type": true}"#), None);
- }
-
- #[test]
- fn test_output_line_creation() {
- let line = OutputLine::stdout(r#"{"type":"result"}"#.to_string());
- assert!(line.is_stdout);
- assert_eq!(line.json_type, Some("result".to_string()));
-
- let line = OutputLine::stderr("error message".to_string());
- assert!(!line.is_stdout);
- assert_eq!(line.json_type, None);
- }
-}
diff --git a/makima/daemon/src/process/claude_protocol.rs b/makima/daemon/src/process/claude_protocol.rs
deleted file mode 100644
index 930152b..0000000
--- a/makima/daemon/src/process/claude_protocol.rs
+++ /dev/null
@@ -1,59 +0,0 @@
-//! Claude Code JSON protocol types for stdin communication.
-//!
-//! When using `--input-format=stream-json`, Claude Code expects
-//! newline-delimited JSON messages on stdin.
-
-use serde::Serialize;
-
-/// Message sent to Claude Code via stdin.
-///
-/// Format based on Claude Code's stream-json input protocol.
-#[derive(Debug, Clone, Serialize)]
-#[serde(tag = "type", rename_all = "snake_case")]
-pub enum ClaudeInputMessage {
- /// A user message to send to Claude.
- User { message: UserMessage },
-}
-
-/// The inner user message structure.
-#[derive(Debug, Clone, Serialize)]
-pub struct UserMessage {
- /// Always "user" for user messages.
- pub role: String,
- /// The message content.
- pub content: String,
-}
-
-impl ClaudeInputMessage {
- /// Create a new user message.
- pub fn user(content: impl Into<String>) -> Self {
- Self::User {
- message: UserMessage {
- role: "user".to_string(),
- content: content.into(),
- },
- }
- }
-
- /// Serialize to a JSON string with trailing newline (NDJSON format).
- pub fn to_json_line(&self) -> Result<String, serde_json::Error> {
- let mut json = serde_json::to_string(self)?;
- json.push('\n');
- Ok(json)
- }
-}
-
-#[cfg(test)]
-mod tests {
- use super::*;
-
- #[test]
- fn test_user_message_serialization() {
- let msg = ClaudeInputMessage::user("Hello, Claude!");
- let json = msg.to_json_line().unwrap();
-
- // Should produce: {"type":"user","message":{"role":"user","content":"Hello, Claude!"}}\n
- assert!(json.starts_with(r#"{"type":"user","message":{"role":"user","content":"Hello, Claude!"}}"#));
- assert!(json.ends_with('\n'));
- }
-}
diff --git a/makima/daemon/src/process/mod.rs b/makima/daemon/src/process/mod.rs
deleted file mode 100644
index 814a3c5..0000000
--- a/makima/daemon/src/process/mod.rs
+++ /dev/null
@@ -1,10 +0,0 @@
-//! Process management for Claude Code subprocess execution.
-//!
-//! Spawns and manages Claude Code processes in worktree directories,
-//! streaming JSON output back to the daemon.
-
-mod claude;
-mod claude_protocol;
-
-pub use claude::{ClaudeProcess, ClaudeProcessError, OutputLine, ProcessManager};
-pub use claude_protocol::ClaudeInputMessage;
diff --git a/makima/daemon/src/task/manager.rs b/makima/daemon/src/task/manager.rs
deleted file mode 100644
index 4979ce7..0000000
--- a/makima/daemon/src/task/manager.rs
+++ /dev/null
@@ -1,2248 +0,0 @@
-//! Task lifecycle manager using git worktrees and Claude Code subprocesses.
-
-use std::collections::HashMap;
-use std::path::PathBuf;
-use std::sync::Arc;
-use std::time::Instant;
-
-use rand::Rng;
-use tokio::io::AsyncWriteExt;
-use tokio::sync::{mpsc, RwLock, Semaphore};
-use uuid::Uuid;
-
-use std::collections::HashSet;
-
-use super::state::TaskState;
-use crate::error::{DaemonError, TaskError, TaskResult};
-use crate::process::{ClaudeInputMessage, ProcessManager};
-use crate::temp::TempManager;
-use crate::worktree::{is_new_repo_request, ConflictResolution, WorktreeInfo, WorktreeManager};
-use crate::ws::{BranchInfo, DaemonCommand, DaemonMessage};
-
-/// Generate a secure random API key for orchestrator tool access.
-fn generate_tool_key() -> String {
- let mut rng = rand::rng();
- let bytes: [u8; 32] = rng.random();
- hex::encode(bytes)
-}
-
-/// System prompt for regular (non-orchestrator) subtasks.
-/// This ensures subtasks work only within their isolated worktree directory.
-const SUBTASK_SYSTEM_PROMPT: &str = r#"You are working in an isolated worktree directory that contains a snapshot of the codebase.
-
-## IMPORTANT: Directory Restrictions
-
-**You MUST only work within the current working directory (your worktree).**
-
-- DO NOT use `cd` to navigate to directories outside your worktree
-- DO NOT use absolute paths that point outside your worktree (e.g., don't write to ~/some/path, /tmp, or the original repository)
-- DO NOT modify files in parent directories or sibling directories
-- All your file operations should be relative to the current directory
-
-Your working directory is your sandboxed workspace. When you complete your task, your changes will be reviewed and integrated by the orchestrator.
-
-**Why?** Your worktree is isolated so that:
-1. Your changes don't affect other running tasks
-2. Changes can be reviewed before integration
-3. Multiple tasks can work on the codebase in parallel without conflicts
-
----
-
-"#;
-
-/// The orchestrator system prompt that tells Claude how to use the helper script.
-const ORCHESTRATOR_SYSTEM_PROMPT: &str = r#"You are an orchestrator task. Your job is to coordinate subtasks and integrate their work, NOT to write code directly.
-
-## FIRST STEP
-
-Start by checking if you have existing subtasks:
-
-```bash
-# List all subtasks to see what work needs to be done
-./.makima/orchestrate.sh list
-```
-
-If subtasks exist, start them. If you need additional subtasks or no subtasks exist yet, you can create them.
-
----
-
-## Creating Subtasks
-
-You can create new subtasks to break down work:
-
-```bash
-# Create a new subtask with a name and plan
-./.makima/orchestrate.sh create "Subtask Name" "Detailed plan for what the subtask should do..."
-
-# The command returns the new subtask ID - use it to start the subtask
-./.makima/orchestrate.sh start <new_subtask_id>
-```
-
-Create subtasks when you need to:
-- Break down complex work into smaller pieces
-- Run multiple tasks in parallel on different parts of the codebase
-- Delegate specific implementation work
-
-## Task Continuation (Sequential Dependencies)
-
-When subtasks need to build on each other's work (e.g., Task B depends on Task A's changes), use `--continue-from`:
-
-```bash
-# Create Task B that continues from Task A's worktree
-./.makima/orchestrate.sh create "Task B" "Build on Task A's work..." --continue-from <task_a_id>
-```
-
-This copies all files from Task A's worktree into Task B's worktree, so Task B starts with Task A's changes.
-
-**When to use continuation:**
-- Sequential work: Task B needs Task A's output files
-- Staged implementation: Building features incrementally
-- Fix-and-extend: One task fixes issues, another adds features on top
-
-**When NOT to use continuation:**
-- Parallel tasks working on different files
-- Independent subtasks that can be merged separately
-
-**Important for merging:** When tasks continue from each other, only merge the FINAL task in the chain. Earlier tasks' changes are already included in later tasks' worktrees.
-
-## Sharing Files with Subtasks
-
-Use `--files` to copy specific files from your orchestrator worktree to subtasks. This is useful for sharing plans, configs, or data files:
-
-```bash
-# Create subtask with specific files copied from orchestrator
-./.makima/orchestrate.sh create "Implement Feature" "Follow the plan in PLAN.md" --files "PLAN.md"
-
-# Copy multiple files (comma-separated)
-./.makima/orchestrate.sh create "API Work" "Use the spec..." --files "PLAN.md,api-spec.yaml,types.ts"
-
-# Combine with --continue-from to share files AND continue from another task
-./.makima/orchestrate.sh create "Step 2" "Continue..." --continue-from <task_a_id> --files "requirements.md"
-```
-
-**Use cases for --files:**
-- Share a PLAN.md with detailed implementation steps
-- Distribute configuration or spec files
-- Pass generated data or intermediate results
-
-## How Subtasks Work
-
-Each subtask runs in its own **worktree** - a separate directory with a copy of the codebase. When subtasks complete:
-- Their work remains in the worktree files (NOT committed to git)
-- **Subtasks do NOT auto-merge** - YOU must integrate their work into your worktree
-- You can view and copy files from subtask worktrees using their paths
-- The worktree path is returned when you get subtask status
-
-**IMPORTANT:** Subtasks never create PRs or merge to the target repository. Only the orchestrator (you) can trigger completion actions like PR creation or merging after integrating all subtask work.
-
-## Subtask Commands
-```bash
-# List all subtasks and their current status
-./.makima/orchestrate.sh list
-
-# Create a new subtask (returns the subtask ID)
-./.makima/orchestrate.sh create "Name" "Plan/description"
-
-# Create a subtask that continues from another task's worktree
-./.makima/orchestrate.sh create "Name" "Plan" --continue-from <other_task_id>
-
-# Create a subtask with specific files copied from orchestrator worktree
-./.makima/orchestrate.sh create "Name" "Plan" --files "file1.md,file2.yaml"
-
-# Start a specific subtask (it will run in its own Claude instance)
-./.makima/orchestrate.sh start <subtask_id>
-
-# Stop a running subtask
-./.makima/orchestrate.sh stop <subtask_id>
-
-# Get detailed status of a subtask (includes worktree_path when available)
-./.makima/orchestrate.sh status <subtask_id>
-
-# Get the output/logs of a subtask
-./.makima/orchestrate.sh output <subtask_id>
-
-# Get the worktree path for a subtask
-./.makima/orchestrate.sh worktree <subtask_id>
-```
-
-## Integrating Subtask Work
-
-When subtasks complete, their changes exist as files in their worktree directories:
-- Files are NOT committed to git branches
-- You must copy/integrate files from subtask worktrees into your worktree
-- Use standard file operations (cp, cat, etc.) to review and integrate changes
-
-### Handling Continuation Chains
-
-**CRITICAL:** When subtasks use `--continue-from`, they form a chain where each task includes all changes from previous tasks. You must ONLY integrate the FINAL task in each chain.
-
-Example chain: Task A → Task B (continues from A) → Task C (continues from B)
-- Task C's worktree contains ALL changes from A, B, and C
-- You should ONLY integrate Task C's worktree
-- DO NOT integrate Task A or Task B separately (their changes are already in C)
-
-**How to track continuation chains:**
-1. When you create tasks with `--continue-from`, note which task continues from which
-2. Build a mental model: Independent tasks (no continuation) + Continuation chains
-3. For each chain, only integrate the LAST task in the chain
-
-**Example with mixed independent and chained tasks:**
-```
-Independent tasks (integrate all):
-- Task X: API endpoints
-- Task Y: Database models
-
-Continuation chain (integrate ONLY the last one):
-- Task A: Core feature → Task B: Tests (continues from A) → Task C: Docs (continues from B)
- Only integrate Task C!
-```
-
-### Integration Examples
-
-For independent subtasks (no continuation):
-```bash
-# Get the worktree path for a completed subtask
-SUBTASK_PATH=$(./.makima/orchestrate.sh worktree <subtask_id>)
-
-# View what files were changed
-ls -la "$SUBTASK_PATH"
-diff -r . "$SUBTASK_PATH" --exclude=.git --exclude=.makima
-
-# Copy specific files from subtask
-cp "$SUBTASK_PATH/src/new_file.rs" ./src/
-cp "$SUBTASK_PATH/src/modified_file.rs" ./src/
-
-# Or use diff/patch for more control
-diff -u ./src/file.rs "$SUBTASK_PATH/src/file.rs" > changes.patch
-patch -p0 < changes.patch
-```
-
-For continuation chains (only integrate the final task):
-```bash
-# If you have: Task A → Task B → Task C (each continues from previous)
-# ONLY get and integrate Task C's worktree - it has everything!
-
-FINAL_TASK_PATH=$(./.makima/orchestrate.sh worktree <task_c_id>)
-
-# Copy all changes from the final task
-rsync -av --exclude='.git' --exclude='.makima' "$FINAL_TASK_PATH/" ./
-```
-
-## Completion
-```bash
-# Mark yourself as complete after integrating all subtask work
-./.makima/orchestrate.sh done "Summary of what was accomplished"
-```
-
-## Workflow
-1. **List existing subtasks**: Run `list` to see current subtasks
-2. **Create subtasks if needed**: Use `create` to add new subtasks for the work
- - For independent parallel work: create without `--continue-from`
- - For sequential dependencies: use `--continue-from <previous_task_id>`
- - Track which tasks continue from which (continuation chains)
-3. **Start subtasks**: Run `start` for each subtask
-4. **Monitor progress**: Check status and output as subtasks run
-5. **Integrate work**: When subtasks complete:
- - For independent tasks: integrate each one's worktree
- - For continuation chains: ONLY integrate the FINAL task (it has all changes)
- - Get worktree path with `worktree <subtask_id>`
- - Copy or merge files into your worktree
-6. **Complete**: Call `done` once all work is integrated
-
-## Important Notes
-- Subtask files are in worktrees, NOT committed git branches
-- **Subtasks do NOT auto-merge or create PRs** - you must integrate their work
-- You can read files from subtask worktrees using their paths
-- Use standard file tools (cp, diff, cat, rsync) to integrate changes
-- You should NOT edit files directly - that's what subtasks are for
-- DO NOT DO THE SUBTASKS' WORK! Your only job is to coordinate, not implement.
-- When you call `done`, YOUR worktree may be used for the final PR/merge
-"#;
-
-/// Content of the helper bash script that orchestrators use to call the API.
-const ORCHESTRATE_SCRIPT: &str = r#"#!/bin/bash
-# Makima Orchestrator Helper Script
-# Usage: ./orchestrate.sh <command> [args...]
-
-API_URL="${MAKIMA_API_URL:-http://localhost:8080}"
-API_KEY="${MAKIMA_API_KEY}"
-TASK_ID="${MAKIMA_TASK_ID}"
-
-if [ -z "$API_KEY" ]; then
- echo "Error: MAKIMA_API_KEY not set" >&2
- exit 1
-fi
-
-if [ -z "$TASK_ID" ]; then
- echo "Error: MAKIMA_TASK_ID not set" >&2
- exit 1
-fi
-
-# Helper function to make API calls and check for errors
-api_call() {
- local method="$1"
- local url="$2"
- local data="$3"
- local response
- local http_code
-
- if [ -n "$data" ]; then
- response=$(curl -s -w "\n%{http_code}" -X "$method" \
- -H "X-Makima-Tool-Key: $API_KEY" \
- -H "Content-Type: application/json" \
- -d "$data" \
- "$url")
- else
- response=$(curl -s -w "\n%{http_code}" -X "$method" \
- -H "X-Makima-Tool-Key: $API_KEY" \
- "$url")
- fi
-
- # Extract HTTP code (last line) and body (everything else)
- http_code=$(echo "$response" | tail -n1)
- body=$(echo "$response" | sed '$d')
-
- # Check for curl errors or non-2xx status
- if [ "$http_code" -lt 200 ] || [ "$http_code" -ge 300 ]; then
- echo "Error: API request failed with HTTP $http_code" >&2
- echo "URL: $url" >&2
- echo "Response: $body" >&2
- echo "$body"
- return 1
- fi
-
- echo "$body"
- return 0
-}
-
-case "$1" in
- list)
- api_call GET "$API_URL/api/v1/mesh/tasks/$TASK_ID/subtasks"
- ;;
- create)
- # Parse arguments: create "name" "plan" [--continue-from <task_id>] [--files "file1,file2"]
- if [ -z "$2" ] || [ -z "$3" ]; then
- echo "Usage: $0 create \"<name>\" \"<plan>\" [--continue-from <task_id>] [--files \"file1,file2\"]" >&2
- exit 1
- fi
- NAME="$2"
- PLAN="$3"
- CONTINUE_FROM=""
- COPY_FILES=""
-
- # Parse optional flags (can be in any order after name and plan)
- shift 3
- while [ $# -gt 0 ]; do
- case "$1" in
- --continue-from)
- CONTINUE_FROM="$2"
- shift 2
- ;;
- --files)
- COPY_FILES="$2"
- shift 2
- ;;
- *)
- echo "Unknown option: $1" >&2
- exit 1
- ;;
- esac
- done
-
- # Escape quotes in name and plan for JSON
- NAME_ESCAPED=$(echo "$NAME" | sed 's/"/\\"/g' | sed 's/\\/\\\\/g')
- PLAN_ESCAPED=$(echo "$PLAN" | sed 's/"/\\"/g' | sed 's/\\/\\\\/g')
-
- # Build JSON body
- JSON_BODY="{\"name\":\"$NAME_ESCAPED\",\"plan\":\"$PLAN_ESCAPED\",\"parentTaskId\":\"$TASK_ID\""
-
- if [ -n "$CONTINUE_FROM" ]; then
- echo "Creating subtask: $NAME (continuing from $CONTINUE_FROM)..." >&2
- JSON_BODY="$JSON_BODY,\"continueFromTaskId\":\"$CONTINUE_FROM\""
- else
- echo "Creating subtask: $NAME..." >&2
- fi
-
- if [ -n "$COPY_FILES" ]; then
- # Convert comma-separated file list to JSON array
- FILES_JSON="["
- first=true
- IFS=',' read -ra FILE_ARRAY <<< "$COPY_FILES"
- for file in "${FILE_ARRAY[@]}"; do
- file=$(echo "$file" | xargs) # trim whitespace
- if [ "$first" = true ]; then
- FILES_JSON="$FILES_JSON\"$file\""
- first=false
- else
- FILES_JSON="$FILES_JSON,\"$file\""
- fi
- done
- FILES_JSON="$FILES_JSON]"
- JSON_BODY="$JSON_BODY,\"copyFiles\":$FILES_JSON"
- echo " (copying files: $COPY_FILES)" >&2
- fi
-
- JSON_BODY="$JSON_BODY}"
- api_call POST "$API_URL/api/v1/mesh/tasks" "$JSON_BODY"
- ;;
- start)
- if [ -z "$2" ]; then
- echo "Usage: $0 start <subtask_id>" >&2
- exit 1
- fi
- echo "Starting subtask $2..." >&2
- api_call POST "$API_URL/api/v1/mesh/tasks/$2/start"
- ;;
- stop)
- if [ -z "$2" ]; then
- echo "Usage: $0 stop <subtask_id>" >&2
- exit 1
- fi
- api_call POST "$API_URL/api/v1/mesh/tasks/$2/stop"
- ;;
- status)
- if [ -z "$2" ]; then
- echo "Usage: $0 status <subtask_id>" >&2
- exit 1
- fi
- api_call GET "$API_URL/api/v1/mesh/tasks/$2"
- ;;
- output)
- if [ -z "$2" ]; then
- echo "Usage: $0 output <subtask_id>" >&2
- exit 1
- fi
- api_call GET "$API_URL/api/v1/mesh/tasks/$2/output"
- ;;
- worktree)
- if [ -z "$2" ]; then
- echo "Usage: $0 worktree <subtask_id>" >&2
- exit 1
- fi
- # Get the worktree path from the task's overlayPath field via API
- TASK_JSON=$(api_call GET "$API_URL/api/v1/mesh/tasks/$2")
- if [ $? -ne 0 ]; then
- echo "Error: Failed to get task info" >&2
- exit 1
- fi
- WORKTREE_PATH=$(echo "$TASK_JSON" | grep -o '"overlayPath":"[^"]*"' | cut -d'"' -f4)
- if [ -z "$WORKTREE_PATH" ]; then
- echo "Error: Task has no worktree path (may not have started yet)" >&2
- exit 1
- fi
- if [ -d "$WORKTREE_PATH" ]; then
- echo "$WORKTREE_PATH"
- else
- echo "Error: Worktree not found at $WORKTREE_PATH" >&2
- echo "The worktree may have been cleaned up." >&2
- exit 1
- fi
- ;;
- done)
- SUMMARY="${2:-Task completed}"
- api_call PUT "$API_URL/api/v1/mesh/tasks/$TASK_ID" "{\"status\":\"done\",\"progressSummary\":\"$SUMMARY\"}"
- ;;
- *)
- echo "Makima Orchestrator Helper"
- echo ""
- echo "Usage: $0 <command> [args...]"
- echo ""
- echo "Subtask Commands:"
- echo " list List all subtasks and their status"
- echo " create \"<name>\" \"<plan>\" Create a new subtask"
- echo " create \"...\" --continue-from ID Create subtask continuing from another task's worktree"
- echo " create \"...\" --files \"file1,file2\" Copy specific files from parent (orchestrator) worktree"
- echo " start <subtask_id> Start a subtask"
- echo " stop <subtask_id> Stop a running subtask"
- echo " status <subtask_id> Get detailed subtask status"
- echo " output <subtask_id> Get subtask output history"
- echo " worktree <subtask_id> Get path to subtask's worktree"
- echo ""
- echo "Completion:"
- echo " done [summary] Mark orchestrator as complete"
- echo ""
- echo "Examples:"
- echo " create \"Fix bug\" \"Fix the null check bug\" --files \"PLAN.md\""
- echo " create \"Step 2\" \"Continue work\" --continue-from abc123 --files \"shared.rs,types.rs\""
- ;;
-esac
-"#;
-
-/// Tracks merge state for an orchestrator task.
-#[derive(Default)]
-struct MergeTracker {
- /// Subtask branches that have been successfully merged.
- merged_subtasks: HashSet<Uuid>,
- /// Subtask branches that were explicitly skipped (with reason).
- skipped_subtasks: HashMap<Uuid, String>,
-}
-
-/// Managed task information.
-pub struct ManagedTask {
- /// Task ID.
- pub id: Uuid,
- /// Current state.
- pub state: TaskState,
- /// Worktree info if created.
- pub worktree: Option<WorktreeInfo>,
- /// Task plan.
- pub plan: String,
- /// Repository URL or path.
- pub repo_source: Option<String>,
- /// Base branch.
- pub base_branch: Option<String>,
- /// Target branch to merge into.
- pub target_branch: Option<String>,
- /// Parent task ID if this is a subtask.
- pub parent_task_id: Option<Uuid>,
- /// Depth in task hierarchy (0=top-level, 1=subtask, 2=sub-subtask).
- pub depth: i32,
- /// Whether this task runs as an orchestrator (coordinates subtasks).
- pub is_orchestrator: bool,
- /// Path to target repository for completion actions.
- pub target_repo_path: Option<String>,
- /// Completion action: "none", "branch", "merge", "pr".
- pub completion_action: Option<String>,
- /// Task ID to continue from (copy worktree from this task).
- pub continue_from_task_id: Option<Uuid>,
- /// Files to copy from parent task's worktree.
- pub copy_files: Option<Vec<String>>,
- /// Time task was created.
- pub created_at: Instant,
- /// Time task started running.
- pub started_at: Option<Instant>,
- /// Time task completed.
- pub completed_at: Option<Instant>,
- /// Error message if failed.
- pub error: Option<String>,
-}
-
-/// Configuration for task execution.
-#[derive(Clone)]
-pub struct TaskConfig {
- /// Maximum concurrent tasks.
- pub max_concurrent_tasks: u32,
- /// Base directory for worktrees.
- pub worktree_base_dir: PathBuf,
- /// Environment variables to pass to Claude.
- pub env_vars: HashMap<String, String>,
- /// Claude command path.
- pub claude_command: String,
- /// Additional arguments to pass to Claude Code.
- pub claude_args: Vec<String>,
- /// Arguments to pass before defaults.
- pub claude_pre_args: Vec<String>,
- /// Enable Claude's permission system.
- pub enable_permissions: bool,
- /// Disable verbose output.
- pub disable_verbose: bool,
-}
-
-impl Default for TaskConfig {
- fn default() -> Self {
- Self {
- max_concurrent_tasks: 4,
- worktree_base_dir: WorktreeManager::default_base_dir(),
- env_vars: HashMap::new(),
- claude_command: "claude".to_string(),
- claude_args: Vec::new(),
- claude_pre_args: Vec::new(),
- enable_permissions: false,
- disable_verbose: false,
- }
- }
-}
-
-/// Task manager for handling task lifecycle.
-pub struct TaskManager {
- /// Worktree manager.
- worktree_manager: Arc<WorktreeManager>,
- /// Process manager.
- process_manager: Arc<ProcessManager>,
- /// Temp directory manager.
- temp_manager: Arc<TempManager>,
- /// Task configuration.
- #[allow(dead_code)]
- config: TaskConfig,
- /// Active tasks.
- tasks: Arc<RwLock<HashMap<Uuid, ManagedTask>>>,
- /// Channel to send messages to server.
- ws_tx: mpsc::Sender<DaemonMessage>,
- /// Semaphore for limiting concurrent tasks.
- semaphore: Arc<Semaphore>,
- /// Channels for sending input to running tasks.
- /// Each sender allows sending messages to the stdin of a running Claude process.
- task_inputs: Arc<RwLock<HashMap<Uuid, mpsc::Sender<String>>>>,
- /// Tracks merge state per orchestrator task (for completion gate).
- merge_trackers: Arc<RwLock<HashMap<Uuid, MergeTracker>>>,
-}
-
-impl TaskManager {
- /// Create a new task manager.
- pub fn new(config: TaskConfig, ws_tx: mpsc::Sender<DaemonMessage>) -> Self {
- let max_concurrent = config.max_concurrent_tasks as usize;
- let worktree_manager = Arc::new(WorktreeManager::new(config.worktree_base_dir.clone()));
- let process_manager = Arc::new(
- ProcessManager::with_command(config.claude_command.clone())
- .with_args(config.claude_args.clone())
- .with_pre_args(config.claude_pre_args.clone())
- .with_permissions_enabled(config.enable_permissions)
- .with_verbose_disabled(config.disable_verbose)
- .with_env(config.env_vars.clone()),
- );
- let temp_manager = Arc::new(TempManager::new());
-
- Self {
- worktree_manager,
- process_manager,
- temp_manager,
- config,
- tasks: Arc::new(RwLock::new(HashMap::new())),
- ws_tx,
- semaphore: Arc::new(Semaphore::new(max_concurrent)),
- task_inputs: Arc::new(RwLock::new(HashMap::new())),
- merge_trackers: Arc::new(RwLock::new(HashMap::new())),
- }
- }
-
- /// Handle a command from the server.
- pub async fn handle_command(&self, command: DaemonCommand) -> Result<(), DaemonError> {
- tracing::info!("Received command from server: {:?}", command);
-
- match command {
- DaemonCommand::SpawnTask {
- task_id,
- task_name,
- plan,
- repo_url,
- base_branch,
- target_branch,
- parent_task_id,
- depth,
- is_orchestrator,
- target_repo_path,
- completion_action,
- continue_from_task_id,
- copy_files,
- } => {
- tracing::info!(
- task_id = %task_id,
- task_name = %task_name,
- repo_url = ?repo_url,
- base_branch = ?base_branch,
- target_branch = ?target_branch,
- parent_task_id = ?parent_task_id,
- depth = depth,
- is_orchestrator = is_orchestrator,
- target_repo_path = ?target_repo_path,
- completion_action = ?completion_action,
- continue_from_task_id = ?continue_from_task_id,
- copy_files = ?copy_files,
- plan_len = plan.len(),
- "Spawning new task"
- );
- self.spawn_task(
- task_id, task_name, plan, repo_url, base_branch, target_branch,
- parent_task_id, depth, is_orchestrator,
- target_repo_path, completion_action, continue_from_task_id,
- copy_files
- ).await?;
- }
- DaemonCommand::PauseTask { task_id } => {
- tracing::info!(task_id = %task_id, "Pause not supported for subprocess tasks");
- // Subprocesses don't support pause, just log and ignore
- }
- DaemonCommand::ResumeTask { task_id } => {
- tracing::info!(task_id = %task_id, "Resume not supported for subprocess tasks");
- // Subprocesses don't support resume, just log and ignore
- }
- DaemonCommand::InterruptTask { task_id, graceful: _ } => {
- tracing::info!(task_id = %task_id, "Interrupting task");
- self.interrupt_task(task_id).await?;
- }
- DaemonCommand::SendMessage { task_id, message } => {
- tracing::info!(task_id = %task_id, message_len = message.len(), "Sending message to task");
- // Send message to the task's stdin via the input channel
- let inputs = self.task_inputs.read().await;
- if let Some(sender) = inputs.get(&task_id) {
- if let Err(e) = sender.send(message).await {
- tracing::warn!(task_id = %task_id, error = %e, "Failed to send message to task input channel");
- } else {
- tracing::info!(task_id = %task_id, "Message sent to task successfully");
- }
- } else {
- tracing::warn!(task_id = %task_id, "No input channel for task (task may not be running)");
- }
- }
- DaemonCommand::InjectSiblingContext { task_id, .. } => {
- tracing::debug!(task_id = %task_id, "Sibling context injection not supported for subprocess tasks");
- }
- DaemonCommand::Authenticated { daemon_id } => {
- tracing::debug!(daemon_id = %daemon_id, "Authenticated command (handled by WS client)");
- }
- DaemonCommand::Error { code, message } => {
- tracing::warn!(code = %code, message = %message, "Error command from server");
- }
-
- // =========================================================================
- // Merge Commands
- // =========================================================================
-
- DaemonCommand::ListBranches { task_id } => {
- tracing::info!(task_id = %task_id, "Listing task branches");
- self.handle_list_branches(task_id).await?;
- }
- DaemonCommand::MergeStart { task_id, source_branch } => {
- tracing::info!(task_id = %task_id, source_branch = %source_branch, "Starting merge");
- self.handle_merge_start(task_id, source_branch).await?;
- }
- DaemonCommand::MergeStatus { task_id } => {
- tracing::info!(task_id = %task_id, "Getting merge status");
- self.handle_merge_status(task_id).await?;
- }
- DaemonCommand::MergeResolve { task_id, file, strategy } => {
- tracing::info!(task_id = %task_id, file = %file, strategy = %strategy, "Resolving conflict");
- self.handle_merge_resolve(task_id, file, strategy).await?;
- }
- DaemonCommand::MergeCommit { task_id, message } => {
- tracing::info!(task_id = %task_id, "Committing merge");
- self.handle_merge_commit(task_id, message).await?;
- }
- DaemonCommand::MergeAbort { task_id } => {
- tracing::info!(task_id = %task_id, "Aborting merge");
- self.handle_merge_abort(task_id).await?;
- }
- DaemonCommand::MergeSkip { task_id, subtask_id, reason } => {
- tracing::info!(task_id = %task_id, subtask_id = %subtask_id, reason = %reason, "Skipping subtask merge");
- self.handle_merge_skip(task_id, subtask_id, reason).await?;
- }
- DaemonCommand::CheckMergeComplete { task_id } => {
- tracing::info!(task_id = %task_id, "Checking merge completion");
- self.handle_check_merge_complete(task_id).await?;
- }
-
- // =========================================================================
- // Completion Action Commands
- // =========================================================================
-
- DaemonCommand::RetryCompletionAction {
- task_id,
- task_name,
- action,
- target_repo_path,
- target_branch,
- } => {
- tracing::info!(
- task_id = %task_id,
- task_name = %task_name,
- action = %action,
- target_repo_path = %target_repo_path,
- target_branch = ?target_branch,
- "Retrying completion action"
- );
- self.handle_retry_completion_action(task_id, task_name, action, target_repo_path, target_branch).await?;
- }
-
- DaemonCommand::CloneWorktree { task_id, target_dir } => {
- tracing::info!(
- task_id = %task_id,
- target_dir = %target_dir,
- "Cloning worktree to target directory"
- );
- self.handle_clone_worktree(task_id, target_dir).await?;
- }
-
- DaemonCommand::CheckTargetExists { task_id, target_dir } => {
- tracing::debug!(
- task_id = %task_id,
- target_dir = %target_dir,
- "Checking if target directory exists"
- );
- self.handle_check_target_exists(task_id, target_dir).await?;
- }
- }
- Ok(())
- }
-
- /// Spawn a new task.
- #[allow(clippy::too_many_arguments)]
- pub async fn spawn_task(
- &self,
- task_id: Uuid,
- task_name: String,
- plan: String,
- repo_url: Option<String>,
- base_branch: Option<String>,
- target_branch: Option<String>,
- parent_task_id: Option<Uuid>,
- depth: i32,
- is_orchestrator: bool,
- target_repo_path: Option<String>,
- completion_action: Option<String>,
- continue_from_task_id: Option<Uuid>,
- copy_files: Option<Vec<String>>,
- ) -> TaskResult<()> {
- tracing::info!(task_id = %task_id, is_orchestrator = is_orchestrator, depth = depth, "=== SPAWN_TASK START ===");
-
- // Check if task already exists - allow re-spawning if in terminal state
- {
- let mut tasks = self.tasks.write().await;
- if let Some(existing) = tasks.get(&task_id) {
- if existing.state.is_terminal() {
- // Task exists but is in terminal state (completed, failed, interrupted)
- // Remove it so we can re-spawn
- tracing::info!(task_id = %task_id, old_state = ?existing.state, "Removing terminated task to allow re-spawn");
- tasks.remove(&task_id);
- } else {
- // Task is still active, reject
- tracing::warn!(task_id = %task_id, state = ?existing.state, "Task already exists and is active, rejecting spawn");
- return Err(TaskError::AlreadyExists(task_id));
- }
- }
- }
-
- // Acquire semaphore permit
- tracing::info!(task_id = %task_id, "Acquiring concurrency permit...");
- let permit = self
- .semaphore
- .clone()
- .try_acquire_owned()
- .map_err(|_| {
- tracing::warn!(task_id = %task_id, "Concurrency limit reached, cannot spawn task");
- TaskError::ConcurrencyLimit
- })?;
- tracing::info!(task_id = %task_id, "Concurrency permit acquired");
-
- // Create task entry
- tracing::info!(task_id = %task_id, "Creating task entry in state: Initializing");
- let task = ManagedTask {
- id: task_id,
- state: TaskState::Initializing,
- worktree: None,
- plan: plan.clone(),
- repo_source: repo_url.clone(),
- base_branch: base_branch.clone(),
- target_branch: target_branch.clone(),
- parent_task_id,
- depth,
- is_orchestrator,
- target_repo_path: target_repo_path.clone(),
- completion_action: completion_action.clone(),
- continue_from_task_id,
- copy_files: copy_files.clone(),
- created_at: Instant::now(),
- started_at: None,
- completed_at: None,
- error: None,
- };
-
- self.tasks.write().await.insert(task_id, task);
- tracing::info!(task_id = %task_id, "Task entry created and stored");
-
- // Notify server of status change
- tracing::info!(task_id = %task_id, "Notifying server: pending -> initializing");
- self.send_status_change(task_id, "pending", "initializing").await;
-
- // Spawn task in background
- tracing::info!(task_id = %task_id, "Spawning background task runner");
- let inner = self.clone_inner();
- tokio::spawn(async move {
- let _permit = permit; // Hold permit until done
- tracing::info!(task_id = %task_id, "Background task runner started");
-
- if let Err(e) = inner.run_task(
- task_id, task_name, plan, repo_url, base_branch, target_branch,
- is_orchestrator, target_repo_path, completion_action,
- continue_from_task_id, copy_files
- ).await {
- tracing::error!(task_id = %task_id, error = %e, "Task execution failed");
- inner.mark_failed(task_id, &e.to_string()).await;
- }
- tracing::info!(task_id = %task_id, "Background task runner completed");
- });
-
- tracing::info!(task_id = %task_id, "=== SPAWN_TASK END (task running in background) ===");
- Ok(())
- }
-
- /// Clone inner state for spawned tasks.
- fn clone_inner(&self) -> TaskManagerInner {
- TaskManagerInner {
- worktree_manager: self.worktree_manager.clone(),
- process_manager: self.process_manager.clone(),
- temp_manager: self.temp_manager.clone(),
- tasks: self.tasks.clone(),
- ws_tx: self.ws_tx.clone(),
- task_inputs: self.task_inputs.clone(),
- }
- }
-
- /// Interrupt a task.
- pub async fn interrupt_task(&self, task_id: Uuid) -> TaskResult<()> {
- let mut tasks = self.tasks.write().await;
- let task = tasks.get_mut(&task_id).ok_or(TaskError::NotFound(task_id))?;
-
- if task.state.is_terminal() {
- return Ok(()); // Already done
- }
-
- let old_state = task.state;
- task.state = TaskState::Interrupted;
- task.completed_at = Some(Instant::now());
-
- // Notify server
- drop(tasks);
- self.send_status_change(task_id, old_state.as_str(), "interrupted").await;
-
- // Note: The process will be killed when the ClaudeProcess is dropped
- // Worktrees are kept until explicitly deleted
-
- Ok(())
- }
-
- /// Get list of active task IDs.
- pub async fn active_task_ids(&self) -> Vec<Uuid> {
- self.tasks
- .read()
- .await
- .iter()
- .filter(|(_, t)| t.state.is_active())
- .map(|(id, _)| *id)
- .collect()
- }
-
- /// Get task state.
- pub async fn get_task_state(&self, task_id: Uuid) -> Option<TaskState> {
- self.tasks.read().await.get(&task_id).map(|t| t.state)
- }
-
- /// Send status change notification to server.
- async fn send_status_change(&self, task_id: Uuid, old_status: &str, new_status: &str) {
- let msg = DaemonMessage::task_status_change(task_id, old_status, new_status);
- let _ = self.ws_tx.send(msg).await;
- }
-
- // =========================================================================
- // Merge Handler Methods
- // =========================================================================
-
- /// Get worktree path for a task, or return error if not found.
- /// First checks in-memory tasks, then scans the worktrees directory.
- async fn get_task_worktree_path(&self, task_id: Uuid) -> Result<std::path::PathBuf, DaemonError> {
- // First try to get from in-memory tasks
- {
- let tasks = self.tasks.read().await;
- if let Some(task) = tasks.get(&task_id) {
- if let Some(ref worktree) = task.worktree {
- return Ok(worktree.path.clone());
- }
- }
- }
-
- // Task not in memory - scan worktrees directory for matching task ID
- let short_id = &task_id.to_string()[..8];
- let worktrees_dir = self.worktree_manager.base_dir();
-
- if let Ok(mut entries) = tokio::fs::read_dir(worktrees_dir).await {
- while let Ok(Some(entry)) = entries.next_entry().await {
- let name = entry.file_name();
- let name_str = name.to_string_lossy();
- if name_str.starts_with(short_id) {
- let path = entry.path();
- // Verify it's a valid git directory
- if path.join(".git").exists() {
- tracing::info!(
- task_id = %task_id,
- worktree_path = %path.display(),
- "Found worktree by scanning directory"
- );
- return Ok(path);
- }
- }
- }
- }
-
- Err(DaemonError::Task(TaskError::SetupFailed(
- format!("No worktree found for task {}. The worktree may have been cleaned up.", task_id)
- )))
- }
-
- /// Handle ListBranches command.
- async fn handle_list_branches(&self, task_id: Uuid) -> Result<(), DaemonError> {
- let worktree_path = self.get_task_worktree_path(task_id).await?;
-
- match self.worktree_manager.list_task_branches(&worktree_path).await {
- Ok(branches) => {
- let branch_infos: Vec<BranchInfo> = branches
- .into_iter()
- .map(|b| BranchInfo {
- name: b.name,
- task_id: b.task_id,
- is_merged: b.is_merged,
- last_commit: b.last_commit,
- last_commit_message: b.last_commit_message,
- })
- .collect();
-
- let msg = DaemonMessage::BranchList {
- task_id,
- branches: branch_infos,
- };
- let _ = self.ws_tx.send(msg).await;
- }
- Err(e) => {
- tracing::error!(task_id = %task_id, error = %e, "Failed to list branches");
- let msg = DaemonMessage::MergeResult {
- task_id,
- success: false,
- message: e.to_string(),
- commit_sha: None,
- conflicts: None,
- };
- let _ = self.ws_tx.send(msg).await;
- }
- }
- Ok(())
- }
-
- /// Handle MergeStart command.
- async fn handle_merge_start(&self, task_id: Uuid, source_branch: String) -> Result<(), DaemonError> {
- let worktree_path = self.get_task_worktree_path(task_id).await?;
-
- match self.worktree_manager.merge_branch(&worktree_path, &source_branch).await {
- Ok(None) => {
- // Merge succeeded without conflicts
- let msg = DaemonMessage::MergeResult {
- task_id,
- success: true,
- message: "Merge completed without conflicts".to_string(),
- commit_sha: None,
- conflicts: None,
- };
- let _ = self.ws_tx.send(msg).await;
- }
- Ok(Some(conflicts)) => {
- // Merge has conflicts
- let msg = DaemonMessage::MergeResult {
- task_id,
- success: false,
- message: format!("Merge has {} conflicts", conflicts.len()),
- commit_sha: None,
- conflicts: Some(conflicts),
- };
- let _ = self.ws_tx.send(msg).await;
- }
- Err(e) => {
- let msg = DaemonMessage::MergeResult {
- task_id,
- success: false,
- message: e.to_string(),
- commit_sha: None,
- conflicts: None,
- };
- let _ = self.ws_tx.send(msg).await;
- }
- }
- Ok(())
- }
-
- /// Handle MergeStatus command.
- async fn handle_merge_status(&self, task_id: Uuid) -> Result<(), DaemonError> {
- let worktree_path = self.get_task_worktree_path(task_id).await?;
-
- match self.worktree_manager.get_merge_state(&worktree_path).await {
- Ok(state) => {
- let msg = DaemonMessage::MergeStatusResponse {
- task_id,
- in_progress: state.in_progress,
- source_branch: if state.in_progress { Some(state.source_branch) } else { None },
- conflicted_files: state.conflicted_files,
- };
- let _ = self.ws_tx.send(msg).await;
- }
- Err(e) => {
- tracing::error!(task_id = %task_id, error = %e, "Failed to get merge status");
- let msg = DaemonMessage::MergeStatusResponse {
- task_id,
- in_progress: false,
- source_branch: None,
- conflicted_files: vec![],
- };
- let _ = self.ws_tx.send(msg).await;
- }
- }
- Ok(())
- }
-
- /// Handle MergeResolve command.
- async fn handle_merge_resolve(&self, task_id: Uuid, file: String, strategy: String) -> Result<(), DaemonError> {
- let worktree_path = self.get_task_worktree_path(task_id).await?;
-
- let resolution = match strategy.to_lowercase().as_str() {
- "ours" => ConflictResolution::Ours,
- "theirs" => ConflictResolution::Theirs,
- _ => {
- let msg = DaemonMessage::MergeResult {
- task_id,
- success: false,
- message: format!("Invalid strategy '{}', must be 'ours' or 'theirs'", strategy),
- commit_sha: None,
- conflicts: None,
- };
- let _ = self.ws_tx.send(msg).await;
- return Ok(());
- }
- };
-
- match self.worktree_manager.resolve_conflict(&worktree_path, &file, resolution).await {
- Ok(()) => {
- let msg = DaemonMessage::MergeResult {
- task_id,
- success: true,
- message: format!("Resolved conflict in {}", file),
- commit_sha: None,
- conflicts: None,
- };
- let _ = self.ws_tx.send(msg).await;
- }
- Err(e) => {
- let msg = DaemonMessage::MergeResult {
- task_id,
- success: false,
- message: e.to_string(),
- commit_sha: None,
- conflicts: None,
- };
- let _ = self.ws_tx.send(msg).await;
- }
- }
- Ok(())
- }
-
- /// Handle MergeCommit command.
- async fn handle_merge_commit(&self, task_id: Uuid, message: String) -> Result<(), DaemonError> {
- let worktree_path = self.get_task_worktree_path(task_id).await?;
-
- match self.worktree_manager.commit_merge(&worktree_path, &message).await {
- Ok(commit_sha) => {
- // Track this merge as completed (extract subtask ID from branch if possible)
- // For now, we'll track it when MergeSkip is called or based on branch names
- let msg = DaemonMessage::MergeResult {
- task_id,
- success: true,
- message: "Merge committed successfully".to_string(),
- commit_sha: Some(commit_sha),
- conflicts: None,
- };
- let _ = self.ws_tx.send(msg).await;
- }
- Err(e) => {
- let msg = DaemonMessage::MergeResult {
- task_id,
- success: false,
- message: e.to_string(),
- commit_sha: None,
- conflicts: None,
- };
- let _ = self.ws_tx.send(msg).await;
- }
- }
- Ok(())
- }
-
- /// Handle MergeAbort command.
- async fn handle_merge_abort(&self, task_id: Uuid) -> Result<(), DaemonError> {
- let worktree_path = self.get_task_worktree_path(task_id).await?;
-
- match self.worktree_manager.abort_merge(&worktree_path).await {
- Ok(()) => {
- let msg = DaemonMessage::MergeResult {
- task_id,
- success: true,
- message: "Merge aborted".to_string(),
- commit_sha: None,
- conflicts: None,
- };
- let _ = self.ws_tx.send(msg).await;
- }
- Err(e) => {
- let msg = DaemonMessage::MergeResult {
- task_id,
- success: false,
- message: e.to_string(),
- commit_sha: None,
- conflicts: None,
- };
- let _ = self.ws_tx.send(msg).await;
- }
- }
- Ok(())
- }
-
- /// Handle MergeSkip command.
- async fn handle_merge_skip(&self, task_id: Uuid, subtask_id: Uuid, reason: String) -> Result<(), DaemonError> {
- // Record that this subtask was skipped
- {
- let mut trackers = self.merge_trackers.write().await;
- let tracker = trackers.entry(task_id).or_insert_with(MergeTracker::default);
- tracker.skipped_subtasks.insert(subtask_id, reason.clone());
- }
-
- let msg = DaemonMessage::MergeResult {
- task_id,
- success: true,
- message: format!("Subtask {} skipped: {}", subtask_id, reason),
- commit_sha: None,
- conflicts: None,
- };
- let _ = self.ws_tx.send(msg).await;
- Ok(())
- }
-
- /// Handle CheckMergeComplete command.
- async fn handle_check_merge_complete(&self, task_id: Uuid) -> Result<(), DaemonError> {
- let worktree_path = self.get_task_worktree_path(task_id).await?;
-
- // Get all task branches
- let branches = match self.worktree_manager.list_task_branches(&worktree_path).await {
- Ok(b) => b,
- Err(e) => {
- let msg = DaemonMessage::MergeCompleteCheck {
- task_id,
- can_complete: false,
- unmerged_branches: vec![format!("Error listing branches: {}", e)],
- merged_count: 0,
- skipped_count: 0,
- };
- let _ = self.ws_tx.send(msg).await;
- return Ok(());
- }
- };
-
- // Get tracker state
- let trackers = self.merge_trackers.read().await;
- let empty_merged: HashSet<Uuid> = HashSet::new();
- let empty_skipped: HashMap<Uuid, String> = HashMap::new();
- let tracker = trackers.get(&task_id);
- let merged_set = tracker.map(|t| &t.merged_subtasks).unwrap_or(&empty_merged);
- let skipped_set = tracker.map(|t| &t.skipped_subtasks).unwrap_or(&empty_skipped);
-
- let mut merged_count = 0u32;
- let mut skipped_count = 0u32;
- let mut unmerged_branches = Vec::new();
-
- for branch in &branches {
- if branch.is_merged {
- merged_count += 1;
- } else if let Some(subtask_id) = branch.task_id {
- if merged_set.contains(&subtask_id) {
- merged_count += 1;
- } else if skipped_set.contains_key(&subtask_id) {
- skipped_count += 1;
- } else {
- unmerged_branches.push(branch.name.clone());
- }
- } else {
- // Branch without task ID - check if it's merged
- unmerged_branches.push(branch.name.clone());
- }
- }
-
- let can_complete = unmerged_branches.is_empty();
-
- let msg = DaemonMessage::MergeCompleteCheck {
- task_id,
- can_complete,
- unmerged_branches,
- merged_count,
- skipped_count,
- };
- let _ = self.ws_tx.send(msg).await;
- Ok(())
- }
-
- /// Mark a subtask as merged in the tracker.
- #[allow(dead_code)]
- pub async fn mark_subtask_merged(&self, orchestrator_task_id: Uuid, subtask_id: Uuid) {
- let mut trackers = self.merge_trackers.write().await;
- let tracker = trackers.entry(orchestrator_task_id).or_insert_with(MergeTracker::default);
- tracker.merged_subtasks.insert(subtask_id);
- }
-
- // =========================================================================
- // Completion Action Handler Methods
- // =========================================================================
-
- /// Handle RetryCompletionAction command.
- async fn handle_retry_completion_action(
- &self,
- task_id: Uuid,
- task_name: String,
- action: String,
- target_repo_path: String,
- target_branch: Option<String>,
- ) -> Result<(), DaemonError> {
- // Get the task's worktree path
- let worktree_path = self.get_task_worktree_path(task_id).await?;
-
- // Execute the completion action
- let inner = self.clone_inner();
- let result = inner.execute_completion_action(
- task_id,
- &task_name,
- &worktree_path,
- &action,
- Some(target_repo_path.as_str()),
- target_branch.as_deref(),
- ).await;
-
- // Send result back to server
- let msg = match result {
- Ok(pr_url) => DaemonMessage::CompletionActionResult {
- task_id,
- success: true,
- message: match action.as_str() {
- "branch" => format!("Branch pushed to {}", target_repo_path),
- "merge" => format!("Merged into {}", target_branch.as_deref().unwrap_or("main")),
- "pr" => format!("Pull request created"),
- _ => format!("Completion action '{}' executed", action),
- },
- pr_url,
- },
- Err(e) => DaemonMessage::CompletionActionResult {
- task_id,
- success: false,
- message: e,
- pr_url: None,
- },
- };
- let _ = self.ws_tx.send(msg).await;
- Ok(())
- }
-
- /// Handle CloneWorktree command.
- async fn handle_clone_worktree(
- &self,
- task_id: Uuid,
- target_dir: String,
- ) -> Result<(), DaemonError> {
- // Get the task's worktree path
- let worktree_path = self.get_task_worktree_path(task_id).await?;
-
- // Expand tilde in target path
- let target_path = crate::worktree::expand_tilde(&target_dir);
-
- // Clone the worktree to target directory
- let result = self.worktree_manager.clone_worktree_to_directory(
- &worktree_path,
- &target_path,
- ).await;
-
- // Send result back to server
- let msg = match result {
- Ok(message) => DaemonMessage::CloneWorktreeResult {
- task_id,
- success: true,
- message,
- target_dir: Some(target_path.to_string_lossy().to_string()),
- },
- Err(e) => DaemonMessage::CloneWorktreeResult {
- task_id,
- success: false,
- message: e.to_string(),
- target_dir: None,
- },
- };
- let _ = self.ws_tx.send(msg).await;
- Ok(())
- }
-
- /// Handle CheckTargetExists command.
- async fn handle_check_target_exists(
- &self,
- task_id: Uuid,
- target_dir: String,
- ) -> Result<(), DaemonError> {
- // Expand tilde in target path
- let target_path = crate::worktree::expand_tilde(&target_dir);
-
- // Check if target exists
- let exists = self.worktree_manager.target_directory_exists(&target_path).await;
-
- // Send result back to server
- let msg = DaemonMessage::CheckTargetExistsResult {
- task_id,
- exists,
- target_dir: target_path.to_string_lossy().to_string(),
- };
- let _ = self.ws_tx.send(msg).await;
- Ok(())
- }
-}
-
-/// Inner state for spawned tasks (cloneable).
-struct TaskManagerInner {
- worktree_manager: Arc<WorktreeManager>,
- process_manager: Arc<ProcessManager>,
- temp_manager: Arc<TempManager>,
- tasks: Arc<RwLock<HashMap<Uuid, ManagedTask>>>,
- ws_tx: mpsc::Sender<DaemonMessage>,
- task_inputs: Arc<RwLock<HashMap<Uuid, mpsc::Sender<String>>>>,
-}
-
-impl TaskManagerInner {
- /// Run a task to completion.
- #[allow(clippy::too_many_arguments)]
- async fn run_task(
- &self,
- task_id: Uuid,
- task_name: String,
- plan: String,
- repo_source: Option<String>,
- base_branch: Option<String>,
- target_branch: Option<String>,
- is_orchestrator: bool,
- target_repo_path: Option<String>,
- completion_action: Option<String>,
- continue_from_task_id: Option<Uuid>,
- copy_files: Option<Vec<String>>,
- ) -> Result<(), DaemonError> {
- tracing::info!(task_id = %task_id, is_orchestrator = is_orchestrator, "=== RUN_TASK START ===");
-
- // Determine working directory
- let working_dir = if let Some(ref source) = repo_source {
- if is_new_repo_request(source) {
- // Explicit new repo request: new:// or new://project-name
- tracing::info!(
- task_id = %task_id,
- source = %source,
- "Creating new git repository"
- );
-
- let msg = DaemonMessage::task_output(
- task_id,
- format!("Initializing new git repository...\n"),
- false,
- );
- let _ = self.ws_tx.send(msg).await;
-
- let worktree_info = self.worktree_manager
- .init_new_repo(task_id, source)
- .await
- .map_err(|e| DaemonError::Task(TaskError::SetupFailed(e.to_string())))?;
-
- tracing::info!(
- task_id = %task_id,
- path = %worktree_info.path.display(),
- "New repository created"
- );
-
- // Store worktree info
- {
- let mut tasks = self.tasks.write().await;
- if let Some(task) = tasks.get_mut(&task_id) {
- task.worktree = Some(worktree_info.clone());
- }
- }
-
- let msg = DaemonMessage::task_output(
- task_id,
- format!("Repository ready at {}\n", worktree_info.path.display()),
- false,
- );
- let _ = self.ws_tx.send(msg).await;
-
- worktree_info.path
- } else {
- // Send progress message
- let msg = DaemonMessage::task_output(
- task_id,
- format!("Setting up worktree from {}...\n", source),
- false,
- );
- let _ = self.ws_tx.send(msg).await;
-
- // Ensure source repo exists (clone if URL, verify if path)
- let source_repo = self.worktree_manager.ensure_repo(source).await
- .map_err(|e| DaemonError::Task(TaskError::SetupFailed(e.to_string())))?;
-
- // Detect or use provided base branch
- let branch = if let Some(ref b) = base_branch {
- b.clone()
- } else {
- self.worktree_manager.detect_default_branch(&source_repo).await
- .map_err(|e| DaemonError::Task(TaskError::SetupFailed(e.to_string())))?
- };
-
- tracing::info!(
- task_id = %task_id,
- source = %source,
- branch = %branch,
- continue_from_task_id = ?continue_from_task_id,
- "Setting up worktree"
- );
-
- // Create worktree - either from scratch or copying from another task
- let task_name = format!("task-{}", &task_id.to_string()[..8]);
- let worktree_info = if let Some(from_task_id) = continue_from_task_id {
- // Find the source task's worktree path
- let source_worktree = self.find_worktree_for_task(from_task_id).await
- .map_err(|e| DaemonError::Task(TaskError::SetupFailed(
- format!("Cannot continue from task {}: {}", from_task_id, e)
- )))?;
-
- let msg = DaemonMessage::task_output(
- task_id,
- format!("Continuing from task {} worktree...\n", &from_task_id.to_string()[..8]),
- false,
- );
- let _ = self.ws_tx.send(msg).await;
-
- // Create worktree by copying from source task
- self.worktree_manager
- .create_worktree_from_task(&source_worktree, task_id, &task_name)
- .await
- .map_err(|e| DaemonError::Task(TaskError::SetupFailed(e.to_string())))?
- } else {
- // Create fresh worktree from repo
- self.worktree_manager
- .create_worktree(&source_repo, task_id, &task_name, &branch)
- .await
- .map_err(|e| DaemonError::Task(TaskError::SetupFailed(e.to_string())))?
- };
-
- tracing::info!(
- task_id = %task_id,
- worktree_path = %worktree_info.path.display(),
- branch = %worktree_info.branch,
- continued_from = ?continue_from_task_id,
- "Worktree created"
- );
-
- // Store worktree info
- {
- let mut tasks = self.tasks.write().await;
- if let Some(task) = tasks.get_mut(&task_id) {
- task.worktree = Some(worktree_info.clone());
- }
- }
-
- let msg = DaemonMessage::task_output(
- task_id,
- format!("Worktree ready at {}\n", worktree_info.path.display()),
- false,
- );
- let _ = self.ws_tx.send(msg).await;
-
- worktree_info.path
- }
- } else {
- // No repo specified - use managed temp directory in ~/.makima/temp/
- tracing::info!(task_id = %task_id, "Creating managed temp directory (no repo)");
-
- let msg = DaemonMessage::task_output(
- task_id,
- "Creating temporary working directory...\n".to_string(),
- false,
- );
- let _ = self.ws_tx.send(msg).await;
-
- let temp_dir = self.temp_manager.create_task_dir(task_id).await?;
-
- let msg = DaemonMessage::task_output(
- task_id,
- format!("Working directory ready at {}\n", temp_dir.display()),
- false,
- );
- let _ = self.ws_tx.send(msg).await;
-
- temp_dir
- };
-
- // Copy files from parent task's worktree if specified
- if let Some(ref files) = copy_files {
- if !files.is_empty() {
- // Get the parent task ID to find its worktree
- let parent_task_id = {
- let tasks = self.tasks.read().await;
- tasks.get(&task_id).and_then(|t| t.parent_task_id)
- };
-
- if let Some(parent_id) = parent_task_id {
- match self.find_worktree_for_task(parent_id).await {
- Ok(parent_worktree) => {
- let msg = DaemonMessage::task_output(
- task_id,
- format!("Copying {} files from orchestrator...\n", files.len()),
- false,
- );
- let _ = self.ws_tx.send(msg).await;
-
- for file_path in files {
- let source = parent_worktree.join(file_path);
- let dest = working_dir.join(file_path);
-
- // Create parent directories if needed
- if let Some(parent) = dest.parent() {
- if let Err(e) = tokio::fs::create_dir_all(parent).await {
- tracing::warn!(
- task_id = %task_id,
- file = %file_path,
- error = %e,
- "Failed to create parent directory for file"
- );
- continue;
- }
- }
-
- // Copy the file
- match tokio::fs::copy(&source, &dest).await {
- Ok(_) => {
- tracing::info!(
- task_id = %task_id,
- source = %source.display(),
- dest = %dest.display(),
- "Copied file from orchestrator"
- );
- }
- Err(e) => {
- tracing::warn!(
- task_id = %task_id,
- source = %source.display(),
- dest = %dest.display(),
- error = %e,
- "Failed to copy file from orchestrator"
- );
- // Notify but don't fail - the file might be optional
- let msg = DaemonMessage::task_output(
- task_id,
- format!("Warning: Could not copy {}: {}\n", file_path, e),
- false,
- );
- let _ = self.ws_tx.send(msg).await;
- }
- }
- }
-
- let msg = DaemonMessage::task_output(
- task_id,
- "Files copied from orchestrator.\n".to_string(),
- false,
- );
- let _ = self.ws_tx.send(msg).await;
- }
- Err(e) => {
- tracing::warn!(
- task_id = %task_id,
- parent_id = %parent_id,
- error = %e,
- "Could not find parent task worktree for file copying"
- );
- }
- }
- } else {
- tracing::warn!(
- task_id = %task_id,
- "copy_files specified but no parent_task_id"
- );
- }
- }
- }
-
- // Update state to Starting
- tracing::info!(task_id = %task_id, "Updating state: Initializing -> Starting");
- self.update_state(task_id, TaskState::Starting).await;
- self.send_status_change(task_id, "initializing", "starting").await;
-
- // Check Claude is available
- match self.process_manager.check_claude_available().await {
- Ok(version) => {
- tracing::info!(task_id = %task_id, version = %version, "Claude Code available");
- let msg = DaemonMessage::task_output(
- task_id,
- format!("Claude Code {} ready\n", version),
- false,
- );
- let _ = self.ws_tx.send(msg).await;
- }
- Err(e) => {
- let err_msg = format!("Claude Code not available: {}", e);
- tracing::error!(task_id = %task_id, error = %err_msg);
- return Err(DaemonError::Task(TaskError::SetupFailed(err_msg)));
- }
- }
-
- // Set up orchestrator mode if needed
- let (extra_env, full_plan) = if is_orchestrator {
- tracing::info!(task_id = %task_id, working_dir = %working_dir.display(), "Setting up orchestrator mode");
-
- let msg = DaemonMessage::task_output(
- task_id,
- "Setting up orchestrator environment...\n".to_string(),
- false,
- );
- let _ = self.ws_tx.send(msg).await;
-
- // Generate tool key for API access
- let tool_key = generate_tool_key();
- tracing::info!(task_id = %task_id, tool_key_len = tool_key.len(), "Generated tool key for orchestrator");
-
- // Register tool key with server
- let register_msg = DaemonMessage::register_tool_key(task_id, tool_key.clone());
- if self.ws_tx.send(register_msg).await.is_err() {
- tracing::warn!(task_id = %task_id, "Failed to register tool key");
- } else {
- tracing::info!(task_id = %task_id, "Tool key registration message sent to server");
- }
-
- // Create .makima directory and helper script
- let makima_dir = working_dir.join(".makima");
- if let Err(e) = tokio::fs::create_dir_all(&makima_dir).await {
- tracing::warn!(task_id = %task_id, makima_dir = %makima_dir.display(), "Failed to create .makima directory: {}", e);
- } else {
- tracing::info!(task_id = %task_id, makima_dir = %makima_dir.display(), "Created .makima directory");
- }
-
- let script_path = makima_dir.join("orchestrate.sh");
- if let Err(e) = tokio::fs::write(&script_path, ORCHESTRATE_SCRIPT).await {
- tracing::warn!(task_id = %task_id, script_path = %script_path.display(), "Failed to write orchestrate.sh: {}", e);
- } else {
- tracing::info!(task_id = %task_id, script_path = %script_path.display(), script_size = ORCHESTRATE_SCRIPT.len(), "Wrote orchestrate.sh");
- // Make script executable
- #[cfg(unix)]
- {
- use std::os::unix::fs::PermissionsExt;
- if let Err(e) = std::fs::set_permissions(&script_path, std::fs::Permissions::from_mode(0o755)) {
- tracing::warn!(task_id = %task_id, "Failed to set script permissions: {}", e);
- } else {
- tracing::info!(task_id = %task_id, "Set orchestrate.sh executable (0o755)");
- }
- }
- }
-
- // Set up environment variables
- let mut env = HashMap::new();
- // TODO: Make API URL configurable
- env.insert("MAKIMA_API_URL".to_string(), "http://localhost:8080".to_string());
- env.insert("MAKIMA_API_KEY".to_string(), tool_key.clone());
- env.insert("MAKIMA_TASK_ID".to_string(), task_id.to_string());
-
- tracing::info!(
- task_id = %task_id,
- api_url = "http://localhost:8080",
- tool_key_preview = &tool_key[..8.min(tool_key.len())],
- "Set orchestrator environment variables"
- );
-
- // Prepend orchestrator instructions to the plan
- let orchestrator_plan = format!(
- "{}\n\n---\n\nYour task:\n{}",
- ORCHESTRATOR_SYSTEM_PROMPT,
- plan
- );
-
- let msg = DaemonMessage::task_output(
- task_id,
- format!("Orchestrator environment ready (script at {})\n", script_path.display()),
- false,
- );
- let _ = self.ws_tx.send(msg).await;
-
- (Some(env), orchestrator_plan)
- } else {
- tracing::info!(task_id = %task_id, "Running as regular subtask (not orchestrator)");
- // Prepend subtask instructions to ensure worktree isolation
- let subtask_plan = format!(
- "{}\nYour task:\n{}",
- SUBTASK_SYSTEM_PROMPT,
- plan
- );
- (None, subtask_plan)
- };
-
- // Spawn Claude process
- let plan_bytes = full_plan.len();
- let plan_chars = full_plan.chars().count();
- // Rough token estimate: ~4 chars per token for English
- let estimated_tokens = plan_chars / 4;
-
- tracing::info!(
- task_id = %task_id,
- working_dir = %working_dir.display(),
- is_orchestrator = is_orchestrator,
- plan_bytes = plan_bytes,
- plan_chars = plan_chars,
- estimated_tokens = estimated_tokens,
- "Spawning Claude process"
- );
-
- // Warn if plan is very large (Claude's context is typically 100k-200k tokens)
- if estimated_tokens > 50_000 {
- tracing::warn!(task_id = %task_id, estimated_tokens = estimated_tokens, "Plan is very large - may hit context limits!");
- let msg = DaemonMessage::task_output(
- task_id,
- format!("Warning: Plan is very large (~{} tokens). This may cause issues.\n", estimated_tokens),
- false,
- );
- let _ = self.ws_tx.send(msg).await;
- }
-
- let msg = DaemonMessage::task_output(
- task_id,
- if is_orchestrator {
- format!("Starting Claude Code (orchestrator mode, ~{} tokens)...\n", estimated_tokens)
- } else {
- format!("Starting Claude Code (~{} tokens)...\n", estimated_tokens)
- },
- false,
- );
- let _ = self.ws_tx.send(msg).await;
-
- tracing::debug!(task_id = %task_id, "Calling process_manager.spawn()...");
- let mut process = self.process_manager
- .spawn(&working_dir, &full_plan, extra_env)
- .await
- .map_err(|e| {
- tracing::error!(task_id = %task_id, error = %e, "Failed to spawn Claude process");
- DaemonError::Task(TaskError::SetupFailed(e.to_string()))
- })?;
- tracing::info!(task_id = %task_id, "Claude process spawned successfully");
-
- // Set up input channel for this task so we can send messages to its stdin
- tracing::debug!(task_id = %task_id, "Setting up input channel...");
- let (input_tx, mut input_rx) = mpsc::channel::<String>(100);
- tracing::debug!(task_id = %task_id, "Acquiring task_inputs write lock...");
- self.task_inputs.write().await.insert(task_id, input_tx);
- tracing::debug!(task_id = %task_id, "Input channel registered");
-
- // Get stdin handle for input forwarding and completion signaling
- let stdin_handle = process.stdin_handle();
- let stdin_handle_for_completion = stdin_handle.clone();
-
- tracing::info!(task_id = %task_id, "Setting up stdin forwarder for task input (JSON protocol)");
- tokio::spawn(async move {
- tracing::info!(task_id = %task_id, "Stdin forwarder task started, waiting for messages...");
- while let Some(msg) = input_rx.recv().await {
- tracing::info!(task_id = %task_id, msg_len = msg.len(), msg_preview = %if msg.len() > 50 { &msg[..50] } else { &msg }, "Received message from input channel");
-
- // Format as JSON user message for stream-json input protocol
- let json_msg = ClaudeInputMessage::user(&msg);
- let json_line = match json_msg.to_json_line() {
- Ok(line) => line,
- Err(e) => {
- tracing::error!(task_id = %task_id, error = %e, "Failed to serialize input message");
- continue;
- }
- };
-
- tracing::debug!(task_id = %task_id, json_line = %json_line.trim(), "Formatted JSON line for stdin");
-
- let mut stdin_guard = stdin_handle.lock().await;
- if let Some(ref mut stdin) = *stdin_guard {
- tracing::debug!(task_id = %task_id, "Acquired stdin lock, writing...");
- if stdin.write_all(json_line.as_bytes()).await.is_err() {
- tracing::warn!(task_id = %task_id, "Failed to write to stdin, breaking");
- break;
- }
- if stdin.flush().await.is_err() {
- tracing::warn!(task_id = %task_id, "Failed to flush stdin, breaking");
- break;
- }
- tracing::info!(task_id = %task_id, json_len = json_line.len(), "Successfully wrote user message to Claude stdin");
- } else {
- tracing::warn!(task_id = %task_id, "Stdin is None (already closed), cannot send message");
- break;
- }
- }
- tracing::info!(task_id = %task_id, "Stdin forwarder task ended (channel closed or stdin unavailable)");
- });
-
- // Update state to Running
- {
- tracing::debug!(task_id = %task_id, "Acquiring tasks write lock for Running state update");
- let mut tasks = self.tasks.write().await;
- if let Some(task) = tasks.get_mut(&task_id) {
- task.state = TaskState::Running;
- task.started_at = Some(Instant::now());
- }
- tracing::debug!(task_id = %task_id, "Released tasks write lock");
- }
- tracing::info!(task_id = %task_id, "Updating state: Starting -> Running");
- self.send_status_change(task_id, "starting", "running").await;
- tracing::debug!(task_id = %task_id, "Sent status change notification");
-
- // Stream output with startup timeout check
- tracing::info!(task_id = %task_id, "Starting output stream - waiting for Claude output...");
- tracing::debug!(task_id = %task_id, "Output will be forwarded via WebSocket to server");
- let ws_tx = self.ws_tx.clone();
-
- let mut output_count = 0u64;
- let mut output_bytes = 0usize;
- let startup_timeout = tokio::time::Duration::from_secs(30);
- let mut startup_check = tokio::time::interval(tokio::time::Duration::from_secs(5));
- startup_check.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
- let startup_deadline = tokio::time::Instant::now() + startup_timeout;
-
- loop {
- tokio::select! {
- maybe_line = process.next_output() => {
- match maybe_line {
- Some(line) => {
- output_count += 1;
- output_bytes += line.content.len();
-
- if output_count == 1 {
- tracing::info!(task_id = %task_id, "Received first output line from Claude");
- }
- if output_count % 100 == 0 {
- tracing::debug!(task_id = %task_id, output_count = output_count, output_bytes = output_bytes, "Output progress");
- }
-
- // Log output details for debugging
- tracing::trace!(
- task_id = %task_id,
- line_num = output_count,
- content_len = line.content.len(),
- is_stdout = line.is_stdout,
- json_type = ?line.json_type,
- "Forwarding output to WebSocket"
- );
-
- // Check if this is a "result" message indicating task completion
- // With --input-format=stream-json, Claude waits for more input after completion
- // We close stdin to signal EOF and let the process exit
- if line.json_type.as_deref() == Some("result") {
- tracing::info!(task_id = %task_id, "Received result message, closing stdin to signal completion");
- let mut stdin_guard = stdin_handle_for_completion.lock().await;
- if let Some(mut stdin) = stdin_guard.take() {
- let _ = stdin.shutdown().await;
- }
- }
-
- let msg = DaemonMessage::task_output(task_id, line.content, false);
- if ws_tx.send(msg).await.is_err() {
- tracing::warn!(task_id = %task_id, "Failed to send output, channel closed");
- break;
- }
- }
- None => {
- tracing::info!(task_id = %task_id, output_count = output_count, output_bytes = output_bytes, "Output stream ended");
- break;
- }
- }
- }
- _ = startup_check.tick(), if output_count == 0 => {
- // Check if process is still alive
- match process.try_wait() {
- Ok(Some(exit_code)) => {
- tracing::error!(task_id = %task_id, exit_code = exit_code, "Claude process exited before producing output!");
- let msg = DaemonMessage::task_output(
- task_id,
- format!("Error: Claude process exited unexpectedly with code {}\n", exit_code),
- false,
- );
- let _ = ws_tx.send(msg).await;
- break;
- }
- Ok(None) => {
- // Still running but no output
- if tokio::time::Instant::now() > startup_deadline {
- tracing::warn!(task_id = %task_id, "Claude process not producing output after 30s - may be stuck");
- let msg = DaemonMessage::task_output(
- task_id,
- "Warning: Claude Code is taking longer than expected to start. It may be waiting for authentication or network access.\n".to_string(),
- false,
- );
- let _ = ws_tx.send(msg).await;
- } else {
- tracing::debug!(task_id = %task_id, "Claude process still running, waiting for output...");
- }
- }
- Err(e) => {
- tracing::error!(task_id = %task_id, error = %e, "Failed to check Claude process status");
- }
- }
- }
- }
- }
-
- // Wait for process to exit
- let exit_code = process.wait().await.unwrap_or(-1);
-
- // Clean up input channel for this task
- self.task_inputs.write().await.remove(&task_id);
- tracing::debug!(task_id = %task_id, "Removed task input channel");
-
- // Update state based on exit code
- let success = exit_code == 0;
- let new_state = if success {
- TaskState::Completed
- } else {
- TaskState::Failed
- };
-
- tracing::info!(
- task_id = %task_id,
- exit_code = exit_code,
- success = success,
- new_state = ?new_state,
- "Claude process exited, updating task state"
- );
-
- {
- let mut tasks = self.tasks.write().await;
- if let Some(task) = tasks.get_mut(&task_id) {
- task.state = new_state;
- task.completed_at = Some(Instant::now());
- if !success {
- task.error = Some(format!("Process exited with code {}", exit_code));
- }
- }
- }
-
- // Execute completion action if task succeeded
- let completion_result = if success {
- if let Some(ref action) = completion_action {
- if action != "none" {
- self.execute_completion_action(
- task_id,
- &task_name,
- &working_dir,
- action,
- target_repo_path.as_deref(),
- target_branch.as_deref(),
- ).await
- } else {
- Ok(None)
- }
- } else {
- Ok(None)
- }
- } else {
- Ok(None)
- };
-
- // Log completion action result
- match &completion_result {
- Ok(Some(pr_url)) => {
- tracing::info!(task_id = %task_id, pr_url = %pr_url, "Completion action created PR");
- }
- Ok(None) => {}
- Err(e) => {
- tracing::warn!(task_id = %task_id, error = %e, "Completion action failed (task still marked as done)");
- }
- }
-
- // Notify server
- let error = if success {
- None
- } else {
- Some(format!("Exit code: {}", exit_code))
- };
- tracing::info!(task_id = %task_id, success = success, "Notifying server of task completion");
- let msg = DaemonMessage::task_complete(task_id, success, error);
- let _ = self.ws_tx.send(msg).await;
-
- // Note: Worktrees are kept until explicitly deleted (per user preference)
- // This allows inspection, PR creation, etc.
-
- tracing::info!(task_id = %task_id, "=== RUN_TASK END ===");
- Ok(())
- }
-
- /// Execute the completion action for a task.
- async fn execute_completion_action(
- &self,
- task_id: Uuid,
- task_name: &str,
- worktree_path: &std::path::Path,
- action: &str,
- target_repo_path: Option<&str>,
- target_branch: Option<&str>,
- ) -> Result<Option<String>, String> {
- let target_repo = match target_repo_path {
- Some(path) => crate::worktree::expand_tilde(path),
- None => {
- tracing::warn!(task_id = %task_id, "No target_repo_path configured, skipping completion action");
- return Ok(None);
- }
- };
-
- if !target_repo.exists() {
- return Err(format!("Target repo not found: {} (expanded from {:?})", target_repo.display(), target_repo_path));
- }
-
- // Get the branch name: makima/{task-name-with-dashes}-{short-id}
- let branch_name = format!(
- "makima/{}-{}",
- crate::worktree::sanitize_name(task_name),
- crate::worktree::short_uuid(task_id)
- );
-
- // Determine target branch - use provided value or detect default branch of target repo
- let target_branch = match target_branch {
- Some(branch) => branch.to_string(),
- None => {
- // Detect default branch (main, master, develop, etc.)
- self.worktree_manager
- .detect_default_branch(&target_repo)
- .await
- .unwrap_or_else(|_| "master".to_string())
- }
- };
-
- let msg = DaemonMessage::task_output(
- task_id,
- format!("Executing completion action: {}...\n", action),
- false,
- );
- let _ = self.ws_tx.send(msg).await;
-
- match action {
- "branch" => {
- // Just push the branch to target repo
- self.worktree_manager
- .push_to_target_repo(worktree_path, &target_repo, &branch_name, task_name)
- .await
- .map_err(|e| e.to_string())?;
-
- let msg = DaemonMessage::task_output(
- task_id,
- format!("Branch '{}' pushed to {}\n", branch_name, target_repo.display()),
- false,
- );
- let _ = self.ws_tx.send(msg).await;
- Ok(None)
- }
- "merge" => {
- // Push and merge into target branch
- let commit_sha = self.worktree_manager
- .merge_to_target(worktree_path, &target_repo, &branch_name, &target_branch, task_name)
- .await
- .map_err(|e| e.to_string())?;
-
- let msg = DaemonMessage::task_output(
- task_id,
- format!("Branch merged into {} (commit: {})\n", target_branch, commit_sha),
- false,
- );
- let _ = self.ws_tx.send(msg).await;
- Ok(None)
- }
- "pr" => {
- // Push and create PR
- let title = task_name.to_string();
- let body = format!(
- "Automated PR from makima task.\n\nTask ID: `{}`",
- task_id
- );
- let pr_url = self.worktree_manager
- .create_pull_request(
- worktree_path,
- &target_repo,
- &branch_name,
- &target_branch,
- &title,
- &body,
- )
- .await
- .map_err(|e| e.to_string())?;
-
- let msg = DaemonMessage::task_output(
- task_id,
- format!("Pull request created: {}\n", pr_url),
- false,
- );
- let _ = self.ws_tx.send(msg).await;
- Ok(Some(pr_url))
- }
- _ => {
- tracing::warn!(task_id = %task_id, action = %action, "Unknown completion action");
- Ok(None)
- }
- }
- }
-
- /// Find worktree path for a task ID.
- /// First checks in-memory tasks, then scans the worktrees directory.
- async fn find_worktree_for_task(&self, task_id: Uuid) -> Result<PathBuf, String> {
- // First try to get from in-memory tasks
- {
- let tasks = self.tasks.read().await;
- if let Some(task) = tasks.get(&task_id) {
- if let Some(ref worktree) = task.worktree {
- return Ok(worktree.path.clone());
- }
- }
- }
-
- // Task not in memory - scan worktrees directory for matching task ID
- let short_id = &task_id.to_string()[..8];
- let worktrees_dir = self.worktree_manager.base_dir();
-
- if let Ok(mut entries) = tokio::fs::read_dir(worktrees_dir).await {
- while let Ok(Some(entry)) = entries.next_entry().await {
- let name = entry.file_name();
- let name_str = name.to_string_lossy();
- if name_str.starts_with(short_id) {
- let path = entry.path();
- // Verify it's a valid git directory
- if path.join(".git").exists() {
- tracing::info!(
- task_id = %task_id,
- worktree_path = %path.display(),
- "Found worktree by scanning directory"
- );
- return Ok(path);
- }
- }
- }
- }
-
- Err(format!(
- "No worktree found for task {}. The worktree may have been cleaned up.",
- task_id
- ))
- }
-
- async fn update_state(&self, task_id: Uuid, state: TaskState) {
- let mut tasks = self.tasks.write().await;
- if let Some(task) = tasks.get_mut(&task_id) {
- task.state = state;
- }
- }
-
- async fn send_status_change(&self, task_id: Uuid, old_status: &str, new_status: &str) {
- let msg = DaemonMessage::task_status_change(task_id, old_status, new_status);
- let _ = self.ws_tx.send(msg).await;
- }
-
- /// Mark task as failed.
- async fn mark_failed(&self, task_id: Uuid, error: &str) {
- {
- let mut tasks = self.tasks.write().await;
- if let Some(task) = tasks.get_mut(&task_id) {
- task.state = TaskState::Failed;
- task.error = Some(error.to_string());
- task.completed_at = Some(Instant::now());
- }
- }
-
- // Notify server
- let msg = DaemonMessage::task_complete(task_id, false, Some(error.to_string()));
- let _ = self.ws_tx.send(msg).await;
- }
-}
-
-impl Clone for TaskManagerInner {
- fn clone(&self) -> Self {
- Self {
- worktree_manager: self.worktree_manager.clone(),
- process_manager: self.process_manager.clone(),
- temp_manager: self.temp_manager.clone(),
- tasks: self.tasks.clone(),
- ws_tx: self.ws_tx.clone(),
- task_inputs: self.task_inputs.clone(),
- }
- }
-}
diff --git a/makima/daemon/src/task/mod.rs b/makima/daemon/src/task/mod.rs
deleted file mode 100644
index 29c261e..0000000
--- a/makima/daemon/src/task/mod.rs
+++ /dev/null
@@ -1,7 +0,0 @@
-//! Task management and execution.
-
-pub mod manager;
-pub mod state;
-
-pub use manager::{ManagedTask, TaskConfig, TaskManager};
-pub use state::TaskState;
diff --git a/makima/daemon/src/task/state.rs b/makima/daemon/src/task/state.rs
deleted file mode 100644
index fe73de1..0000000
--- a/makima/daemon/src/task/state.rs
+++ /dev/null
@@ -1,161 +0,0 @@
-//! Task state machine.
-
-use std::fmt;
-
-/// Task execution state.
-#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
-pub enum TaskState {
- /// Task received, preparing overlay.
- Initializing,
- /// Overlay ready, starting container.
- Starting,
- /// Container running.
- Running,
- /// Container paused.
- Paused,
- /// Waiting for sibling or resource.
- Blocked,
- /// Task completed successfully.
- Completed,
- /// Task failed with error.
- Failed,
- /// Task interrupted by user.
- Interrupted,
-}
-
-impl TaskState {
- /// Check if a state transition is valid.
- pub fn can_transition_to(&self, target: TaskState) -> bool {
- use TaskState::*;
-
- matches!(
- (self, target),
- // From Initializing
- (Initializing, Starting)
- | (Initializing, Failed)
- | (Initializing, Interrupted)
- // From Starting
- | (Starting, Running)
- | (Starting, Failed)
- | (Starting, Interrupted)
- // From Running
- | (Running, Paused)
- | (Running, Blocked)
- | (Running, Completed)
- | (Running, Failed)
- | (Running, Interrupted)
- // From Paused
- | (Paused, Running)
- | (Paused, Interrupted)
- | (Paused, Failed)
- // From Blocked
- | (Blocked, Running)
- | (Blocked, Failed)
- | (Blocked, Interrupted)
- )
- }
-
- /// Check if this state is terminal (no more transitions possible).
- pub fn is_terminal(&self) -> bool {
- matches!(
- self,
- TaskState::Completed | TaskState::Failed | TaskState::Interrupted
- )
- }
-
- /// Check if the task is currently active (running or paused).
- pub fn is_active(&self) -> bool {
- matches!(
- self,
- TaskState::Initializing
- | TaskState::Starting
- | TaskState::Running
- | TaskState::Paused
- | TaskState::Blocked
- )
- }
-
- /// Check if the task is running.
- pub fn is_running(&self) -> bool {
- matches!(self, TaskState::Running)
- }
-
- /// Convert to string for protocol messages.
- pub fn as_str(&self) -> &'static str {
- match self {
- TaskState::Initializing => "initializing",
- TaskState::Starting => "starting",
- TaskState::Running => "running",
- TaskState::Paused => "paused",
- TaskState::Blocked => "blocked",
- TaskState::Completed => "done",
- TaskState::Failed => "failed",
- TaskState::Interrupted => "interrupted",
- }
- }
-
- /// Parse from string.
- pub fn from_str(s: &str) -> Option<Self> {
- match s.to_lowercase().as_str() {
- "initializing" => Some(TaskState::Initializing),
- "starting" => Some(TaskState::Starting),
- "running" => Some(TaskState::Running),
- "paused" => Some(TaskState::Paused),
- "blocked" => Some(TaskState::Blocked),
- "done" | "completed" => Some(TaskState::Completed),
- "failed" => Some(TaskState::Failed),
- "interrupted" => Some(TaskState::Interrupted),
- _ => None,
- }
- }
-}
-
-impl fmt::Display for TaskState {
- fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
- write!(f, "{}", self.as_str())
- }
-}
-
-impl Default for TaskState {
- fn default() -> Self {
- TaskState::Initializing
- }
-}
-
-#[cfg(test)]
-mod tests {
- use super::*;
-
- #[test]
- fn test_valid_transitions() {
- use TaskState::*;
-
- // Valid transitions
- assert!(Initializing.can_transition_to(Starting));
- assert!(Starting.can_transition_to(Running));
- assert!(Running.can_transition_to(Completed));
- assert!(Running.can_transition_to(Paused));
- assert!(Paused.can_transition_to(Running));
-
- // Invalid transitions
- assert!(!Completed.can_transition_to(Running));
- assert!(!Failed.can_transition_to(Running));
- assert!(!Running.can_transition_to(Initializing));
- }
-
- #[test]
- fn test_terminal_states() {
- assert!(TaskState::Completed.is_terminal());
- assert!(TaskState::Failed.is_terminal());
- assert!(TaskState::Interrupted.is_terminal());
- assert!(!TaskState::Running.is_terminal());
- assert!(!TaskState::Paused.is_terminal());
- }
-
- #[test]
- fn test_parse() {
- assert_eq!(TaskState::from_str("running"), Some(TaskState::Running));
- assert_eq!(TaskState::from_str("done"), Some(TaskState::Completed));
- assert_eq!(TaskState::from_str("invalid"), None);
- }
-}
diff --git a/makima/daemon/src/temp.rs b/makima/daemon/src/temp.rs
deleted file mode 100644
index 015b21b..0000000
--- a/makima/daemon/src/temp.rs
+++ /dev/null
@@ -1,224 +0,0 @@
-//! Managed temporary directory for tasks without repositories.
-//!
-//! Tasks that don't have a repository URL and aren't subtasks (which inherit
-//! from parent) use a managed temp directory in ~/.makima/temp/. The directory
-//! is automatically cleaned up when it exceeds a size limit.
-
-use std::path::PathBuf;
-
-use tokio::fs;
-use uuid::Uuid;
-
-/// Maximum size of the temp directory before cleanup (5GB).
-const MAX_TEMP_SIZE_BYTES: u64 = 5 * 1024 * 1024 * 1024;
-
-/// Manages temporary directories for tasks without repositories.
-pub struct TempManager {
- /// Base directory for temp task directories (~/.makima/temp/).
- temp_dir: PathBuf,
-}
-
-impl TempManager {
- /// Create a new TempManager.
- pub fn new() -> Self {
- let home = dirs::home_dir().unwrap_or_else(|| PathBuf::from("."));
- Self {
- temp_dir: home.join(".makima").join("temp"),
- }
- }
-
- /// Create a new TempManager with a custom base directory.
- #[allow(dead_code)]
- pub fn with_base_dir(base_dir: PathBuf) -> Self {
- Self { temp_dir: base_dir }
- }
-
- /// Get the base temp directory path.
- pub fn temp_dir(&self) -> &PathBuf {
- &self.temp_dir
- }
-
- /// Create a temp directory for a task.
- ///
- /// This creates a directory at ~/.makima/temp/task-{id}/ and triggers
- /// cleanup if the total size exceeds the limit.
- pub async fn create_task_dir(&self, task_id: Uuid) -> Result<PathBuf, std::io::Error> {
- // Ensure base directory exists
- fs::create_dir_all(&self.temp_dir).await?;
-
- // Check size and cleanup if needed
- if let Err(e) = self.cleanup_if_needed().await {
- tracing::warn!("Temp directory cleanup failed: {}", e);
- // Continue anyway, cleanup is best-effort
- }
-
- // Create task-specific directory
- let task_dir = self.temp_dir.join(format!("task-{}", task_id));
- fs::create_dir_all(&task_dir).await?;
-
- tracing::info!(
- task_id = %task_id,
- path = %task_dir.display(),
- "Created temp directory for task"
- );
-
- Ok(task_dir)
- }
-
- /// Calculate total size of temp directory recursively.
- async fn get_total_size(&self) -> Result<u64, std::io::Error> {
- if !self.temp_dir.exists() {
- return Ok(0);
- }
-
- let mut total = 0u64;
- let mut stack = vec![self.temp_dir.clone()];
-
- while let Some(dir) = stack.pop() {
- let mut entries = match fs::read_dir(&dir).await {
- Ok(e) => e,
- Err(_) => continue,
- };
-
- while let Ok(Some(entry)) = entries.next_entry().await {
- let metadata = match entry.metadata().await {
- Ok(m) => m,
- Err(_) => continue,
- };
-
- if metadata.is_dir() {
- stack.push(entry.path());
- } else {
- total += metadata.len();
- }
- }
- }
-
- Ok(total)
- }
-
- /// Remove oldest directories if total size exceeds limit.
- async fn cleanup_if_needed(&self) -> Result<(), std::io::Error> {
- let size = self.get_total_size().await?;
- if size <= MAX_TEMP_SIZE_BYTES {
- return Ok(());
- }
-
- tracing::info!(
- current_size_mb = size / 1024 / 1024,
- limit_mb = MAX_TEMP_SIZE_BYTES / 1024 / 1024,
- "Temp directory exceeds size limit, starting cleanup"
- );
-
- // Get all task dirs with modification times
- let mut dirs: Vec<(PathBuf, std::time::SystemTime, u64)> = vec![];
- let mut entries = fs::read_dir(&self.temp_dir).await?;
-
- while let Ok(Some(entry)) = entries.next_entry().await {
- let path = entry.path();
- if !path.is_dir() {
- continue;
- }
-
- let metadata = match entry.metadata().await {
- Ok(m) => m,
- Err(_) => continue,
- };
-
- let modified = metadata.modified().unwrap_or(std::time::UNIX_EPOCH);
- let dir_size = self.get_dir_size(&path).await.unwrap_or(0);
- dirs.push((path, modified, dir_size));
- }
-
- // Sort by oldest first
- dirs.sort_by(|a, b| a.1.cmp(&b.1));
-
- // Remove oldest until under limit
- let mut current_size = size;
- for (path, _, dir_size) in dirs {
- if current_size <= MAX_TEMP_SIZE_BYTES {
- break;
- }
-
- tracing::info!(
- path = %path.display(),
- size_mb = dir_size / 1024 / 1024,
- "Removing old temp directory"
- );
-
- if let Err(e) = fs::remove_dir_all(&path).await {
- tracing::warn!(path = %path.display(), error = %e, "Failed to remove temp directory");
- continue;
- }
-
- current_size = current_size.saturating_sub(dir_size);
- }
-
- tracing::info!(
- new_size_mb = current_size / 1024 / 1024,
- "Temp directory cleanup complete"
- );
-
- Ok(())
- }
-
- /// Calculate size of a directory recursively.
- async fn get_dir_size(&self, path: &PathBuf) -> Result<u64, std::io::Error> {
- let mut total = 0u64;
- let mut stack = vec![path.clone()];
-
- while let Some(dir) = stack.pop() {
- let mut entries = match fs::read_dir(&dir).await {
- Ok(e) => e,
- Err(_) => continue,
- };
-
- while let Ok(Some(entry)) = entries.next_entry().await {
- let metadata = match entry.metadata().await {
- Ok(m) => m,
- Err(_) => continue,
- };
-
- if metadata.is_dir() {
- stack.push(entry.path());
- } else {
- total += metadata.len();
- }
- }
- }
-
- Ok(total)
- }
-
- /// Remove a specific task's temp directory.
- #[allow(dead_code)]
- pub async fn remove_task_dir(&self, task_id: Uuid) -> Result<(), std::io::Error> {
- let task_dir = self.temp_dir.join(format!("task-{}", task_id));
- if task_dir.exists() {
- fs::remove_dir_all(&task_dir).await?;
- tracing::info!(
- task_id = %task_id,
- path = %task_dir.display(),
- "Removed temp directory for task"
- );
- }
- Ok(())
- }
-}
-
-impl Default for TempManager {
- fn default() -> Self {
- Self::new()
- }
-}
-
-#[cfg(test)]
-mod tests {
- use super::*;
-
- #[test]
- fn test_temp_manager_default_dir() {
- let manager = TempManager::new();
- assert!(manager.temp_dir().ends_with(".makima/temp"));
- }
-}
diff --git a/makima/daemon/src/worktree/manager.rs b/makima/daemon/src/worktree/manager.rs
deleted file mode 100644
index 266b970..0000000
--- a/makima/daemon/src/worktree/manager.rs
+++ /dev/null
@@ -1,1623 +0,0 @@
-//! Worktree manager implementation.
-
-use std::collections::HashMap;
-use std::path::{Path, PathBuf};
-use std::sync::LazyLock;
-
-use tokio::process::Command;
-use tokio::sync::Mutex;
-use uuid::Uuid;
-
-/// Errors that can occur during worktree operations.
-#[derive(Debug, thiserror::Error)]
-pub enum WorktreeError {
- #[error("Git command failed: {0}")]
- GitCommand(String),
-
- #[error("Repository not found: {0}")]
- RepoNotFound(String),
-
- #[error("Failed to create directory: {0}")]
- CreateDir(#[from] std::io::Error),
-
- #[error("Invalid repository path: {0}")]
- InvalidPath(String),
-
- #[error("Worktree already exists: {0}")]
- AlreadyExists(String),
-
- #[error("Clone failed: {0}")]
- CloneFailed(String),
-
- #[error("Merge in progress")]
- MergeInProgress,
-
- #[error("No merge in progress")]
- NoMergeInProgress,
-
- #[error("Merge has conflicts: {0}")]
- MergeConflicts(String),
-}
-
-/// Strategy for resolving a merge conflict.
-#[derive(Debug, Clone, Copy, PartialEq, Eq)]
-pub enum ConflictResolution {
- /// Use our version (the branch being merged into).
- Ours,
- /// Use their version (the branch being merged).
- Theirs,
-}
-
-/// State of an in-progress merge.
-#[derive(Debug, Clone)]
-pub struct MergeState {
- /// The branch being merged.
- pub source_branch: String,
- /// Files with unresolved conflicts.
- pub conflicted_files: Vec<String>,
- /// Whether a merge is currently in progress.
- pub in_progress: bool,
-}
-
-/// Information about a task branch.
-#[derive(Debug, Clone)]
-pub struct TaskBranchInfo {
- /// Full branch name.
- pub name: String,
- /// Task ID extracted from branch name (if parseable).
- pub task_id: Option<Uuid>,
- /// Whether this branch has been merged into the current branch.
- pub is_merged: bool,
- /// Short SHA of the last commit.
- pub last_commit: String,
- /// Subject line of the last commit.
- pub last_commit_message: String,
-}
-
-/// Information about a created worktree.
-#[derive(Debug, Clone)]
-pub struct WorktreeInfo {
- /// Path to the worktree directory.
- pub path: PathBuf,
- /// Git branch name for this worktree.
- pub branch: String,
- /// Source repository path.
- pub source_repo: PathBuf,
-}
-
-/// Manages git worktrees for task isolation.
-pub struct WorktreeManager {
- /// Base directory for all worktrees (~/.makima/worktrees).
- base_dir: PathBuf,
- /// Base directory for cloned repos (~/.makima/repos).
- repos_dir: PathBuf,
- /// Branch prefix for task branches.
- branch_prefix: String,
-}
-
-/// Per-worktree locks to prevent concurrent creation issues.
-static WORKTREE_LOCKS: LazyLock<Mutex<HashMap<String, std::sync::Arc<tokio::sync::Mutex<()>>>>> =
- LazyLock::new(|| Mutex::new(HashMap::new()));
-
-impl WorktreeManager {
- /// Create a new WorktreeManager with the given base directory.
- pub fn new(base_dir: PathBuf) -> Self {
- let repos_dir = base_dir.parent()
- .map(|p| p.join("repos"))
- .unwrap_or_else(|| base_dir.join("repos"));
-
- Self {
- base_dir,
- repos_dir,
- branch_prefix: "makima/task-".to_string(),
- }
- }
-
- /// Get the default worktree base directory (~/.makima/worktrees).
- pub fn default_base_dir() -> PathBuf {
- dirs::home_dir()
- .unwrap_or_else(|| PathBuf::from("."))
- .join(".makima")
- .join("worktrees")
- }
-
- /// Get the base directory for worktrees.
- pub fn base_dir(&self) -> &Path {
- &self.base_dir
- }
-
- /// Detect the default branch of a repository.
- /// Tries to find HEAD's target, falling back to common branch names.
- pub async fn detect_default_branch(&self, repo_path: &Path) -> Result<String, WorktreeError> {
- // Try to get the branch that HEAD points to
- let output = Command::new("git")
- .args(["symbolic-ref", "refs/remotes/origin/HEAD", "--short"])
- .current_dir(repo_path)
- .output()
- .await?;
-
- if output.status.success() {
- let branch = String::from_utf8_lossy(&output.stdout).trim().to_string();
- // Remove "origin/" prefix if present
- let branch = branch.strip_prefix("origin/").unwrap_or(&branch).to_string();
- if !branch.is_empty() {
- return Ok(branch);
- }
- }
-
- // Try common branch names
- for branch in ["main", "master", "develop", "trunk"] {
- let output = Command::new("git")
- .args(["rev-parse", "--verify", &format!("refs/heads/{}", branch)])
- .current_dir(repo_path)
- .output()
- .await?;
-
- if output.status.success() {
- return Ok(branch.to_string());
- }
- }
-
- // Fall back to getting the current branch
- let output = Command::new("git")
- .args(["rev-parse", "--abbrev-ref", "HEAD"])
- .current_dir(repo_path)
- .output()
- .await?;
-
- if output.status.success() {
- let branch = String::from_utf8_lossy(&output.stdout).trim().to_string();
- if !branch.is_empty() && branch != "HEAD" {
- return Ok(branch);
- }
- }
-
- Err(WorktreeError::GitCommand(
- "Could not detect default branch".to_string(),
- ))
- }
-
- /// Ensure the source repository exists locally and is up-to-date.
- /// If repo_source is a URL, clone it. If it's a path, verify it exists.
- /// For both cases, fetch latest changes from remote if available.
- pub async fn ensure_repo(&self, repo_source: &str) -> Result<PathBuf, WorktreeError> {
- // Check if it's a URL (simple heuristic)
- if repo_source.starts_with("http://")
- || repo_source.starts_with("https://")
- || repo_source.starts_with("git@")
- || repo_source.starts_with("ssh://")
- {
- self.clone_or_fetch_repo(repo_source).await
- } else {
- // Treat as local path - expand tilde if present
- let path = expand_tilde(repo_source);
- if !path.exists() {
- return Err(WorktreeError::RepoNotFound(repo_source.to_string()));
- }
- // Verify it's a git repo
- let git_dir = path.join(".git");
- if !git_dir.exists() {
- return Err(WorktreeError::InvalidPath(format!(
- "{} is not a git repository",
- repo_source
- )));
- }
-
- // Fetch latest changes from remote if configured
- tracing::info!("Fetching latest changes for local repo: {}", repo_source);
- let output = Command::new("git")
- .args(["fetch", "--all", "--prune"])
- .current_dir(&path)
- .output()
- .await?;
-
- if !output.status.success() {
- let stderr = String::from_utf8_lossy(&output.stderr);
- // Don't fail - repo might not have a remote configured
- tracing::debug!("Git fetch for local repo (may not have remote): {}", stderr);
- } else {
- tracing::info!("Fetched latest changes for {}", repo_source);
- }
-
- Ok(path)
- }
- }
-
- /// Clone a repository or fetch if already cloned.
- async fn clone_or_fetch_repo(&self, url: &str) -> Result<PathBuf, WorktreeError> {
- // Extract repo name from URL
- let repo_name = extract_repo_name(url);
- let repo_path = self.repos_dir.join(&repo_name);
-
- // Create repos directory if needed
- tokio::fs::create_dir_all(&self.repos_dir).await?;
-
- if repo_path.exists() {
- // Fetch latest changes
- tracing::info!("Fetching updates for existing repo: {}", repo_name);
- let output = Command::new("git")
- .args(["fetch", "--all", "--prune"])
- .current_dir(&repo_path)
- .output()
- .await?;
-
- if !output.status.success() {
- let stderr = String::from_utf8_lossy(&output.stderr);
- tracing::warn!("Git fetch warning: {}", stderr);
- // Don't fail on fetch errors, repo might still be usable
- }
- } else {
- // Clone the repository
- tracing::info!("Cloning repository: {} -> {}", url, repo_path.display());
- let output = Command::new("git")
- .args(["clone", "--bare", url])
- .arg(&repo_path)
- .output()
- .await?;
-
- if !output.status.success() {
- let stderr = String::from_utf8_lossy(&output.stderr);
- return Err(WorktreeError::CloneFailed(stderr.to_string()));
- }
- }
-
- Ok(repo_path)
- }
-
- /// Create a worktree for a task.
- ///
- /// This creates a unique directory with a git worktree checked out to a new branch.
- pub async fn create_worktree(
- &self,
- source_repo: &Path,
- task_id: Uuid,
- task_name: &str,
- base_branch: &str,
- ) -> Result<WorktreeInfo, WorktreeError> {
- // Generate unique directory name and branch
- let dir_name = format!("{}-{}", short_uuid(task_id), sanitize_name(task_name));
- let worktree_path = self.base_dir.join(&dir_name);
- // Branch name: makima/{task-name-with-dashes}-{short-id}
- let branch_name = format!("{}{}-{}", self.branch_prefix, sanitize_name(task_name), short_uuid(task_id));
-
- // Acquire lock for this worktree path
- let lock = {
- let mut locks = WORKTREE_LOCKS.lock().await;
- locks
- .entry(worktree_path.to_string_lossy().to_string())
- .or_insert_with(|| std::sync::Arc::new(tokio::sync::Mutex::new(())))
- .clone()
- };
- let _guard = lock.lock().await;
-
- // Check if worktree already exists - reuse it if so
- if worktree_path.exists() {
- tracing::info!(
- task_id = %task_id,
- worktree_path = %worktree_path.display(),
- "Worktree already exists, reusing"
- );
-
- // Verify it's a valid git directory
- let git_dir = worktree_path.join(".git");
- if git_dir.exists() {
- // Get the current branch name
- let output = Command::new("git")
- .args(["rev-parse", "--abbrev-ref", "HEAD"])
- .current_dir(&worktree_path)
- .output()
- .await?;
-
- let current_branch = if output.status.success() {
- String::from_utf8_lossy(&output.stdout).trim().to_string()
- } else {
- branch_name.clone()
- };
-
- return Ok(WorktreeInfo {
- path: worktree_path,
- branch: current_branch,
- source_repo: source_repo.to_path_buf(),
- });
- } else {
- // Directory exists but isn't a git worktree - remove and recreate
- tracing::warn!(
- task_id = %task_id,
- worktree_path = %worktree_path.display(),
- "Directory exists but is not a git worktree, removing"
- );
- tokio::fs::remove_dir_all(&worktree_path).await?;
- }
- }
-
- // Create base directory
- tokio::fs::create_dir_all(&self.base_dir).await?;
-
- tracing::info!(
- task_id = %task_id,
- worktree_path = %worktree_path.display(),
- branch = %branch_name,
- base_branch = %base_branch,
- "Creating worktree from local branch"
- );
-
- // Create the worktree with a new branch based on the local base_branch
- let output = Command::new("git")
- .args([
- "worktree",
- "add",
- "-b",
- &branch_name,
- ])
- .arg(&worktree_path)
- .arg(base_branch)
- .current_dir(source_repo)
- .output()
- .await?;
-
- if !output.status.success() {
- let stderr = String::from_utf8_lossy(&output.stderr);
- return Err(WorktreeError::GitCommand(format!(
- "Failed to create worktree: {}",
- stderr
- )));
- }
-
- tracing::info!(
- task_id = %task_id,
- worktree_path = %worktree_path.display(),
- "Worktree created successfully"
- );
-
- Ok(WorktreeInfo {
- path: worktree_path,
- branch: branch_name,
- source_repo: source_repo.to_path_buf(),
- })
- }
-
- /// Create a worktree for a task by copying from another task's worktree.
- ///
- /// This allows sequential subtasks where one continues from another's work,
- /// including uncommitted changes.
- pub async fn create_worktree_from_task(
- &self,
- source_worktree: &Path,
- task_id: Uuid,
- task_name: &str,
- ) -> Result<WorktreeInfo, WorktreeError> {
- // Verify source worktree exists
- if !source_worktree.exists() {
- return Err(WorktreeError::RepoNotFound(format!(
- "Source worktree not found: {}",
- source_worktree.display()
- )));
- }
-
- // Get the source repo from the source worktree
- let source_repo = self.get_worktree_source(source_worktree).await?;
-
- // Get the base branch from source worktree's current HEAD
- let output = Command::new("git")
- .args(["rev-parse", "HEAD"])
- .current_dir(source_worktree)
- .output()
- .await?;
-
- if !output.status.success() {
- return Err(WorktreeError::GitCommand(
- "Failed to get source worktree HEAD".to_string(),
- ));
- }
- let source_commit = String::from_utf8_lossy(&output.stdout).trim().to_string();
-
- // Generate unique directory name and branch for new worktree
- let dir_name = format!("{}-{}", short_uuid(task_id), sanitize_name(task_name));
- let worktree_path = self.base_dir.join(&dir_name);
- let branch_name = format!("{}{}", self.branch_prefix, task_id);
-
- // Acquire lock for this worktree path
- let lock = {
- let mut locks = WORKTREE_LOCKS.lock().await;
- locks
- .entry(worktree_path.to_string_lossy().to_string())
- .or_insert_with(|| std::sync::Arc::new(tokio::sync::Mutex::new(())))
- .clone()
- };
- let _guard = lock.lock().await;
-
- // Remove existing worktree if present
- if worktree_path.exists() {
- tracing::info!(
- task_id = %task_id,
- worktree_path = %worktree_path.display(),
- "Removing existing worktree before creating from source"
- );
- tokio::fs::remove_dir_all(&worktree_path).await?;
- }
-
- // Create base directory
- tokio::fs::create_dir_all(&self.base_dir).await?;
-
- tracing::info!(
- task_id = %task_id,
- source_worktree = %source_worktree.display(),
- worktree_path = %worktree_path.display(),
- branch = %branch_name,
- source_commit = %source_commit,
- "Creating worktree from source task"
- );
-
- // Create a new worktree based on the source commit
- let output = Command::new("git")
- .args([
- "worktree",
- "add",
- "-b",
- &branch_name,
- ])
- .arg(&worktree_path)
- .arg(&source_commit)
- .current_dir(&source_repo)
- .output()
- .await?;
-
- if !output.status.success() {
- let stderr = String::from_utf8_lossy(&output.stderr);
- return Err(WorktreeError::GitCommand(format!(
- "Failed to create worktree: {}",
- stderr
- )));
- }
-
- // Now copy uncommitted changes from source worktree
- // Use rsync to copy all files except .git
- let output = Command::new("rsync")
- .args([
- "-a",
- "--exclude", ".git",
- "--exclude", ".makima",
- &format!("{}/", source_worktree.display()),
- &format!("{}/", worktree_path.display()),
- ])
- .output()
- .await?;
-
- if !output.status.success() {
- let stderr = String::from_utf8_lossy(&output.stderr);
- tracing::warn!(
- task_id = %task_id,
- "rsync warning (continuing anyway): {}",
- stderr
- );
- }
-
- tracing::info!(
- task_id = %task_id,
- worktree_path = %worktree_path.display(),
- "Worktree created from source task successfully"
- );
-
- Ok(WorktreeInfo {
- path: worktree_path,
- branch: branch_name,
- source_repo: source_repo.to_path_buf(),
- })
- }
-
- /// Remove a worktree and optionally its branch.
- pub async fn remove_worktree(
- &self,
- worktree_path: &Path,
- delete_branch: bool,
- ) -> Result<(), WorktreeError> {
- if !worktree_path.exists() {
- return Ok(()); // Already gone
- }
-
- // Get the branch name before removing
- let branch_name = if delete_branch {
- self.get_worktree_branch(worktree_path).await.ok()
- } else {
- None
- };
-
- // Find the source repo from worktree
- let source_repo = self.get_worktree_source(worktree_path).await?;
-
- tracing::info!(
- worktree_path = %worktree_path.display(),
- delete_branch = delete_branch,
- "Removing worktree"
- );
-
- // Remove the worktree
- let output = Command::new("git")
- .args(["worktree", "remove", "--force"])
- .arg(worktree_path)
- .current_dir(&source_repo)
- .output()
- .await?;
-
- if !output.status.success() {
- let stderr = String::from_utf8_lossy(&output.stderr);
- // Try force removal of directory if git worktree remove fails
- if worktree_path.exists() {
- tokio::fs::remove_dir_all(worktree_path).await?;
- }
- tracing::warn!("Git worktree remove warning: {}", stderr);
- }
-
- // Prune worktree references
- let _ = Command::new("git")
- .args(["worktree", "prune"])
- .current_dir(&source_repo)
- .output()
- .await;
-
- // Delete the branch if requested
- if let Some(branch) = branch_name {
- let output = Command::new("git")
- .args(["branch", "-D", &branch])
- .current_dir(&source_repo)
- .output()
- .await?;
-
- if !output.status.success() {
- let stderr = String::from_utf8_lossy(&output.stderr);
- tracing::warn!("Failed to delete branch {}: {}", branch, stderr);
- }
- }
-
- Ok(())
- }
-
- /// Get the branch name of a worktree.
- async fn get_worktree_branch(&self, worktree_path: &Path) -> Result<String, WorktreeError> {
- let output = Command::new("git")
- .args(["rev-parse", "--abbrev-ref", "HEAD"])
- .current_dir(worktree_path)
- .output()
- .await?;
-
- if !output.status.success() {
- let stderr = String::from_utf8_lossy(&output.stderr);
- return Err(WorktreeError::GitCommand(format!(
- "Failed to get branch: {}",
- stderr
- )));
- }
-
- Ok(String::from_utf8_lossy(&output.stdout).trim().to_string())
- }
-
- /// Get the source repository path for a worktree.
- async fn get_worktree_source(&self, worktree_path: &Path) -> Result<PathBuf, WorktreeError> {
- // Read the .git file in the worktree which contains the path to the main repo
- let git_file = worktree_path.join(".git");
-
- if git_file.is_file() {
- let content = tokio::fs::read_to_string(&git_file).await?;
- // Format: "gitdir: /path/to/repo/.git/worktrees/name"
- if let Some(gitdir) = content.strip_prefix("gitdir: ") {
- let gitdir = gitdir.trim();
- // Navigate from worktrees/name back to the main repo
- let path = PathBuf::from(gitdir);
- if let Some(worktrees_dir) = path.parent() {
- if let Some(git_dir) = worktrees_dir.parent() {
- if let Some(repo_dir) = git_dir.parent() {
- return Ok(repo_dir.to_path_buf());
- }
- }
- }
- }
- }
-
- // Fallback: try to find it in our repos directory
- Err(WorktreeError::InvalidPath(format!(
- "Could not determine source repo for worktree: {}",
- worktree_path.display()
- )))
- }
-
- /// List all worktrees in the base directory.
- pub async fn list_worktrees(&self) -> Result<Vec<PathBuf>, WorktreeError> {
- let mut worktrees = Vec::new();
-
- if !self.base_dir.exists() {
- return Ok(worktrees);
- }
-
- let mut entries = tokio::fs::read_dir(&self.base_dir).await?;
- while let Some(entry) = entries.next_entry().await? {
- let path = entry.path();
- if path.is_dir() && path.join(".git").exists() {
- worktrees.push(path);
- }
- }
-
- Ok(worktrees)
- }
-
- /// Initialize a new git repository for a task.
- ///
- /// This creates a fresh git repo (not a worktree) for tasks that don't need
- /// an existing codebase. Use this when `repository_url` is `new://` or `new://project-name`.
- pub async fn init_new_repo(
- &self,
- task_id: Uuid,
- repo_source: &str,
- ) -> Result<WorktreeInfo, WorktreeError> {
- let project_name = extract_new_repo_name(repo_source);
- let dir_name = match project_name {
- Some(name) => format!("{}-{}", short_uuid(task_id), sanitize_name(name)),
- None => format!("{}-new", short_uuid(task_id)),
- };
- let repo_path = self.repos_dir.join(&dir_name);
-
- tracing::info!(
- task_id = %task_id,
- path = %repo_path.display(),
- project_name = ?project_name,
- "Initializing new git repository"
- );
-
- // Create directory
- tokio::fs::create_dir_all(&repo_path).await?;
-
- // git init
- let output = Command::new("git")
- .args(["init"])
- .current_dir(&repo_path)
- .output()
- .await?;
-
- if !output.status.success() {
- let stderr = String::from_utf8_lossy(&output.stderr);
- return Err(WorktreeError::GitCommand(format!(
- "Failed to init repository: {}",
- stderr
- )));
- }
-
- // Configure git user (needed for commits)
- let _ = Command::new("git")
- .args(["config", "user.email", "makima@localhost"])
- .current_dir(&repo_path)
- .output()
- .await;
- let _ = Command::new("git")
- .args(["config", "user.name", "Makima"])
- .current_dir(&repo_path)
- .output()
- .await;
-
- // Initial commit (required for worktrees to work later if needed)
- let output = Command::new("git")
- .args(["commit", "--allow-empty", "-m", "Initial commit"])
- .current_dir(&repo_path)
- .output()
- .await?;
-
- if !output.status.success() {
- let stderr = String::from_utf8_lossy(&output.stderr);
- return Err(WorktreeError::GitCommand(format!(
- "Failed to create initial commit: {}",
- stderr
- )));
- }
-
- tracing::info!(
- task_id = %task_id,
- path = %repo_path.display(),
- "New git repository initialized"
- );
-
- Ok(WorktreeInfo {
- path: repo_path.clone(),
- branch: "main".to_string(),
- source_repo: repo_path,
- })
- }
-
- // ========== Merge Operations ==========
-
- /// List all task branches in a repository.
- ///
- /// Returns branches matching the pattern `makima/task-*`.
- pub async fn list_task_branches(
- &self,
- repo_path: &Path,
- ) -> Result<Vec<TaskBranchInfo>, WorktreeError> {
- // Get all branches matching our prefix
- let output = Command::new("git")
- .args([
- "branch",
- "--list",
- &format!("{}*", self.branch_prefix),
- "--format=%(refname:short)|%(objectname:short)|%(subject)",
- ])
- .current_dir(repo_path)
- .output()
- .await?;
-
- if !output.status.success() {
- let stderr = String::from_utf8_lossy(&output.stderr);
- return Err(WorktreeError::GitCommand(format!(
- "Failed to list branches: {}",
- stderr
- )));
- }
-
- // Get list of merged branches
- let merged_output = Command::new("git")
- .args(["branch", "--merged", "HEAD", "--format=%(refname:short)"])
- .current_dir(repo_path)
- .output()
- .await?;
-
- let merged_branches: std::collections::HashSet<String> = if merged_output.status.success() {
- String::from_utf8_lossy(&merged_output.stdout)
- .lines()
- .map(|s| s.trim().to_string())
- .collect()
- } else {
- std::collections::HashSet::new()
- };
-
- let stdout = String::from_utf8_lossy(&output.stdout);
- let mut branches = Vec::new();
-
- for line in stdout.lines() {
- let parts: Vec<&str> = line.split('|').collect();
- if parts.len() >= 3 {
- let name = parts[0].trim().to_string();
- let last_commit = parts[1].trim().to_string();
- let last_commit_message = parts[2].trim().to_string();
-
- // Try to extract task ID from branch name
- let task_id = name
- .strip_prefix(&self.branch_prefix)
- .and_then(|s| Uuid::parse_str(s).ok());
-
- let is_merged = merged_branches.contains(&name);
-
- branches.push(TaskBranchInfo {
- name,
- task_id,
- is_merged,
- last_commit,
- last_commit_message,
- });
- }
- }
-
- Ok(branches)
- }
-
- /// Start a merge of a branch into the current worktree.
- ///
- /// Uses `--no-commit` to allow conflict resolution before committing.
- /// Returns Ok(None) if merge succeeds without conflicts, or Ok(Some(files))
- /// with the list of conflicted files.
- pub async fn merge_branch(
- &self,
- worktree_path: &Path,
- source_branch: &str,
- ) -> Result<Option<Vec<String>>, WorktreeError> {
- // Check if there's already a merge in progress
- if self.is_merge_in_progress(worktree_path).await? {
- return Err(WorktreeError::MergeInProgress);
- }
-
- tracing::info!(
- worktree = %worktree_path.display(),
- source_branch = %source_branch,
- "Starting merge"
- );
-
- // Attempt the merge with --no-commit --no-ff
- let output = Command::new("git")
- .args(["merge", "--no-commit", "--no-ff", source_branch])
- .current_dir(worktree_path)
- .output()
- .await?;
-
- if output.status.success() {
- tracing::info!("Merge completed without conflicts");
- return Ok(None);
- }
-
- // Check if there are conflicts
- let conflicts = self.get_conflicted_files(worktree_path).await?;
- if !conflicts.is_empty() {
- tracing::info!(
- conflicts = ?conflicts,
- "Merge has conflicts"
- );
- return Ok(Some(conflicts));
- }
-
- // Other error
- let stderr = String::from_utf8_lossy(&output.stderr);
- Err(WorktreeError::GitCommand(format!(
- "Merge failed: {}",
- stderr
- )))
- }
-
- /// Check if a merge is currently in progress.
- pub async fn is_merge_in_progress(&self, worktree_path: &Path) -> Result<bool, WorktreeError> {
- // Check for MERGE_HEAD file
- let merge_head = worktree_path.join(".git").join("MERGE_HEAD");
- if merge_head.exists() {
- return Ok(true);
- }
-
- // Also check in .git file (for worktrees)
- let git_file = worktree_path.join(".git");
- if git_file.is_file() {
- if let Ok(content) = tokio::fs::read_to_string(&git_file).await {
- if let Some(gitdir) = content.strip_prefix("gitdir: ") {
- let gitdir = PathBuf::from(gitdir.trim());
- let merge_head = gitdir.join("MERGE_HEAD");
- if merge_head.exists() {
- return Ok(true);
- }
- }
- }
- }
-
- Ok(false)
- }
-
- /// Get the list of files with unresolved conflicts.
- pub async fn get_conflicted_files(
- &self,
- worktree_path: &Path,
- ) -> Result<Vec<String>, WorktreeError> {
- let output = Command::new("git")
- .args(["diff", "--name-only", "--diff-filter=U"])
- .current_dir(worktree_path)
- .output()
- .await?;
-
- if !output.status.success() {
- // No conflicts or not in merge state
- return Ok(Vec::new());
- }
-
- let stdout = String::from_utf8_lossy(&output.stdout);
- let files: Vec<String> = stdout
- .lines()
- .map(|s| s.trim().to_string())
- .filter(|s| !s.is_empty())
- .collect();
-
- Ok(files)
- }
-
- /// Get the current merge state.
- pub async fn get_merge_state(
- &self,
- worktree_path: &Path,
- ) -> Result<MergeState, WorktreeError> {
- let in_progress = self.is_merge_in_progress(worktree_path).await?;
-
- if !in_progress {
- return Ok(MergeState {
- source_branch: String::new(),
- conflicted_files: Vec::new(),
- in_progress: false,
- });
- }
-
- // Get the branch being merged from MERGE_HEAD
- let source_branch = self.get_merge_source_branch(worktree_path).await?;
- let conflicted_files = self.get_conflicted_files(worktree_path).await?;
-
- Ok(MergeState {
- source_branch,
- conflicted_files,
- in_progress: true,
- })
- }
-
- /// Get the branch name being merged (from MERGE_HEAD).
- async fn get_merge_source_branch(&self, worktree_path: &Path) -> Result<String, WorktreeError> {
- // Get MERGE_HEAD commit
- let output = Command::new("git")
- .args(["rev-parse", "MERGE_HEAD"])
- .current_dir(worktree_path)
- .output()
- .await?;
-
- if !output.status.success() {
- return Ok("unknown".to_string());
- }
-
- let commit = String::from_utf8_lossy(&output.stdout).trim().to_string();
-
- // Try to find branch name for this commit
- let output = Command::new("git")
- .args(["name-rev", "--name-only", &commit])
- .current_dir(worktree_path)
- .output()
- .await?;
-
- if output.status.success() {
- let name = String::from_utf8_lossy(&output.stdout).trim().to_string();
- // Clean up the name (remove ~N suffixes, etc.)
- let name = name.split('~').next().unwrap_or(&name);
- let name = name.split('^').next().unwrap_or(name);
- return Ok(name.to_string());
- }
-
- Ok(commit[..8.min(commit.len())].to_string())
- }
-
- /// Resolve a conflict in a specific file.
- pub async fn resolve_conflict(
- &self,
- worktree_path: &Path,
- file_path: &str,
- resolution: ConflictResolution,
- ) -> Result<(), WorktreeError> {
- if !self.is_merge_in_progress(worktree_path).await? {
- return Err(WorktreeError::NoMergeInProgress);
- }
-
- let strategy = match resolution {
- ConflictResolution::Ours => "--ours",
- ConflictResolution::Theirs => "--theirs",
- };
-
- tracing::info!(
- worktree = %worktree_path.display(),
- file = %file_path,
- strategy = %strategy,
- "Resolving conflict"
- );
-
- // Checkout the chosen version
- let output = Command::new("git")
- .args(["checkout", strategy, "--", file_path])
- .current_dir(worktree_path)
- .output()
- .await?;
-
- if !output.status.success() {
- let stderr = String::from_utf8_lossy(&output.stderr);
- return Err(WorktreeError::GitCommand(format!(
- "Failed to resolve conflict: {}",
- stderr
- )));
- }
-
- // Stage the resolved file
- let output = Command::new("git")
- .args(["add", file_path])
- .current_dir(worktree_path)
- .output()
- .await?;
-
- if !output.status.success() {
- let stderr = String::from_utf8_lossy(&output.stderr);
- return Err(WorktreeError::GitCommand(format!(
- "Failed to stage resolved file: {}",
- stderr
- )));
- }
-
- Ok(())
- }
-
- /// Abort the current merge.
- pub async fn abort_merge(&self, worktree_path: &Path) -> Result<(), WorktreeError> {
- if !self.is_merge_in_progress(worktree_path).await? {
- return Err(WorktreeError::NoMergeInProgress);
- }
-
- tracing::info!(
- worktree = %worktree_path.display(),
- "Aborting merge"
- );
-
- let output = Command::new("git")
- .args(["merge", "--abort"])
- .current_dir(worktree_path)
- .output()
- .await?;
-
- if !output.status.success() {
- let stderr = String::from_utf8_lossy(&output.stderr);
- return Err(WorktreeError::GitCommand(format!(
- "Failed to abort merge: {}",
- stderr
- )));
- }
-
- Ok(())
- }
-
- /// Commit the current merge.
- pub async fn commit_merge(
- &self,
- worktree_path: &Path,
- message: &str,
- ) -> Result<String, WorktreeError> {
- // Check for remaining conflicts
- let conflicts = self.get_conflicted_files(worktree_path).await?;
- if !conflicts.is_empty() {
- return Err(WorktreeError::MergeConflicts(conflicts.join(", ")));
- }
-
- tracing::info!(
- worktree = %worktree_path.display(),
- message = %message,
- "Committing merge"
- );
-
- let output = Command::new("git")
- .args(["commit", "-m", message])
- .current_dir(worktree_path)
- .output()
- .await?;
-
- if !output.status.success() {
- let stderr = String::from_utf8_lossy(&output.stderr);
- return Err(WorktreeError::GitCommand(format!(
- "Failed to commit merge: {}",
- stderr
- )));
- }
-
- // Get the new commit SHA
- let output = Command::new("git")
- .args(["rev-parse", "HEAD"])
- .current_dir(worktree_path)
- .output()
- .await?;
-
- if output.status.success() {
- let sha = String::from_utf8_lossy(&output.stdout).trim().to_string();
- return Ok(sha);
- }
-
- Ok("unknown".to_string())
- }
-
- // ========== Completion Action Operations ==========
-
- /// Push task branch from worktree to an external target repository.
- ///
- /// This stages and commits any uncommitted changes, then pushes to the target repo.
- pub async fn push_to_target_repo(
- &self,
- worktree_path: &Path,
- target_repo: &Path,
- branch_name: &str,
- task_name: &str,
- ) -> Result<(), WorktreeError> {
- tracing::info!(
- worktree = %worktree_path.display(),
- target_repo = %target_repo.display(),
- branch = %branch_name,
- "Pushing branch to target repository"
- );
-
- // First, stage all changes (including new files)
- let output = Command::new("git")
- .args(["add", "-A"])
- .current_dir(worktree_path)
- .output()
- .await?;
-
- if !output.status.success() {
- let stderr = String::from_utf8_lossy(&output.stderr);
- return Err(WorktreeError::GitCommand(format!(
- "Failed to stage changes: {}",
- stderr
- )));
- }
-
- // Check if there are staged changes to commit
- let output = Command::new("git")
- .args(["diff", "--cached", "--quiet"])
- .current_dir(worktree_path)
- .output()
- .await?;
-
- // Exit code 1 means there are staged changes
- if !output.status.success() {
- tracing::info!("Committing staged changes before push");
-
- let commit_message = format!("feat: {}", task_name);
- let output = Command::new("git")
- .args(["commit", "-m", &commit_message])
- .current_dir(worktree_path)
- .output()
- .await?;
-
- if !output.status.success() {
- let stderr = String::from_utf8_lossy(&output.stderr);
- return Err(WorktreeError::GitCommand(format!(
- "Failed to commit changes: {}",
- stderr
- )));
- }
- }
-
- // Ensure there are commits to push
- let output = Command::new("git")
- .args(["log", "--oneline", "-1"])
- .current_dir(worktree_path)
- .output()
- .await?;
-
- if !output.status.success() {
- return Err(WorktreeError::GitCommand(
- "No commits in worktree".to_string(),
- ));
- }
-
- // Add target repo as a remote in the worktree (if not already)
- let remote_name = "target";
- let target_path_str = target_repo.to_string_lossy();
-
- // Remove existing remote if any (ignore errors)
- let _ = Command::new("git")
- .args(["remote", "remove", remote_name])
- .current_dir(worktree_path)
- .output()
- .await;
-
- // Add the target as a remote
- let output = Command::new("git")
- .args(["remote", "add", remote_name, &target_path_str])
- .current_dir(worktree_path)
- .output()
- .await?;
-
- if !output.status.success() {
- let stderr = String::from_utf8_lossy(&output.stderr);
- return Err(WorktreeError::GitCommand(format!(
- "Failed to add remote: {}",
- stderr
- )));
- }
-
- // Push the branch to the target
- let output = Command::new("git")
- .args(["push", "-u", remote_name, &format!("HEAD:{}", branch_name)])
- .current_dir(worktree_path)
- .output()
- .await?;
-
- if !output.status.success() {
- let stderr = String::from_utf8_lossy(&output.stderr);
- return Err(WorktreeError::GitCommand(format!(
- "Failed to push to target: {}",
- stderr
- )));
- }
-
- tracing::info!(
- branch = %branch_name,
- target_repo = %target_repo.display(),
- "Branch pushed successfully"
- );
-
- // Detach HEAD in the worktree to release the branch
- // This allows the branch to be checked out in the target repo
- let output = Command::new("git")
- .args(["checkout", "--detach", "HEAD"])
- .current_dir(worktree_path)
- .output()
- .await?;
-
- if !output.status.success() {
- // Non-fatal: log but don't fail the push
- let stderr = String::from_utf8_lossy(&output.stderr);
- tracing::warn!(
- "Failed to detach HEAD in worktree (branch may not be checkable in target): {}",
- stderr
- );
- } else {
- tracing::info!("Detached HEAD in worktree to release branch");
- }
-
- Ok(())
- }
-
- /// Merge a branch into the target branch in the target repository.
- ///
- /// This pushes the branch first (if needed), then performs a merge in the target repo.
- pub async fn merge_to_target(
- &self,
- worktree_path: &Path,
- target_repo: &Path,
- source_branch: &str,
- target_branch: &str,
- task_name: &str,
- ) -> Result<String, WorktreeError> {
- tracing::info!(
- worktree = %worktree_path.display(),
- target_repo = %target_repo.display(),
- source_branch = %source_branch,
- target_branch = %target_branch,
- "Merging branch to target"
- );
-
- // First, push the branch to target repo
- self.push_to_target_repo(worktree_path, target_repo, source_branch, task_name)
- .await?;
-
- // In target repo, checkout the target branch
- let output = Command::new("git")
- .args(["checkout", target_branch])
- .current_dir(target_repo)
- .output()
- .await?;
-
- if !output.status.success() {
- let stderr = String::from_utf8_lossy(&output.stderr);
- return Err(WorktreeError::GitCommand(format!(
- "Failed to checkout target branch: {}",
- stderr
- )));
- }
-
- // Pull latest changes first
- let _ = Command::new("git")
- .args(["pull", "--ff-only"])
- .current_dir(target_repo)
- .output()
- .await;
-
- // Merge the source branch
- let merge_message = format!("feat: {}", task_name);
- let output = Command::new("git")
- .args(["merge", "--no-ff", source_branch, "-m", &merge_message])
- .current_dir(target_repo)
- .output()
- .await?;
-
- if !output.status.success() {
- let stderr = String::from_utf8_lossy(&output.stderr);
-
- // Check if it's a conflict
- let conflicts = self.get_conflicted_files(target_repo).await?;
- if !conflicts.is_empty() {
- // Abort the merge
- let _ = Command::new("git")
- .args(["merge", "--abort"])
- .current_dir(target_repo)
- .output()
- .await;
-
- return Err(WorktreeError::MergeConflicts(format!(
- "Merge conflicts in: {}. Consider creating a PR instead.",
- conflicts.join(", ")
- )));
- }
-
- return Err(WorktreeError::GitCommand(format!(
- "Failed to merge: {}",
- stderr
- )));
- }
-
- // Get the merge commit SHA
- let output = Command::new("git")
- .args(["rev-parse", "HEAD"])
- .current_dir(target_repo)
- .output()
- .await?;
-
- let commit_sha = if output.status.success() {
- String::from_utf8_lossy(&output.stdout).trim().to_string()
- } else {
- "unknown".to_string()
- };
-
- tracing::info!(
- commit_sha = %commit_sha,
- "Merge completed successfully"
- );
-
- Ok(commit_sha)
- }
-
- /// Create a GitHub pull request using the gh CLI.
- ///
- /// This pushes the branch first, then creates a PR.
- pub async fn create_pull_request(
- &self,
- worktree_path: &Path,
- target_repo: &Path,
- source_branch: &str,
- target_branch: &str,
- title: &str,
- body: &str,
- ) -> Result<String, WorktreeError> {
- tracing::info!(
- worktree = %worktree_path.display(),
- target_repo = %target_repo.display(),
- source_branch = %source_branch,
- target_branch = %target_branch,
- title = %title,
- "Creating pull request"
- );
-
- // First, push the branch to the target repo's remote
- // For PRs, we need to push to origin (the GitHub remote)
-
- // Get the worktree's current branch
- let output = Command::new("git")
- .args(["rev-parse", "--abbrev-ref", "HEAD"])
- .current_dir(worktree_path)
- .output()
- .await?;
-
- let current_branch = if output.status.success() {
- String::from_utf8_lossy(&output.stdout).trim().to_string()
- } else {
- source_branch.to_string()
- };
-
- // Push to the target repo's origin
- // First, check if target_repo has an origin remote
- let output = Command::new("git")
- .args(["remote", "get-url", "origin"])
- .current_dir(target_repo)
- .output()
- .await?;
-
- if !output.status.success() {
- return Err(WorktreeError::GitCommand(
- "Target repository has no origin remote configured".to_string(),
- ));
- }
-
- let origin_url = String::from_utf8_lossy(&output.stdout).trim().to_string();
-
- // Push the branch from worktree to the remote
- // First add the remote to worktree
- let _ = Command::new("git")
- .args(["remote", "remove", "pr-origin"])
- .current_dir(worktree_path)
- .output()
- .await;
-
- let output = Command::new("git")
- .args(["remote", "add", "pr-origin", &origin_url])
- .current_dir(worktree_path)
- .output()
- .await?;
-
- if !output.status.success() {
- let stderr = String::from_utf8_lossy(&output.stderr);
- return Err(WorktreeError::GitCommand(format!(
- "Failed to add remote: {}",
- stderr
- )));
- }
-
- // Push to the remote
- let output = Command::new("git")
- .args(["push", "-u", "pr-origin", &format!("{}:{}", current_branch, source_branch)])
- .current_dir(worktree_path)
- .output()
- .await?;
-
- if !output.status.success() {
- let stderr = String::from_utf8_lossy(&output.stderr);
- return Err(WorktreeError::GitCommand(format!(
- "Failed to push branch: {}",
- stderr
- )));
- }
-
- // Create PR using gh CLI in the target repo
- let output = Command::new("gh")
- .args([
- "pr",
- "create",
- "--title", title,
- "--body", body,
- "--head", source_branch,
- "--base", target_branch,
- ])
- .current_dir(target_repo)
- .output()
- .await?;
-
- if !output.status.success() {
- let stderr = String::from_utf8_lossy(&output.stderr);
- return Err(WorktreeError::GitCommand(format!(
- "Failed to create PR: {}",
- stderr
- )));
- }
-
- // The gh CLI outputs the PR URL
- let pr_url = String::from_utf8_lossy(&output.stdout).trim().to_string();
-
- tracing::info!(
- pr_url = %pr_url,
- "Pull request created successfully"
- );
-
- Ok(pr_url)
- }
-
- /// Clone/copy the worktree contents to a target directory.
- ///
- /// This creates a new git repository at the target path with the same contents
- /// as the worktree. Returns (success, message).
- pub async fn clone_worktree_to_directory(
- &self,
- worktree_path: &Path,
- target_dir: &Path,
- ) -> Result<String, WorktreeError> {
- tracing::info!(
- worktree = %worktree_path.display(),
- target = %target_dir.display(),
- "Cloning worktree to target directory"
- );
-
- // Check if target directory already exists
- if target_dir.exists() {
- return Err(WorktreeError::AlreadyExists(format!(
- "Target directory already exists: {}",
- target_dir.display()
- )));
- }
-
- // Get parent directory to ensure it exists
- if let Some(parent) = target_dir.parent() {
- if !parent.exists() {
- tokio::fs::create_dir_all(parent).await?;
- }
- }
-
- // Use git clone --local to efficiently copy the repository
- // This is more efficient than cp -r for git repos
- let output = Command::new("git")
- .args([
- "clone",
- "--local",
- "--no-hardlinks",
- &worktree_path.to_string_lossy(),
- &target_dir.to_string_lossy(),
- ])
- .output()
- .await?;
-
- if !output.status.success() {
- let stderr = String::from_utf8_lossy(&output.stderr);
- return Err(WorktreeError::CloneFailed(format!(
- "Failed to clone worktree: {}",
- stderr
- )));
- }
-
- // Remove the 'origin' remote that points back to the worktree
- let _ = Command::new("git")
- .args(["remote", "remove", "origin"])
- .current_dir(target_dir)
- .output()
- .await;
-
- tracing::info!(
- target = %target_dir.display(),
- "Worktree cloned successfully"
- );
-
- Ok(format!("Cloned to {}", target_dir.display()))
- }
-
- /// Check if a target directory exists.
- pub async fn target_directory_exists(&self, target_dir: &Path) -> bool {
- target_dir.exists()
- }
-}
-
-/// Check if repo_source is a "new repo" request.
-///
-/// Accepts `new://` or `new://project-name` to create a fresh git repository.
-pub fn is_new_repo_request(source: &str) -> bool {
- source == "new" || source == "new://" || source.starts_with("new://")
-}
-
-/// Extract optional project name from new:// URL.
-fn extract_new_repo_name(source: &str) -> Option<&str> {
- source.strip_prefix("new://").filter(|s| !s.is_empty())
-}
-
-/// Extract repository name from URL.
-fn extract_repo_name(url: &str) -> String {
- // Handle various URL formats:
- // https://github.com/user/repo.git -> repo
- // git@github.com:user/repo.git -> repo
- // https://github.com/user/repo -> repo
-
- let url = url.trim_end_matches('/');
- let url = url.trim_end_matches(".git");
-
- url.rsplit('/')
- .next()
- .or_else(|| url.rsplit(':').next())
- .unwrap_or("repo")
- .to_string()
-}
-
-/// Create a short UUID string for directory naming.
-pub fn short_uuid(id: Uuid) -> String {
- id.to_string()[..8].to_string()
-}
-
-/// Expand tilde (~) in path to home directory.
-pub fn expand_tilde(path: &str) -> PathBuf {
- if let Some(rest) = path.strip_prefix("~/") {
- if let Some(home) = dirs::home_dir() {
- return home.join(rest);
- }
- } else if path == "~" {
- if let Some(home) = dirs::home_dir() {
- return home;
- }
- }
- PathBuf::from(path)
-}
-
-/// Sanitize a name for use in directory/branch names.
-pub fn sanitize_name(name: &str) -> String {
- name.chars()
- .map(|c| {
- if c.is_alphanumeric() || c == '-' || c == '_' {
- c.to_ascii_lowercase()
- } else {
- '-'
- }
- })
- .collect::<String>()
- .chars()
- .take(50) // Limit length
- .collect()
-}
-
-#[cfg(test)]
-mod tests {
- use super::*;
-
- #[test]
- fn test_extract_repo_name() {
- assert_eq!(
- extract_repo_name("https://github.com/user/repo.git"),
- "repo"
- );
- assert_eq!(
- extract_repo_name("https://github.com/user/repo"),
- "repo"
- );
- assert_eq!(
- extract_repo_name("git@github.com:user/repo.git"),
- "repo"
- );
- }
-
- #[test]
- fn test_sanitize_name() {
- assert_eq!(sanitize_name("Hello World!"), "hello-world-");
- assert_eq!(sanitize_name("test_name-123"), "test_name-123");
- assert_eq!(sanitize_name("A".repeat(100).as_str()).len(), 50);
- }
-
- #[test]
- fn test_short_uuid() {
- let id = Uuid::parse_str("550e8400-e29b-41d4-a716-446655440000").unwrap();
- assert_eq!(short_uuid(id), "550e8400");
- }
-}
diff --git a/makima/daemon/src/worktree/mod.rs b/makima/daemon/src/worktree/mod.rs
deleted file mode 100644
index eb9f031..0000000
--- a/makima/daemon/src/worktree/mod.rs
+++ /dev/null
@@ -1,11 +0,0 @@
-//! Git worktree management for task isolation.
-//!
-//! Each task gets a unique git worktree with its own branch,
-//! providing isolation without the overhead of Docker containers.
-
-mod manager;
-
-pub use manager::{
- expand_tilde, is_new_repo_request, sanitize_name, short_uuid, ConflictResolution, MergeState,
- TaskBranchInfo, WorktreeError, WorktreeInfo, WorktreeManager,
-};
diff --git a/makima/daemon/src/ws/client.rs b/makima/daemon/src/ws/client.rs
deleted file mode 100644
index ba1263f..0000000
--- a/makima/daemon/src/ws/client.rs
+++ /dev/null
@@ -1,290 +0,0 @@
-//! WebSocket client for connecting to the makima server.
-
-use std::sync::Arc;
-use std::time::Duration;
-
-use backoff::backoff::Backoff;
-use backoff::ExponentialBackoff;
-use futures::{SinkExt, StreamExt};
-use tokio::sync::{mpsc, RwLock};
-use tokio_tungstenite::{connect_async, tungstenite::{client::IntoClientRequest, Message}};
-use uuid::Uuid;
-
-use super::protocol::{DaemonCommand, DaemonMessage};
-use crate::config::ServerConfig;
-use crate::error::{DaemonError, Result};
-
-/// WebSocket client state.
-#[derive(Debug, Clone, Copy, PartialEq, Eq)]
-pub enum ConnectionState {
- /// Not connected to server.
- Disconnected,
- /// Currently connecting.
- Connecting,
- /// Connected and authenticated.
- Connected,
- /// Connection failed, will retry.
- Reconnecting,
- /// Permanently failed (e.g., auth failure).
- Failed,
-}
-
-/// WebSocket client for daemon-server communication.
-pub struct WsClient {
- config: ServerConfig,
- machine_id: String,
- hostname: String,
- max_concurrent_tasks: i32,
- state: Arc<RwLock<ConnectionState>>,
- daemon_id: Arc<RwLock<Option<Uuid>>>,
- /// Channel to receive messages to send to server.
- outgoing_rx: mpsc::Receiver<DaemonMessage>,
- /// Sender for outgoing messages (clone this to send messages).
- outgoing_tx: mpsc::Sender<DaemonMessage>,
- /// Channel to send received commands to the task manager.
- incoming_tx: mpsc::Sender<DaemonCommand>,
-}
-
-impl WsClient {
- /// Create a new WebSocket client.
- pub fn new(
- config: ServerConfig,
- machine_id: String,
- hostname: String,
- max_concurrent_tasks: i32,
- incoming_tx: mpsc::Sender<DaemonCommand>,
- ) -> Self {
- let (outgoing_tx, outgoing_rx) = mpsc::channel(256);
-
- Self {
- config,
- machine_id,
- hostname,
- max_concurrent_tasks,
- state: Arc::new(RwLock::new(ConnectionState::Disconnected)),
- daemon_id: Arc::new(RwLock::new(None)),
- outgoing_rx,
- outgoing_tx,
- incoming_tx,
- }
- }
-
- /// Get a sender for outgoing messages.
- pub fn sender(&self) -> mpsc::Sender<DaemonMessage> {
- self.outgoing_tx.clone()
- }
-
- /// Get current connection state.
- pub async fn state(&self) -> ConnectionState {
- *self.state.read().await
- }
-
- /// Get daemon ID if authenticated.
- pub async fn daemon_id(&self) -> Option<Uuid> {
- *self.daemon_id.read().await
- }
-
- /// Run the WebSocket client with automatic reconnection.
- pub async fn run(&mut self) -> Result<()> {
- let mut backoff = ExponentialBackoff {
- initial_interval: Duration::from_secs(self.config.reconnect_interval_secs),
- max_interval: Duration::from_secs(60),
- max_elapsed_time: if self.config.max_reconnect_attempts > 0 {
- Some(Duration::from_secs(
- self.config.reconnect_interval_secs * self.config.max_reconnect_attempts as u64 * 10,
- ))
- } else {
- None // Infinite retries
- },
- ..Default::default()
- };
-
- loop {
- *self.state.write().await = ConnectionState::Connecting;
- tracing::info!("Connecting to server: {}", self.config.url);
-
- match self.connect_and_run().await {
- Ok(()) => {
- // Clean shutdown
- tracing::info!("WebSocket connection closed cleanly");
- break;
- }
- Err(DaemonError::AuthFailed(msg)) => {
- tracing::error!("Authentication failed: {}", msg);
- *self.state.write().await = ConnectionState::Failed;
- return Err(DaemonError::AuthFailed(msg));
- }
- Err(e) => {
- tracing::warn!("Connection error: {}", e);
- *self.state.write().await = ConnectionState::Reconnecting;
-
- if let Some(delay) = backoff.next_backoff() {
- tracing::info!("Reconnecting in {:?}...", delay);
- tokio::time::sleep(delay).await;
- } else {
- tracing::error!("Max reconnection attempts reached");
- *self.state.write().await = ConnectionState::Failed;
- return Err(DaemonError::ConnectionLost);
- }
- }
- }
- }
-
- Ok(())
- }
-
- /// Connect to server and run the message loop.
- async fn connect_and_run(&mut self) -> Result<()> {
- // Build WebSocket URL
- let ws_url = format!("{}/api/v1/mesh/daemons/connect", self.config.url);
- tracing::debug!("Connecting to WebSocket: {}", ws_url);
-
- // Build request with API key header
- let mut request = ws_url.into_client_request()?;
- request.headers_mut().insert(
- "x-makima-api-key",
- self.config.api_key.parse().map_err(|_| {
- DaemonError::AuthFailed("Invalid API key format".into())
- })?,
- );
-
- // Connect with API key in headers
- let (ws_stream, _response) = connect_async(request).await?;
- let (mut write, mut read) = ws_stream.split();
-
- // Send daemon info after connection (server authenticated us via header)
- let info_msg = DaemonMessage::authenticate(
- &self.config.api_key,
- &self.machine_id,
- &self.hostname,
- self.max_concurrent_tasks,
- );
- let info_json = serde_json::to_string(&info_msg)?;
- write.send(Message::Text(info_json)).await?;
-
- // Wait for authentication response
- let auth_response = read
- .next()
- .await
- .ok_or(DaemonError::ConnectionLost)??;
-
- let auth_text = match auth_response {
- Message::Text(text) => text,
- Message::Close(_) => return Err(DaemonError::ConnectionLost),
- _ => return Err(DaemonError::AuthFailed("Unexpected response type".into())),
- };
-
- let command: DaemonCommand = serde_json::from_str(&auth_text)?;
- match command {
- DaemonCommand::Authenticated { daemon_id } => {
- tracing::info!("Authenticated with daemon ID: {}", daemon_id);
- *self.daemon_id.write().await = Some(daemon_id);
- *self.state.write().await = ConnectionState::Connected;
-
- // Send daemon directories info to server
- let working_directory = std::env::current_dir()
- .map(|p| p.to_string_lossy().to_string())
- .unwrap_or_else(|_| ".".to_string());
- let home_directory = dirs::home_dir()
- .map(|h| h.join(".makima").join("home"))
- .unwrap_or_else(|| std::path::PathBuf::from("~/.makima/home"));
- // Create home directory if it doesn't exist
- if let Err(e) = std::fs::create_dir_all(&home_directory) {
- tracing::warn!("Failed to create home directory {:?}: {}", home_directory, e);
- }
- let home_directory_str = home_directory.to_string_lossy().to_string();
- let worktrees_directory = dirs::home_dir()
- .map(|h| h.join(".makima").join("worktrees").to_string_lossy().to_string())
- .unwrap_or_else(|| "~/.makima/worktrees".to_string());
-
- let dirs_msg = DaemonMessage::DaemonDirectories {
- working_directory,
- home_directory: home_directory_str,
- worktrees_directory,
- };
- let dirs_json = serde_json::to_string(&dirs_msg)?;
- write.send(Message::Text(dirs_json)).await?;
- tracing::info!("Sent daemon directories info to server");
- }
- DaemonCommand::Error { code, message } => {
- return Err(DaemonError::AuthFailed(format!("{}: {}", code, message)));
- }
- _ => {
- return Err(DaemonError::AuthFailed(
- "Unexpected response to authentication".into(),
- ));
- }
- }
-
- // Start main message loop
- let heartbeat_interval = Duration::from_secs(self.config.heartbeat_interval_secs);
- let mut heartbeat_timer = tokio::time::interval(heartbeat_interval);
-
- loop {
- tokio::select! {
- // Handle incoming server commands
- msg = read.next() => {
- match msg {
- Some(Ok(Message::Text(text))) => {
- tracing::info!("Received WebSocket message: {} bytes", text.len());
- match serde_json::from_str::<DaemonCommand>(&text) {
- Ok(command) => {
- tracing::info!("Parsed command: {:?}", command);
- tracing::info!("Sending command to task manager channel...");
- if self.incoming_tx.send(command).await.is_err() {
- tracing::warn!("Command receiver dropped, shutting down");
- break;
- }
- tracing::info!("Command sent to task manager successfully");
- }
- Err(e) => {
- tracing::warn!("Failed to parse server message: {}", e);
- tracing::debug!("Raw message: {}", text);
- }
- }
- }
- Some(Ok(Message::Ping(data))) => {
- write.send(Message::Pong(data)).await?;
- }
- Some(Ok(Message::Close(_))) | None => {
- tracing::info!("Server closed connection");
- return Err(DaemonError::ConnectionLost);
- }
- Some(Err(e)) => {
- tracing::warn!("WebSocket error: {}", e);
- return Err(e.into());
- }
- _ => {}
- }
- }
-
- // Handle outgoing messages
- msg = self.outgoing_rx.recv() => {
- match msg {
- Some(message) => {
- let json = serde_json::to_string(&message)?;
- tracing::trace!("Sending message: {}", json);
- write.send(Message::Text(json)).await?;
- }
- None => {
- // Sender dropped, shutdown
- tracing::info!("Outgoing channel closed, shutting down");
- break;
- }
- }
- }
-
- // Send heartbeat
- _ = heartbeat_timer.tick() => {
- // Get active task IDs from task manager
- // For now, send empty list - will be connected to task manager
- let heartbeat = DaemonMessage::heartbeat(vec![]);
- let json = serde_json::to_string(&heartbeat)?;
- write.send(Message::Text(json)).await?;
- }
- }
- }
-
- Ok(())
- }
-}
diff --git a/makima/daemon/src/ws/mod.rs b/makima/daemon/src/ws/mod.rs
deleted file mode 100644
index 5a0e9d1..0000000
--- a/makima/daemon/src/ws/mod.rs
+++ /dev/null
@@ -1,7 +0,0 @@
-//! WebSocket client and protocol types for daemon-server communication.
-
-pub mod client;
-pub mod protocol;
-
-pub use client::{ConnectionState, WsClient};
-pub use protocol::{BranchInfo, DaemonCommand, DaemonMessage};
diff --git a/makima/daemon/src/ws/protocol.rs b/makima/daemon/src/ws/protocol.rs
deleted file mode 100644
index 7c2ad6d..0000000
--- a/makima/daemon/src/ws/protocol.rs
+++ /dev/null
@@ -1,511 +0,0 @@
-//! Protocol types for daemon-server communication.
-//!
-//! These types mirror the server's protocol exactly for compatibility.
-
-use serde::{Deserialize, Serialize};
-use uuid::Uuid;
-
-/// Message from daemon to server.
-#[derive(Debug, Clone, Serialize, Deserialize)]
-#[serde(tag = "type", rename_all = "camelCase")]
-pub enum DaemonMessage {
- /// Authentication request (first message required).
- Authenticate {
- #[serde(rename = "apiKey")]
- api_key: String,
- #[serde(rename = "machineId")]
- machine_id: String,
- hostname: String,
- #[serde(rename = "maxConcurrentTasks")]
- max_concurrent_tasks: i32,
- },
-
- /// Periodic heartbeat with current status.
- Heartbeat {
- #[serde(rename = "activeTasks")]
- active_tasks: Vec<Uuid>,
- },
-
- /// Task output streaming (stdout/stderr from Claude Code).
- TaskOutput {
- #[serde(rename = "taskId")]
- task_id: Uuid,
- output: String,
- #[serde(rename = "isPartial")]
- is_partial: bool,
- },
-
- /// Task status change notification.
- TaskStatusChange {
- #[serde(rename = "taskId")]
- task_id: Uuid,
- #[serde(rename = "oldStatus")]
- old_status: String,
- #[serde(rename = "newStatus")]
- new_status: String,
- },
-
- /// Task progress update with summary.
- TaskProgress {
- #[serde(rename = "taskId")]
- task_id: Uuid,
- summary: String,
- },
-
- /// Task completion notification.
- TaskComplete {
- #[serde(rename = "taskId")]
- task_id: Uuid,
- success: bool,
- error: Option<String>,
- },
-
- /// Register a tool key for orchestrator API access.
- RegisterToolKey {
- #[serde(rename = "taskId")]
- task_id: Uuid,
- /// The API key for this orchestrator to use when calling mesh endpoints.
- key: String,
- },
-
- /// Revoke a tool key when task completes.
- RevokeToolKey {
- #[serde(rename = "taskId")]
- task_id: Uuid,
- },
-
- // =========================================================================
- // Merge Response Messages (sent by daemon after processing merge commands)
- // =========================================================================
-
- /// Response to ListBranches command.
- BranchList {
- #[serde(rename = "taskId")]
- task_id: Uuid,
- branches: Vec<BranchInfo>,
- },
-
- /// Response to MergeStatus command.
- MergeStatusResponse {
- #[serde(rename = "taskId")]
- task_id: Uuid,
- #[serde(rename = "inProgress")]
- in_progress: bool,
- #[serde(rename = "sourceBranch")]
- source_branch: Option<String>,
- #[serde(rename = "conflictedFiles")]
- conflicted_files: Vec<String>,
- },
-
- /// Response to merge operations (MergeStart, MergeResolve, MergeCommit, MergeAbort).
- MergeResult {
- #[serde(rename = "taskId")]
- task_id: Uuid,
- success: bool,
- message: String,
- #[serde(rename = "commitSha")]
- commit_sha: Option<String>,
- /// Present only when conflicts occurred.
- conflicts: Option<Vec<String>>,
- },
-
- /// Response to CheckMergeComplete command.
- MergeCompleteCheck {
- #[serde(rename = "taskId")]
- task_id: Uuid,
- #[serde(rename = "canComplete")]
- can_complete: bool,
- #[serde(rename = "unmergedBranches")]
- unmerged_branches: Vec<String>,
- #[serde(rename = "mergedCount")]
- merged_count: u32,
- #[serde(rename = "skippedCount")]
- skipped_count: u32,
- },
-
- // =========================================================================
- // Completion Action Response Messages
- // =========================================================================
-
- /// Response to RetryCompletionAction command.
- CompletionActionResult {
- #[serde(rename = "taskId")]
- task_id: Uuid,
- success: bool,
- message: String,
- /// PR URL if action was "pr" and successful.
- #[serde(rename = "prUrl")]
- pr_url: Option<String>,
- },
-
- /// Report daemon's available directories for task output.
- DaemonDirectories {
- /// Current working directory of the daemon.
- #[serde(rename = "workingDirectory")]
- working_directory: String,
- /// Path to ~/.makima/home directory (for cloning completed work).
- #[serde(rename = "homeDirectory")]
- home_directory: String,
- /// Path to worktrees directory (~/.makima/worktrees).
- #[serde(rename = "worktreesDirectory")]
- worktrees_directory: String,
- },
-
- /// Response to CloneWorktree command.
- CloneWorktreeResult {
- #[serde(rename = "taskId")]
- task_id: Uuid,
- success: bool,
- message: String,
- /// The path where the worktree was cloned.
- #[serde(rename = "targetDir")]
- target_dir: Option<String>,
- },
-
- /// Response to CheckTargetExists command.
- CheckTargetExistsResult {
- #[serde(rename = "taskId")]
- task_id: Uuid,
- /// Whether the target directory exists.
- exists: bool,
- /// The path that was checked.
- #[serde(rename = "targetDir")]
- target_dir: String,
- },
-}
-
-/// Information about a branch (used in BranchList message).
-#[derive(Debug, Clone, Serialize, Deserialize)]
-#[serde(rename_all = "camelCase")]
-pub struct BranchInfo {
- /// Full branch name.
- pub name: String,
- /// Task ID extracted from branch name (if parseable).
- #[serde(rename = "taskId")]
- pub task_id: Option<Uuid>,
- /// Whether this branch has been merged.
- #[serde(rename = "isMerged")]
- pub is_merged: bool,
- /// Short SHA of the last commit.
- #[serde(rename = "lastCommit")]
- pub last_commit: String,
- /// Subject line of the last commit.
- #[serde(rename = "lastCommitMessage")]
- pub last_commit_message: String,
-}
-
-/// Command from server to daemon.
-#[derive(Debug, Clone, Serialize, Deserialize)]
-#[serde(tag = "type", rename_all = "camelCase")]
-pub enum DaemonCommand {
- /// Confirm successful authentication.
- Authenticated {
- #[serde(rename = "daemonId")]
- daemon_id: Uuid,
- },
-
- /// Spawn a new task in a container.
- SpawnTask {
- #[serde(rename = "taskId")]
- task_id: Uuid,
- /// Human-readable task name (used for commit messages).
- #[serde(rename = "taskName")]
- task_name: String,
- plan: String,
- #[serde(rename = "repoUrl")]
- repo_url: Option<String>,
- #[serde(rename = "baseBranch")]
- base_branch: Option<String>,
- /// Target branch to merge into (used for completion actions).
- #[serde(rename = "targetBranch")]
- target_branch: Option<String>,
- /// Parent task ID if this is a subtask.
- #[serde(rename = "parentTaskId")]
- parent_task_id: Option<Uuid>,
- /// Depth in task hierarchy (0=top-level, 1=subtask, 2=sub-subtask).
- depth: i32,
- /// Whether this task should run as an orchestrator (true if depth==0 and has subtasks).
- #[serde(rename = "isOrchestrator")]
- is_orchestrator: bool,
- /// Path to user's local repository (outside ~/.makima) for completion actions.
- #[serde(rename = "targetRepoPath")]
- target_repo_path: Option<String>,
- /// Action on completion: "none", "branch", "merge", "pr".
- #[serde(rename = "completionAction")]
- completion_action: Option<String>,
- /// Task ID to continue from (copy worktree from this task).
- #[serde(rename = "continueFromTaskId")]
- continue_from_task_id: Option<Uuid>,
- /// Files to copy from parent task's worktree.
- #[serde(rename = "copyFiles")]
- copy_files: Option<Vec<String>>,
- },
-
- /// Pause a running task.
- PauseTask {
- #[serde(rename = "taskId")]
- task_id: Uuid,
- },
-
- /// Resume a paused task.
- ResumeTask {
- #[serde(rename = "taskId")]
- task_id: Uuid,
- },
-
- /// Interrupt a task (gracefully or forced).
- InterruptTask {
- #[serde(rename = "taskId")]
- task_id: Uuid,
- graceful: bool,
- },
-
- /// Send a message to a running task.
- SendMessage {
- #[serde(rename = "taskId")]
- task_id: Uuid,
- message: String,
- },
-
- /// Inject context about sibling task progress.
- InjectSiblingContext {
- #[serde(rename = "taskId")]
- task_id: Uuid,
- #[serde(rename = "siblingTaskId")]
- sibling_task_id: Uuid,
- #[serde(rename = "siblingName")]
- sibling_name: String,
- #[serde(rename = "siblingStatus")]
- sibling_status: String,
- #[serde(rename = "progressSummary")]
- progress_summary: Option<String>,
- #[serde(rename = "changedFiles")]
- changed_files: Vec<String>,
- },
-
- // =========================================================================
- // Merge Commands (for orchestrators to merge subtask branches)
- // =========================================================================
-
- /// List all subtask branches for a task.
- ListBranches {
- #[serde(rename = "taskId")]
- task_id: Uuid,
- },
-
- /// Start merging a subtask branch.
- MergeStart {
- #[serde(rename = "taskId")]
- task_id: Uuid,
- #[serde(rename = "sourceBranch")]
- source_branch: String,
- },
-
- /// Get current merge status.
- MergeStatus {
- #[serde(rename = "taskId")]
- task_id: Uuid,
- },
-
- /// Resolve a merge conflict.
- MergeResolve {
- #[serde(rename = "taskId")]
- task_id: Uuid,
- file: String,
- /// "ours" or "theirs"
- strategy: String,
- },
-
- /// Commit the current merge.
- MergeCommit {
- #[serde(rename = "taskId")]
- task_id: Uuid,
- message: String,
- },
-
- /// Abort the current merge.
- MergeAbort {
- #[serde(rename = "taskId")]
- task_id: Uuid,
- },
-
- /// Skip merging a subtask branch (mark as intentionally not merged).
- MergeSkip {
- #[serde(rename = "taskId")]
- task_id: Uuid,
- #[serde(rename = "subtaskId")]
- subtask_id: Uuid,
- reason: String,
- },
-
- /// Check if all subtask branches have been merged or skipped (completion gate).
- CheckMergeComplete {
- #[serde(rename = "taskId")]
- task_id: Uuid,
- },
-
- // =========================================================================
- // Completion Action Commands
- // =========================================================================
-
- /// Retry a completion action for a completed task.
- RetryCompletionAction {
- #[serde(rename = "taskId")]
- task_id: Uuid,
- /// Human-readable task name (used for commit messages).
- #[serde(rename = "taskName")]
- task_name: String,
- /// The action to execute: "branch", "merge", or "pr".
- action: String,
- /// Path to the target repository.
- #[serde(rename = "targetRepoPath")]
- target_repo_path: String,
- /// Target branch to merge into (for merge/pr actions).
- #[serde(rename = "targetBranch")]
- target_branch: Option<String>,
- },
-
- /// Clone worktree to a target directory.
- CloneWorktree {
- #[serde(rename = "taskId")]
- task_id: Uuid,
- /// Path to the target directory.
- #[serde(rename = "targetDir")]
- target_dir: String,
- },
-
- /// Check if a target directory exists.
- CheckTargetExists {
- #[serde(rename = "taskId")]
- task_id: Uuid,
- /// Path to check.
- #[serde(rename = "targetDir")]
- target_dir: String,
- },
-
- /// Error response.
- Error {
- code: String,
- message: String,
- },
-}
-
-impl DaemonMessage {
- /// Create an authentication message.
- pub fn authenticate(
- api_key: &str,
- machine_id: &str,
- hostname: &str,
- max_concurrent_tasks: i32,
- ) -> Self {
- Self::Authenticate {
- api_key: api_key.to_string(),
- machine_id: machine_id.to_string(),
- hostname: hostname.to_string(),
- max_concurrent_tasks,
- }
- }
-
- /// Create a heartbeat message.
- pub fn heartbeat(active_tasks: Vec<Uuid>) -> Self {
- Self::Heartbeat { active_tasks }
- }
-
- /// Create a task output message.
- pub fn task_output(task_id: Uuid, output: String, is_partial: bool) -> Self {
- Self::TaskOutput {
- task_id,
- output,
- is_partial,
- }
- }
-
- /// Create a task status change message.
- pub fn task_status_change(task_id: Uuid, old_status: &str, new_status: &str) -> Self {
- Self::TaskStatusChange {
- task_id,
- old_status: old_status.to_string(),
- new_status: new_status.to_string(),
- }
- }
-
- /// Create a task progress message.
- pub fn task_progress(task_id: Uuid, summary: String) -> Self {
- Self::TaskProgress { task_id, summary }
- }
-
- /// Create a task complete message.
- pub fn task_complete(task_id: Uuid, success: bool, error: Option<String>) -> Self {
- Self::TaskComplete {
- task_id,
- success,
- error,
- }
- }
-
- /// Create a register tool key message.
- pub fn register_tool_key(task_id: Uuid, key: String) -> Self {
- Self::RegisterToolKey { task_id, key }
- }
-
- /// Create a revoke tool key message.
- pub fn revoke_tool_key(task_id: Uuid) -> Self {
- Self::RevokeToolKey { task_id }
- }
-}
-
-#[cfg(test)]
-mod tests {
- use super::*;
-
- #[test]
- fn test_daemon_message_serialization() {
- let msg = DaemonMessage::authenticate("key123", "machine-abc", "worker-1", 4);
- let json = serde_json::to_string(&msg).unwrap();
- assert!(json.contains("\"type\":\"authenticate\""));
- assert!(json.contains("\"apiKey\":\"key123\""));
- assert!(json.contains("\"machineId\":\"machine-abc\""));
- }
-
- #[test]
- fn test_daemon_command_deserialization() {
- let json = r#"{"type":"spawnTask","taskId":"550e8400-e29b-41d4-a716-446655440000","plan":"Build the feature","repoUrl":"https://github.com/test/repo","baseBranch":"main","parentTaskId":null,"depth":0,"isOrchestrator":false}"#;
- let cmd: DaemonCommand = serde_json::from_str(json).unwrap();
- match cmd {
- DaemonCommand::SpawnTask {
- plan,
- repo_url,
- base_branch,
- parent_task_id,
- depth,
- is_orchestrator,
- ..
- } => {
- assert_eq!(plan, "Build the feature");
- assert_eq!(repo_url, Some("https://github.com/test/repo".to_string()));
- assert_eq!(base_branch, Some("main".to_string()));
- assert_eq!(parent_task_id, None);
- assert_eq!(depth, 0);
- assert!(!is_orchestrator);
- }
- _ => panic!("Expected SpawnTask"),
- }
- }
-
- #[test]
- fn test_orchestrator_spawn_deserialization() {
- let json = r#"{"type":"spawnTask","taskId":"550e8400-e29b-41d4-a716-446655440000","plan":"Coordinate subtasks","repoUrl":"https://github.com/test/repo","baseBranch":"main","parentTaskId":null,"depth":0,"isOrchestrator":true}"#;
- let cmd: DaemonCommand = serde_json::from_str(json).unwrap();
- match cmd {
- DaemonCommand::SpawnTask {
- is_orchestrator,
- depth,
- ..
- } => {
- assert!(is_orchestrator);
- assert_eq!(depth, 0);
- }
- _ => panic!("Expected SpawnTask"),
- }
- }
-}