summaryrefslogtreecommitdiff
path: root/makima/daemon/src/process
diff options
context:
space:
mode:
authorsoryu <soryu@soryu.co>2026-01-11 05:52:14 +0000
committersoryu <soryu@soryu.co>2026-01-15 00:21:16 +0000
commit87044a747b47bd83249d61a45842c7f7b2eae56d (patch)
treeef2000ce79ffcc2723ef841acef5aa1deb1d5378 /makima/daemon/src/process
parent077820c4167c168072d217a1b01df840463a12a8 (diff)
downloadsoryu-87044a747b47bd83249d61a45842c7f7b2eae56d.tar.gz
soryu-87044a747b47bd83249d61a45842c7f7b2eae56d.zip
Contract system
Diffstat (limited to 'makima/daemon/src/process')
-rw-r--r--makima/daemon/src/process/claude.rs481
-rw-r--r--makima/daemon/src/process/claude_protocol.rs59
-rw-r--r--makima/daemon/src/process/mod.rs10
3 files changed, 0 insertions, 550 deletions
diff --git a/makima/daemon/src/process/claude.rs b/makima/daemon/src/process/claude.rs
deleted file mode 100644
index e06ee09..0000000
--- a/makima/daemon/src/process/claude.rs
+++ /dev/null
@@ -1,481 +0,0 @@
-//! Claude Code process management.
-
-use std::collections::HashMap;
-use std::path::Path;
-use std::process::Stdio;
-use std::sync::Arc;
-
-use futures::Stream;
-use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
-use tokio::process::{Child, ChildStdin, Command};
-use tokio::sync::{mpsc, Mutex};
-
-use super::claude_protocol::ClaudeInputMessage;
-
-/// Errors that can occur during Claude process management.
-#[derive(Debug, thiserror::Error)]
-pub enum ClaudeProcessError {
- #[error("Failed to spawn Claude process: {0}")]
- SpawnFailed(#[from] std::io::Error),
-
- #[error("Claude command not found: {0}")]
- CommandNotFound(String),
-
- #[error("Process already exited")]
- AlreadyExited,
-
- #[error("Failed to read output: {0}")]
- OutputRead(String),
-}
-
-/// A line of output from Claude Code.
-#[derive(Debug, Clone)]
-pub struct OutputLine {
- /// The raw content of the line.
- pub content: String,
- /// Whether this is from stdout (true) or stderr (false).
- pub is_stdout: bool,
- /// Parsed JSON type if available (e.g., "system", "assistant", "result").
- pub json_type: Option<String>,
-}
-
-impl OutputLine {
- /// Create a new stdout output line.
- pub fn stdout(content: String) -> Self {
- let json_type = extract_json_type(&content);
- Self {
- content,
- is_stdout: true,
- json_type,
- }
- }
-
- /// Create a new stderr output line.
- pub fn stderr(content: String) -> Self {
- Self {
- content,
- is_stdout: false,
- json_type: None,
- }
- }
-}
-
-/// Extract the "type" field from a JSON line if present.
-fn extract_json_type(line: &str) -> Option<String> {
- // Quick check for JSON
- if !line.starts_with('{') {
- return None;
- }
-
- // Try to parse and extract type
- if let Ok(json) = serde_json::from_str::<serde_json::Value>(line) {
- json.get("type")
- .and_then(|v| v.as_str())
- .map(|s| s.to_string())
- } else {
- None
- }
-}
-
-/// Handle to a running Claude Code process.
-pub struct ClaudeProcess {
- /// The child process.
- child: Child,
- /// Receiver for output lines.
- output_rx: mpsc::Receiver<OutputLine>,
- /// Stdin handle for sending input to the process (thread-safe).
- stdin: Arc<Mutex<Option<ChildStdin>>>,
-}
-
-impl ClaudeProcess {
- /// Wait for the process to exit and return the exit code.
- pub async fn wait(&mut self) -> Result<i64, ClaudeProcessError> {
- let status = self.child.wait().await?;
- Ok(status.code().unwrap_or(-1) as i64)
- }
-
- /// Check if the process has exited.
- pub fn try_wait(&mut self) -> Result<Option<i64>, ClaudeProcessError> {
- match self.child.try_wait()? {
- Some(status) => Ok(Some(status.code().unwrap_or(-1) as i64)),
- None => Ok(None),
- }
- }
-
- /// Kill the process.
- pub async fn kill(&mut self) -> Result<(), ClaudeProcessError> {
- self.child.kill().await?;
- Ok(())
- }
-
- /// Get the next output line, if available.
- pub async fn next_output(&mut self) -> Option<OutputLine> {
- self.output_rx.recv().await
- }
-
- /// Send a raw message to the process via stdin.
- ///
- /// This can be used to provide input when Claude Code is waiting for user input.
- pub async fn send_input(&self, message: &str) -> Result<(), ClaudeProcessError> {
- let mut stdin_guard = self.stdin.lock().await;
- if let Some(ref mut stdin) = *stdin_guard {
- stdin
- .write_all(message.as_bytes())
- .await
- .map_err(|e| ClaudeProcessError::OutputRead(format!("Failed to write to stdin: {}", e)))?;
- stdin
- .write_all(b"\n")
- .await
- .map_err(|e| ClaudeProcessError::OutputRead(format!("Failed to write newline: {}", e)))?;
- stdin
- .flush()
- .await
- .map_err(|e| ClaudeProcessError::OutputRead(format!("Failed to flush stdin: {}", e)))?;
- Ok(())
- } else {
- Err(ClaudeProcessError::OutputRead("Stdin not available".to_string()))
- }
- }
-
- /// Send a user message to the process via stdin using JSON protocol.
- ///
- /// This is the preferred method when using `--input-format=stream-json`.
- /// The message is serialized as JSON and sent as a single line.
- pub async fn send_user_message(&self, content: &str) -> Result<(), ClaudeProcessError> {
- let message = ClaudeInputMessage::user(content);
- let json_line = message.to_json_line().map_err(|e| {
- ClaudeProcessError::OutputRead(format!("Failed to serialize message: {}", e))
- })?;
-
- tracing::debug!(content_len = content.len(), "Sending user message to Claude process");
-
- let mut stdin_guard = self.stdin.lock().await;
- if let Some(ref mut stdin) = *stdin_guard {
- stdin
- .write_all(json_line.as_bytes())
- .await
- .map_err(|e| ClaudeProcessError::OutputRead(format!("Failed to write to stdin: {}", e)))?;
- stdin
- .flush()
- .await
- .map_err(|e| ClaudeProcessError::OutputRead(format!("Failed to flush stdin: {}", e)))?;
- Ok(())
- } else {
- Err(ClaudeProcessError::OutputRead("Stdin not available".to_string()))
- }
- }
-
- /// Get a clone of the stdin handle for external use.
- pub fn stdin_handle(&self) -> Arc<Mutex<Option<ChildStdin>>> {
- Arc::clone(&self.stdin)
- }
-
- /// Close stdin, signaling EOF to the process.
- pub async fn close_stdin(&self) -> Result<(), ClaudeProcessError> {
- let mut stdin_guard = self.stdin.lock().await;
- if let Some(mut stdin) = stdin_guard.take() {
- let _ = stdin.shutdown().await;
- }
- Ok(())
- }
-
- /// Convert to a stream of output lines.
- pub fn into_stream(self) -> impl Stream<Item = OutputLine> {
- futures::stream::unfold(self.output_rx, |mut rx| async move {
- rx.recv().await.map(|line| (line, rx))
- })
- }
-}
-
-/// Manages Claude Code process spawning.
-pub struct ProcessManager {
- /// Path to the claude command.
- claude_command: String,
- /// Additional arguments to pass to Claude Code (after defaults).
- claude_args: Vec<String>,
- /// Arguments to pass before defaults.
- claude_pre_args: Vec<String>,
- /// Whether to enable Claude's permission system (skip --dangerously-skip-permissions).
- enable_permissions: bool,
- /// Whether to disable verbose output.
- disable_verbose: bool,
- /// Default environment variables to pass.
- default_env: HashMap<String, String>,
-}
-
-impl Default for ProcessManager {
- fn default() -> Self {
- Self::new()
- }
-}
-
-impl ProcessManager {
- /// Create a new ProcessManager with default settings.
- pub fn new() -> Self {
- Self {
- claude_command: "claude".to_string(),
- claude_args: Vec::new(),
- claude_pre_args: Vec::new(),
- enable_permissions: false,
- disable_verbose: false,
- default_env: HashMap::new(),
- }
- }
-
- /// Create a ProcessManager with a custom claude command path.
- pub fn with_command(command: String) -> Self {
- Self {
- claude_command: command,
- claude_args: Vec::new(),
- claude_pre_args: Vec::new(),
- enable_permissions: false,
- disable_verbose: false,
- default_env: HashMap::new(),
- }
- }
-
- /// Set additional arguments to pass after default arguments.
- pub fn with_args(mut self, args: Vec<String>) -> Self {
- self.claude_args = args;
- self
- }
-
- /// Set arguments to pass before default arguments.
- pub fn with_pre_args(mut self, args: Vec<String>) -> Self {
- self.claude_pre_args = args;
- self
- }
-
- /// Enable Claude's permission system (don't pass --dangerously-skip-permissions).
- pub fn with_permissions_enabled(mut self, enabled: bool) -> Self {
- self.enable_permissions = enabled;
- self
- }
-
- /// Disable verbose output.
- pub fn with_verbose_disabled(mut self, disabled: bool) -> Self {
- self.disable_verbose = disabled;
- self
- }
-
- /// Add default environment variables.
- pub fn with_env(mut self, env: HashMap<String, String>) -> Self {
- self.default_env = env;
- self
- }
-
- /// Spawn a Claude Code process to execute a plan.
- ///
- /// The process runs in the specified working directory with stream-json output format.
- pub async fn spawn(
- &self,
- working_dir: &Path,
- plan: &str,
- extra_env: Option<HashMap<String, String>>,
- ) -> Result<ClaudeProcess, ClaudeProcessError> {
- tracing::info!(
- working_dir = %working_dir.display(),
- plan_len = plan.len(),
- plan_preview = %if plan.len() > 200 { &plan[..200] } else { plan },
- "Spawning Claude Code process"
- );
-
- // Verify working directory exists
- if !working_dir.exists() {
- tracing::error!(working_dir = %working_dir.display(), "Working directory does not exist!");
- return Err(ClaudeProcessError::SpawnFailed(std::io::Error::new(
- std::io::ErrorKind::NotFound,
- format!("Working directory does not exist: {}", working_dir.display()),
- )));
- }
-
- // Build environment
- let mut env = self.default_env.clone();
- if let Some(extra) = extra_env {
- env.extend(extra);
- }
-
- // Build arguments list
- let mut args = Vec::new();
-
- // Pre-args (before defaults)
- args.extend(self.claude_pre_args.clone());
-
- // Required arguments for stream-json protocol
- args.push("--output-format=stream-json".to_string());
- args.push("--input-format=stream-json".to_string());
-
- // Optional default arguments
- if !self.disable_verbose {
- args.push("--verbose".to_string());
- }
- if !self.enable_permissions {
- args.push("--dangerously-skip-permissions".to_string());
- }
-
- // Additional user-configured arguments
- args.extend(self.claude_args.clone());
-
- tracing::debug!(args = ?args, "Claude command arguments");
-
- // Spawn the process
- let mut child = Command::new(&self.claude_command)
- .args(&args)
- .current_dir(working_dir)
- .envs(env)
- .stdin(Stdio::piped())
- .stdout(Stdio::piped())
- .stderr(Stdio::piped())
- .kill_on_drop(true)
- .spawn()
- .map_err(|e| {
- if e.kind() == std::io::ErrorKind::NotFound {
- ClaudeProcessError::CommandNotFound(self.claude_command.clone())
- } else {
- ClaudeProcessError::SpawnFailed(e)
- }
- })?;
-
- // Create output channel
- let (tx, rx) = mpsc::channel(1000);
-
- // Take stdout, stderr, and stdin
- // With --input-format=stream-json, we keep stdin open for sending messages
- let stdin = child.stdin.take();
- let stdin = Arc::new(Mutex::new(stdin));
-
- let stdout = child.stdout.take().expect("stdout should be piped");
- let stderr = child.stderr.take().expect("stderr should be piped");
-
- // Spawn task to read stdout
- let tx_stdout = tx.clone();
- tokio::spawn(async move {
- use tokio::io::AsyncReadExt;
- let mut reader = BufReader::new(stdout);
- let mut buffer = vec![0u8; 4096];
- let mut line_buffer = String::new();
-
- loop {
- // Try to read with a timeout to detect if we're stuck
- match tokio::time::timeout(
- tokio::time::Duration::from_secs(5),
- reader.read(&mut buffer)
- ).await {
- Ok(Ok(0)) => {
- // EOF
- tracing::debug!("Claude stdout EOF");
- // Send any remaining content
- if !line_buffer.is_empty() {
- let _ = tx_stdout.send(OutputLine::stdout(line_buffer)).await;
- }
- break;
- }
- Ok(Ok(n)) => {
- let chunk = String::from_utf8_lossy(&buffer[..n]);
- tracing::debug!(bytes = n, chunk_preview = %if chunk.len() > 100 { &chunk[..100] } else { &chunk }, "Got stdout chunk from Claude");
-
- // Accumulate into line buffer and emit complete lines
- line_buffer.push_str(&chunk);
- while let Some(newline_pos) = line_buffer.find('\n') {
- let line = line_buffer[..newline_pos].to_string();
- line_buffer = line_buffer[newline_pos + 1..].to_string();
- if tx_stdout.send(OutputLine::stdout(line)).await.is_err() {
- return;
- }
- }
- }
- Ok(Err(e)) => {
- tracing::error!(error = %e, "Error reading Claude stdout");
- break;
- }
- Err(_) => {
- // Timeout - no data for 5 seconds
- tracing::warn!("No stdout data from Claude for 5 seconds");
- }
- }
- }
- tracing::debug!("Claude stdout reader task ended");
- });
-
- // Spawn task to read stderr
- let tx_stderr = tx;
- tokio::spawn(async move {
- let reader = BufReader::new(stderr);
- let mut lines = reader.lines();
- while let Ok(Some(line)) = lines.next_line().await {
- tracing::debug!(line = %line, "Claude stderr");
- if tx_stderr.send(OutputLine::stderr(line)).await.is_err() {
- break;
- }
- }
- tracing::debug!("Claude stderr reader task ended");
- });
-
- tracing::info!("Claude Code process spawned successfully");
-
- let process = ClaudeProcess {
- child,
- output_rx: rx,
- stdin,
- };
-
- // Send the initial plan as the first user message
- tracing::info!(plan_len = plan.len(), "Sending initial plan to Claude via stdin");
- process.send_user_message(plan).await?;
-
- Ok(process)
- }
-
- /// Check if the claude command is available.
- pub async fn check_claude_available(&self) -> Result<String, ClaudeProcessError> {
- let output = Command::new(&self.claude_command)
- .arg("--version")
- .output()
- .await
- .map_err(|e| {
- if e.kind() == std::io::ErrorKind::NotFound {
- ClaudeProcessError::CommandNotFound(self.claude_command.clone())
- } else {
- ClaudeProcessError::SpawnFailed(e)
- }
- })?;
-
- if output.status.success() {
- Ok(String::from_utf8_lossy(&output.stdout).trim().to_string())
- } else {
- Err(ClaudeProcessError::CommandNotFound(
- self.claude_command.clone(),
- ))
- }
- }
-}
-
-#[cfg(test)]
-mod tests {
- use super::*;
-
- #[test]
- fn test_extract_json_type() {
- assert_eq!(
- extract_json_type(r#"{"type":"system","subtype":"init"}"#),
- Some("system".to_string())
- );
- assert_eq!(
- extract_json_type(r#"{"type":"assistant","message":{}}"#),
- Some("assistant".to_string())
- );
- assert_eq!(extract_json_type("not json"), None);
- assert_eq!(extract_json_type(r#"{"no_type": true}"#), None);
- }
-
- #[test]
- fn test_output_line_creation() {
- let line = OutputLine::stdout(r#"{"type":"result"}"#.to_string());
- assert!(line.is_stdout);
- assert_eq!(line.json_type, Some("result".to_string()));
-
- let line = OutputLine::stderr("error message".to_string());
- assert!(!line.is_stdout);
- assert_eq!(line.json_type, None);
- }
-}
diff --git a/makima/daemon/src/process/claude_protocol.rs b/makima/daemon/src/process/claude_protocol.rs
deleted file mode 100644
index 930152b..0000000
--- a/makima/daemon/src/process/claude_protocol.rs
+++ /dev/null
@@ -1,59 +0,0 @@
-//! Claude Code JSON protocol types for stdin communication.
-//!
-//! When using `--input-format=stream-json`, Claude Code expects
-//! newline-delimited JSON messages on stdin.
-
-use serde::Serialize;
-
-/// Message sent to Claude Code via stdin.
-///
-/// Format based on Claude Code's stream-json input protocol.
-#[derive(Debug, Clone, Serialize)]
-#[serde(tag = "type", rename_all = "snake_case")]
-pub enum ClaudeInputMessage {
- /// A user message to send to Claude.
- User { message: UserMessage },
-}
-
-/// The inner user message structure.
-#[derive(Debug, Clone, Serialize)]
-pub struct UserMessage {
- /// Always "user" for user messages.
- pub role: String,
- /// The message content.
- pub content: String,
-}
-
-impl ClaudeInputMessage {
- /// Create a new user message.
- pub fn user(content: impl Into<String>) -> Self {
- Self::User {
- message: UserMessage {
- role: "user".to_string(),
- content: content.into(),
- },
- }
- }
-
- /// Serialize to a JSON string with trailing newline (NDJSON format).
- pub fn to_json_line(&self) -> Result<String, serde_json::Error> {
- let mut json = serde_json::to_string(self)?;
- json.push('\n');
- Ok(json)
- }
-}
-
-#[cfg(test)]
-mod tests {
- use super::*;
-
- #[test]
- fn test_user_message_serialization() {
- let msg = ClaudeInputMessage::user("Hello, Claude!");
- let json = msg.to_json_line().unwrap();
-
- // Should produce: {"type":"user","message":{"role":"user","content":"Hello, Claude!"}}\n
- assert!(json.starts_with(r#"{"type":"user","message":{"role":"user","content":"Hello, Claude!"}}"#));
- assert!(json.ends_with('\n'));
- }
-}
diff --git a/makima/daemon/src/process/mod.rs b/makima/daemon/src/process/mod.rs
deleted file mode 100644
index 814a3c5..0000000
--- a/makima/daemon/src/process/mod.rs
+++ /dev/null
@@ -1,10 +0,0 @@
-//! Process management for Claude Code subprocess execution.
-//!
-//! Spawns and manages Claude Code processes in worktree directories,
-//! streaming JSON output back to the daemon.
-
-mod claude;
-mod claude_protocol;
-
-pub use claude::{ClaudeProcess, ClaudeProcessError, OutputLine, ProcessManager};
-pub use claude_protocol::ClaudeInputMessage;