summaryrefslogtreecommitdiff
path: root/makima/src/daemon
diff options
context:
space:
mode:
authorsoryu <soryu@soryu.co>2026-01-22 00:59:18 +0000
committersoryu <soryu@soryu.co>2026-01-22 00:59:18 +0000
commitb84b3f782d3a3d6bf7ed8040fd72907ca19db8c6 (patch)
tree47307136f44dbd69f38675d2c1a0da08d9628114 /makima/src/daemon
parent0a30c9d3a9227660860abcd48ea1e9bd5cc2350c (diff)
downloadsoryu-b84b3f782d3a3d6bf7ed8040fd72907ca19db8c6.tar.gz
soryu-b84b3f782d3a3d6bf7ed8040fd72907ca19db8c6.zip
Add pausing tasks
Diffstat (limited to 'makima/src/daemon')
-rw-r--r--makima/src/daemon/task/manager.rs196
1 files changed, 192 insertions, 4 deletions
diff --git a/makima/src/daemon/task/manager.rs b/makima/src/daemon/task/manager.rs
index 555cd2a..5eed0e8 100644
--- a/makima/src/daemon/task/manager.rs
+++ b/makima/src/daemon/task/manager.rs
@@ -1152,6 +1152,178 @@ impl TaskManager {
tracing::warn!("Graceful shutdown not supported on this platform");
}
+ /// Pause a running task by sending SIGSTOP to its process.
+ #[cfg(unix)]
+ pub async fn pause_task(&self, task_id: Uuid) -> TaskResult<()> {
+ use nix::sys::signal::{kill, Signal};
+ use nix::unistd::Pid;
+
+ // Check if task exists and is running
+ let current_state = {
+ let tasks = self.tasks.read().await;
+ tasks.get(&task_id).map(|t| t.state)
+ };
+
+ match current_state {
+ Some(TaskState::Running) => {}
+ Some(TaskState::Paused) => {
+ tracing::debug!(task_id = %task_id, "Task already paused");
+ return Ok(());
+ }
+ Some(state) => {
+ tracing::warn!(task_id = %task_id, state = ?state, "Cannot pause task in state");
+ return Err(TaskError::InvalidStateTransition {
+ from: format!("{:?}", state),
+ to: "Paused".to_string(),
+ });
+ }
+ None => {
+ tracing::warn!(task_id = %task_id, "Task not found");
+ return Err(TaskError::NotFound(task_id));
+ }
+ }
+
+ // Get the process PID
+ let pid = {
+ let pids = self.active_pids.read().await;
+ pids.get(&task_id).copied()
+ };
+
+ let Some(pid) = pid else {
+ tracing::warn!(task_id = %task_id, "No PID found for task");
+ return Err(TaskError::ExecutionFailed(
+ "No active process for task".to_string(),
+ ));
+ };
+
+ // Send SIGSTOP to pause the process
+ match kill(Pid::from_raw(pid as i32), Signal::SIGSTOP) {
+ Ok(()) => {
+ tracing::info!(task_id = %task_id, pid = pid, "Sent SIGSTOP to pause process");
+ }
+ Err(nix::errno::Errno::ESRCH) => {
+ tracing::warn!(task_id = %task_id, pid = pid, "Process not found");
+ return Err(TaskError::ExecutionFailed("Process not found".to_string()));
+ }
+ Err(e) => {
+ tracing::error!(task_id = %task_id, pid = pid, error = %e, "Failed to send SIGSTOP");
+ return Err(TaskError::ExecutionFailed(format!(
+ "Failed to pause: {}",
+ e
+ )));
+ }
+ }
+
+ // Update task state to Paused
+ {
+ let mut tasks = self.tasks.write().await;
+ if let Some(task) = tasks.get_mut(&task_id) {
+ task.state = TaskState::Paused;
+ }
+ }
+
+ // Notify server of state change
+ let msg = DaemonMessage::task_status_change(task_id, "running", "paused");
+ let _ = self.ws_tx.send(msg).await;
+
+ Ok(())
+ }
+
+ /// Pause a task (no-op on non-Unix).
+ #[cfg(not(unix))]
+ pub async fn pause_task(&self, task_id: Uuid) -> TaskResult<()> {
+ tracing::warn!(task_id = %task_id, "Pause not supported on this platform");
+ Err(TaskError::ExecutionFailed(
+ "Pause not supported on this platform".to_string(),
+ ))
+ }
+
+ /// Resume a paused task by sending SIGCONT to its process.
+ #[cfg(unix)]
+ pub async fn resume_task(&self, task_id: Uuid) -> TaskResult<()> {
+ use nix::sys::signal::{kill, Signal};
+ use nix::unistd::Pid;
+
+ // Check if task exists and is paused
+ let current_state = {
+ let tasks = self.tasks.read().await;
+ tasks.get(&task_id).map(|t| t.state)
+ };
+
+ match current_state {
+ Some(TaskState::Paused) => {}
+ Some(TaskState::Running) => {
+ tracing::debug!(task_id = %task_id, "Task already running");
+ return Ok(());
+ }
+ Some(state) => {
+ tracing::warn!(task_id = %task_id, state = ?state, "Cannot resume task in state");
+ return Err(TaskError::InvalidStateTransition {
+ from: format!("{:?}", state),
+ to: "Running".to_string(),
+ });
+ }
+ None => {
+ tracing::warn!(task_id = %task_id, "Task not found");
+ return Err(TaskError::NotFound(task_id));
+ }
+ }
+
+ // Get the process PID
+ let pid = {
+ let pids = self.active_pids.read().await;
+ pids.get(&task_id).copied()
+ };
+
+ let Some(pid) = pid else {
+ tracing::warn!(task_id = %task_id, "No PID found for task");
+ return Err(TaskError::ExecutionFailed(
+ "No active process for task".to_string(),
+ ));
+ };
+
+ // Send SIGCONT to resume the process
+ match kill(Pid::from_raw(pid as i32), Signal::SIGCONT) {
+ Ok(()) => {
+ tracing::info!(task_id = %task_id, pid = pid, "Sent SIGCONT to resume process");
+ }
+ Err(nix::errno::Errno::ESRCH) => {
+ tracing::warn!(task_id = %task_id, pid = pid, "Process not found");
+ return Err(TaskError::ExecutionFailed("Process not found".to_string()));
+ }
+ Err(e) => {
+ tracing::error!(task_id = %task_id, pid = pid, error = %e, "Failed to send SIGCONT");
+ return Err(TaskError::ExecutionFailed(format!(
+ "Failed to resume: {}",
+ e
+ )));
+ }
+ }
+
+ // Update task state to Running
+ {
+ let mut tasks = self.tasks.write().await;
+ if let Some(task) = tasks.get_mut(&task_id) {
+ task.state = TaskState::Running;
+ }
+ }
+
+ // Notify server of state change
+ let msg = DaemonMessage::task_status_change(task_id, "paused", "running");
+ let _ = self.ws_tx.send(msg).await;
+
+ Ok(())
+ }
+
+ /// Resume a task (no-op on non-Unix).
+ #[cfg(not(unix))]
+ pub async fn resume_task(&self, task_id: Uuid) -> TaskResult<()> {
+ tracing::warn!(task_id = %task_id, "Resume not supported on this platform");
+ Err(TaskError::ExecutionFailed(
+ "Resume not supported on this platform".to_string(),
+ ))
+ }
+
/// Handle a command from the server.
pub async fn handle_command(&self, command: DaemonCommand) -> Result<(), DaemonError> {
tracing::info!("Received command from server: {:?}", command);
@@ -1206,12 +1378,16 @@ impl TaskManager {
).await?;
}
DaemonCommand::PauseTask { task_id } => {
- tracing::info!(task_id = %task_id, "Pause not supported for subprocess tasks");
- // Subprocesses don't support pause, just log and ignore
+ tracing::info!(task_id = %task_id, "Pausing task");
+ if let Err(e) = self.pause_task(task_id).await {
+ tracing::warn!(task_id = %task_id, error = %e, "Failed to pause task");
+ }
}
DaemonCommand::ResumeTask { task_id } => {
- tracing::info!(task_id = %task_id, "Resume not supported for subprocess tasks");
- // Subprocesses don't support resume, just log and ignore
+ tracing::info!(task_id = %task_id, "Resuming task");
+ if let Err(e) = self.resume_task(task_id).await {
+ tracing::warn!(task_id = %task_id, error = %e, "Failed to resume task");
+ }
}
DaemonCommand::InterruptTask { task_id, graceful: _ } => {
tracing::info!(task_id = %task_id, "Interrupting task");
@@ -1228,6 +1404,18 @@ impl TaskManager {
tracing::warn!(task_id = %task_id, "No pending auth flow to receive code");
}
} else {
+ // Check if task is paused - auto-resume before sending message
+ let task_state = {
+ let tasks = self.tasks.read().await;
+ tasks.get(&task_id).map(|t| t.state)
+ };
+ if task_state == Some(TaskState::Paused) {
+ tracing::info!(task_id = %task_id, "Auto-resuming paused task before sending message");
+ if let Err(e) = self.resume_task(task_id).await {
+ tracing::warn!(task_id = %task_id, error = %e, "Failed to auto-resume task");
+ }
+ }
+
// Regular message - send to task's stdin
tracing::info!(task_id = %task_id, message_len = message.len(), "Sending message to task");
// Send message to the task's stdin via the input channel