diff options
| author | soryu <soryu@soryu.co> | 2026-01-15 03:39:16 +0000 |
|---|---|---|
| committer | soryu <soryu@soryu.co> | 2026-01-15 03:39:28 +0000 |
| commit | 3fe9a6c6b113d64c7a8409b5463026700be4c28c (patch) | |
| tree | 5f17c31fe3e0036b75b6ad40a2748751fd678b5c | |
| parent | 764bd28d08ceaef03cd4050f9568a62d77bbcfca (diff) | |
| download | soryu-3fe9a6c6b113d64c7a8409b5463026700be4c28c.tar.gz soryu-3fe9a6c6b113d64c7a8409b5463026700be4c28c.zip | |
Add cleanup to daemon
Also fixup for container image
| -rw-r--r-- | Cargo.lock | 21 | ||||
| -rw-r--r-- | Dockerfile | 3 | ||||
| -rw-r--r-- | makima/Cargo.toml | 3 | ||||
| -rw-r--r-- | makima/src/bin/makima.rs | 6 | ||||
| -rw-r--r-- | makima/src/daemon/process/claude.rs | 35 | ||||
| -rw-r--r-- | makima/src/daemon/task/manager.rs | 103 |
6 files changed, 167 insertions, 4 deletions
@@ -323,6 +323,12 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9330f8b2ff13f34540b44e946ef35111825727b38d33286ef986142615121801" [[package]] +name = "cfg_aliases" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "613afe47fcd5fac7ccf1db93babcb082c5994d996f20b8b159f2ad1658eb5724" + +[[package]] name = "chrono" version = "0.4.42" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -1877,6 +1883,7 @@ dependencies = [ "jaq-std", "jsonwebtoken", "ndarray", + "nix 0.29.0", "once_cell", "ort", "parakeet-rs", @@ -2082,6 +2089,18 @@ dependencies = [ ] [[package]] +name = "nix" +version = "0.29.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "71e2746dc3a24dd78b3cfcb7be93368c6de9963d30f43a6a73998a9cf4b17b46" +dependencies = [ + "bitflags 2.10.0", + "cfg-if", + "cfg_aliases", + "libc", +] + +[[package]] name = "nom" version = "7.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -2497,7 +2516,7 @@ dependencies = [ "lazy_static", "libc", "log", - "nix", + "nix 0.25.1", "serial", "shared_library", "shell-words", @@ -28,7 +28,8 @@ COPY vendor ./vendor COPY tools/stt-client ./tools/stt-client # Build release binary -RUN cargo build --release --package makima --bin makima-server +RUN cargo build --release --package makima --bin makima +RUN mv /app/target/release/makima /app/target/release/makima-server RUN cp /app/target/release/makima-server /app/makima-server # Clean up build artifacts to reduce image size diff --git a/makima/Cargo.toml b/makima/Cargo.toml index a850d4a..a77d9ea 100644 --- a/makima/Cargo.toml +++ b/makima/Cargo.toml @@ -62,6 +62,9 @@ reqwest = { version = "0.12", features = ["json"] } # Lazy statics once_cell = "1.19" +# Unix signal handling +nix = { version = "0.29", features = ["signal", "process"] } + # Regex for text parsing regex = "1.10" diff --git a/makima/src/bin/makima.rs b/makima/src/bin/makima.rs index 35783dc..5c71885 100644 --- a/makima/src/bin/makima.rs +++ b/makima/src/bin/makima.rs @@ -219,6 +219,12 @@ async fn run_daemon( } } + // Gracefully shutdown all running Claude processes + eprintln!("Terminating Claude processes..."); + task_manager + .shutdown_all_processes(std::time::Duration::from_secs(5)) + .await; + // Cleanup tracing::info!("Daemon stopped"); diff --git a/makima/src/daemon/process/claude.rs b/makima/src/daemon/process/claude.rs index 93b097c..536d883 100644 --- a/makima/src/daemon/process/claude.rs +++ b/makima/src/daemon/process/claude.rs @@ -102,12 +102,45 @@ impl ClaudeProcess { } } - /// Kill the process. + /// Kill the process (SIGKILL). pub async fn kill(&mut self) -> Result<(), ClaudeProcessError> { self.child.kill().await?; Ok(()) } + /// Get the process ID, if the process is still running. + pub fn id(&self) -> Option<u32> { + self.child.id() + } + + /// Send SIGTERM to gracefully terminate the process (Unix only). + /// Returns Ok(true) if signal was sent, Ok(false) if process already exited. + #[cfg(unix)] + pub fn terminate(&self) -> Result<bool, ClaudeProcessError> { + use nix::sys::signal::{kill, Signal}; + use nix::unistd::Pid; + + if let Some(pid) = self.child.id() { + match kill(Pid::from_raw(pid as i32), Signal::SIGTERM) { + Ok(()) => Ok(true), + Err(nix::errno::Errno::ESRCH) => Ok(false), // Process doesn't exist + Err(e) => Err(ClaudeProcessError::OutputRead(format!( + "Failed to send SIGTERM: {}", + e + ))), + } + } else { + Ok(false) // Process already exited + } + } + + /// Send SIGTERM to gracefully terminate the process (no-op on non-Unix). + #[cfg(not(unix))] + pub fn terminate(&self) -> Result<bool, ClaudeProcessError> { + // On non-Unix platforms, fall back to kill + Ok(false) + } + /// Get the next output line, if available. pub async fn next_output(&mut self) -> Option<OutputLine> { self.output_rx.recv().await diff --git a/makima/src/daemon/task/manager.rs b/makima/src/daemon/task/manager.rs index 3b4ffdd..e6a4e29 100644 --- a/makima/src/daemon/task/manager.rs +++ b/makima/src/daemon/task/manager.rs @@ -967,6 +967,8 @@ pub struct TaskManager { task_inputs: Arc<RwLock<HashMap<Uuid, mpsc::Sender<String>>>>, /// Tracks merge state per orchestrator task (for completion gate). merge_trackers: Arc<RwLock<HashMap<Uuid, MergeTracker>>>, + /// Active process PIDs for graceful shutdown. + active_pids: Arc<RwLock<HashMap<Uuid, u32>>>, } impl TaskManager { @@ -994,9 +996,94 @@ impl TaskManager { semaphore: Arc::new(Semaphore::new(max_concurrent)), task_inputs: Arc::new(RwLock::new(HashMap::new())), merge_trackers: Arc::new(RwLock::new(HashMap::new())), + active_pids: Arc::new(RwLock::new(HashMap::new())), } } + /// Gracefully shutdown all running Claude processes. + /// + /// This sends SIGTERM to all active processes, waits for them to exit gracefully, + /// and then sends SIGKILL to any that don't exit within the timeout. + #[cfg(unix)] + pub async fn shutdown_all_processes(&self, timeout: std::time::Duration) { + use nix::sys::signal::{kill, Signal}; + use nix::unistd::Pid; + + let pids: Vec<(Uuid, u32)> = { + let guard = self.active_pids.read().await; + guard.iter().map(|(k, v)| (*k, *v)).collect() + }; + + if pids.is_empty() { + tracing::info!("No active Claude processes to shutdown"); + return; + } + + tracing::info!(count = pids.len(), "Sending SIGTERM to all Claude processes"); + + // Send SIGTERM to all processes + for (task_id, pid) in &pids { + match kill(Pid::from_raw(*pid as i32), Signal::SIGTERM) { + Ok(()) => { + tracing::debug!(task_id = %task_id, pid = pid, "Sent SIGTERM to process"); + } + Err(nix::errno::Errno::ESRCH) => { + tracing::debug!(task_id = %task_id, pid = pid, "Process already exited"); + } + Err(e) => { + tracing::warn!(task_id = %task_id, pid = pid, error = %e, "Failed to send SIGTERM"); + } + } + } + + // Wait for processes to exit with timeout + let start = std::time::Instant::now(); + let check_interval = std::time::Duration::from_millis(100); + + while start.elapsed() < timeout { + let remaining: Vec<u32> = { + let guard = self.active_pids.read().await; + guard.values().copied().collect() + }; + + if remaining.is_empty() { + tracing::info!("All Claude processes exited gracefully"); + return; + } + + tokio::time::sleep(check_interval).await; + } + + // Send SIGKILL to any remaining processes + let remaining: Vec<(Uuid, u32)> = { + let guard = self.active_pids.read().await; + guard.iter().map(|(k, v)| (*k, *v)).collect() + }; + + if !remaining.is_empty() { + tracing::warn!( + count = remaining.len(), + "Some processes did not exit gracefully, sending SIGKILL" + ); + for (task_id, pid) in &remaining { + match kill(Pid::from_raw(*pid as i32), Signal::SIGKILL) { + Ok(()) => { + tracing::debug!(task_id = %task_id, pid = pid, "Sent SIGKILL to process"); + } + Err(e) => { + tracing::warn!(task_id = %task_id, pid = pid, error = %e, "Failed to send SIGKILL"); + } + } + } + } + } + + /// Gracefully shutdown all running Claude processes (no-op on non-Unix). + #[cfg(not(unix))] + pub async fn shutdown_all_processes(&self, _timeout: std::time::Duration) { + tracing::warn!("Graceful shutdown not supported on this platform"); + } + /// Handle a command from the server. pub async fn handle_command(&self, command: DaemonCommand) -> Result<(), DaemonError> { tracing::info!("Received command from server: {:?}", command); @@ -1408,6 +1495,7 @@ impl TaskManager { tasks: self.tasks.clone(), ws_tx: self.ws_tx.clone(), task_inputs: self.task_inputs.clone(), + active_pids: self.active_pids.clone(), } } @@ -2282,6 +2370,7 @@ struct TaskManagerInner { tasks: Arc<RwLock<HashMap<Uuid, ManagedTask>>>, ws_tx: mpsc::Sender<DaemonMessage>, task_inputs: Arc<RwLock<HashMap<Uuid, mpsc::Sender<String>>>>, + active_pids: Arc<RwLock<HashMap<Uuid, u32>>>, } impl TaskManagerInner { @@ -2782,7 +2871,14 @@ impl TaskManagerInner { tracing::error!(task_id = %task_id, error = %e, "Failed to spawn Claude process"); DaemonError::Task(TaskError::SetupFailed(e.to_string())) })?; - tracing::info!(task_id = %task_id, "Claude process spawned successfully"); + + // Register the process PID for graceful shutdown tracking + if let Some(pid) = process.id() { + self.active_pids.write().await.insert(task_id, pid); + tracing::info!(task_id = %task_id, pid = pid, "Claude process spawned successfully, PID registered"); + } else { + tracing::info!(task_id = %task_id, "Claude process spawned successfully (no PID available)"); + } // Set up input channel for this task so we can send messages to its stdin tracing::debug!(task_id = %task_id, "Setting up input channel..."); @@ -2983,6 +3079,10 @@ impl TaskManagerInner { // Wait for process to exit let exit_code = process.wait().await.unwrap_or(-1); + // Unregister the process PID (process has exited) + self.active_pids.write().await.remove(&task_id); + tracing::debug!(task_id = %task_id, "Unregistered process PID"); + // Clean up input channel for this task self.task_inputs.write().await.remove(&task_id); tracing::debug!(task_id = %task_id, "Removed task input channel"); @@ -3280,6 +3380,7 @@ impl Clone for TaskManagerInner { tasks: self.tasks.clone(), ws_tx: self.ws_tx.clone(), task_inputs: self.task_inputs.clone(), + active_pids: self.active_pids.clone(), } } } |
