From 87044a747b47bd83249d61a45842c7f7b2eae56d Mon Sep 17 00:00:00 2001 From: soryu Date: Sun, 11 Jan 2026 05:52:14 +0000 Subject: Contract system --- makima/src/daemon/process/claude.rs | 509 +++++++++++++++++++++++++++ makima/src/daemon/process/claude_protocol.rs | 59 ++++ makima/src/daemon/process/mod.rs | 10 + 3 files changed, 578 insertions(+) create mode 100644 makima/src/daemon/process/claude.rs create mode 100644 makima/src/daemon/process/claude_protocol.rs create mode 100644 makima/src/daemon/process/mod.rs (limited to 'makima/src/daemon/process') diff --git a/makima/src/daemon/process/claude.rs b/makima/src/daemon/process/claude.rs new file mode 100644 index 0000000..93b097c --- /dev/null +++ b/makima/src/daemon/process/claude.rs @@ -0,0 +1,509 @@ +//! Claude Code process management. + +use std::collections::HashMap; +use std::path::Path; +use std::process::Stdio; +use std::sync::Arc; + +use futures::Stream; +use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader}; +use tokio::process::{Child, ChildStdin, Command}; +use tokio::sync::{mpsc, Mutex}; + +use super::claude_protocol::ClaudeInputMessage; + +/// Errors that can occur during Claude process management. +#[derive(Debug, thiserror::Error)] +pub enum ClaudeProcessError { + #[error("Failed to spawn Claude process: {0}")] + SpawnFailed(#[from] std::io::Error), + + #[error("Claude command not found: {0}")] + CommandNotFound(String), + + #[error("Process already exited")] + AlreadyExited, + + #[error("Failed to read output: {0}")] + OutputRead(String), +} + +/// A line of output from Claude Code. +#[derive(Debug, Clone)] +pub struct OutputLine { + /// The raw content of the line. + pub content: String, + /// Whether this is from stdout (true) or stderr (false). + pub is_stdout: bool, + /// Parsed JSON type if available (e.g., "system", "assistant", "result"). + pub json_type: Option, +} + +impl OutputLine { + /// Create a new stdout output line. + pub fn stdout(content: String) -> Self { + let json_type = extract_json_type(&content); + Self { + content, + is_stdout: true, + json_type, + } + } + + /// Create a new stderr output line. + pub fn stderr(content: String) -> Self { + Self { + content, + is_stdout: false, + json_type: None, + } + } +} + +/// Extract the "type" field from a JSON line if present. +fn extract_json_type(line: &str) -> Option { + // Quick check for JSON + if !line.starts_with('{') { + return None; + } + + // Try to parse and extract type + if let Ok(json) = serde_json::from_str::(line) { + json.get("type") + .and_then(|v| v.as_str()) + .map(|s| s.to_string()) + } else { + None + } +} + +/// Handle to a running Claude Code process. +pub struct ClaudeProcess { + /// The child process. + child: Child, + /// Receiver for output lines. + output_rx: mpsc::Receiver, + /// Stdin handle for sending input to the process (thread-safe). + stdin: Arc>>, +} + +impl ClaudeProcess { + /// Wait for the process to exit and return the exit code. + pub async fn wait(&mut self) -> Result { + let status = self.child.wait().await?; + Ok(status.code().unwrap_or(-1) as i64) + } + + /// Check if the process has exited. + pub fn try_wait(&mut self) -> Result, ClaudeProcessError> { + match self.child.try_wait()? { + Some(status) => Ok(Some(status.code().unwrap_or(-1) as i64)), + None => Ok(None), + } + } + + /// Kill the process. + pub async fn kill(&mut self) -> Result<(), ClaudeProcessError> { + self.child.kill().await?; + Ok(()) + } + + /// Get the next output line, if available. + pub async fn next_output(&mut self) -> Option { + self.output_rx.recv().await + } + + /// Send a raw message to the process via stdin. + /// + /// This can be used to provide input when Claude Code is waiting for user input. + pub async fn send_input(&self, message: &str) -> Result<(), ClaudeProcessError> { + let mut stdin_guard = self.stdin.lock().await; + if let Some(ref mut stdin) = *stdin_guard { + stdin + .write_all(message.as_bytes()) + .await + .map_err(|e| ClaudeProcessError::OutputRead(format!("Failed to write to stdin: {}", e)))?; + stdin + .write_all(b"\n") + .await + .map_err(|e| ClaudeProcessError::OutputRead(format!("Failed to write newline: {}", e)))?; + stdin + .flush() + .await + .map_err(|e| ClaudeProcessError::OutputRead(format!("Failed to flush stdin: {}", e)))?; + Ok(()) + } else { + Err(ClaudeProcessError::OutputRead("Stdin not available".to_string())) + } + } + + /// Send a user message to the process via stdin using JSON protocol. + /// + /// This is the preferred method when using `--input-format=stream-json`. + /// The message is serialized as JSON and sent as a single line. + pub async fn send_user_message(&self, content: &str) -> Result<(), ClaudeProcessError> { + let message = ClaudeInputMessage::user(content); + let json_line = message.to_json_line().map_err(|e| { + ClaudeProcessError::OutputRead(format!("Failed to serialize message: {}", e)) + })?; + + tracing::debug!(content_len = content.len(), "Sending user message to Claude process"); + + let mut stdin_guard = self.stdin.lock().await; + if let Some(ref mut stdin) = *stdin_guard { + stdin + .write_all(json_line.as_bytes()) + .await + .map_err(|e| ClaudeProcessError::OutputRead(format!("Failed to write to stdin: {}", e)))?; + stdin + .flush() + .await + .map_err(|e| ClaudeProcessError::OutputRead(format!("Failed to flush stdin: {}", e)))?; + Ok(()) + } else { + Err(ClaudeProcessError::OutputRead("Stdin not available".to_string())) + } + } + + /// Get a clone of the stdin handle for external use. + pub fn stdin_handle(&self) -> Arc>> { + Arc::clone(&self.stdin) + } + + /// Close stdin, signaling EOF to the process. + pub async fn close_stdin(&self) -> Result<(), ClaudeProcessError> { + let mut stdin_guard = self.stdin.lock().await; + if let Some(mut stdin) = stdin_guard.take() { + let _ = stdin.shutdown().await; + } + Ok(()) + } + + /// Convert to a stream of output lines. + pub fn into_stream(self) -> impl Stream { + futures::stream::unfold(self.output_rx, |mut rx| async move { + rx.recv().await.map(|line| (line, rx)) + }) + } +} + +/// Manages Claude Code process spawning. +pub struct ProcessManager { + /// Path to the claude command. + claude_command: String, + /// Additional arguments to pass to Claude Code (after defaults). + claude_args: Vec, + /// Arguments to pass before defaults. + claude_pre_args: Vec, + /// Whether to enable Claude's permission system (skip --dangerously-skip-permissions). + enable_permissions: bool, + /// Whether to disable verbose output. + disable_verbose: bool, + /// Default environment variables to pass. + default_env: HashMap, +} + +impl Default for ProcessManager { + fn default() -> Self { + Self::new() + } +} + +impl ProcessManager { + /// Create a new ProcessManager with default settings. + pub fn new() -> Self { + Self { + claude_command: "claude".to_string(), + claude_args: Vec::new(), + claude_pre_args: Vec::new(), + enable_permissions: false, + disable_verbose: false, + default_env: HashMap::new(), + } + } + + /// Create a ProcessManager with a custom claude command path. + pub fn with_command(command: String) -> Self { + Self { + claude_command: command, + claude_args: Vec::new(), + claude_pre_args: Vec::new(), + enable_permissions: false, + disable_verbose: false, + default_env: HashMap::new(), + } + } + + /// Set additional arguments to pass after default arguments. + pub fn with_args(mut self, args: Vec) -> Self { + self.claude_args = args; + self + } + + /// Set arguments to pass before default arguments. + pub fn with_pre_args(mut self, args: Vec) -> Self { + self.claude_pre_args = args; + self + } + + /// Enable Claude's permission system (don't pass --dangerously-skip-permissions). + pub fn with_permissions_enabled(mut self, enabled: bool) -> Self { + self.enable_permissions = enabled; + self + } + + /// Disable verbose output. + pub fn with_verbose_disabled(mut self, disabled: bool) -> Self { + self.disable_verbose = disabled; + self + } + + /// Add default environment variables. + pub fn with_env(mut self, env: HashMap) -> Self { + self.default_env = env; + self + } + + /// Get the claude command path. + pub fn claude_command(&self) -> &str { + &self.claude_command + } + + /// Spawn a Claude Code process to execute a plan. + /// + /// The process runs in the specified working directory with stream-json output format. + /// If `system_prompt` is provided, it will be passed via --system-prompt flag. + pub async fn spawn( + &self, + working_dir: &Path, + plan: &str, + extra_env: Option>, + ) -> Result { + self.spawn_with_system_prompt(working_dir, plan, extra_env, None).await + } + + /// Spawn a Claude Code process with an optional system prompt. + /// + /// The process runs in the specified working directory with stream-json output format. + /// If `system_prompt` is provided, it will be passed via --system-prompt flag as + /// behavioral constraints that Claude will treat as system-level instructions. + pub async fn spawn_with_system_prompt( + &self, + working_dir: &Path, + plan: &str, + extra_env: Option>, + system_prompt: Option<&str>, + ) -> Result { + tracing::info!( + working_dir = %working_dir.display(), + plan_len = plan.len(), + plan_preview = %if plan.len() > 200 { &plan[..200] } else { plan }, + has_system_prompt = system_prompt.is_some(), + "Spawning Claude Code process" + ); + + // Verify working directory exists + if !working_dir.exists() { + tracing::error!(working_dir = %working_dir.display(), "Working directory does not exist!"); + return Err(ClaudeProcessError::SpawnFailed(std::io::Error::new( + std::io::ErrorKind::NotFound, + format!("Working directory does not exist: {}", working_dir.display()), + ))); + } + + // Build environment + let mut env = self.default_env.clone(); + if let Some(extra) = extra_env { + env.extend(extra); + } + + // Build arguments list + let mut args = Vec::new(); + + // Pre-args (before defaults) + args.extend(self.claude_pre_args.clone()); + + // Required arguments for stream-json protocol + args.push("--output-format=stream-json".to_string()); + args.push("--input-format=stream-json".to_string()); + + // Optional default arguments + if !self.disable_verbose { + args.push("--verbose".to_string()); + } + if !self.enable_permissions { + args.push("--dangerously-skip-permissions".to_string()); + } + + // System prompt - passed via --system-prompt flag for system-level constraints + if let Some(prompt) = system_prompt { + args.push("--system-prompt".to_string()); + args.push(prompt.to_string()); + } + + // Additional user-configured arguments + args.extend(self.claude_args.clone()); + + tracing::debug!(args = ?args, "Claude command arguments"); + + // Spawn the process + let mut child = Command::new(&self.claude_command) + .args(&args) + .current_dir(working_dir) + .envs(env) + .stdin(Stdio::piped()) + .stdout(Stdio::piped()) + .stderr(Stdio::piped()) + .kill_on_drop(true) + .spawn() + .map_err(|e| { + if e.kind() == std::io::ErrorKind::NotFound { + ClaudeProcessError::CommandNotFound(self.claude_command.clone()) + } else { + ClaudeProcessError::SpawnFailed(e) + } + })?; + + // Create output channel + let (tx, rx) = mpsc::channel(1000); + + // Take stdout, stderr, and stdin + // With --input-format=stream-json, we keep stdin open for sending messages + let stdin = child.stdin.take(); + let stdin = Arc::new(Mutex::new(stdin)); + + let stdout = child.stdout.take().expect("stdout should be piped"); + let stderr = child.stderr.take().expect("stderr should be piped"); + + // Spawn task to read stdout + let tx_stdout = tx.clone(); + tokio::spawn(async move { + use tokio::io::AsyncReadExt; + let mut reader = BufReader::new(stdout); + let mut buffer = vec![0u8; 4096]; + let mut line_buffer = String::new(); + + loop { + // Try to read with a timeout to detect if we're stuck + match tokio::time::timeout( + tokio::time::Duration::from_secs(5), + reader.read(&mut buffer) + ).await { + Ok(Ok(0)) => { + // EOF + tracing::debug!("Claude stdout EOF"); + // Send any remaining content + if !line_buffer.is_empty() { + let _ = tx_stdout.send(OutputLine::stdout(line_buffer)).await; + } + break; + } + Ok(Ok(n)) => { + let chunk = String::from_utf8_lossy(&buffer[..n]); + tracing::debug!(bytes = n, chunk_preview = %if chunk.len() > 100 { &chunk[..100] } else { &chunk }, "Got stdout chunk from Claude"); + + // Accumulate into line buffer and emit complete lines + line_buffer.push_str(&chunk); + while let Some(newline_pos) = line_buffer.find('\n') { + let line = line_buffer[..newline_pos].to_string(); + line_buffer = line_buffer[newline_pos + 1..].to_string(); + if tx_stdout.send(OutputLine::stdout(line)).await.is_err() { + return; + } + } + } + Ok(Err(e)) => { + tracing::error!(error = %e, "Error reading Claude stdout"); + break; + } + Err(_) => { + // Timeout - no data for 5 seconds + tracing::warn!("No stdout data from Claude for 5 seconds"); + } + } + } + tracing::debug!("Claude stdout reader task ended"); + }); + + // Spawn task to read stderr + let tx_stderr = tx; + tokio::spawn(async move { + let reader = BufReader::new(stderr); + let mut lines = reader.lines(); + while let Ok(Some(line)) = lines.next_line().await { + tracing::debug!(line = %line, "Claude stderr"); + if tx_stderr.send(OutputLine::stderr(line)).await.is_err() { + break; + } + } + tracing::debug!("Claude stderr reader task ended"); + }); + + tracing::info!("Claude Code process spawned successfully"); + + let process = ClaudeProcess { + child, + output_rx: rx, + stdin, + }; + + // Send the initial plan as the first user message + tracing::info!(plan_len = plan.len(), "Sending initial plan to Claude via stdin"); + process.send_user_message(plan).await?; + + Ok(process) + } + + /// Check if the claude command is available. + pub async fn check_claude_available(&self) -> Result { + let output = Command::new(&self.claude_command) + .arg("--version") + .output() + .await + .map_err(|e| { + if e.kind() == std::io::ErrorKind::NotFound { + ClaudeProcessError::CommandNotFound(self.claude_command.clone()) + } else { + ClaudeProcessError::SpawnFailed(e) + } + })?; + + if output.status.success() { + Ok(String::from_utf8_lossy(&output.stdout).trim().to_string()) + } else { + Err(ClaudeProcessError::CommandNotFound( + self.claude_command.clone(), + )) + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_extract_json_type() { + assert_eq!( + extract_json_type(r#"{"type":"system","subtype":"init"}"#), + Some("system".to_string()) + ); + assert_eq!( + extract_json_type(r#"{"type":"assistant","message":{}}"#), + Some("assistant".to_string()) + ); + assert_eq!(extract_json_type("not json"), None); + assert_eq!(extract_json_type(r#"{"no_type": true}"#), None); + } + + #[test] + fn test_output_line_creation() { + let line = OutputLine::stdout(r#"{"type":"result"}"#.to_string()); + assert!(line.is_stdout); + assert_eq!(line.json_type, Some("result".to_string())); + + let line = OutputLine::stderr("error message".to_string()); + assert!(!line.is_stdout); + assert_eq!(line.json_type, None); + } +} diff --git a/makima/src/daemon/process/claude_protocol.rs b/makima/src/daemon/process/claude_protocol.rs new file mode 100644 index 0000000..96e5377 --- /dev/null +++ b/makima/src/daemon/process/claude_protocol.rs @@ -0,0 +1,59 @@ +//! Claude Code JSON protocol types for stdin communication. +//! +//! When using `--input-format=stream-json`, Claude Code expects +//! newline-delimited JSON messages on stdin. + +use serde::Serialize; + +/// Message sent to Claude Code via stdin. +/// +/// Format based on Claude Code's stream-json input protocol. +#[derive(Debug, Clone, Serialize)] +#[serde(tag = "type", rename_all = "snake_case")] +pub enum ClaudeInputMessage { + /// A user message to send to Claude. + User { message: UserMessage }, +} + +/// The inner user message structure. +#[derive(Debug, Clone, Serialize)] +pub struct UserMessage { + /// Always "user" for user messages. + pub role: String, + /// The message content. + pub content: String, +} + +impl ClaudeInputMessage { + /// Create a new user message. + pub fn user(content: impl Into) -> Self { + Self::User { + message: UserMessage { + role: "user".to_string(), + content: content.into(), + }, + } + } + + /// Serialize to a JSON string with trailing newline (NDJSON format). + pub fn to_json_line(&self) -> Result { + let mut json = serde_json::to_string(self)?; + json.push('\n'); + Ok(json) + } +} + +#[cfg(test)] +mod tests { + use crate::daemon::*; + + #[test] + fn test_user_message_serialization() { + let msg = ClaudeInputMessage::user("Hello, Claude!"); + let json = msg.to_json_line().unwrap(); + + // Should produce: {"type":"user","message":{"role":"user","content":"Hello, Claude!"}}\n + assert!(json.starts_with(r#"{"type":"user","message":{"role":"user","content":"Hello, Claude!"}}"#)); + assert!(json.ends_with('\n')); + } +} diff --git a/makima/src/daemon/process/mod.rs b/makima/src/daemon/process/mod.rs new file mode 100644 index 0000000..814a3c5 --- /dev/null +++ b/makima/src/daemon/process/mod.rs @@ -0,0 +1,10 @@ +//! Process management for Claude Code subprocess execution. +//! +//! Spawns and manages Claude Code processes in worktree directories, +//! streaming JSON output back to the daemon. + +mod claude; +mod claude_protocol; + +pub use claude::{ClaudeProcess, ClaudeProcessError, OutputLine, ProcessManager}; +pub use claude_protocol::ClaudeInputMessage; -- cgit v1.2.3