diff options
| author | soryu <soryu@soryu.co> | 2026-01-11 05:52:14 +0000 |
|---|---|---|
| committer | soryu <soryu@soryu.co> | 2026-01-15 00:21:16 +0000 |
| commit | 87044a747b47bd83249d61a45842c7f7b2eae56d (patch) | |
| tree | ef2000ce79ffcc2723ef841acef5aa1deb1d5378 /makima/daemon/src/process | |
| parent | 077820c4167c168072d217a1b01df840463a12a8 (diff) | |
| download | soryu-87044a747b47bd83249d61a45842c7f7b2eae56d.tar.gz soryu-87044a747b47bd83249d61a45842c7f7b2eae56d.zip | |
Contract system
Diffstat (limited to 'makima/daemon/src/process')
| -rw-r--r-- | makima/daemon/src/process/claude.rs | 481 | ||||
| -rw-r--r-- | makima/daemon/src/process/claude_protocol.rs | 59 | ||||
| -rw-r--r-- | makima/daemon/src/process/mod.rs | 10 |
3 files changed, 0 insertions, 550 deletions
diff --git a/makima/daemon/src/process/claude.rs b/makima/daemon/src/process/claude.rs deleted file mode 100644 index e06ee09..0000000 --- a/makima/daemon/src/process/claude.rs +++ /dev/null @@ -1,481 +0,0 @@ -//! Claude Code process management. - -use std::collections::HashMap; -use std::path::Path; -use std::process::Stdio; -use std::sync::Arc; - -use futures::Stream; -use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader}; -use tokio::process::{Child, ChildStdin, Command}; -use tokio::sync::{mpsc, Mutex}; - -use super::claude_protocol::ClaudeInputMessage; - -/// Errors that can occur during Claude process management. -#[derive(Debug, thiserror::Error)] -pub enum ClaudeProcessError { - #[error("Failed to spawn Claude process: {0}")] - SpawnFailed(#[from] std::io::Error), - - #[error("Claude command not found: {0}")] - CommandNotFound(String), - - #[error("Process already exited")] - AlreadyExited, - - #[error("Failed to read output: {0}")] - OutputRead(String), -} - -/// A line of output from Claude Code. -#[derive(Debug, Clone)] -pub struct OutputLine { - /// The raw content of the line. - pub content: String, - /// Whether this is from stdout (true) or stderr (false). - pub is_stdout: bool, - /// Parsed JSON type if available (e.g., "system", "assistant", "result"). - pub json_type: Option<String>, -} - -impl OutputLine { - /// Create a new stdout output line. - pub fn stdout(content: String) -> Self { - let json_type = extract_json_type(&content); - Self { - content, - is_stdout: true, - json_type, - } - } - - /// Create a new stderr output line. - pub fn stderr(content: String) -> Self { - Self { - content, - is_stdout: false, - json_type: None, - } - } -} - -/// Extract the "type" field from a JSON line if present. -fn extract_json_type(line: &str) -> Option<String> { - // Quick check for JSON - if !line.starts_with('{') { - return None; - } - - // Try to parse and extract type - if let Ok(json) = serde_json::from_str::<serde_json::Value>(line) { - json.get("type") - .and_then(|v| v.as_str()) - .map(|s| s.to_string()) - } else { - None - } -} - -/// Handle to a running Claude Code process. -pub struct ClaudeProcess { - /// The child process. - child: Child, - /// Receiver for output lines. - output_rx: mpsc::Receiver<OutputLine>, - /// Stdin handle for sending input to the process (thread-safe). - stdin: Arc<Mutex<Option<ChildStdin>>>, -} - -impl ClaudeProcess { - /// Wait for the process to exit and return the exit code. - pub async fn wait(&mut self) -> Result<i64, ClaudeProcessError> { - let status = self.child.wait().await?; - Ok(status.code().unwrap_or(-1) as i64) - } - - /// Check if the process has exited. - pub fn try_wait(&mut self) -> Result<Option<i64>, ClaudeProcessError> { - match self.child.try_wait()? { - Some(status) => Ok(Some(status.code().unwrap_or(-1) as i64)), - None => Ok(None), - } - } - - /// Kill the process. - pub async fn kill(&mut self) -> Result<(), ClaudeProcessError> { - self.child.kill().await?; - Ok(()) - } - - /// Get the next output line, if available. - pub async fn next_output(&mut self) -> Option<OutputLine> { - self.output_rx.recv().await - } - - /// Send a raw message to the process via stdin. - /// - /// This can be used to provide input when Claude Code is waiting for user input. - pub async fn send_input(&self, message: &str) -> Result<(), ClaudeProcessError> { - let mut stdin_guard = self.stdin.lock().await; - if let Some(ref mut stdin) = *stdin_guard { - stdin - .write_all(message.as_bytes()) - .await - .map_err(|e| ClaudeProcessError::OutputRead(format!("Failed to write to stdin: {}", e)))?; - stdin - .write_all(b"\n") - .await - .map_err(|e| ClaudeProcessError::OutputRead(format!("Failed to write newline: {}", e)))?; - stdin - .flush() - .await - .map_err(|e| ClaudeProcessError::OutputRead(format!("Failed to flush stdin: {}", e)))?; - Ok(()) - } else { - Err(ClaudeProcessError::OutputRead("Stdin not available".to_string())) - } - } - - /// Send a user message to the process via stdin using JSON protocol. - /// - /// This is the preferred method when using `--input-format=stream-json`. - /// The message is serialized as JSON and sent as a single line. - pub async fn send_user_message(&self, content: &str) -> Result<(), ClaudeProcessError> { - let message = ClaudeInputMessage::user(content); - let json_line = message.to_json_line().map_err(|e| { - ClaudeProcessError::OutputRead(format!("Failed to serialize message: {}", e)) - })?; - - tracing::debug!(content_len = content.len(), "Sending user message to Claude process"); - - let mut stdin_guard = self.stdin.lock().await; - if let Some(ref mut stdin) = *stdin_guard { - stdin - .write_all(json_line.as_bytes()) - .await - .map_err(|e| ClaudeProcessError::OutputRead(format!("Failed to write to stdin: {}", e)))?; - stdin - .flush() - .await - .map_err(|e| ClaudeProcessError::OutputRead(format!("Failed to flush stdin: {}", e)))?; - Ok(()) - } else { - Err(ClaudeProcessError::OutputRead("Stdin not available".to_string())) - } - } - - /// Get a clone of the stdin handle for external use. - pub fn stdin_handle(&self) -> Arc<Mutex<Option<ChildStdin>>> { - Arc::clone(&self.stdin) - } - - /// Close stdin, signaling EOF to the process. - pub async fn close_stdin(&self) -> Result<(), ClaudeProcessError> { - let mut stdin_guard = self.stdin.lock().await; - if let Some(mut stdin) = stdin_guard.take() { - let _ = stdin.shutdown().await; - } - Ok(()) - } - - /// Convert to a stream of output lines. - pub fn into_stream(self) -> impl Stream<Item = OutputLine> { - futures::stream::unfold(self.output_rx, |mut rx| async move { - rx.recv().await.map(|line| (line, rx)) - }) - } -} - -/// Manages Claude Code process spawning. -pub struct ProcessManager { - /// Path to the claude command. - claude_command: String, - /// Additional arguments to pass to Claude Code (after defaults). - claude_args: Vec<String>, - /// Arguments to pass before defaults. - claude_pre_args: Vec<String>, - /// Whether to enable Claude's permission system (skip --dangerously-skip-permissions). - enable_permissions: bool, - /// Whether to disable verbose output. - disable_verbose: bool, - /// Default environment variables to pass. - default_env: HashMap<String, String>, -} - -impl Default for ProcessManager { - fn default() -> Self { - Self::new() - } -} - -impl ProcessManager { - /// Create a new ProcessManager with default settings. - pub fn new() -> Self { - Self { - claude_command: "claude".to_string(), - claude_args: Vec::new(), - claude_pre_args: Vec::new(), - enable_permissions: false, - disable_verbose: false, - default_env: HashMap::new(), - } - } - - /// Create a ProcessManager with a custom claude command path. - pub fn with_command(command: String) -> Self { - Self { - claude_command: command, - claude_args: Vec::new(), - claude_pre_args: Vec::new(), - enable_permissions: false, - disable_verbose: false, - default_env: HashMap::new(), - } - } - - /// Set additional arguments to pass after default arguments. - pub fn with_args(mut self, args: Vec<String>) -> Self { - self.claude_args = args; - self - } - - /// Set arguments to pass before default arguments. - pub fn with_pre_args(mut self, args: Vec<String>) -> Self { - self.claude_pre_args = args; - self - } - - /// Enable Claude's permission system (don't pass --dangerously-skip-permissions). - pub fn with_permissions_enabled(mut self, enabled: bool) -> Self { - self.enable_permissions = enabled; - self - } - - /// Disable verbose output. - pub fn with_verbose_disabled(mut self, disabled: bool) -> Self { - self.disable_verbose = disabled; - self - } - - /// Add default environment variables. - pub fn with_env(mut self, env: HashMap<String, String>) -> Self { - self.default_env = env; - self - } - - /// Spawn a Claude Code process to execute a plan. - /// - /// The process runs in the specified working directory with stream-json output format. - pub async fn spawn( - &self, - working_dir: &Path, - plan: &str, - extra_env: Option<HashMap<String, String>>, - ) -> Result<ClaudeProcess, ClaudeProcessError> { - tracing::info!( - working_dir = %working_dir.display(), - plan_len = plan.len(), - plan_preview = %if plan.len() > 200 { &plan[..200] } else { plan }, - "Spawning Claude Code process" - ); - - // Verify working directory exists - if !working_dir.exists() { - tracing::error!(working_dir = %working_dir.display(), "Working directory does not exist!"); - return Err(ClaudeProcessError::SpawnFailed(std::io::Error::new( - std::io::ErrorKind::NotFound, - format!("Working directory does not exist: {}", working_dir.display()), - ))); - } - - // Build environment - let mut env = self.default_env.clone(); - if let Some(extra) = extra_env { - env.extend(extra); - } - - // Build arguments list - let mut args = Vec::new(); - - // Pre-args (before defaults) - args.extend(self.claude_pre_args.clone()); - - // Required arguments for stream-json protocol - args.push("--output-format=stream-json".to_string()); - args.push("--input-format=stream-json".to_string()); - - // Optional default arguments - if !self.disable_verbose { - args.push("--verbose".to_string()); - } - if !self.enable_permissions { - args.push("--dangerously-skip-permissions".to_string()); - } - - // Additional user-configured arguments - args.extend(self.claude_args.clone()); - - tracing::debug!(args = ?args, "Claude command arguments"); - - // Spawn the process - let mut child = Command::new(&self.claude_command) - .args(&args) - .current_dir(working_dir) - .envs(env) - .stdin(Stdio::piped()) - .stdout(Stdio::piped()) - .stderr(Stdio::piped()) - .kill_on_drop(true) - .spawn() - .map_err(|e| { - if e.kind() == std::io::ErrorKind::NotFound { - ClaudeProcessError::CommandNotFound(self.claude_command.clone()) - } else { - ClaudeProcessError::SpawnFailed(e) - } - })?; - - // Create output channel - let (tx, rx) = mpsc::channel(1000); - - // Take stdout, stderr, and stdin - // With --input-format=stream-json, we keep stdin open for sending messages - let stdin = child.stdin.take(); - let stdin = Arc::new(Mutex::new(stdin)); - - let stdout = child.stdout.take().expect("stdout should be piped"); - let stderr = child.stderr.take().expect("stderr should be piped"); - - // Spawn task to read stdout - let tx_stdout = tx.clone(); - tokio::spawn(async move { - use tokio::io::AsyncReadExt; - let mut reader = BufReader::new(stdout); - let mut buffer = vec![0u8; 4096]; - let mut line_buffer = String::new(); - - loop { - // Try to read with a timeout to detect if we're stuck - match tokio::time::timeout( - tokio::time::Duration::from_secs(5), - reader.read(&mut buffer) - ).await { - Ok(Ok(0)) => { - // EOF - tracing::debug!("Claude stdout EOF"); - // Send any remaining content - if !line_buffer.is_empty() { - let _ = tx_stdout.send(OutputLine::stdout(line_buffer)).await; - } - break; - } - Ok(Ok(n)) => { - let chunk = String::from_utf8_lossy(&buffer[..n]); - tracing::debug!(bytes = n, chunk_preview = %if chunk.len() > 100 { &chunk[..100] } else { &chunk }, "Got stdout chunk from Claude"); - - // Accumulate into line buffer and emit complete lines - line_buffer.push_str(&chunk); - while let Some(newline_pos) = line_buffer.find('\n') { - let line = line_buffer[..newline_pos].to_string(); - line_buffer = line_buffer[newline_pos + 1..].to_string(); - if tx_stdout.send(OutputLine::stdout(line)).await.is_err() { - return; - } - } - } - Ok(Err(e)) => { - tracing::error!(error = %e, "Error reading Claude stdout"); - break; - } - Err(_) => { - // Timeout - no data for 5 seconds - tracing::warn!("No stdout data from Claude for 5 seconds"); - } - } - } - tracing::debug!("Claude stdout reader task ended"); - }); - - // Spawn task to read stderr - let tx_stderr = tx; - tokio::spawn(async move { - let reader = BufReader::new(stderr); - let mut lines = reader.lines(); - while let Ok(Some(line)) = lines.next_line().await { - tracing::debug!(line = %line, "Claude stderr"); - if tx_stderr.send(OutputLine::stderr(line)).await.is_err() { - break; - } - } - tracing::debug!("Claude stderr reader task ended"); - }); - - tracing::info!("Claude Code process spawned successfully"); - - let process = ClaudeProcess { - child, - output_rx: rx, - stdin, - }; - - // Send the initial plan as the first user message - tracing::info!(plan_len = plan.len(), "Sending initial plan to Claude via stdin"); - process.send_user_message(plan).await?; - - Ok(process) - } - - /// Check if the claude command is available. - pub async fn check_claude_available(&self) -> Result<String, ClaudeProcessError> { - let output = Command::new(&self.claude_command) - .arg("--version") - .output() - .await - .map_err(|e| { - if e.kind() == std::io::ErrorKind::NotFound { - ClaudeProcessError::CommandNotFound(self.claude_command.clone()) - } else { - ClaudeProcessError::SpawnFailed(e) - } - })?; - - if output.status.success() { - Ok(String::from_utf8_lossy(&output.stdout).trim().to_string()) - } else { - Err(ClaudeProcessError::CommandNotFound( - self.claude_command.clone(), - )) - } - } -} - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn test_extract_json_type() { - assert_eq!( - extract_json_type(r#"{"type":"system","subtype":"init"}"#), - Some("system".to_string()) - ); - assert_eq!( - extract_json_type(r#"{"type":"assistant","message":{}}"#), - Some("assistant".to_string()) - ); - assert_eq!(extract_json_type("not json"), None); - assert_eq!(extract_json_type(r#"{"no_type": true}"#), None); - } - - #[test] - fn test_output_line_creation() { - let line = OutputLine::stdout(r#"{"type":"result"}"#.to_string()); - assert!(line.is_stdout); - assert_eq!(line.json_type, Some("result".to_string())); - - let line = OutputLine::stderr("error message".to_string()); - assert!(!line.is_stdout); - assert_eq!(line.json_type, None); - } -} diff --git a/makima/daemon/src/process/claude_protocol.rs b/makima/daemon/src/process/claude_protocol.rs deleted file mode 100644 index 930152b..0000000 --- a/makima/daemon/src/process/claude_protocol.rs +++ /dev/null @@ -1,59 +0,0 @@ -//! Claude Code JSON protocol types for stdin communication. -//! -//! When using `--input-format=stream-json`, Claude Code expects -//! newline-delimited JSON messages on stdin. - -use serde::Serialize; - -/// Message sent to Claude Code via stdin. -/// -/// Format based on Claude Code's stream-json input protocol. -#[derive(Debug, Clone, Serialize)] -#[serde(tag = "type", rename_all = "snake_case")] -pub enum ClaudeInputMessage { - /// A user message to send to Claude. - User { message: UserMessage }, -} - -/// The inner user message structure. -#[derive(Debug, Clone, Serialize)] -pub struct UserMessage { - /// Always "user" for user messages. - pub role: String, - /// The message content. - pub content: String, -} - -impl ClaudeInputMessage { - /// Create a new user message. - pub fn user(content: impl Into<String>) -> Self { - Self::User { - message: UserMessage { - role: "user".to_string(), - content: content.into(), - }, - } - } - - /// Serialize to a JSON string with trailing newline (NDJSON format). - pub fn to_json_line(&self) -> Result<String, serde_json::Error> { - let mut json = serde_json::to_string(self)?; - json.push('\n'); - Ok(json) - } -} - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn test_user_message_serialization() { - let msg = ClaudeInputMessage::user("Hello, Claude!"); - let json = msg.to_json_line().unwrap(); - - // Should produce: {"type":"user","message":{"role":"user","content":"Hello, Claude!"}}\n - assert!(json.starts_with(r#"{"type":"user","message":{"role":"user","content":"Hello, Claude!"}}"#)); - assert!(json.ends_with('\n')); - } -} diff --git a/makima/daemon/src/process/mod.rs b/makima/daemon/src/process/mod.rs deleted file mode 100644 index 814a3c5..0000000 --- a/makima/daemon/src/process/mod.rs +++ /dev/null @@ -1,10 +0,0 @@ -//! Process management for Claude Code subprocess execution. -//! -//! Spawns and manages Claude Code processes in worktree directories, -//! streaming JSON output back to the daemon. - -mod claude; -mod claude_protocol; - -pub use claude::{ClaudeProcess, ClaudeProcessError, OutputLine, ProcessManager}; -pub use claude_protocol::ClaudeInputMessage; |
