From 3fe9a6c6b113d64c7a8409b5463026700be4c28c Mon Sep 17 00:00:00 2001 From: soryu Date: Thu, 15 Jan 2026 03:39:16 +0000 Subject: Add cleanup to daemon Also fixup for container image --- makima/src/bin/makima.rs | 6 +++ makima/src/daemon/process/claude.rs | 35 +++++++++++- makima/src/daemon/task/manager.rs | 103 +++++++++++++++++++++++++++++++++++- 3 files changed, 142 insertions(+), 2 deletions(-) (limited to 'makima/src') diff --git a/makima/src/bin/makima.rs b/makima/src/bin/makima.rs index 35783dc..5c71885 100644 --- a/makima/src/bin/makima.rs +++ b/makima/src/bin/makima.rs @@ -219,6 +219,12 @@ async fn run_daemon( } } + // Gracefully shutdown all running Claude processes + eprintln!("Terminating Claude processes..."); + task_manager + .shutdown_all_processes(std::time::Duration::from_secs(5)) + .await; + // Cleanup tracing::info!("Daemon stopped"); diff --git a/makima/src/daemon/process/claude.rs b/makima/src/daemon/process/claude.rs index 93b097c..536d883 100644 --- a/makima/src/daemon/process/claude.rs +++ b/makima/src/daemon/process/claude.rs @@ -102,12 +102,45 @@ impl ClaudeProcess { } } - /// Kill the process. + /// Kill the process (SIGKILL). pub async fn kill(&mut self) -> Result<(), ClaudeProcessError> { self.child.kill().await?; Ok(()) } + /// Get the process ID, if the process is still running. + pub fn id(&self) -> Option { + self.child.id() + } + + /// Send SIGTERM to gracefully terminate the process (Unix only). + /// Returns Ok(true) if signal was sent, Ok(false) if process already exited. + #[cfg(unix)] + pub fn terminate(&self) -> Result { + use nix::sys::signal::{kill, Signal}; + use nix::unistd::Pid; + + if let Some(pid) = self.child.id() { + match kill(Pid::from_raw(pid as i32), Signal::SIGTERM) { + Ok(()) => Ok(true), + Err(nix::errno::Errno::ESRCH) => Ok(false), // Process doesn't exist + Err(e) => Err(ClaudeProcessError::OutputRead(format!( + "Failed to send SIGTERM: {}", + e + ))), + } + } else { + Ok(false) // Process already exited + } + } + + /// Send SIGTERM to gracefully terminate the process (no-op on non-Unix). + #[cfg(not(unix))] + pub fn terminate(&self) -> Result { + // On non-Unix platforms, fall back to kill + Ok(false) + } + /// Get the next output line, if available. pub async fn next_output(&mut self) -> Option { self.output_rx.recv().await diff --git a/makima/src/daemon/task/manager.rs b/makima/src/daemon/task/manager.rs index 3b4ffdd..e6a4e29 100644 --- a/makima/src/daemon/task/manager.rs +++ b/makima/src/daemon/task/manager.rs @@ -967,6 +967,8 @@ pub struct TaskManager { task_inputs: Arc>>>, /// Tracks merge state per orchestrator task (for completion gate). merge_trackers: Arc>>, + /// Active process PIDs for graceful shutdown. + active_pids: Arc>>, } impl TaskManager { @@ -994,9 +996,94 @@ impl TaskManager { semaphore: Arc::new(Semaphore::new(max_concurrent)), task_inputs: Arc::new(RwLock::new(HashMap::new())), merge_trackers: Arc::new(RwLock::new(HashMap::new())), + active_pids: Arc::new(RwLock::new(HashMap::new())), } } + /// Gracefully shutdown all running Claude processes. + /// + /// This sends SIGTERM to all active processes, waits for them to exit gracefully, + /// and then sends SIGKILL to any that don't exit within the timeout. + #[cfg(unix)] + pub async fn shutdown_all_processes(&self, timeout: std::time::Duration) { + use nix::sys::signal::{kill, Signal}; + use nix::unistd::Pid; + + let pids: Vec<(Uuid, u32)> = { + let guard = self.active_pids.read().await; + guard.iter().map(|(k, v)| (*k, *v)).collect() + }; + + if pids.is_empty() { + tracing::info!("No active Claude processes to shutdown"); + return; + } + + tracing::info!(count = pids.len(), "Sending SIGTERM to all Claude processes"); + + // Send SIGTERM to all processes + for (task_id, pid) in &pids { + match kill(Pid::from_raw(*pid as i32), Signal::SIGTERM) { + Ok(()) => { + tracing::debug!(task_id = %task_id, pid = pid, "Sent SIGTERM to process"); + } + Err(nix::errno::Errno::ESRCH) => { + tracing::debug!(task_id = %task_id, pid = pid, "Process already exited"); + } + Err(e) => { + tracing::warn!(task_id = %task_id, pid = pid, error = %e, "Failed to send SIGTERM"); + } + } + } + + // Wait for processes to exit with timeout + let start = std::time::Instant::now(); + let check_interval = std::time::Duration::from_millis(100); + + while start.elapsed() < timeout { + let remaining: Vec = { + let guard = self.active_pids.read().await; + guard.values().copied().collect() + }; + + if remaining.is_empty() { + tracing::info!("All Claude processes exited gracefully"); + return; + } + + tokio::time::sleep(check_interval).await; + } + + // Send SIGKILL to any remaining processes + let remaining: Vec<(Uuid, u32)> = { + let guard = self.active_pids.read().await; + guard.iter().map(|(k, v)| (*k, *v)).collect() + }; + + if !remaining.is_empty() { + tracing::warn!( + count = remaining.len(), + "Some processes did not exit gracefully, sending SIGKILL" + ); + for (task_id, pid) in &remaining { + match kill(Pid::from_raw(*pid as i32), Signal::SIGKILL) { + Ok(()) => { + tracing::debug!(task_id = %task_id, pid = pid, "Sent SIGKILL to process"); + } + Err(e) => { + tracing::warn!(task_id = %task_id, pid = pid, error = %e, "Failed to send SIGKILL"); + } + } + } + } + } + + /// Gracefully shutdown all running Claude processes (no-op on non-Unix). + #[cfg(not(unix))] + pub async fn shutdown_all_processes(&self, _timeout: std::time::Duration) { + tracing::warn!("Graceful shutdown not supported on this platform"); + } + /// Handle a command from the server. pub async fn handle_command(&self, command: DaemonCommand) -> Result<(), DaemonError> { tracing::info!("Received command from server: {:?}", command); @@ -1408,6 +1495,7 @@ impl TaskManager { tasks: self.tasks.clone(), ws_tx: self.ws_tx.clone(), task_inputs: self.task_inputs.clone(), + active_pids: self.active_pids.clone(), } } @@ -2282,6 +2370,7 @@ struct TaskManagerInner { tasks: Arc>>, ws_tx: mpsc::Sender, task_inputs: Arc>>>, + active_pids: Arc>>, } impl TaskManagerInner { @@ -2782,7 +2871,14 @@ impl TaskManagerInner { tracing::error!(task_id = %task_id, error = %e, "Failed to spawn Claude process"); DaemonError::Task(TaskError::SetupFailed(e.to_string())) })?; - tracing::info!(task_id = %task_id, "Claude process spawned successfully"); + + // Register the process PID for graceful shutdown tracking + if let Some(pid) = process.id() { + self.active_pids.write().await.insert(task_id, pid); + tracing::info!(task_id = %task_id, pid = pid, "Claude process spawned successfully, PID registered"); + } else { + tracing::info!(task_id = %task_id, "Claude process spawned successfully (no PID available)"); + } // Set up input channel for this task so we can send messages to its stdin tracing::debug!(task_id = %task_id, "Setting up input channel..."); @@ -2983,6 +3079,10 @@ impl TaskManagerInner { // Wait for process to exit let exit_code = process.wait().await.unwrap_or(-1); + // Unregister the process PID (process has exited) + self.active_pids.write().await.remove(&task_id); + tracing::debug!(task_id = %task_id, "Unregistered process PID"); + // Clean up input channel for this task self.task_inputs.write().await.remove(&task_id); tracing::debug!(task_id = %task_id, "Removed task input channel"); @@ -3280,6 +3380,7 @@ impl Clone for TaskManagerInner { tasks: self.tasks.clone(), ws_tx: self.ws_tx.clone(), task_inputs: self.task_inputs.clone(), + active_pids: self.active_pids.clone(), } } } -- cgit v1.2.3