diff options
| author | soryu <soryu@soryu.co> | 2026-01-22 00:59:18 +0000 |
|---|---|---|
| committer | soryu <soryu@soryu.co> | 2026-01-22 00:59:18 +0000 |
| commit | b84b3f782d3a3d6bf7ed8040fd72907ca19db8c6 (patch) | |
| tree | 47307136f44dbd69f38675d2c1a0da08d9628114 | |
| parent | 0a30c9d3a9227660860abcd48ea1e9bd5cc2350c (diff) | |
| download | soryu-b84b3f782d3a3d6bf7ed8040fd72907ca19db8c6.tar.gz soryu-b84b3f782d3a3d6bf7ed8040fd72907ca19db8c6.zip | |
Add pausing tasks
| -rw-r--r-- | makima/src/daemon/task/manager.rs | 196 | ||||
| -rw-r--r-- | makima/src/server/handlers/mesh_supervisor.rs | 69 |
2 files changed, 257 insertions, 8 deletions
diff --git a/makima/src/daemon/task/manager.rs b/makima/src/daemon/task/manager.rs index 555cd2a..5eed0e8 100644 --- a/makima/src/daemon/task/manager.rs +++ b/makima/src/daemon/task/manager.rs @@ -1152,6 +1152,178 @@ impl TaskManager { tracing::warn!("Graceful shutdown not supported on this platform"); } + /// Pause a running task by sending SIGSTOP to its process. + #[cfg(unix)] + pub async fn pause_task(&self, task_id: Uuid) -> TaskResult<()> { + use nix::sys::signal::{kill, Signal}; + use nix::unistd::Pid; + + // Check if task exists and is running + let current_state = { + let tasks = self.tasks.read().await; + tasks.get(&task_id).map(|t| t.state) + }; + + match current_state { + Some(TaskState::Running) => {} + Some(TaskState::Paused) => { + tracing::debug!(task_id = %task_id, "Task already paused"); + return Ok(()); + } + Some(state) => { + tracing::warn!(task_id = %task_id, state = ?state, "Cannot pause task in state"); + return Err(TaskError::InvalidStateTransition { + from: format!("{:?}", state), + to: "Paused".to_string(), + }); + } + None => { + tracing::warn!(task_id = %task_id, "Task not found"); + return Err(TaskError::NotFound(task_id)); + } + } + + // Get the process PID + let pid = { + let pids = self.active_pids.read().await; + pids.get(&task_id).copied() + }; + + let Some(pid) = pid else { + tracing::warn!(task_id = %task_id, "No PID found for task"); + return Err(TaskError::ExecutionFailed( + "No active process for task".to_string(), + )); + }; + + // Send SIGSTOP to pause the process + match kill(Pid::from_raw(pid as i32), Signal::SIGSTOP) { + Ok(()) => { + tracing::info!(task_id = %task_id, pid = pid, "Sent SIGSTOP to pause process"); + } + Err(nix::errno::Errno::ESRCH) => { + tracing::warn!(task_id = %task_id, pid = pid, "Process not found"); + return Err(TaskError::ExecutionFailed("Process not found".to_string())); + } + Err(e) => { + tracing::error!(task_id = %task_id, pid = pid, error = %e, "Failed to send SIGSTOP"); + return Err(TaskError::ExecutionFailed(format!( + "Failed to pause: {}", + e + ))); + } + } + + // Update task state to Paused + { + let mut tasks = self.tasks.write().await; + if let Some(task) = tasks.get_mut(&task_id) { + task.state = TaskState::Paused; + } + } + + // Notify server of state change + let msg = DaemonMessage::task_status_change(task_id, "running", "paused"); + let _ = self.ws_tx.send(msg).await; + + Ok(()) + } + + /// Pause a task (no-op on non-Unix). + #[cfg(not(unix))] + pub async fn pause_task(&self, task_id: Uuid) -> TaskResult<()> { + tracing::warn!(task_id = %task_id, "Pause not supported on this platform"); + Err(TaskError::ExecutionFailed( + "Pause not supported on this platform".to_string(), + )) + } + + /// Resume a paused task by sending SIGCONT to its process. + #[cfg(unix)] + pub async fn resume_task(&self, task_id: Uuid) -> TaskResult<()> { + use nix::sys::signal::{kill, Signal}; + use nix::unistd::Pid; + + // Check if task exists and is paused + let current_state = { + let tasks = self.tasks.read().await; + tasks.get(&task_id).map(|t| t.state) + }; + + match current_state { + Some(TaskState::Paused) => {} + Some(TaskState::Running) => { + tracing::debug!(task_id = %task_id, "Task already running"); + return Ok(()); + } + Some(state) => { + tracing::warn!(task_id = %task_id, state = ?state, "Cannot resume task in state"); + return Err(TaskError::InvalidStateTransition { + from: format!("{:?}", state), + to: "Running".to_string(), + }); + } + None => { + tracing::warn!(task_id = %task_id, "Task not found"); + return Err(TaskError::NotFound(task_id)); + } + } + + // Get the process PID + let pid = { + let pids = self.active_pids.read().await; + pids.get(&task_id).copied() + }; + + let Some(pid) = pid else { + tracing::warn!(task_id = %task_id, "No PID found for task"); + return Err(TaskError::ExecutionFailed( + "No active process for task".to_string(), + )); + }; + + // Send SIGCONT to resume the process + match kill(Pid::from_raw(pid as i32), Signal::SIGCONT) { + Ok(()) => { + tracing::info!(task_id = %task_id, pid = pid, "Sent SIGCONT to resume process"); + } + Err(nix::errno::Errno::ESRCH) => { + tracing::warn!(task_id = %task_id, pid = pid, "Process not found"); + return Err(TaskError::ExecutionFailed("Process not found".to_string())); + } + Err(e) => { + tracing::error!(task_id = %task_id, pid = pid, error = %e, "Failed to send SIGCONT"); + return Err(TaskError::ExecutionFailed(format!( + "Failed to resume: {}", + e + ))); + } + } + + // Update task state to Running + { + let mut tasks = self.tasks.write().await; + if let Some(task) = tasks.get_mut(&task_id) { + task.state = TaskState::Running; + } + } + + // Notify server of state change + let msg = DaemonMessage::task_status_change(task_id, "paused", "running"); + let _ = self.ws_tx.send(msg).await; + + Ok(()) + } + + /// Resume a task (no-op on non-Unix). + #[cfg(not(unix))] + pub async fn resume_task(&self, task_id: Uuid) -> TaskResult<()> { + tracing::warn!(task_id = %task_id, "Resume not supported on this platform"); + Err(TaskError::ExecutionFailed( + "Resume not supported on this platform".to_string(), + )) + } + /// Handle a command from the server. pub async fn handle_command(&self, command: DaemonCommand) -> Result<(), DaemonError> { tracing::info!("Received command from server: {:?}", command); @@ -1206,12 +1378,16 @@ impl TaskManager { ).await?; } DaemonCommand::PauseTask { task_id } => { - tracing::info!(task_id = %task_id, "Pause not supported for subprocess tasks"); - // Subprocesses don't support pause, just log and ignore + tracing::info!(task_id = %task_id, "Pausing task"); + if let Err(e) = self.pause_task(task_id).await { + tracing::warn!(task_id = %task_id, error = %e, "Failed to pause task"); + } } DaemonCommand::ResumeTask { task_id } => { - tracing::info!(task_id = %task_id, "Resume not supported for subprocess tasks"); - // Subprocesses don't support resume, just log and ignore + tracing::info!(task_id = %task_id, "Resuming task"); + if let Err(e) = self.resume_task(task_id).await { + tracing::warn!(task_id = %task_id, error = %e, "Failed to resume task"); + } } DaemonCommand::InterruptTask { task_id, graceful: _ } => { tracing::info!(task_id = %task_id, "Interrupting task"); @@ -1228,6 +1404,18 @@ impl TaskManager { tracing::warn!(task_id = %task_id, "No pending auth flow to receive code"); } } else { + // Check if task is paused - auto-resume before sending message + let task_state = { + let tasks = self.tasks.read().await; + tasks.get(&task_id).map(|t| t.state) + }; + if task_state == Some(TaskState::Paused) { + tracing::info!(task_id = %task_id, "Auto-resuming paused task before sending message"); + if let Err(e) = self.resume_task(task_id).await { + tracing::warn!(task_id = %task_id, error = %e, "Failed to auto-resume task"); + } + } + // Regular message - send to task's stdin tracing::info!(task_id = %task_id, message_len = message.len(), "Sending message to task"); // Send message to the task's stdin via the input channel diff --git a/makima/src/server/handlers/mesh_supervisor.rs b/makima/src/server/handlers/mesh_supervisor.rs index 668ea7b..0bb58ed 100644 --- a/makima/src/server/handlers/mesh_supervisor.rs +++ b/makima/src/server/handlers/mesh_supervisor.rs @@ -1589,9 +1589,40 @@ pub async fn ask_question( ).await; } - // If non_blocking mode or phaseguard is enabled, return immediately with the question_id - // Phaseguard questions persist until answered (no timeout) and are displayed by the frontend - if request.non_blocking || request.phaseguard { + // If non_blocking mode, return immediately + if request.non_blocking { + return ( + StatusCode::OK, + Json(AskQuestionResponse { + question_id, + response: None, + timed_out: false, + }), + ).into_response(); + } + + // If phaseguard is enabled, pause the supervisor task and return + // The task will be auto-resumed when a message is sent to it (e.g., when user answers) + if request.phaseguard { + // Pause the supervisor task + if let Some(daemon_id) = supervisor.daemon_id { + let cmd = DaemonCommand::PauseTask { task_id: supervisor_id }; + if let Err(e) = state.send_daemon_command(daemon_id, cmd).await { + tracing::warn!(supervisor_id = %supervisor_id, error = %e, "Failed to pause supervisor for phaseguard"); + } else { + tracing::info!(supervisor_id = %supervisor_id, "Paused supervisor for phaseguard question"); + } + } + + // Update task status to paused in DB + let update = crate::db::models::UpdateTaskRequest { + status: Some("paused".to_string()), + ..Default::default() + }; + if let Err(e) = repository::update_task_for_owner(pool, supervisor_id, owner_id, update).await { + tracing::warn!(supervisor_id = %supervisor_id, error = %e, "Failed to update task status to paused"); + } + return ( StatusCode::OK, Json(AskQuestionResponse { @@ -1725,7 +1756,7 @@ pub async fn answer_question( }; // Submit the response - let success = state.submit_question_response(question_id, request.response); + let success = state.submit_question_response(question_id, request.response.clone()); if success { tracing::info!( @@ -1733,6 +1764,36 @@ pub async fn answer_question( task_id = %question.task_id, "User answered supervisor question" ); + + // Send the response to the task as a message + // This will auto-resume the task if it was paused (phaseguard) + let pool = state.db_pool.as_ref().unwrap(); + if let Ok(Some(task)) = repository::get_task_for_owner(pool, question.task_id, auth.owner_id).await { + if let Some(daemon_id) = task.daemon_id { + // Format the response message + let response_msg = format!( + "\n[User Response to Question]\nQuestion: {}\nAnswer: {}\n", + question.question, + request.response + ); + let cmd = DaemonCommand::SendMessage { + task_id: question.task_id, + message: response_msg, + }; + if let Err(e) = state.send_daemon_command(daemon_id, cmd).await { + tracing::warn!( + task_id = %question.task_id, + error = %e, + "Failed to send response message to task" + ); + } else { + tracing::info!( + task_id = %question.task_id, + "Sent response message to task (will auto-resume if paused)" + ); + } + } + } } Json(AnswerQuestionResponse { success }).into_response() |
