summaryrefslogtreecommitdiff
path: root/makima/src/daemon/process
diff options
context:
space:
mode:
authorsoryu <soryu@soryu.co>2026-01-11 05:52:14 +0000
committersoryu <soryu@soryu.co>2026-01-15 00:21:16 +0000
commit87044a747b47bd83249d61a45842c7f7b2eae56d (patch)
treeef2000ce79ffcc2723ef841acef5aa1deb1d5378 /makima/src/daemon/process
parent077820c4167c168072d217a1b01df840463a12a8 (diff)
downloadsoryu-87044a747b47bd83249d61a45842c7f7b2eae56d.tar.gz
soryu-87044a747b47bd83249d61a45842c7f7b2eae56d.zip
Contract system
Diffstat (limited to 'makima/src/daemon/process')
-rw-r--r--makima/src/daemon/process/claude.rs509
-rw-r--r--makima/src/daemon/process/claude_protocol.rs59
-rw-r--r--makima/src/daemon/process/mod.rs10
3 files changed, 578 insertions, 0 deletions
diff --git a/makima/src/daemon/process/claude.rs b/makima/src/daemon/process/claude.rs
new file mode 100644
index 0000000..93b097c
--- /dev/null
+++ b/makima/src/daemon/process/claude.rs
@@ -0,0 +1,509 @@
+//! Claude Code process management.
+
+use std::collections::HashMap;
+use std::path::Path;
+use std::process::Stdio;
+use std::sync::Arc;
+
+use futures::Stream;
+use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
+use tokio::process::{Child, ChildStdin, Command};
+use tokio::sync::{mpsc, Mutex};
+
+use super::claude_protocol::ClaudeInputMessage;
+
+/// Errors that can occur during Claude process management.
+#[derive(Debug, thiserror::Error)]
+pub enum ClaudeProcessError {
+ #[error("Failed to spawn Claude process: {0}")]
+ SpawnFailed(#[from] std::io::Error),
+
+ #[error("Claude command not found: {0}")]
+ CommandNotFound(String),
+
+ #[error("Process already exited")]
+ AlreadyExited,
+
+ #[error("Failed to read output: {0}")]
+ OutputRead(String),
+}
+
+/// A line of output from Claude Code.
+#[derive(Debug, Clone)]
+pub struct OutputLine {
+ /// The raw content of the line.
+ pub content: String,
+ /// Whether this is from stdout (true) or stderr (false).
+ pub is_stdout: bool,
+ /// Parsed JSON type if available (e.g., "system", "assistant", "result").
+ pub json_type: Option<String>,
+}
+
+impl OutputLine {
+ /// Create a new stdout output line.
+ pub fn stdout(content: String) -> Self {
+ let json_type = extract_json_type(&content);
+ Self {
+ content,
+ is_stdout: true,
+ json_type,
+ }
+ }
+
+ /// Create a new stderr output line.
+ pub fn stderr(content: String) -> Self {
+ Self {
+ content,
+ is_stdout: false,
+ json_type: None,
+ }
+ }
+}
+
+/// Extract the "type" field from a JSON line if present.
+fn extract_json_type(line: &str) -> Option<String> {
+ // Quick check for JSON
+ if !line.starts_with('{') {
+ return None;
+ }
+
+ // Try to parse and extract type
+ if let Ok(json) = serde_json::from_str::<serde_json::Value>(line) {
+ json.get("type")
+ .and_then(|v| v.as_str())
+ .map(|s| s.to_string())
+ } else {
+ None
+ }
+}
+
+/// Handle to a running Claude Code process.
+pub struct ClaudeProcess {
+ /// The child process.
+ child: Child,
+ /// Receiver for output lines.
+ output_rx: mpsc::Receiver<OutputLine>,
+ /// Stdin handle for sending input to the process (thread-safe).
+ stdin: Arc<Mutex<Option<ChildStdin>>>,
+}
+
+impl ClaudeProcess {
+ /// Wait for the process to exit and return the exit code.
+ pub async fn wait(&mut self) -> Result<i64, ClaudeProcessError> {
+ let status = self.child.wait().await?;
+ Ok(status.code().unwrap_or(-1) as i64)
+ }
+
+ /// Check if the process has exited.
+ pub fn try_wait(&mut self) -> Result<Option<i64>, ClaudeProcessError> {
+ match self.child.try_wait()? {
+ Some(status) => Ok(Some(status.code().unwrap_or(-1) as i64)),
+ None => Ok(None),
+ }
+ }
+
+ /// Kill the process.
+ pub async fn kill(&mut self) -> Result<(), ClaudeProcessError> {
+ self.child.kill().await?;
+ Ok(())
+ }
+
+ /// Get the next output line, if available.
+ pub async fn next_output(&mut self) -> Option<OutputLine> {
+ self.output_rx.recv().await
+ }
+
+ /// Send a raw message to the process via stdin.
+ ///
+ /// This can be used to provide input when Claude Code is waiting for user input.
+ pub async fn send_input(&self, message: &str) -> Result<(), ClaudeProcessError> {
+ let mut stdin_guard = self.stdin.lock().await;
+ if let Some(ref mut stdin) = *stdin_guard {
+ stdin
+ .write_all(message.as_bytes())
+ .await
+ .map_err(|e| ClaudeProcessError::OutputRead(format!("Failed to write to stdin: {}", e)))?;
+ stdin
+ .write_all(b"\n")
+ .await
+ .map_err(|e| ClaudeProcessError::OutputRead(format!("Failed to write newline: {}", e)))?;
+ stdin
+ .flush()
+ .await
+ .map_err(|e| ClaudeProcessError::OutputRead(format!("Failed to flush stdin: {}", e)))?;
+ Ok(())
+ } else {
+ Err(ClaudeProcessError::OutputRead("Stdin not available".to_string()))
+ }
+ }
+
+ /// Send a user message to the process via stdin using JSON protocol.
+ ///
+ /// This is the preferred method when using `--input-format=stream-json`.
+ /// The message is serialized as JSON and sent as a single line.
+ pub async fn send_user_message(&self, content: &str) -> Result<(), ClaudeProcessError> {
+ let message = ClaudeInputMessage::user(content);
+ let json_line = message.to_json_line().map_err(|e| {
+ ClaudeProcessError::OutputRead(format!("Failed to serialize message: {}", e))
+ })?;
+
+ tracing::debug!(content_len = content.len(), "Sending user message to Claude process");
+
+ let mut stdin_guard = self.stdin.lock().await;
+ if let Some(ref mut stdin) = *stdin_guard {
+ stdin
+ .write_all(json_line.as_bytes())
+ .await
+ .map_err(|e| ClaudeProcessError::OutputRead(format!("Failed to write to stdin: {}", e)))?;
+ stdin
+ .flush()
+ .await
+ .map_err(|e| ClaudeProcessError::OutputRead(format!("Failed to flush stdin: {}", e)))?;
+ Ok(())
+ } else {
+ Err(ClaudeProcessError::OutputRead("Stdin not available".to_string()))
+ }
+ }
+
+ /// Get a clone of the stdin handle for external use.
+ pub fn stdin_handle(&self) -> Arc<Mutex<Option<ChildStdin>>> {
+ Arc::clone(&self.stdin)
+ }
+
+ /// Close stdin, signaling EOF to the process.
+ pub async fn close_stdin(&self) -> Result<(), ClaudeProcessError> {
+ let mut stdin_guard = self.stdin.lock().await;
+ if let Some(mut stdin) = stdin_guard.take() {
+ let _ = stdin.shutdown().await;
+ }
+ Ok(())
+ }
+
+ /// Convert to a stream of output lines.
+ pub fn into_stream(self) -> impl Stream<Item = OutputLine> {
+ futures::stream::unfold(self.output_rx, |mut rx| async move {
+ rx.recv().await.map(|line| (line, rx))
+ })
+ }
+}
+
+/// Manages Claude Code process spawning.
+pub struct ProcessManager {
+ /// Path to the claude command.
+ claude_command: String,
+ /// Additional arguments to pass to Claude Code (after defaults).
+ claude_args: Vec<String>,
+ /// Arguments to pass before defaults.
+ claude_pre_args: Vec<String>,
+ /// Whether to enable Claude's permission system (skip --dangerously-skip-permissions).
+ enable_permissions: bool,
+ /// Whether to disable verbose output.
+ disable_verbose: bool,
+ /// Default environment variables to pass.
+ default_env: HashMap<String, String>,
+}
+
+impl Default for ProcessManager {
+ fn default() -> Self {
+ Self::new()
+ }
+}
+
+impl ProcessManager {
+ /// Create a new ProcessManager with default settings.
+ pub fn new() -> Self {
+ Self {
+ claude_command: "claude".to_string(),
+ claude_args: Vec::new(),
+ claude_pre_args: Vec::new(),
+ enable_permissions: false,
+ disable_verbose: false,
+ default_env: HashMap::new(),
+ }
+ }
+
+ /// Create a ProcessManager with a custom claude command path.
+ pub fn with_command(command: String) -> Self {
+ Self {
+ claude_command: command,
+ claude_args: Vec::new(),
+ claude_pre_args: Vec::new(),
+ enable_permissions: false,
+ disable_verbose: false,
+ default_env: HashMap::new(),
+ }
+ }
+
+ /// Set additional arguments to pass after default arguments.
+ pub fn with_args(mut self, args: Vec<String>) -> Self {
+ self.claude_args = args;
+ self
+ }
+
+ /// Set arguments to pass before default arguments.
+ pub fn with_pre_args(mut self, args: Vec<String>) -> Self {
+ self.claude_pre_args = args;
+ self
+ }
+
+ /// Enable Claude's permission system (don't pass --dangerously-skip-permissions).
+ pub fn with_permissions_enabled(mut self, enabled: bool) -> Self {
+ self.enable_permissions = enabled;
+ self
+ }
+
+ /// Disable verbose output.
+ pub fn with_verbose_disabled(mut self, disabled: bool) -> Self {
+ self.disable_verbose = disabled;
+ self
+ }
+
+ /// Add default environment variables.
+ pub fn with_env(mut self, env: HashMap<String, String>) -> Self {
+ self.default_env = env;
+ self
+ }
+
+ /// Get the claude command path.
+ pub fn claude_command(&self) -> &str {
+ &self.claude_command
+ }
+
+ /// Spawn a Claude Code process to execute a plan.
+ ///
+ /// The process runs in the specified working directory with stream-json output format.
+ /// If `system_prompt` is provided, it will be passed via --system-prompt flag.
+ pub async fn spawn(
+ &self,
+ working_dir: &Path,
+ plan: &str,
+ extra_env: Option<HashMap<String, String>>,
+ ) -> Result<ClaudeProcess, ClaudeProcessError> {
+ self.spawn_with_system_prompt(working_dir, plan, extra_env, None).await
+ }
+
+ /// Spawn a Claude Code process with an optional system prompt.
+ ///
+ /// The process runs in the specified working directory with stream-json output format.
+ /// If `system_prompt` is provided, it will be passed via --system-prompt flag as
+ /// behavioral constraints that Claude will treat as system-level instructions.
+ pub async fn spawn_with_system_prompt(
+ &self,
+ working_dir: &Path,
+ plan: &str,
+ extra_env: Option<HashMap<String, String>>,
+ system_prompt: Option<&str>,
+ ) -> Result<ClaudeProcess, ClaudeProcessError> {
+ tracing::info!(
+ working_dir = %working_dir.display(),
+ plan_len = plan.len(),
+ plan_preview = %if plan.len() > 200 { &plan[..200] } else { plan },
+ has_system_prompt = system_prompt.is_some(),
+ "Spawning Claude Code process"
+ );
+
+ // Verify working directory exists
+ if !working_dir.exists() {
+ tracing::error!(working_dir = %working_dir.display(), "Working directory does not exist!");
+ return Err(ClaudeProcessError::SpawnFailed(std::io::Error::new(
+ std::io::ErrorKind::NotFound,
+ format!("Working directory does not exist: {}", working_dir.display()),
+ )));
+ }
+
+ // Build environment
+ let mut env = self.default_env.clone();
+ if let Some(extra) = extra_env {
+ env.extend(extra);
+ }
+
+ // Build arguments list
+ let mut args = Vec::new();
+
+ // Pre-args (before defaults)
+ args.extend(self.claude_pre_args.clone());
+
+ // Required arguments for stream-json protocol
+ args.push("--output-format=stream-json".to_string());
+ args.push("--input-format=stream-json".to_string());
+
+ // Optional default arguments
+ if !self.disable_verbose {
+ args.push("--verbose".to_string());
+ }
+ if !self.enable_permissions {
+ args.push("--dangerously-skip-permissions".to_string());
+ }
+
+ // System prompt - passed via --system-prompt flag for system-level constraints
+ if let Some(prompt) = system_prompt {
+ args.push("--system-prompt".to_string());
+ args.push(prompt.to_string());
+ }
+
+ // Additional user-configured arguments
+ args.extend(self.claude_args.clone());
+
+ tracing::debug!(args = ?args, "Claude command arguments");
+
+ // Spawn the process
+ let mut child = Command::new(&self.claude_command)
+ .args(&args)
+ .current_dir(working_dir)
+ .envs(env)
+ .stdin(Stdio::piped())
+ .stdout(Stdio::piped())
+ .stderr(Stdio::piped())
+ .kill_on_drop(true)
+ .spawn()
+ .map_err(|e| {
+ if e.kind() == std::io::ErrorKind::NotFound {
+ ClaudeProcessError::CommandNotFound(self.claude_command.clone())
+ } else {
+ ClaudeProcessError::SpawnFailed(e)
+ }
+ })?;
+
+ // Create output channel
+ let (tx, rx) = mpsc::channel(1000);
+
+ // Take stdout, stderr, and stdin
+ // With --input-format=stream-json, we keep stdin open for sending messages
+ let stdin = child.stdin.take();
+ let stdin = Arc::new(Mutex::new(stdin));
+
+ let stdout = child.stdout.take().expect("stdout should be piped");
+ let stderr = child.stderr.take().expect("stderr should be piped");
+
+ // Spawn task to read stdout
+ let tx_stdout = tx.clone();
+ tokio::spawn(async move {
+ use tokio::io::AsyncReadExt;
+ let mut reader = BufReader::new(stdout);
+ let mut buffer = vec![0u8; 4096];
+ let mut line_buffer = String::new();
+
+ loop {
+ // Try to read with a timeout to detect if we're stuck
+ match tokio::time::timeout(
+ tokio::time::Duration::from_secs(5),
+ reader.read(&mut buffer)
+ ).await {
+ Ok(Ok(0)) => {
+ // EOF
+ tracing::debug!("Claude stdout EOF");
+ // Send any remaining content
+ if !line_buffer.is_empty() {
+ let _ = tx_stdout.send(OutputLine::stdout(line_buffer)).await;
+ }
+ break;
+ }
+ Ok(Ok(n)) => {
+ let chunk = String::from_utf8_lossy(&buffer[..n]);
+ tracing::debug!(bytes = n, chunk_preview = %if chunk.len() > 100 { &chunk[..100] } else { &chunk }, "Got stdout chunk from Claude");
+
+ // Accumulate into line buffer and emit complete lines
+ line_buffer.push_str(&chunk);
+ while let Some(newline_pos) = line_buffer.find('\n') {
+ let line = line_buffer[..newline_pos].to_string();
+ line_buffer = line_buffer[newline_pos + 1..].to_string();
+ if tx_stdout.send(OutputLine::stdout(line)).await.is_err() {
+ return;
+ }
+ }
+ }
+ Ok(Err(e)) => {
+ tracing::error!(error = %e, "Error reading Claude stdout");
+ break;
+ }
+ Err(_) => {
+ // Timeout - no data for 5 seconds
+ tracing::warn!("No stdout data from Claude for 5 seconds");
+ }
+ }
+ }
+ tracing::debug!("Claude stdout reader task ended");
+ });
+
+ // Spawn task to read stderr
+ let tx_stderr = tx;
+ tokio::spawn(async move {
+ let reader = BufReader::new(stderr);
+ let mut lines = reader.lines();
+ while let Ok(Some(line)) = lines.next_line().await {
+ tracing::debug!(line = %line, "Claude stderr");
+ if tx_stderr.send(OutputLine::stderr(line)).await.is_err() {
+ break;
+ }
+ }
+ tracing::debug!("Claude stderr reader task ended");
+ });
+
+ tracing::info!("Claude Code process spawned successfully");
+
+ let process = ClaudeProcess {
+ child,
+ output_rx: rx,
+ stdin,
+ };
+
+ // Send the initial plan as the first user message
+ tracing::info!(plan_len = plan.len(), "Sending initial plan to Claude via stdin");
+ process.send_user_message(plan).await?;
+
+ Ok(process)
+ }
+
+ /// Check if the claude command is available.
+ pub async fn check_claude_available(&self) -> Result<String, ClaudeProcessError> {
+ let output = Command::new(&self.claude_command)
+ .arg("--version")
+ .output()
+ .await
+ .map_err(|e| {
+ if e.kind() == std::io::ErrorKind::NotFound {
+ ClaudeProcessError::CommandNotFound(self.claude_command.clone())
+ } else {
+ ClaudeProcessError::SpawnFailed(e)
+ }
+ })?;
+
+ if output.status.success() {
+ Ok(String::from_utf8_lossy(&output.stdout).trim().to_string())
+ } else {
+ Err(ClaudeProcessError::CommandNotFound(
+ self.claude_command.clone(),
+ ))
+ }
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+
+ #[test]
+ fn test_extract_json_type() {
+ assert_eq!(
+ extract_json_type(r#"{"type":"system","subtype":"init"}"#),
+ Some("system".to_string())
+ );
+ assert_eq!(
+ extract_json_type(r#"{"type":"assistant","message":{}}"#),
+ Some("assistant".to_string())
+ );
+ assert_eq!(extract_json_type("not json"), None);
+ assert_eq!(extract_json_type(r#"{"no_type": true}"#), None);
+ }
+
+ #[test]
+ fn test_output_line_creation() {
+ let line = OutputLine::stdout(r#"{"type":"result"}"#.to_string());
+ assert!(line.is_stdout);
+ assert_eq!(line.json_type, Some("result".to_string()));
+
+ let line = OutputLine::stderr("error message".to_string());
+ assert!(!line.is_stdout);
+ assert_eq!(line.json_type, None);
+ }
+}
diff --git a/makima/src/daemon/process/claude_protocol.rs b/makima/src/daemon/process/claude_protocol.rs
new file mode 100644
index 0000000..96e5377
--- /dev/null
+++ b/makima/src/daemon/process/claude_protocol.rs
@@ -0,0 +1,59 @@
+//! Claude Code JSON protocol types for stdin communication.
+//!
+//! When using `--input-format=stream-json`, Claude Code expects
+//! newline-delimited JSON messages on stdin.
+
+use serde::Serialize;
+
+/// Message sent to Claude Code via stdin.
+///
+/// Format based on Claude Code's stream-json input protocol.
+#[derive(Debug, Clone, Serialize)]
+#[serde(tag = "type", rename_all = "snake_case")]
+pub enum ClaudeInputMessage {
+ /// A user message to send to Claude.
+ User { message: UserMessage },
+}
+
+/// The inner user message structure.
+#[derive(Debug, Clone, Serialize)]
+pub struct UserMessage {
+ /// Always "user" for user messages.
+ pub role: String,
+ /// The message content.
+ pub content: String,
+}
+
+impl ClaudeInputMessage {
+ /// Create a new user message.
+ pub fn user(content: impl Into<String>) -> Self {
+ Self::User {
+ message: UserMessage {
+ role: "user".to_string(),
+ content: content.into(),
+ },
+ }
+ }
+
+ /// Serialize to a JSON string with trailing newline (NDJSON format).
+ pub fn to_json_line(&self) -> Result<String, serde_json::Error> {
+ let mut json = serde_json::to_string(self)?;
+ json.push('\n');
+ Ok(json)
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use crate::daemon::*;
+
+ #[test]
+ fn test_user_message_serialization() {
+ let msg = ClaudeInputMessage::user("Hello, Claude!");
+ let json = msg.to_json_line().unwrap();
+
+ // Should produce: {"type":"user","message":{"role":"user","content":"Hello, Claude!"}}\n
+ assert!(json.starts_with(r#"{"type":"user","message":{"role":"user","content":"Hello, Claude!"}}"#));
+ assert!(json.ends_with('\n'));
+ }
+}
diff --git a/makima/src/daemon/process/mod.rs b/makima/src/daemon/process/mod.rs
new file mode 100644
index 0000000..814a3c5
--- /dev/null
+++ b/makima/src/daemon/process/mod.rs
@@ -0,0 +1,10 @@
+//! Process management for Claude Code subprocess execution.
+//!
+//! Spawns and manages Claude Code processes in worktree directories,
+//! streaming JSON output back to the daemon.
+
+mod claude;
+mod claude_protocol;
+
+pub use claude::{ClaudeProcess, ClaudeProcessError, OutputLine, ProcessManager};
+pub use claude_protocol::ClaudeInputMessage;