summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorsoryu <soryu@soryu.co>2026-01-22 00:59:18 +0000
committersoryu <soryu@soryu.co>2026-01-22 00:59:18 +0000
commitb84b3f782d3a3d6bf7ed8040fd72907ca19db8c6 (patch)
tree47307136f44dbd69f38675d2c1a0da08d9628114
parent0a30c9d3a9227660860abcd48ea1e9bd5cc2350c (diff)
downloadsoryu-b84b3f782d3a3d6bf7ed8040fd72907ca19db8c6.tar.gz
soryu-b84b3f782d3a3d6bf7ed8040fd72907ca19db8c6.zip
Add pausing tasks
-rw-r--r--makima/src/daemon/task/manager.rs196
-rw-r--r--makima/src/server/handlers/mesh_supervisor.rs69
2 files changed, 257 insertions, 8 deletions
diff --git a/makima/src/daemon/task/manager.rs b/makima/src/daemon/task/manager.rs
index 555cd2a..5eed0e8 100644
--- a/makima/src/daemon/task/manager.rs
+++ b/makima/src/daemon/task/manager.rs
@@ -1152,6 +1152,178 @@ impl TaskManager {
tracing::warn!("Graceful shutdown not supported on this platform");
}
+ /// Pause a running task by sending SIGSTOP to its process.
+ #[cfg(unix)]
+ pub async fn pause_task(&self, task_id: Uuid) -> TaskResult<()> {
+ use nix::sys::signal::{kill, Signal};
+ use nix::unistd::Pid;
+
+ // Check if task exists and is running
+ let current_state = {
+ let tasks = self.tasks.read().await;
+ tasks.get(&task_id).map(|t| t.state)
+ };
+
+ match current_state {
+ Some(TaskState::Running) => {}
+ Some(TaskState::Paused) => {
+ tracing::debug!(task_id = %task_id, "Task already paused");
+ return Ok(());
+ }
+ Some(state) => {
+ tracing::warn!(task_id = %task_id, state = ?state, "Cannot pause task in state");
+ return Err(TaskError::InvalidStateTransition {
+ from: format!("{:?}", state),
+ to: "Paused".to_string(),
+ });
+ }
+ None => {
+ tracing::warn!(task_id = %task_id, "Task not found");
+ return Err(TaskError::NotFound(task_id));
+ }
+ }
+
+ // Get the process PID
+ let pid = {
+ let pids = self.active_pids.read().await;
+ pids.get(&task_id).copied()
+ };
+
+ let Some(pid) = pid else {
+ tracing::warn!(task_id = %task_id, "No PID found for task");
+ return Err(TaskError::ExecutionFailed(
+ "No active process for task".to_string(),
+ ));
+ };
+
+ // Send SIGSTOP to pause the process
+ match kill(Pid::from_raw(pid as i32), Signal::SIGSTOP) {
+ Ok(()) => {
+ tracing::info!(task_id = %task_id, pid = pid, "Sent SIGSTOP to pause process");
+ }
+ Err(nix::errno::Errno::ESRCH) => {
+ tracing::warn!(task_id = %task_id, pid = pid, "Process not found");
+ return Err(TaskError::ExecutionFailed("Process not found".to_string()));
+ }
+ Err(e) => {
+ tracing::error!(task_id = %task_id, pid = pid, error = %e, "Failed to send SIGSTOP");
+ return Err(TaskError::ExecutionFailed(format!(
+ "Failed to pause: {}",
+ e
+ )));
+ }
+ }
+
+ // Update task state to Paused
+ {
+ let mut tasks = self.tasks.write().await;
+ if let Some(task) = tasks.get_mut(&task_id) {
+ task.state = TaskState::Paused;
+ }
+ }
+
+ // Notify server of state change
+ let msg = DaemonMessage::task_status_change(task_id, "running", "paused");
+ let _ = self.ws_tx.send(msg).await;
+
+ Ok(())
+ }
+
+ /// Pause a task (no-op on non-Unix).
+ #[cfg(not(unix))]
+ pub async fn pause_task(&self, task_id: Uuid) -> TaskResult<()> {
+ tracing::warn!(task_id = %task_id, "Pause not supported on this platform");
+ Err(TaskError::ExecutionFailed(
+ "Pause not supported on this platform".to_string(),
+ ))
+ }
+
+ /// Resume a paused task by sending SIGCONT to its process.
+ #[cfg(unix)]
+ pub async fn resume_task(&self, task_id: Uuid) -> TaskResult<()> {
+ use nix::sys::signal::{kill, Signal};
+ use nix::unistd::Pid;
+
+ // Check if task exists and is paused
+ let current_state = {
+ let tasks = self.tasks.read().await;
+ tasks.get(&task_id).map(|t| t.state)
+ };
+
+ match current_state {
+ Some(TaskState::Paused) => {}
+ Some(TaskState::Running) => {
+ tracing::debug!(task_id = %task_id, "Task already running");
+ return Ok(());
+ }
+ Some(state) => {
+ tracing::warn!(task_id = %task_id, state = ?state, "Cannot resume task in state");
+ return Err(TaskError::InvalidStateTransition {
+ from: format!("{:?}", state),
+ to: "Running".to_string(),
+ });
+ }
+ None => {
+ tracing::warn!(task_id = %task_id, "Task not found");
+ return Err(TaskError::NotFound(task_id));
+ }
+ }
+
+ // Get the process PID
+ let pid = {
+ let pids = self.active_pids.read().await;
+ pids.get(&task_id).copied()
+ };
+
+ let Some(pid) = pid else {
+ tracing::warn!(task_id = %task_id, "No PID found for task");
+ return Err(TaskError::ExecutionFailed(
+ "No active process for task".to_string(),
+ ));
+ };
+
+ // Send SIGCONT to resume the process
+ match kill(Pid::from_raw(pid as i32), Signal::SIGCONT) {
+ Ok(()) => {
+ tracing::info!(task_id = %task_id, pid = pid, "Sent SIGCONT to resume process");
+ }
+ Err(nix::errno::Errno::ESRCH) => {
+ tracing::warn!(task_id = %task_id, pid = pid, "Process not found");
+ return Err(TaskError::ExecutionFailed("Process not found".to_string()));
+ }
+ Err(e) => {
+ tracing::error!(task_id = %task_id, pid = pid, error = %e, "Failed to send SIGCONT");
+ return Err(TaskError::ExecutionFailed(format!(
+ "Failed to resume: {}",
+ e
+ )));
+ }
+ }
+
+ // Update task state to Running
+ {
+ let mut tasks = self.tasks.write().await;
+ if let Some(task) = tasks.get_mut(&task_id) {
+ task.state = TaskState::Running;
+ }
+ }
+
+ // Notify server of state change
+ let msg = DaemonMessage::task_status_change(task_id, "paused", "running");
+ let _ = self.ws_tx.send(msg).await;
+
+ Ok(())
+ }
+
+ /// Resume a task (no-op on non-Unix).
+ #[cfg(not(unix))]
+ pub async fn resume_task(&self, task_id: Uuid) -> TaskResult<()> {
+ tracing::warn!(task_id = %task_id, "Resume not supported on this platform");
+ Err(TaskError::ExecutionFailed(
+ "Resume not supported on this platform".to_string(),
+ ))
+ }
+
/// Handle a command from the server.
pub async fn handle_command(&self, command: DaemonCommand) -> Result<(), DaemonError> {
tracing::info!("Received command from server: {:?}", command);
@@ -1206,12 +1378,16 @@ impl TaskManager {
).await?;
}
DaemonCommand::PauseTask { task_id } => {
- tracing::info!(task_id = %task_id, "Pause not supported for subprocess tasks");
- // Subprocesses don't support pause, just log and ignore
+ tracing::info!(task_id = %task_id, "Pausing task");
+ if let Err(e) = self.pause_task(task_id).await {
+ tracing::warn!(task_id = %task_id, error = %e, "Failed to pause task");
+ }
}
DaemonCommand::ResumeTask { task_id } => {
- tracing::info!(task_id = %task_id, "Resume not supported for subprocess tasks");
- // Subprocesses don't support resume, just log and ignore
+ tracing::info!(task_id = %task_id, "Resuming task");
+ if let Err(e) = self.resume_task(task_id).await {
+ tracing::warn!(task_id = %task_id, error = %e, "Failed to resume task");
+ }
}
DaemonCommand::InterruptTask { task_id, graceful: _ } => {
tracing::info!(task_id = %task_id, "Interrupting task");
@@ -1228,6 +1404,18 @@ impl TaskManager {
tracing::warn!(task_id = %task_id, "No pending auth flow to receive code");
}
} else {
+ // Check if task is paused - auto-resume before sending message
+ let task_state = {
+ let tasks = self.tasks.read().await;
+ tasks.get(&task_id).map(|t| t.state)
+ };
+ if task_state == Some(TaskState::Paused) {
+ tracing::info!(task_id = %task_id, "Auto-resuming paused task before sending message");
+ if let Err(e) = self.resume_task(task_id).await {
+ tracing::warn!(task_id = %task_id, error = %e, "Failed to auto-resume task");
+ }
+ }
+
// Regular message - send to task's stdin
tracing::info!(task_id = %task_id, message_len = message.len(), "Sending message to task");
// Send message to the task's stdin via the input channel
diff --git a/makima/src/server/handlers/mesh_supervisor.rs b/makima/src/server/handlers/mesh_supervisor.rs
index 668ea7b..0bb58ed 100644
--- a/makima/src/server/handlers/mesh_supervisor.rs
+++ b/makima/src/server/handlers/mesh_supervisor.rs
@@ -1589,9 +1589,40 @@ pub async fn ask_question(
).await;
}
- // If non_blocking mode or phaseguard is enabled, return immediately with the question_id
- // Phaseguard questions persist until answered (no timeout) and are displayed by the frontend
- if request.non_blocking || request.phaseguard {
+ // If non_blocking mode, return immediately
+ if request.non_blocking {
+ return (
+ StatusCode::OK,
+ Json(AskQuestionResponse {
+ question_id,
+ response: None,
+ timed_out: false,
+ }),
+ ).into_response();
+ }
+
+ // If phaseguard is enabled, pause the supervisor task and return
+ // The task will be auto-resumed when a message is sent to it (e.g., when user answers)
+ if request.phaseguard {
+ // Pause the supervisor task
+ if let Some(daemon_id) = supervisor.daemon_id {
+ let cmd = DaemonCommand::PauseTask { task_id: supervisor_id };
+ if let Err(e) = state.send_daemon_command(daemon_id, cmd).await {
+ tracing::warn!(supervisor_id = %supervisor_id, error = %e, "Failed to pause supervisor for phaseguard");
+ } else {
+ tracing::info!(supervisor_id = %supervisor_id, "Paused supervisor for phaseguard question");
+ }
+ }
+
+ // Update task status to paused in DB
+ let update = crate::db::models::UpdateTaskRequest {
+ status: Some("paused".to_string()),
+ ..Default::default()
+ };
+ if let Err(e) = repository::update_task_for_owner(pool, supervisor_id, owner_id, update).await {
+ tracing::warn!(supervisor_id = %supervisor_id, error = %e, "Failed to update task status to paused");
+ }
+
return (
StatusCode::OK,
Json(AskQuestionResponse {
@@ -1725,7 +1756,7 @@ pub async fn answer_question(
};
// Submit the response
- let success = state.submit_question_response(question_id, request.response);
+ let success = state.submit_question_response(question_id, request.response.clone());
if success {
tracing::info!(
@@ -1733,6 +1764,36 @@ pub async fn answer_question(
task_id = %question.task_id,
"User answered supervisor question"
);
+
+ // Send the response to the task as a message
+ // This will auto-resume the task if it was paused (phaseguard)
+ let pool = state.db_pool.as_ref().unwrap();
+ if let Ok(Some(task)) = repository::get_task_for_owner(pool, question.task_id, auth.owner_id).await {
+ if let Some(daemon_id) = task.daemon_id {
+ // Format the response message
+ let response_msg = format!(
+ "\n[User Response to Question]\nQuestion: {}\nAnswer: {}\n",
+ question.question,
+ request.response
+ );
+ let cmd = DaemonCommand::SendMessage {
+ task_id: question.task_id,
+ message: response_msg,
+ };
+ if let Err(e) = state.send_daemon_command(daemon_id, cmd).await {
+ tracing::warn!(
+ task_id = %question.task_id,
+ error = %e,
+ "Failed to send response message to task"
+ );
+ } else {
+ tracing::info!(
+ task_id = %question.task_id,
+ "Sent response message to task (will auto-resume if paused)"
+ );
+ }
+ }
+ }
}
Json(AnswerQuestionResponse { success }).into_response()