summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorsoryu <soryu@soryu.co>2026-01-15 03:39:16 +0000
committersoryu <soryu@soryu.co>2026-01-15 03:39:28 +0000
commit3fe9a6c6b113d64c7a8409b5463026700be4c28c (patch)
tree5f17c31fe3e0036b75b6ad40a2748751fd678b5c
parent764bd28d08ceaef03cd4050f9568a62d77bbcfca (diff)
downloadsoryu-3fe9a6c6b113d64c7a8409b5463026700be4c28c.tar.gz
soryu-3fe9a6c6b113d64c7a8409b5463026700be4c28c.zip
Add cleanup to daemon
Also fixup for container image
-rw-r--r--Cargo.lock21
-rw-r--r--Dockerfile3
-rw-r--r--makima/Cargo.toml3
-rw-r--r--makima/src/bin/makima.rs6
-rw-r--r--makima/src/daemon/process/claude.rs35
-rw-r--r--makima/src/daemon/task/manager.rs103
6 files changed, 167 insertions, 4 deletions
diff --git a/Cargo.lock b/Cargo.lock
index c59f241..f501dcb 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -323,6 +323,12 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9330f8b2ff13f34540b44e946ef35111825727b38d33286ef986142615121801"
[[package]]
+name = "cfg_aliases"
+version = "0.2.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "613afe47fcd5fac7ccf1db93babcb082c5994d996f20b8b159f2ad1658eb5724"
+
+[[package]]
name = "chrono"
version = "0.4.42"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -1877,6 +1883,7 @@ dependencies = [
"jaq-std",
"jsonwebtoken",
"ndarray",
+ "nix 0.29.0",
"once_cell",
"ort",
"parakeet-rs",
@@ -2082,6 +2089,18 @@ dependencies = [
]
[[package]]
+name = "nix"
+version = "0.29.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "71e2746dc3a24dd78b3cfcb7be93368c6de9963d30f43a6a73998a9cf4b17b46"
+dependencies = [
+ "bitflags 2.10.0",
+ "cfg-if",
+ "cfg_aliases",
+ "libc",
+]
+
+[[package]]
name = "nom"
version = "7.1.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -2497,7 +2516,7 @@ dependencies = [
"lazy_static",
"libc",
"log",
- "nix",
+ "nix 0.25.1",
"serial",
"shared_library",
"shell-words",
diff --git a/Dockerfile b/Dockerfile
index 5c22598..fbbfda8 100644
--- a/Dockerfile
+++ b/Dockerfile
@@ -28,7 +28,8 @@ COPY vendor ./vendor
COPY tools/stt-client ./tools/stt-client
# Build release binary
-RUN cargo build --release --package makima --bin makima-server
+RUN cargo build --release --package makima --bin makima
+RUN mv /app/target/release/makima /app/target/release/makima-server
RUN cp /app/target/release/makima-server /app/makima-server
# Clean up build artifacts to reduce image size
diff --git a/makima/Cargo.toml b/makima/Cargo.toml
index a850d4a..a77d9ea 100644
--- a/makima/Cargo.toml
+++ b/makima/Cargo.toml
@@ -62,6 +62,9 @@ reqwest = { version = "0.12", features = ["json"] }
# Lazy statics
once_cell = "1.19"
+# Unix signal handling
+nix = { version = "0.29", features = ["signal", "process"] }
+
# Regex for text parsing
regex = "1.10"
diff --git a/makima/src/bin/makima.rs b/makima/src/bin/makima.rs
index 35783dc..5c71885 100644
--- a/makima/src/bin/makima.rs
+++ b/makima/src/bin/makima.rs
@@ -219,6 +219,12 @@ async fn run_daemon(
}
}
+ // Gracefully shutdown all running Claude processes
+ eprintln!("Terminating Claude processes...");
+ task_manager
+ .shutdown_all_processes(std::time::Duration::from_secs(5))
+ .await;
+
// Cleanup
tracing::info!("Daemon stopped");
diff --git a/makima/src/daemon/process/claude.rs b/makima/src/daemon/process/claude.rs
index 93b097c..536d883 100644
--- a/makima/src/daemon/process/claude.rs
+++ b/makima/src/daemon/process/claude.rs
@@ -102,12 +102,45 @@ impl ClaudeProcess {
}
}
- /// Kill the process.
+ /// Kill the process (SIGKILL).
pub async fn kill(&mut self) -> Result<(), ClaudeProcessError> {
self.child.kill().await?;
Ok(())
}
+ /// Get the process ID, if the process is still running.
+ pub fn id(&self) -> Option<u32> {
+ self.child.id()
+ }
+
+ /// Send SIGTERM to gracefully terminate the process (Unix only).
+ /// Returns Ok(true) if signal was sent, Ok(false) if process already exited.
+ #[cfg(unix)]
+ pub fn terminate(&self) -> Result<bool, ClaudeProcessError> {
+ use nix::sys::signal::{kill, Signal};
+ use nix::unistd::Pid;
+
+ if let Some(pid) = self.child.id() {
+ match kill(Pid::from_raw(pid as i32), Signal::SIGTERM) {
+ Ok(()) => Ok(true),
+ Err(nix::errno::Errno::ESRCH) => Ok(false), // Process doesn't exist
+ Err(e) => Err(ClaudeProcessError::OutputRead(format!(
+ "Failed to send SIGTERM: {}",
+ e
+ ))),
+ }
+ } else {
+ Ok(false) // Process already exited
+ }
+ }
+
+ /// Send SIGTERM to gracefully terminate the process (no-op on non-Unix).
+ #[cfg(not(unix))]
+ pub fn terminate(&self) -> Result<bool, ClaudeProcessError> {
+ // On non-Unix platforms, fall back to kill
+ Ok(false)
+ }
+
/// Get the next output line, if available.
pub async fn next_output(&mut self) -> Option<OutputLine> {
self.output_rx.recv().await
diff --git a/makima/src/daemon/task/manager.rs b/makima/src/daemon/task/manager.rs
index 3b4ffdd..e6a4e29 100644
--- a/makima/src/daemon/task/manager.rs
+++ b/makima/src/daemon/task/manager.rs
@@ -967,6 +967,8 @@ pub struct TaskManager {
task_inputs: Arc<RwLock<HashMap<Uuid, mpsc::Sender<String>>>>,
/// Tracks merge state per orchestrator task (for completion gate).
merge_trackers: Arc<RwLock<HashMap<Uuid, MergeTracker>>>,
+ /// Active process PIDs for graceful shutdown.
+ active_pids: Arc<RwLock<HashMap<Uuid, u32>>>,
}
impl TaskManager {
@@ -994,9 +996,94 @@ impl TaskManager {
semaphore: Arc::new(Semaphore::new(max_concurrent)),
task_inputs: Arc::new(RwLock::new(HashMap::new())),
merge_trackers: Arc::new(RwLock::new(HashMap::new())),
+ active_pids: Arc::new(RwLock::new(HashMap::new())),
}
}
+ /// Gracefully shutdown all running Claude processes.
+ ///
+ /// This sends SIGTERM to all active processes, waits for them to exit gracefully,
+ /// and then sends SIGKILL to any that don't exit within the timeout.
+ #[cfg(unix)]
+ pub async fn shutdown_all_processes(&self, timeout: std::time::Duration) {
+ use nix::sys::signal::{kill, Signal};
+ use nix::unistd::Pid;
+
+ let pids: Vec<(Uuid, u32)> = {
+ let guard = self.active_pids.read().await;
+ guard.iter().map(|(k, v)| (*k, *v)).collect()
+ };
+
+ if pids.is_empty() {
+ tracing::info!("No active Claude processes to shutdown");
+ return;
+ }
+
+ tracing::info!(count = pids.len(), "Sending SIGTERM to all Claude processes");
+
+ // Send SIGTERM to all processes
+ for (task_id, pid) in &pids {
+ match kill(Pid::from_raw(*pid as i32), Signal::SIGTERM) {
+ Ok(()) => {
+ tracing::debug!(task_id = %task_id, pid = pid, "Sent SIGTERM to process");
+ }
+ Err(nix::errno::Errno::ESRCH) => {
+ tracing::debug!(task_id = %task_id, pid = pid, "Process already exited");
+ }
+ Err(e) => {
+ tracing::warn!(task_id = %task_id, pid = pid, error = %e, "Failed to send SIGTERM");
+ }
+ }
+ }
+
+ // Wait for processes to exit with timeout
+ let start = std::time::Instant::now();
+ let check_interval = std::time::Duration::from_millis(100);
+
+ while start.elapsed() < timeout {
+ let remaining: Vec<u32> = {
+ let guard = self.active_pids.read().await;
+ guard.values().copied().collect()
+ };
+
+ if remaining.is_empty() {
+ tracing::info!("All Claude processes exited gracefully");
+ return;
+ }
+
+ tokio::time::sleep(check_interval).await;
+ }
+
+ // Send SIGKILL to any remaining processes
+ let remaining: Vec<(Uuid, u32)> = {
+ let guard = self.active_pids.read().await;
+ guard.iter().map(|(k, v)| (*k, *v)).collect()
+ };
+
+ if !remaining.is_empty() {
+ tracing::warn!(
+ count = remaining.len(),
+ "Some processes did not exit gracefully, sending SIGKILL"
+ );
+ for (task_id, pid) in &remaining {
+ match kill(Pid::from_raw(*pid as i32), Signal::SIGKILL) {
+ Ok(()) => {
+ tracing::debug!(task_id = %task_id, pid = pid, "Sent SIGKILL to process");
+ }
+ Err(e) => {
+ tracing::warn!(task_id = %task_id, pid = pid, error = %e, "Failed to send SIGKILL");
+ }
+ }
+ }
+ }
+ }
+
+ /// Gracefully shutdown all running Claude processes (no-op on non-Unix).
+ #[cfg(not(unix))]
+ pub async fn shutdown_all_processes(&self, _timeout: std::time::Duration) {
+ tracing::warn!("Graceful shutdown not supported on this platform");
+ }
+
/// Handle a command from the server.
pub async fn handle_command(&self, command: DaemonCommand) -> Result<(), DaemonError> {
tracing::info!("Received command from server: {:?}", command);
@@ -1408,6 +1495,7 @@ impl TaskManager {
tasks: self.tasks.clone(),
ws_tx: self.ws_tx.clone(),
task_inputs: self.task_inputs.clone(),
+ active_pids: self.active_pids.clone(),
}
}
@@ -2282,6 +2370,7 @@ struct TaskManagerInner {
tasks: Arc<RwLock<HashMap<Uuid, ManagedTask>>>,
ws_tx: mpsc::Sender<DaemonMessage>,
task_inputs: Arc<RwLock<HashMap<Uuid, mpsc::Sender<String>>>>,
+ active_pids: Arc<RwLock<HashMap<Uuid, u32>>>,
}
impl TaskManagerInner {
@@ -2782,7 +2871,14 @@ impl TaskManagerInner {
tracing::error!(task_id = %task_id, error = %e, "Failed to spawn Claude process");
DaemonError::Task(TaskError::SetupFailed(e.to_string()))
})?;
- tracing::info!(task_id = %task_id, "Claude process spawned successfully");
+
+ // Register the process PID for graceful shutdown tracking
+ if let Some(pid) = process.id() {
+ self.active_pids.write().await.insert(task_id, pid);
+ tracing::info!(task_id = %task_id, pid = pid, "Claude process spawned successfully, PID registered");
+ } else {
+ tracing::info!(task_id = %task_id, "Claude process spawned successfully (no PID available)");
+ }
// Set up input channel for this task so we can send messages to its stdin
tracing::debug!(task_id = %task_id, "Setting up input channel...");
@@ -2983,6 +3079,10 @@ impl TaskManagerInner {
// Wait for process to exit
let exit_code = process.wait().await.unwrap_or(-1);
+ // Unregister the process PID (process has exited)
+ self.active_pids.write().await.remove(&task_id);
+ tracing::debug!(task_id = %task_id, "Unregistered process PID");
+
// Clean up input channel for this task
self.task_inputs.write().await.remove(&task_id);
tracing::debug!(task_id = %task_id, "Removed task input channel");
@@ -3280,6 +3380,7 @@ impl Clone for TaskManagerInner {
tasks: self.tasks.clone(),
ws_tx: self.ws_tx.clone(),
task_inputs: self.task_inputs.clone(),
+ active_pids: self.active_pids.clone(),
}
}
}