//! Claude Code process management.
use std::collections::HashMap;
use std::path::Path;
use std::process::Stdio;
use std::sync::Arc;
use futures::Stream;
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
use tokio::process::{Child, ChildStdin, Command};
use tokio::sync::{mpsc, Mutex};
use super::claude_protocol::ClaudeInputMessage;
/// Errors that can occur during Claude process management.
#[derive(Debug, thiserror::Error)]
pub enum ClaudeProcessError {
#[error("Failed to spawn Claude process: {0}")]
SpawnFailed(#[from] std::io::Error),
#[error("Claude command not found: {0}")]
CommandNotFound(String),
#[error("Process already exited")]
AlreadyExited,
#[error("Failed to read output: {0}")]
OutputRead(String),
}
/// A line of output from Claude Code.
#[derive(Debug, Clone)]
pub struct OutputLine {
/// The raw content of the line.
pub content: String,
/// Whether this is from stdout (true) or stderr (false).
pub is_stdout: bool,
/// Parsed JSON type if available (e.g., "system", "assistant", "result").
pub json_type: Option<String>,
}
impl OutputLine {
/// Create a new stdout output line.
pub fn stdout(content: String) -> Self {
let json_type = extract_json_type(&content);
Self {
content,
is_stdout: true,
json_type,
}
}
/// Create a new stderr output line.
pub fn stderr(content: String) -> Self {
Self {
content,
is_stdout: false,
json_type: None,
}
}
}
/// Extract the "type" field from a JSON line if present.
fn extract_json_type(line: &str) -> Option<String> {
// Quick check for JSON
if !line.starts_with('{') {
return None;
}
// Try to parse and extract type
if let Ok(json) = serde_json::from_str::<serde_json::Value>(line) {
json.get("type")
.and_then(|v| v.as_str())
.map(|s| s.to_string())
} else {
None
}
}
/// Handle to a running Claude Code process.
pub struct ClaudeProcess {
/// The child process.
child: Child,
/// Receiver for output lines.
output_rx: mpsc::Receiver<OutputLine>,
/// Stdin handle for sending input to the process (thread-safe).
stdin: Arc<Mutex<Option<ChildStdin>>>,
}
impl ClaudeProcess {
/// Wait for the process to exit and return the exit code.
pub async fn wait(&mut self) -> Result<i64, ClaudeProcessError> {
let status = self.child.wait().await?;
Ok(status.code().unwrap_or(-1) as i64)
}
/// Check if the process has exited.
pub fn try_wait(&mut self) -> Result<Option<i64>, ClaudeProcessError> {
match self.child.try_wait()? {
Some(status) => Ok(Some(status.code().unwrap_or(-1) as i64)),
None => Ok(None),
}
}
/// Kill the process.
pub async fn kill(&mut self) -> Result<(), ClaudeProcessError> {
self.child.kill().await?;
Ok(())
}
/// Get the next output line, if available.
pub async fn next_output(&mut self) -> Option<OutputLine> {
self.output_rx.recv().await
}
/// Send a raw message to the process via stdin.
///
/// This can be used to provide input when Claude Code is waiting for user input.
pub async fn send_input(&self, message: &str) -> Result<(), ClaudeProcessError> {
let mut stdin_guard = self.stdin.lock().await;
if let Some(ref mut stdin) = *stdin_guard {
stdin
.write_all(message.as_bytes())
.await
.map_err(|e| ClaudeProcessError::OutputRead(format!("Failed to write to stdin: {}", e)))?;
stdin
.write_all(b"\n")
.await
.map_err(|e| ClaudeProcessError::OutputRead(format!("Failed to write newline: {}", e)))?;
stdin
.flush()
.await
.map_err(|e| ClaudeProcessError::OutputRead(format!("Failed to flush stdin: {}", e)))?;
Ok(())
} else {
Err(ClaudeProcessError::OutputRead("Stdin not available".to_string()))
}
}
/// Send a user message to the process via stdin using JSON protocol.
///
/// This is the preferred method when using `--input-format=stream-json`.
/// The message is serialized as JSON and sent as a single line.
pub async fn send_user_message(&self, content: &str) -> Result<(), ClaudeProcessError> {
let message = ClaudeInputMessage::user(content);
let json_line = message.to_json_line().map_err(|e| {
ClaudeProcessError::OutputRead(format!("Failed to serialize message: {}", e))
})?;
tracing::debug!(content_len = content.len(), "Sending user message to Claude process");
let mut stdin_guard = self.stdin.lock().await;
if let Some(ref mut stdin) = *stdin_guard {
stdin
.write_all(json_line.as_bytes())
.await
.map_err(|e| ClaudeProcessError::OutputRead(format!("Failed to write to stdin: {}", e)))?;
stdin
.flush()
.await
.map_err(|e| ClaudeProcessError::OutputRead(format!("Failed to flush stdin: {}", e)))?;
Ok(())
} else {
Err(ClaudeProcessError::OutputRead("Stdin not available".to_string()))
}
}
/// Get a clone of the stdin handle for external use.
pub fn stdin_handle(&self) -> Arc<Mutex<Option<ChildStdin>>> {
Arc::clone(&self.stdin)
}
/// Close stdin, signaling EOF to the process.
pub async fn close_stdin(&self) -> Result<(), ClaudeProcessError> {
let mut stdin_guard = self.stdin.lock().await;
if let Some(mut stdin) = stdin_guard.take() {
let _ = stdin.shutdown().await;
}
Ok(())
}
/// Convert to a stream of output lines.
pub fn into_stream(self) -> impl Stream<Item = OutputLine> {
futures::stream::unfold(self.output_rx, |mut rx| async move {
rx.recv().await.map(|line| (line, rx))
})
}
}
/// Manages Claude Code process spawning.
pub struct ProcessManager {
/// Path to the claude command.
claude_command: String,
/// Additional arguments to pass to Claude Code (after defaults).
claude_args: Vec<String>,
/// Arguments to pass before defaults.
claude_pre_args: Vec<String>,
/// Whether to enable Claude's permission system (skip --dangerously-skip-permissions).
enable_permissions: bool,
/// Whether to disable verbose output.
disable_verbose: bool,
/// Default environment variables to pass.
default_env: HashMap<String, String>,
}
impl Default for ProcessManager {
fn default() -> Self {
Self::new()
}
}
impl ProcessManager {
/// Create a new ProcessManager with default settings.
pub fn new() -> Self {
Self {
claude_command: "claude".to_string(),
claude_args: Vec::new(),
claude_pre_args: Vec::new(),
enable_permissions: false,
disable_verbose: false,
default_env: HashMap::new(),
}
}
/// Create a ProcessManager with a custom claude command path.
pub fn with_command(command: String) -> Self {
Self {
claude_command: command,
claude_args: Vec::new(),
claude_pre_args: Vec::new(),
enable_permissions: false,
disable_verbose: false,
default_env: HashMap::new(),
}
}
/// Set additional arguments to pass after default arguments.
pub fn with_args(mut self, args: Vec<String>) -> Self {
self.claude_args = args;
self
}
/// Set arguments to pass before default arguments.
pub fn with_pre_args(mut self, args: Vec<String>) -> Self {
self.claude_pre_args = args;
self
}
/// Enable Claude's permission system (don't pass --dangerously-skip-permissions).
pub fn with_permissions_enabled(mut self, enabled: bool) -> Self {
self.enable_permissions = enabled;
self
}
/// Disable verbose output.
pub fn with_verbose_disabled(mut self, disabled: bool) -> Self {
self.disable_verbose = disabled;
self
}
/// Add default environment variables.
pub fn with_env(mut self, env: HashMap<String, String>) -> Self {
self.default_env = env;
self
}
/// Spawn a Claude Code process to execute a plan.
///
/// The process runs in the specified working directory with stream-json output format.
pub async fn spawn(
&self,
working_dir: &Path,
plan: &str,
extra_env: Option<HashMap<String, String>>,
) -> Result<ClaudeProcess, ClaudeProcessError> {
tracing::info!(
working_dir = %working_dir.display(),
plan_len = plan.len(),
plan_preview = %if plan.len() > 200 { &plan[..200] } else { plan },
"Spawning Claude Code process"
);
// Verify working directory exists
if !working_dir.exists() {
tracing::error!(working_dir = %working_dir.display(), "Working directory does not exist!");
return Err(ClaudeProcessError::SpawnFailed(std::io::Error::new(
std::io::ErrorKind::NotFound,
format!("Working directory does not exist: {}", working_dir.display()),
)));
}
// Build environment
let mut env = self.default_env.clone();
if let Some(extra) = extra_env {
env.extend(extra);
}
// Build arguments list
let mut args = Vec::new();
// Pre-args (before defaults)
args.extend(self.claude_pre_args.clone());
// Required arguments for stream-json protocol
args.push("--output-format=stream-json".to_string());
args.push("--input-format=stream-json".to_string());
// Optional default arguments
if !self.disable_verbose {
args.push("--verbose".to_string());
}
if !self.enable_permissions {
args.push("--dangerously-skip-permissions".to_string());
}
// Additional user-configured arguments
args.extend(self.claude_args.clone());
tracing::debug!(args = ?args, "Claude command arguments");
// Spawn the process
let mut child = Command::new(&self.claude_command)
.args(&args)
.current_dir(working_dir)
.envs(env)
.stdin(Stdio::piped())
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.kill_on_drop(true)
.spawn()
.map_err(|e| {
if e.kind() == std::io::ErrorKind::NotFound {
ClaudeProcessError::CommandNotFound(self.claude_command.clone())
} else {
ClaudeProcessError::SpawnFailed(e)
}
})?;
// Create output channel
let (tx, rx) = mpsc::channel(1000);
// Take stdout, stderr, and stdin
// With --input-format=stream-json, we keep stdin open for sending messages
let stdin = child.stdin.take();
let stdin = Arc::new(Mutex::new(stdin));
let stdout = child.stdout.take().expect("stdout should be piped");
let stderr = child.stderr.take().expect("stderr should be piped");
// Spawn task to read stdout
let tx_stdout = tx.clone();
tokio::spawn(async move {
use tokio::io::AsyncReadExt;
let mut reader = BufReader::new(stdout);
let mut buffer = vec![0u8; 4096];
let mut line_buffer = String::new();
loop {
// Try to read with a timeout to detect if we're stuck
match tokio::time::timeout(
tokio::time::Duration::from_secs(5),
reader.read(&mut buffer)
).await {
Ok(Ok(0)) => {
// EOF
tracing::debug!("Claude stdout EOF");
// Send any remaining content
if !line_buffer.is_empty() {
let _ = tx_stdout.send(OutputLine::stdout(line_buffer)).await;
}
break;
}
Ok(Ok(n)) => {
let chunk = String::from_utf8_lossy(&buffer[..n]);
tracing::debug!(bytes = n, chunk_preview = %if chunk.len() > 100 { &chunk[..100] } else { &chunk }, "Got stdout chunk from Claude");
// Accumulate into line buffer and emit complete lines
line_buffer.push_str(&chunk);
while let Some(newline_pos) = line_buffer.find('\n') {
let line = line_buffer[..newline_pos].to_string();
line_buffer = line_buffer[newline_pos + 1..].to_string();
if tx_stdout.send(OutputLine::stdout(line)).await.is_err() {
return;
}
}
}
Ok(Err(e)) => {
tracing::error!(error = %e, "Error reading Claude stdout");
break;
}
Err(_) => {
// Timeout - no data for 5 seconds
tracing::warn!("No stdout data from Claude for 5 seconds");
}
}
}
tracing::debug!("Claude stdout reader task ended");
});
// Spawn task to read stderr
let tx_stderr = tx;
tokio::spawn(async move {
let reader = BufReader::new(stderr);
let mut lines = reader.lines();
while let Ok(Some(line)) = lines.next_line().await {
tracing::debug!(line = %line, "Claude stderr");
if tx_stderr.send(OutputLine::stderr(line)).await.is_err() {
break;
}
}
tracing::debug!("Claude stderr reader task ended");
});
tracing::info!("Claude Code process spawned successfully");
let process = ClaudeProcess {
child,
output_rx: rx,
stdin,
};
// Send the initial plan as the first user message
tracing::info!(plan_len = plan.len(), "Sending initial plan to Claude via stdin");
process.send_user_message(plan).await?;
Ok(process)
}
/// Check if the claude command is available.
pub async fn check_claude_available(&self) -> Result<String, ClaudeProcessError> {
let output = Command::new(&self.claude_command)
.arg("--version")
.output()
.await
.map_err(|e| {
if e.kind() == std::io::ErrorKind::NotFound {
ClaudeProcessError::CommandNotFound(self.claude_command.clone())
} else {
ClaudeProcessError::SpawnFailed(e)
}
})?;
if output.status.success() {
Ok(String::from_utf8_lossy(&output.stdout).trim().to_string())
} else {
Err(ClaudeProcessError::CommandNotFound(
self.claude_command.clone(),
))
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_extract_json_type() {
assert_eq!(
extract_json_type(r#"{"type":"system","subtype":"init"}"#),
Some("system".to_string())
);
assert_eq!(
extract_json_type(r#"{"type":"assistant","message":{}}"#),
Some("assistant".to_string())
);
assert_eq!(extract_json_type("not json"), None);
assert_eq!(extract_json_type(r#"{"no_type": true}"#), None);
}
#[test]
fn test_output_line_creation() {
let line = OutputLine::stdout(r#"{"type":"result"}"#.to_string());
assert!(line.is_stdout);
assert_eq!(line.json_type, Some("result".to_string()));
let line = OutputLine::stderr("error message".to_string());
assert!(!line.is_stdout);
assert_eq!(line.json_type, None);
}
}