//! Claude Code process management. use std::collections::HashMap; use std::path::{Path, PathBuf}; use std::process::Stdio; use std::sync::Arc; use futures::Stream; use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader}; use tokio::process::{Child, ChildStdin, Command}; use tokio::sync::{mpsc, Mutex}; use super::claude_protocol::ClaudeInputMessage; use crate::daemon::config::BubblewrapConfig; /// Errors that can occur during Claude process management. #[derive(Debug, thiserror::Error)] pub enum ClaudeProcessError { #[error("Failed to spawn Claude process: {0}")] SpawnFailed(#[from] std::io::Error), #[error("Claude command not found: {0}")] CommandNotFound(String), #[error("Process already exited")] AlreadyExited, #[error("Failed to read output: {0}")] OutputRead(String), #[error("Bubblewrap (bwrap) not found. Install bubblewrap or disable the --bubblewrap flag.")] BubblewrapNotFound, } /// A line of output from Claude Code. #[derive(Debug, Clone)] pub struct OutputLine { /// The raw content of the line. pub content: String, /// Whether this is from stdout (true) or stderr (false). pub is_stdout: bool, /// Parsed JSON type if available (e.g., "system", "assistant", "result"). pub json_type: Option, } impl OutputLine { /// Create a new stdout output line. pub fn stdout(content: String) -> Self { let json_type = extract_json_type(&content); Self { content, is_stdout: true, json_type, } } /// Create a new stderr output line. pub fn stderr(content: String) -> Self { Self { content, is_stdout: false, json_type: None, } } } /// Extract the "type" field from a JSON line if present. fn extract_json_type(line: &str) -> Option { // Quick check for JSON if !line.starts_with('{') { return None; } // Try to parse and extract type if let Ok(json) = serde_json::from_str::(line) { json.get("type") .and_then(|v| v.as_str()) .map(|s| s.to_string()) } else { None } } /// Handle to a running Claude Code process. pub struct ClaudeProcess { /// The child process. child: Child, /// Receiver for output lines. output_rx: mpsc::Receiver, /// Stdin handle for sending input to the process (thread-safe). stdin: Arc>>, } impl ClaudeProcess { /// Wait for the process to exit and return the exit code. pub async fn wait(&mut self) -> Result { let status = self.child.wait().await?; Ok(status.code().unwrap_or(-1) as i64) } /// Check if the process has exited. pub fn try_wait(&mut self) -> Result, ClaudeProcessError> { match self.child.try_wait()? { Some(status) => Ok(Some(status.code().unwrap_or(-1) as i64)), None => Ok(None), } } /// Kill the process (SIGKILL). pub async fn kill(&mut self) -> Result<(), ClaudeProcessError> { self.child.kill().await?; Ok(()) } /// Get the process ID, if the process is still running. pub fn id(&self) -> Option { self.child.id() } /// Send SIGTERM to gracefully terminate the process and all its children (Unix only). /// Returns Ok(true) if signal was sent, Ok(false) if process already exited. /// Sends signal to the entire process group (negative PID) to kill all children. #[cfg(unix)] pub fn terminate(&self) -> Result { use nix::sys::signal::{killpg, Signal}; use nix::unistd::Pid; if let Some(pid) = self.child.id() { // Kill the entire process group (the process is its own group leader) match killpg(Pid::from_raw(pid as i32), Signal::SIGTERM) { Ok(()) => Ok(true), Err(nix::errno::Errno::ESRCH) => Ok(false), // Process group doesn't exist Err(e) => Err(ClaudeProcessError::OutputRead(format!( "Failed to send SIGTERM to process group: {}", e ))), } } else { Ok(false) // Process already exited } } /// Send SIGTERM to gracefully terminate the process (no-op on non-Unix). #[cfg(not(unix))] pub fn terminate(&self) -> Result { // On non-Unix platforms, fall back to kill Ok(false) } /// Get the next output line, if available. pub async fn next_output(&mut self) -> Option { self.output_rx.recv().await } /// Send a raw message to the process via stdin. /// /// This can be used to provide input when Claude Code is waiting for user input. pub async fn send_input(&self, message: &str) -> Result<(), ClaudeProcessError> { let mut stdin_guard = self.stdin.lock().await; if let Some(ref mut stdin) = *stdin_guard { stdin .write_all(message.as_bytes()) .await .map_err(|e| ClaudeProcessError::OutputRead(format!("Failed to write to stdin: {}", e)))?; stdin .write_all(b"\n") .await .map_err(|e| ClaudeProcessError::OutputRead(format!("Failed to write newline: {}", e)))?; stdin .flush() .await .map_err(|e| ClaudeProcessError::OutputRead(format!("Failed to flush stdin: {}", e)))?; Ok(()) } else { Err(ClaudeProcessError::OutputRead("Stdin not available".to_string())) } } /// Send a user message to the process via stdin using JSON protocol. /// /// This is the preferred method when using `--input-format=stream-json`. /// The message is serialized as JSON and sent as a single line. pub async fn send_user_message(&self, content: &str) -> Result<(), ClaudeProcessError> { let message = ClaudeInputMessage::user(content); let json_line = message.to_json_line().map_err(|e| { ClaudeProcessError::OutputRead(format!("Failed to serialize message: {}", e)) })?; tracing::debug!(content_len = content.len(), "Sending user message to Claude process"); let mut stdin_guard = self.stdin.lock().await; if let Some(ref mut stdin) = *stdin_guard { stdin .write_all(json_line.as_bytes()) .await .map_err(|e| ClaudeProcessError::OutputRead(format!("Failed to write to stdin: {}", e)))?; stdin .flush() .await .map_err(|e| ClaudeProcessError::OutputRead(format!("Failed to flush stdin: {}", e)))?; Ok(()) } else { Err(ClaudeProcessError::OutputRead("Stdin not available".to_string())) } } /// Get a clone of the stdin handle for external use. pub fn stdin_handle(&self) -> Arc>> { Arc::clone(&self.stdin) } /// Close stdin, signaling EOF to the process. pub async fn close_stdin(&self) -> Result<(), ClaudeProcessError> { let mut stdin_guard = self.stdin.lock().await; if let Some(mut stdin) = stdin_guard.take() { let _ = stdin.shutdown().await; } Ok(()) } /// Convert to a stream of output lines. pub fn into_stream(self) -> impl Stream { futures::stream::unfold(self.output_rx, |mut rx| async move { rx.recv().await.map(|line| (line, rx)) }) } } /// Manages Claude Code process spawning. pub struct ProcessManager { /// Path to the claude command. claude_command: String, /// Additional arguments to pass to Claude Code (after defaults). claude_args: Vec, /// Arguments to pass before defaults. claude_pre_args: Vec, /// Whether to enable Claude's permission system (skip --dangerously-skip-permissions). enable_permissions: bool, /// Whether to disable verbose output. disable_verbose: bool, /// Default environment variables to pass. default_env: HashMap, /// Bubblewrap sandbox configuration. bubblewrap: Option, } impl Default for ProcessManager { fn default() -> Self { Self::new() } } impl ProcessManager { /// Create a new ProcessManager with default settings. pub fn new() -> Self { Self { claude_command: "claude".to_string(), claude_args: Vec::new(), claude_pre_args: Vec::new(), enable_permissions: false, disable_verbose: false, default_env: HashMap::new(), bubblewrap: None, } } /// Create a ProcessManager with a custom claude command path. pub fn with_command(command: String) -> Self { Self { claude_command: command, claude_args: Vec::new(), claude_pre_args: Vec::new(), enable_permissions: false, disable_verbose: false, default_env: HashMap::new(), bubblewrap: None, } } /// Set additional arguments to pass after default arguments. pub fn with_args(mut self, args: Vec) -> Self { self.claude_args = args; self } /// Set arguments to pass before default arguments. pub fn with_pre_args(mut self, args: Vec) -> Self { self.claude_pre_args = args; self } /// Enable Claude's permission system (don't pass --dangerously-skip-permissions). pub fn with_permissions_enabled(mut self, enabled: bool) -> Self { self.enable_permissions = enabled; self } /// Disable verbose output. pub fn with_verbose_disabled(mut self, disabled: bool) -> Self { self.disable_verbose = disabled; self } /// Add default environment variables. pub fn with_env(mut self, env: HashMap) -> Self { self.default_env = env; self } /// Configure bubblewrap sandboxing. /// /// When enabled, Claude processes will be spawned inside a bubblewrap sandbox /// with filesystem isolation. pub fn with_bubblewrap(mut self, config: Option) -> Self { self.bubblewrap = config; self } /// Get the claude command path. pub fn claude_command(&self) -> &str { &self.claude_command } /// Check if bubblewrap (bwrap) is available on the system. /// /// Returns the bwrap version string if available. pub async fn check_bwrap_available(&self) -> Result { let bwrap_cmd = self .bubblewrap .as_ref() .map(|c| c.bwrap_command.as_str()) .unwrap_or("bwrap"); let output = Command::new(bwrap_cmd) .arg("--version") .output() .await .map_err(|e| { if e.kind() == std::io::ErrorKind::NotFound { ClaudeProcessError::BubblewrapNotFound } else { ClaudeProcessError::SpawnFailed(e) } })?; if output.status.success() { Ok(String::from_utf8_lossy(&output.stdout).trim().to_string()) } else { Err(ClaudeProcessError::BubblewrapNotFound) } } /// Build bwrap command arguments for sandboxing. /// /// Returns a tuple of (bwrap_command, bwrap_args) where bwrap_args includes /// all the sandbox flags followed by "--" and the actual command to run. fn build_bwrap_args( &self, working_dir: &Path, claude_command: &str, claude_args: &[String], config: &BubblewrapConfig, ) -> (String, Vec) { let mut args = Vec::new(); // Unshare all namespaces except user (for unprivileged use) args.push("--unshare-all".to_string()); // Share network if enabled (needed for API calls) if config.network { args.push("--share-net".to_string()); } // Safety flags args.push("--die-with-parent".to_string()); args.push("--new-session".to_string()); // Bind root filesystem read-only args.push("--ro-bind".to_string()); args.push("/".to_string()); args.push("/".to_string()); // Mount fresh /dev args.push("--dev".to_string()); args.push("/dev".to_string()); // Mount fresh /proc args.push("--proc".to_string()); args.push("/proc".to_string()); // Fresh /tmp args.push("--tmpfs".to_string()); args.push("/tmp".to_string()); // Bind working directory (worktree) read-write let working_dir_str = working_dir.to_string_lossy().to_string(); args.push("--bind".to_string()); args.push(working_dir_str.clone()); args.push(working_dir_str); // Bind ~/.claude read-write if it exists (for Claude config) if let Ok(home) = std::env::var("HOME") { let claude_config_dir = PathBuf::from(&home).join(".claude"); if claude_config_dir.exists() { let claude_config_str = claude_config_dir.to_string_lossy().to_string(); args.push("--bind".to_string()); args.push(claude_config_str.clone()); args.push(claude_config_str); } // Also bind ~/.config/claude if it exists (alternative config location) let claude_config_alt = PathBuf::from(&home).join(".config").join("claude"); if claude_config_alt.exists() { let config_str = claude_config_alt.to_string_lossy().to_string(); args.push("--bind".to_string()); args.push(config_str.clone()); args.push(config_str); } } // Additional read-only binds from config for path in &config.ro_bind { if path.exists() { let path_str = path.to_string_lossy().to_string(); args.push("--ro-bind".to_string()); args.push(path_str.clone()); args.push(path_str); } } // Additional read-write binds from config for path in &config.rw_bind { if path.exists() { let path_str = path.to_string_lossy().to_string(); args.push("--bind".to_string()); args.push(path_str.clone()); args.push(path_str); } } // Separator before the actual command args.push("--".to_string()); // Add the claude command and its arguments args.push(claude_command.to_string()); args.extend(claude_args.iter().cloned()); (config.bwrap_command.clone(), args) } /// Spawn a Claude Code process to execute a plan. /// /// The process runs in the specified working directory with stream-json output format. /// If `system_prompt` is provided, it will be passed via --system-prompt flag. pub async fn spawn( &self, working_dir: &Path, plan: &str, extra_env: Option>, ) -> Result { self.spawn_with_system_prompt(working_dir, plan, extra_env, None).await } /// Spawn a Claude Code process with an optional system prompt. /// /// The process runs in the specified working directory with stream-json output format. /// If `system_prompt` is provided, it will be passed via --system-prompt flag as /// behavioral constraints that Claude will treat as system-level instructions. pub async fn spawn_with_system_prompt( &self, working_dir: &Path, plan: &str, extra_env: Option>, system_prompt: Option<&str>, ) -> Result { // Check if bubblewrap is enabled and available let use_bubblewrap = if let Some(ref bwrap_config) = self.bubblewrap { if bwrap_config.enabled { // Verify bwrap is available before proceeding self.check_bwrap_available().await?; true } else { false } } else { false }; tracing::info!( working_dir = %working_dir.display(), plan_len = plan.len(), plan_preview = %if plan.len() > 200 { &plan[..200] } else { plan }, has_system_prompt = system_prompt.is_some(), bubblewrap_enabled = use_bubblewrap, "Spawning Claude Code process" ); // Verify working directory exists if !working_dir.exists() { tracing::error!(working_dir = %working_dir.display(), "Working directory does not exist!"); return Err(ClaudeProcessError::SpawnFailed(std::io::Error::new( std::io::ErrorKind::NotFound, format!("Working directory does not exist: {}", working_dir.display()), ))); } // Build environment let mut env = self.default_env.clone(); if let Some(extra) = extra_env { env.extend(extra); } // Load OAuth token from disk and set as env var if not already provided. // This allows processes to authenticate using tokens saved by the reauth flow. // The token is loaded fresh each time (not cached) so newly saved tokens are picked up. if !env.contains_key("CLAUDE_CODE_OAUTH_TOKEN") { if let Some(token) = crate::daemon::task::manager::load_oauth_token() { tracing::debug!("Setting CLAUDE_CODE_OAUTH_TOKEN from saved token file"); env.insert("CLAUDE_CODE_OAUTH_TOKEN".to_string(), token); } } // Build Claude arguments list let mut claude_args = Vec::new(); // Pre-args (before defaults) claude_args.extend(self.claude_pre_args.clone()); // Required arguments for stream-json protocol claude_args.push("--output-format=stream-json".to_string()); claude_args.push("--input-format=stream-json".to_string()); // Optional default arguments if !self.disable_verbose { claude_args.push("--verbose".to_string()); } if !self.enable_permissions { claude_args.push("--dangerously-skip-permissions".to_string()); } // System prompt - passed via --system-prompt flag for system-level constraints if let Some(prompt) = system_prompt { claude_args.push("--system-prompt".to_string()); claude_args.push(prompt.to_string()); } // Additional user-configured arguments claude_args.extend(self.claude_args.clone()); // Determine the actual command and arguments to spawn let (command, args) = if use_bubblewrap { let bwrap_config = self.bubblewrap.as_ref().unwrap(); let (bwrap_cmd, bwrap_args) = self.build_bwrap_args(working_dir, &self.claude_command, &claude_args, bwrap_config); tracing::info!( bwrap_command = %bwrap_cmd, bwrap_args_count = bwrap_args.len(), "Running Claude in bubblewrap sandbox" ); tracing::debug!(bwrap_args = ?bwrap_args, "Bubblewrap arguments"); (bwrap_cmd, bwrap_args) } else { tracing::debug!(args = ?claude_args, "Claude command arguments"); (self.claude_command.clone(), claude_args) }; // Spawn the process in its own process group so we can kill all children let mut cmd = Command::new(&command); cmd.args(&args) .current_dir(working_dir) .envs(env) .stdin(Stdio::piped()) .stdout(Stdio::piped()) .stderr(Stdio::piped()) .kill_on_drop(true); // On Unix, create a new process group so we can kill all child processes #[cfg(unix)] { #[allow(unused_imports)] use std::os::unix::process::CommandExt; cmd.process_group(0); } let mut child = cmd.spawn().map_err(|e| { if e.kind() == std::io::ErrorKind::NotFound { if use_bubblewrap { ClaudeProcessError::BubblewrapNotFound } else { ClaudeProcessError::CommandNotFound(self.claude_command.clone()) } } else { ClaudeProcessError::SpawnFailed(e) } })?; // Create output channel let (tx, rx) = mpsc::channel(1000); // Take stdout, stderr, and stdin // With --input-format=stream-json, we keep stdin open for sending messages let stdin = child.stdin.take(); let stdin = Arc::new(Mutex::new(stdin)); let stdout = child.stdout.take().expect("stdout should be piped"); let stderr = child.stderr.take().expect("stderr should be piped"); // Spawn task to read stdout let tx_stdout = tx.clone(); tokio::spawn(async move { use tokio::io::AsyncReadExt; let mut reader = BufReader::new(stdout); let mut buffer = vec![0u8; 4096]; let mut line_buffer = String::new(); loop { // Try to read with a timeout to detect if we're stuck match tokio::time::timeout( tokio::time::Duration::from_secs(5), reader.read(&mut buffer) ).await { Ok(Ok(0)) => { // EOF tracing::debug!("Claude stdout EOF"); // Send any remaining content if !line_buffer.is_empty() { let _ = tx_stdout.send(OutputLine::stdout(line_buffer)).await; } break; } Ok(Ok(n)) => { let chunk = String::from_utf8_lossy(&buffer[..n]); tracing::debug!(bytes = n, chunk_preview = %if chunk.len() > 100 { &chunk[..100] } else { &chunk }, "Got stdout chunk from Claude"); // Accumulate into line buffer and emit complete lines line_buffer.push_str(&chunk); while let Some(newline_pos) = line_buffer.find('\n') { let line = line_buffer[..newline_pos].to_string(); line_buffer = line_buffer[newline_pos + 1..].to_string(); if tx_stdout.send(OutputLine::stdout(line)).await.is_err() { return; } } } Ok(Err(e)) => { tracing::error!(error = %e, "Error reading Claude stdout"); break; } Err(_) => { // Timeout - no data for 5 seconds tracing::warn!("No stdout data from Claude for 5 seconds"); } } } tracing::debug!("Claude stdout reader task ended"); }); // Spawn task to read stderr let tx_stderr = tx; tokio::spawn(async move { let reader = BufReader::new(stderr); let mut lines = reader.lines(); while let Ok(Some(line)) = lines.next_line().await { tracing::debug!(line = %line, "Claude stderr"); if tx_stderr.send(OutputLine::stderr(line)).await.is_err() { break; } } tracing::debug!("Claude stderr reader task ended"); }); tracing::info!("Claude Code process spawned successfully"); let process = ClaudeProcess { child, output_rx: rx, stdin, }; // Send the initial plan as the first user message tracing::info!(plan_len = plan.len(), "Sending initial plan to Claude via stdin"); process.send_user_message(plan).await?; Ok(process) } /// Spawn a Claude Code process in continuation mode. /// /// This is used for the autonomous loop feature where we need to continue /// a previous conversation. The --continue flag tells Claude to resume /// from the previous session state. pub async fn spawn_continue( &self, working_dir: &Path, continuation_prompt: &str, extra_env: Option>, system_prompt: Option<&str>, ) -> Result { tracing::info!( working_dir = %working_dir.display(), prompt_len = continuation_prompt.len(), has_system_prompt = system_prompt.is_some(), "Spawning Claude Code process in continuation mode" ); // Verify working directory exists if !working_dir.exists() { tracing::error!(working_dir = %working_dir.display(), "Working directory does not exist!"); return Err(ClaudeProcessError::SpawnFailed(std::io::Error::new( std::io::ErrorKind::NotFound, format!("Working directory does not exist: {}", working_dir.display()), ))); } // Build environment let mut env = self.default_env.clone(); if let Some(extra) = extra_env { env.extend(extra); } // Build arguments list let mut args = Vec::new(); // Pre-args (before defaults) args.extend(self.claude_pre_args.clone()); // Required arguments for stream-json protocol args.push("--output-format=stream-json".to_string()); args.push("--input-format=stream-json".to_string()); // The key flag for continuation mode args.push("--continue".to_string()); // Optional default arguments if !self.disable_verbose { args.push("--verbose".to_string()); } if !self.enable_permissions { args.push("--dangerously-skip-permissions".to_string()); } // System prompt - passed via --system-prompt flag for system-level constraints if let Some(prompt) = system_prompt { args.push("--system-prompt".to_string()); args.push(prompt.to_string()); } // Additional user-configured arguments args.extend(self.claude_args.clone()); tracing::debug!(args = ?args, "Claude continue command arguments"); // Spawn the process in its own process group so we can kill all children let mut cmd = Command::new(&self.claude_command); cmd.args(&args) .current_dir(working_dir) .envs(env) .stdin(Stdio::piped()) .stdout(Stdio::piped()) .stderr(Stdio::piped()) .kill_on_drop(true); // On Unix, create a new process group so we can kill all child processes #[cfg(unix)] { #[allow(unused_imports)] use std::os::unix::process::CommandExt; cmd.process_group(0); } let mut child = cmd.spawn().map_err(|e| { if e.kind() == std::io::ErrorKind::NotFound { ClaudeProcessError::CommandNotFound(self.claude_command.clone()) } else { ClaudeProcessError::SpawnFailed(e) } })?; // Create output channel let (tx, rx) = mpsc::channel(1000); // Take stdout, stderr, and stdin let stdin = child.stdin.take(); let stdin = Arc::new(Mutex::new(stdin)); let stdout = child.stdout.take().expect("stdout should be piped"); let stderr = child.stderr.take().expect("stderr should be piped"); // Spawn task to read stdout let tx_stdout = tx.clone(); tokio::spawn(async move { use tokio::io::AsyncReadExt; let mut reader = BufReader::new(stdout); let mut buffer = vec![0u8; 4096]; let mut line_buffer = String::new(); loop { match tokio::time::timeout( tokio::time::Duration::from_secs(5), reader.read(&mut buffer) ).await { Ok(Ok(0)) => { tracing::debug!("Claude stdout EOF (continue mode)"); if !line_buffer.is_empty() { let _ = tx_stdout.send(OutputLine::stdout(line_buffer)).await; } break; } Ok(Ok(n)) => { let chunk = String::from_utf8_lossy(&buffer[..n]); line_buffer.push_str(&chunk); while let Some(newline_pos) = line_buffer.find('\n') { let line = line_buffer[..newline_pos].to_string(); line_buffer = line_buffer[newline_pos + 1..].to_string(); if tx_stdout.send(OutputLine::stdout(line)).await.is_err() { return; } } } Ok(Err(e)) => { tracing::error!(error = %e, "Error reading Claude stdout (continue mode)"); break; } Err(_) => { tracing::warn!("No stdout data from Claude for 5 seconds (continue mode)"); } } } tracing::debug!("Claude stdout reader task ended (continue mode)"); }); // Spawn task to read stderr let tx_stderr = tx; tokio::spawn(async move { let reader = BufReader::new(stderr); let mut lines = reader.lines(); while let Ok(Some(line)) = lines.next_line().await { tracing::debug!(line = %line, "Claude stderr (continue mode)"); if tx_stderr.send(OutputLine::stderr(line)).await.is_err() { break; } } tracing::debug!("Claude stderr reader task ended (continue mode)"); }); tracing::info!("Claude Code process spawned successfully in continue mode"); let process = ClaudeProcess { child, output_rx: rx, stdin, }; // Send the continuation prompt as a user message tracing::info!(prompt_len = continuation_prompt.len(), "Sending continuation prompt to Claude via stdin"); process.send_user_message(continuation_prompt).await?; Ok(process) } /// Check if the claude command is available. pub async fn check_claude_available(&self) -> Result { let output = Command::new(&self.claude_command) .arg("--version") .output() .await .map_err(|e| { if e.kind() == std::io::ErrorKind::NotFound { ClaudeProcessError::CommandNotFound(self.claude_command.clone()) } else { ClaudeProcessError::SpawnFailed(e) } })?; if output.status.success() { Ok(String::from_utf8_lossy(&output.stdout).trim().to_string()) } else { Err(ClaudeProcessError::CommandNotFound( self.claude_command.clone(), )) } } } #[cfg(test)] mod tests { use super::*; #[test] fn test_extract_json_type() { assert_eq!( extract_json_type(r#"{"type":"system","subtype":"init"}"#), Some("system".to_string()) ); assert_eq!( extract_json_type(r#"{"type":"assistant","message":{}}"#), Some("assistant".to_string()) ); assert_eq!(extract_json_type("not json"), None); assert_eq!(extract_json_type(r#"{"no_type": true}"#), None); } #[test] fn test_output_line_creation() { let line = OutputLine::stdout(r#"{"type":"result"}"#.to_string()); assert!(line.is_stdout); assert_eq!(line.json_type, Some("result".to_string())); let line = OutputLine::stderr("error message".to_string()); assert!(!line.is_stdout); assert_eq!(line.json_type, None); } }