diff options
Diffstat (limited to 'makima/src/daemon')
| -rw-r--r-- | makima/src/daemon/task/manager.rs | 196 |
1 files changed, 192 insertions, 4 deletions
diff --git a/makima/src/daemon/task/manager.rs b/makima/src/daemon/task/manager.rs index 555cd2a..5eed0e8 100644 --- a/makima/src/daemon/task/manager.rs +++ b/makima/src/daemon/task/manager.rs @@ -1152,6 +1152,178 @@ impl TaskManager { tracing::warn!("Graceful shutdown not supported on this platform"); } + /// Pause a running task by sending SIGSTOP to its process. + #[cfg(unix)] + pub async fn pause_task(&self, task_id: Uuid) -> TaskResult<()> { + use nix::sys::signal::{kill, Signal}; + use nix::unistd::Pid; + + // Check if task exists and is running + let current_state = { + let tasks = self.tasks.read().await; + tasks.get(&task_id).map(|t| t.state) + }; + + match current_state { + Some(TaskState::Running) => {} + Some(TaskState::Paused) => { + tracing::debug!(task_id = %task_id, "Task already paused"); + return Ok(()); + } + Some(state) => { + tracing::warn!(task_id = %task_id, state = ?state, "Cannot pause task in state"); + return Err(TaskError::InvalidStateTransition { + from: format!("{:?}", state), + to: "Paused".to_string(), + }); + } + None => { + tracing::warn!(task_id = %task_id, "Task not found"); + return Err(TaskError::NotFound(task_id)); + } + } + + // Get the process PID + let pid = { + let pids = self.active_pids.read().await; + pids.get(&task_id).copied() + }; + + let Some(pid) = pid else { + tracing::warn!(task_id = %task_id, "No PID found for task"); + return Err(TaskError::ExecutionFailed( + "No active process for task".to_string(), + )); + }; + + // Send SIGSTOP to pause the process + match kill(Pid::from_raw(pid as i32), Signal::SIGSTOP) { + Ok(()) => { + tracing::info!(task_id = %task_id, pid = pid, "Sent SIGSTOP to pause process"); + } + Err(nix::errno::Errno::ESRCH) => { + tracing::warn!(task_id = %task_id, pid = pid, "Process not found"); + return Err(TaskError::ExecutionFailed("Process not found".to_string())); + } + Err(e) => { + tracing::error!(task_id = %task_id, pid = pid, error = %e, "Failed to send SIGSTOP"); + return Err(TaskError::ExecutionFailed(format!( + "Failed to pause: {}", + e + ))); + } + } + + // Update task state to Paused + { + let mut tasks = self.tasks.write().await; + if let Some(task) = tasks.get_mut(&task_id) { + task.state = TaskState::Paused; + } + } + + // Notify server of state change + let msg = DaemonMessage::task_status_change(task_id, "running", "paused"); + let _ = self.ws_tx.send(msg).await; + + Ok(()) + } + + /// Pause a task (no-op on non-Unix). + #[cfg(not(unix))] + pub async fn pause_task(&self, task_id: Uuid) -> TaskResult<()> { + tracing::warn!(task_id = %task_id, "Pause not supported on this platform"); + Err(TaskError::ExecutionFailed( + "Pause not supported on this platform".to_string(), + )) + } + + /// Resume a paused task by sending SIGCONT to its process. + #[cfg(unix)] + pub async fn resume_task(&self, task_id: Uuid) -> TaskResult<()> { + use nix::sys::signal::{kill, Signal}; + use nix::unistd::Pid; + + // Check if task exists and is paused + let current_state = { + let tasks = self.tasks.read().await; + tasks.get(&task_id).map(|t| t.state) + }; + + match current_state { + Some(TaskState::Paused) => {} + Some(TaskState::Running) => { + tracing::debug!(task_id = %task_id, "Task already running"); + return Ok(()); + } + Some(state) => { + tracing::warn!(task_id = %task_id, state = ?state, "Cannot resume task in state"); + return Err(TaskError::InvalidStateTransition { + from: format!("{:?}", state), + to: "Running".to_string(), + }); + } + None => { + tracing::warn!(task_id = %task_id, "Task not found"); + return Err(TaskError::NotFound(task_id)); + } + } + + // Get the process PID + let pid = { + let pids = self.active_pids.read().await; + pids.get(&task_id).copied() + }; + + let Some(pid) = pid else { + tracing::warn!(task_id = %task_id, "No PID found for task"); + return Err(TaskError::ExecutionFailed( + "No active process for task".to_string(), + )); + }; + + // Send SIGCONT to resume the process + match kill(Pid::from_raw(pid as i32), Signal::SIGCONT) { + Ok(()) => { + tracing::info!(task_id = %task_id, pid = pid, "Sent SIGCONT to resume process"); + } + Err(nix::errno::Errno::ESRCH) => { + tracing::warn!(task_id = %task_id, pid = pid, "Process not found"); + return Err(TaskError::ExecutionFailed("Process not found".to_string())); + } + Err(e) => { + tracing::error!(task_id = %task_id, pid = pid, error = %e, "Failed to send SIGCONT"); + return Err(TaskError::ExecutionFailed(format!( + "Failed to resume: {}", + e + ))); + } + } + + // Update task state to Running + { + let mut tasks = self.tasks.write().await; + if let Some(task) = tasks.get_mut(&task_id) { + task.state = TaskState::Running; + } + } + + // Notify server of state change + let msg = DaemonMessage::task_status_change(task_id, "paused", "running"); + let _ = self.ws_tx.send(msg).await; + + Ok(()) + } + + /// Resume a task (no-op on non-Unix). + #[cfg(not(unix))] + pub async fn resume_task(&self, task_id: Uuid) -> TaskResult<()> { + tracing::warn!(task_id = %task_id, "Resume not supported on this platform"); + Err(TaskError::ExecutionFailed( + "Resume not supported on this platform".to_string(), + )) + } + /// Handle a command from the server. pub async fn handle_command(&self, command: DaemonCommand) -> Result<(), DaemonError> { tracing::info!("Received command from server: {:?}", command); @@ -1206,12 +1378,16 @@ impl TaskManager { ).await?; } DaemonCommand::PauseTask { task_id } => { - tracing::info!(task_id = %task_id, "Pause not supported for subprocess tasks"); - // Subprocesses don't support pause, just log and ignore + tracing::info!(task_id = %task_id, "Pausing task"); + if let Err(e) = self.pause_task(task_id).await { + tracing::warn!(task_id = %task_id, error = %e, "Failed to pause task"); + } } DaemonCommand::ResumeTask { task_id } => { - tracing::info!(task_id = %task_id, "Resume not supported for subprocess tasks"); - // Subprocesses don't support resume, just log and ignore + tracing::info!(task_id = %task_id, "Resuming task"); + if let Err(e) = self.resume_task(task_id).await { + tracing::warn!(task_id = %task_id, error = %e, "Failed to resume task"); + } } DaemonCommand::InterruptTask { task_id, graceful: _ } => { tracing::info!(task_id = %task_id, "Interrupting task"); @@ -1228,6 +1404,18 @@ impl TaskManager { tracing::warn!(task_id = %task_id, "No pending auth flow to receive code"); } } else { + // Check if task is paused - auto-resume before sending message + let task_state = { + let tasks = self.tasks.read().await; + tasks.get(&task_id).map(|t| t.state) + }; + if task_state == Some(TaskState::Paused) { + tracing::info!(task_id = %task_id, "Auto-resuming paused task before sending message"); + if let Err(e) = self.resume_task(task_id).await { + tracing::warn!(task_id = %task_id, error = %e, "Failed to auto-resume task"); + } + } + // Regular message - send to task's stdin tracing::info!(task_id = %task_id, message_len = message.len(), "Sending message to task"); // Send message to the task's stdin via the input channel |
