summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorsoryu <soryu@soryu.co>2026-01-19 02:26:09 +0000
committersoryu <soryu@soryu.co>2026-01-19 02:26:09 +0000
commit786510379bed060db2b3742b7dfca671552d2c34 (patch)
tree432d0b575a64ef0fb2cb10a86cd916b5fbc16909
parentb64eddc8c2f250cdcbacae18cce107bf4c86f9f4 (diff)
downloadsoryu-786510379bed060db2b3742b7dfca671552d2c34.tar.gz
soryu-786510379bed060db2b3742b7dfca671552d2c34.zip
Make sure tasks can continue
-rw-r--r--makima/src/daemon/task/manager.rs108
-rw-r--r--makima/src/daemon/ws/protocol.rs9
-rw-r--r--makima/src/server/handlers/contract_chat.rs2
-rw-r--r--makima/src/server/handlers/mesh.rs10
-rw-r--r--makima/src/server/handlers/mesh_chat.rs2
-rw-r--r--makima/src/server/handlers/mesh_supervisor.rs82
-rw-r--r--makima/src/server/state.rs6
7 files changed, 202 insertions, 17 deletions
diff --git a/makima/src/daemon/task/manager.rs b/makima/src/daemon/task/manager.rs
index 029d026..80b7039 100644
--- a/makima/src/daemon/task/manager.rs
+++ b/makima/src/daemon/task/manager.rs
@@ -1171,6 +1171,8 @@ impl TaskManager {
contract_id,
is_supervisor,
autonomous_loop,
+ resume_session,
+ conversation_history,
} => {
tracing::info!(
task_id = %task_id,
@@ -1183,6 +1185,7 @@ impl TaskManager {
is_orchestrator = is_orchestrator,
is_supervisor = is_supervisor,
autonomous_loop = autonomous_loop,
+ resume_session = resume_session,
target_repo_path = ?target_repo_path,
completion_action = ?completion_action,
continue_from_task_id = ?continue_from_task_id,
@@ -1195,7 +1198,8 @@ impl TaskManager {
task_id, task_name, plan, repo_url, base_branch, target_branch,
parent_task_id, depth, is_orchestrator, is_supervisor,
target_repo_path, completion_action, continue_from_task_id,
- copy_files, contract_id, autonomous_loop
+ copy_files, contract_id, autonomous_loop, resume_session,
+ conversation_history,
).await?;
}
DaemonCommand::PauseTask { task_id } => {
@@ -1275,6 +1279,8 @@ impl TaskManager {
None, // copy_files
contract_id,
false, // autonomous_loop - supervisors don't use this
+ false, // resume_session - respawning from scratch
+ None, // conversation_history - not needed for fresh respawn
).await {
tracing::error!(
task_id = %task_id,
@@ -1490,6 +1496,8 @@ impl TaskManager {
copy_files: Option<Vec<String>>,
contract_id: Option<Uuid>,
autonomous_loop: bool,
+ resume_session: bool,
+ conversation_history: Option<serde_json::Value>,
) -> TaskResult<()> {
tracing::info!(task_id = %task_id, is_orchestrator = is_orchestrator, is_supervisor = is_supervisor, depth = depth, "=== SPAWN_TASK START ===");
@@ -1566,7 +1574,8 @@ impl TaskManager {
if let Err(e) = inner.run_task(
task_id, task_name, plan, repo_url, base_branch, target_branch,
is_orchestrator, is_supervisor, target_repo_path, completion_action,
- continue_from_task_id, copy_files, contract_id, autonomous_loop
+ continue_from_task_id, copy_files, contract_id, autonomous_loop, resume_session,
+ conversation_history,
).await {
tracing::error!(task_id = %task_id, error = %e, "Task execution failed");
inner.mark_failed(task_id, &e.to_string()).await;
@@ -2909,11 +2918,39 @@ impl TaskManagerInner {
copy_files: Option<Vec<String>>,
contract_id: Option<Uuid>,
autonomous_loop: bool,
+ resume_session: bool,
+ conversation_history: Option<serde_json::Value>,
) -> Result<(), DaemonError> {
- tracing::info!(task_id = %task_id, is_orchestrator = is_orchestrator, is_supervisor = is_supervisor, "=== RUN_TASK START ===");
+ tracing::info!(task_id = %task_id, is_orchestrator = is_orchestrator, is_supervisor = is_supervisor, resume_session = resume_session, "=== RUN_TASK START ===");
+
+ // If resuming session, try to find existing worktree first
+ let existing_worktree = if resume_session {
+ match self.find_worktree_for_task(task_id).await {
+ Ok(path) => {
+ tracing::info!(task_id = %task_id, path = %path.display(), "Found existing worktree for session resume");
+ Some(path)
+ }
+ Err(e) => {
+ tracing::warn!(task_id = %task_id, error = %e, "No existing worktree found for resume, will create new");
+ None
+ }
+ }
+ } else {
+ None
+ };
// Determine working directory
- let working_dir = if let Some(ref source) = repo_source {
+ let has_existing_worktree = existing_worktree.is_some();
+ let working_dir = if let Some(existing) = existing_worktree {
+ // Reuse existing worktree for session resume
+ let msg = DaemonMessage::task_output(
+ task_id,
+ format!("Resuming session in existing worktree: {}\n", existing.display()),
+ false,
+ );
+ let _ = self.ws_tx.send(msg).await;
+ existing
+ } else if let Some(ref source) = repo_source {
if is_new_repo_request(source) {
// Explicit new repo request: new:// or new://project-name
tracing::info!(
@@ -3388,14 +3425,61 @@ impl TaskManagerInner {
// Clone extra_env for use in autonomous loop iterations
let extra_env_for_loop = extra_env.clone();
- tracing::debug!(task_id = %task_id, has_system_prompt = system_prompt.is_some(), "Calling process_manager.spawn()...");
- let mut process = self.process_manager
- .spawn_with_system_prompt(&working_dir, &full_plan, extra_env, system_prompt.as_deref())
- .await
- .map_err(|e| {
- tracing::error!(task_id = %task_id, error = %e, "Failed to spawn Claude process");
- DaemonError::Task(TaskError::SetupFailed(e.to_string()))
- })?;
+ tracing::debug!(task_id = %task_id, has_system_prompt = system_prompt.is_some(), resume_session = resume_session, "Calling process_manager.spawn()...");
+ let mut process = if resume_session {
+ // Use --continue flag to resume from previous session
+ // Build continuation prompt based on whether worktree exists
+ let continuation_prompt = if has_existing_worktree {
+ // Worktree exists: Claude's session state should work
+ format!(
+ "Resuming previous session. Continue from where you left off.\n\n{}",
+ full_plan
+ )
+ } else if let Some(ref history) = conversation_history {
+ // Worktree missing: inject conversation history as context
+ let history_str = serde_json::to_string_pretty(history).unwrap_or_default();
+ format!(
+ "Resuming previous session. Here is the conversation history from the previous session:\n\n\
+ <previous_conversation>\n{}\n</previous_conversation>\n\n\
+ Continue from where you left off with this task:\n\n{}",
+ history_str,
+ full_plan
+ )
+ } else {
+ // No history available: just the plan
+ format!("Resuming with plan:\n\n{}", full_plan)
+ };
+
+ let resume_msg = if has_existing_worktree {
+ "Using --continue to resume previous conversation...\n"
+ } else if conversation_history.is_some() {
+ "Worktree not found. Resuming with injected conversation history...\n"
+ } else {
+ "Resuming without conversation history (worktree not found)...\n"
+ };
+ let msg = DaemonMessage::task_output(
+ task_id,
+ resume_msg.to_string(),
+ false,
+ );
+ let _ = self.ws_tx.send(msg).await;
+
+ self.process_manager
+ .spawn_continue(&working_dir, &continuation_prompt, extra_env, system_prompt.as_deref())
+ .await
+ .map_err(|e| {
+ tracing::error!(task_id = %task_id, error = %e, "Failed to spawn Claude process with --continue");
+ DaemonError::Task(TaskError::SetupFailed(e.to_string()))
+ })?
+ } else {
+ self.process_manager
+ .spawn_with_system_prompt(&working_dir, &full_plan, extra_env, system_prompt.as_deref())
+ .await
+ .map_err(|e| {
+ tracing::error!(task_id = %task_id, error = %e, "Failed to spawn Claude process");
+ DaemonError::Task(TaskError::SetupFailed(e.to_string()))
+ })?
+ };
// Register the process PID for graceful shutdown tracking
if let Some(pid) = process.id() {
diff --git a/makima/src/daemon/ws/protocol.rs b/makima/src/daemon/ws/protocol.rs
index 5c9004b..d0bcc19 100644
--- a/makima/src/daemon/ws/protocol.rs
+++ b/makima/src/daemon/ws/protocol.rs
@@ -378,6 +378,15 @@ pub enum DaemonCommand {
/// without a COMPLETION_GATE indicating ready: true.
#[serde(rename = "autonomousLoop", default)]
autonomous_loop: bool,
+ /// Whether to resume from a previous session using --continue flag.
+ /// When enabled, the daemon will reuse the existing worktree and call
+ /// Claude with --continue to maintain conversation history.
+ #[serde(rename = "resumeSession", default)]
+ resume_session: bool,
+ /// Conversation history for fallback when worktree doesn't exist.
+ /// Used to inject previous conversation context into the prompt.
+ #[serde(rename = "conversationHistory", default)]
+ conversation_history: Option<serde_json::Value>,
},
/// Pause a running task.
diff --git a/makima/src/server/handlers/contract_chat.rs b/makima/src/server/handlers/contract_chat.rs
index 29ec620..7e7b476 100644
--- a/makima/src/server/handlers/contract_chat.rs
+++ b/makima/src/server/handlers/contract_chat.rs
@@ -1589,6 +1589,8 @@ async fn handle_contract_request(
copy_files: task.copy_files.as_ref().and_then(|v| serde_json::from_value(v.clone()).ok()),
contract_id: task.contract_id,
is_supervisor: task.is_supervisor,
+ resume_session: false,
+ conversation_history: None,
};
if let Err(e) = command_sender.send(command).await {
diff --git a/makima/src/server/handlers/mesh.rs b/makima/src/server/handlers/mesh.rs
index e9a421c..ad3ec79 100644
--- a/makima/src/server/handlers/mesh.rs
+++ b/makima/src/server/handlers/mesh.rs
@@ -719,6 +719,8 @@ pub async fn start_task(
copy_files: task.copy_files.as_ref().and_then(|v| serde_json::from_value(v.clone()).ok()),
contract_id: task.contract_id,
is_supervisor: task.is_supervisor,
+ resume_session: false,
+ conversation_history: None,
};
tracing::info!(
@@ -766,6 +768,8 @@ pub async fn start_task(
copy_files: task.copy_files.as_ref().and_then(|v| serde_json::from_value(v.clone()).ok()),
contract_id: task.contract_id,
is_supervisor: task.is_supervisor,
+ resume_session: false,
+ conversation_history: None,
};
if state.send_daemon_command(alt_daemon_id, alt_command).await.is_ok() {
@@ -1165,6 +1169,8 @@ pub async fn send_message(
copy_files: updated_task.copy_files.as_ref().and_then(|v| serde_json::from_value(v.clone()).ok()),
contract_id: updated_task.contract_id,
is_supervisor: updated_task.is_supervisor,
+ resume_session: false,
+ conversation_history: None,
};
if state.send_daemon_command(new_daemon_id, spawn_cmd).await.is_ok() {
@@ -2299,6 +2305,8 @@ pub async fn reassign_task(
copy_files: None,
contract_id: task.contract_id,
is_supervisor: task.is_supervisor,
+ resume_session: false,
+ conversation_history: None,
};
tracing::info!(
@@ -2621,6 +2629,8 @@ pub async fn continue_task(
copy_files: task.copy_files.as_ref().and_then(|v| serde_json::from_value(v.clone()).ok()),
contract_id: task.contract_id,
is_supervisor: task.is_supervisor,
+ resume_session: false,
+ conversation_history: None,
};
tracing::info!(
diff --git a/makima/src/server/handlers/mesh_chat.rs b/makima/src/server/handlers/mesh_chat.rs
index f6b861a..c468446 100644
--- a/makima/src/server/handlers/mesh_chat.rs
+++ b/makima/src/server/handlers/mesh_chat.rs
@@ -1146,6 +1146,8 @@ async fn handle_mesh_request(
copy_files: task.copy_files.as_ref().and_then(|v| serde_json::from_value(v.clone()).ok()),
contract_id: task.contract_id,
is_supervisor: task.is_supervisor,
+ resume_session: false,
+ conversation_history: None,
};
match state.send_daemon_command(target_daemon_id, command).await {
diff --git a/makima/src/server/handlers/mesh_supervisor.rs b/makima/src/server/handlers/mesh_supervisor.rs
index a8af3fb..df8f77c 100644
--- a/makima/src/server/handlers/mesh_supervisor.rs
+++ b/makima/src/server/handlers/mesh_supervisor.rs
@@ -351,6 +351,8 @@ async fn try_start_pending_task(
copy_files: updated_task.copy_files.as_ref().and_then(|v| serde_json::from_value(v.clone()).ok()),
contract_id: updated_task.contract_id,
is_supervisor: false,
+ resume_session: false,
+ conversation_history: None,
};
if let Err(e) = state.send_daemon_command(daemon.id, cmd).await {
@@ -646,6 +648,8 @@ pub async fn spawn_task(
copy_files: updated_task.copy_files.as_ref().and_then(|v| serde_json::from_value(v.clone()).ok()),
contract_id: updated_task.contract_id,
is_supervisor: false,
+ resume_session: false,
+ conversation_history: None,
};
if let Err(e) = state.send_daemon_command(daemon.id, cmd).await {
@@ -1845,22 +1849,90 @@ pub async fn resume_supervisor(
.map(|arr| arr.len() as i32)
.unwrap_or(0);
+ // Find a connected daemon for this owner
+ let target_daemon_id = match state.find_alternative_daemon(auth_info.owner_id, &[]) {
+ Some(id) => id,
+ None => {
+ return (
+ StatusCode::SERVICE_UNAVAILABLE,
+ Json(ApiError::new(
+ "NO_DAEMON",
+ "No daemons connected for your account. Cannot resume supervisor.",
+ )),
+ )
+ .into_response();
+ }
+ };
+
+ // Track response values (may be updated by resume modes)
+ let mut response_daemon_id = supervisor_task.daemon_id;
+ let mut response_status = "pending".to_string();
+
// Based on resume mode, handle differently
match req.resume_mode.as_str() {
"continue" => {
- // Mark task for reassignment with existing conversation context
- if let Err(e) = sqlx::query("UPDATE tasks SET status = 'pending' WHERE id = $1")
+ // Update task status to starting and assign daemon
+ if let Err(e) = sqlx::query("UPDATE tasks SET status = 'starting', daemon_id = $1 WHERE id = $2")
+ .bind(target_daemon_id)
.bind(supervisor_state.task_id)
.execute(pool)
.await
{
- tracing::error!("Failed to update task status: {}", e);
+ tracing::error!("Failed to update task for resume: {}", e);
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(ApiError::new("DB_ERROR", e.to_string())),
)
.into_response();
}
+
+ // Send SpawnTask with resume_session=true to use Claude's --continue
+ // Include conversation_history as fallback if worktree doesn't exist on target daemon
+ let command = DaemonCommand::SpawnTask {
+ task_id: supervisor_state.task_id,
+ task_name: supervisor_task.name.clone(),
+ plan: supervisor_task.plan.clone(),
+ repo_url: supervisor_task.repository_url.clone(),
+ base_branch: supervisor_task.base_branch.clone(),
+ target_branch: supervisor_task.target_branch.clone(),
+ parent_task_id: supervisor_task.parent_task_id,
+ depth: supervisor_task.depth,
+ is_orchestrator: false,
+ target_repo_path: supervisor_task.target_repo_path.clone(),
+ completion_action: supervisor_task.completion_action.clone(),
+ continue_from_task_id: supervisor_task.continue_from_task_id,
+ copy_files: supervisor_task.copy_files.as_ref().and_then(|v| serde_json::from_value(v.clone()).ok()),
+ contract_id: supervisor_task.contract_id,
+ is_supervisor: true,
+ resume_session: true, // Use --continue to preserve conversation
+ conversation_history: Some(supervisor_state.conversation_history.clone()), // Fallback if worktree missing
+ };
+
+ if let Err(e) = state.send_daemon_command(target_daemon_id, command).await {
+ // Rollback status on failure
+ let _ = sqlx::query("UPDATE tasks SET status = 'interrupted', daemon_id = NULL WHERE id = $1")
+ .bind(supervisor_state.task_id)
+ .execute(pool)
+ .await;
+ tracing::error!("Failed to send SpawnTask to daemon: {}", e);
+ return (
+ StatusCode::SERVICE_UNAVAILABLE,
+ Json(ApiError::new("DAEMON_ERROR", format!("Failed to send to daemon: {}", e))),
+ )
+ .into_response();
+ }
+
+ tracing::info!(
+ contract_id = %contract_id,
+ supervisor_task_id = %supervisor_state.task_id,
+ daemon_id = %target_daemon_id,
+ message_count = message_count,
+ "Supervisor resumed with --continue (resume_session=true)"
+ );
+
+ // Update response values for successful resume
+ response_daemon_id = Some(target_daemon_id);
+ response_status = "starting".to_string();
}
"restart_phase" => {
// Clear conversation but keep phase progress
@@ -1925,13 +1997,13 @@ pub async fn resume_supervisor(
Json(ResumeSupervisorResponse {
supervisor_task_id: supervisor_state.task_id,
- daemon_id: supervisor_task.daemon_id,
+ daemon_id: response_daemon_id,
resumed_from: ResumedFromInfo {
phase: contract.phase,
last_activity: supervisor_state.last_activity,
message_count,
},
- status: "pending".to_string(),
+ status: response_status,
})
.into_response()
}
diff --git a/makima/src/server/state.rs b/makima/src/server/state.rs
index 86f38c8..c5736af 100644
--- a/makima/src/server/state.rs
+++ b/makima/src/server/state.rs
@@ -206,6 +206,12 @@ pub enum DaemonCommand {
/// Whether this task is a supervisor (long-running contract orchestrator)
#[serde(rename = "isSupervisor")]
is_supervisor: bool,
+ /// Whether to resume from a previous session using --continue flag
+ #[serde(rename = "resumeSession", default)]
+ resume_session: bool,
+ /// Conversation history for fallback when worktree doesn't exist
+ #[serde(rename = "conversationHistory", default)]
+ conversation_history: Option<serde_json::Value>,
},
/// Pause a running task
PauseTask {