summaryrefslogtreecommitdiff
path: root/makima/src/server/handlers/mesh.rs
diff options
context:
space:
mode:
authorsoryu <soryu@soryu.co>2026-01-15 03:26:28 +0000
committersoryu <soryu@soryu.co>2026-01-15 03:26:28 +0000
commiteeafe072bc6bb81459f7d087b48fc921afe9cc11 (patch)
tree7f835993edd732f8ff66d756391dedffe3d44e90 /makima/src/server/handlers/mesh.rs
parentc61a2b9b9c988f5460f85980d4ddf285f1a730b5 (diff)
downloadsoryu-eeafe072bc6bb81459f7d087b48fc921afe9cc11.tar.gz
soryu-eeafe072bc6bb81459f7d087b48fc921afe9cc11.zip
Automatically derive repo URL and add notifications for input
Diffstat (limited to 'makima/src/server/handlers/mesh.rs')
-rw-r--r--makima/src/server/handlers/mesh.rs687
1 files changed, 686 insertions, 1 deletions
diff --git a/makima/src/server/handlers/mesh.rs b/makima/src/server/handlers/mesh.rs
index 2d90a04..3da6fd5 100644
--- a/makima/src/server/handlers/mesh.rs
+++ b/makima/src/server/handlers/mesh.rs
@@ -52,7 +52,7 @@ pub fn extract_auth(state: &SharedState, headers: &HeaderMap) -> AuthSource {
if let Some(task_id) = state.validate_tool_key(key_str) {
return AuthSource::ToolKey(task_id);
}
- tracing::warn!("Invalid tool key provided");
+ tracing::warn!("Invalid tool key provided: {}", key_str);
}
}
@@ -1774,3 +1774,688 @@ pub async fn check_target_exists(
}))
.into_response()
}
+
+// =============================================================================
+// Task Reassignment (Daemon Failover)
+// =============================================================================
+
+/// Request to reassign a task to a new daemon after daemon disconnect.
+#[derive(Debug, serde::Deserialize, utoipa::ToSchema)]
+#[serde(rename_all = "camelCase")]
+pub struct ReassignTaskRequest {
+ /// Target daemon ID to reassign to. If not provided, will select any available daemon.
+ pub target_daemon_id: Option<Uuid>,
+ /// Whether to include conversation context from previous run.
+ #[serde(default = "default_include_context")]
+ pub include_context: bool,
+}
+
+fn default_include_context() -> bool {
+ true
+}
+
+/// Response from task reassignment.
+#[derive(Debug, serde::Serialize, utoipa::ToSchema)]
+#[serde(rename_all = "camelCase")]
+pub struct ReassignTaskResponse {
+ /// The new task that was created.
+ pub task: Task,
+ /// The new daemon ID.
+ pub daemon_id: Uuid,
+ /// The ID of the old task that was deleted.
+ pub old_task_id: Uuid,
+ /// Whether conversation context was included.
+ pub context_included: bool,
+ /// Number of context entries from previous conversation.
+ pub context_entries: usize,
+}
+
+/// Build a conversation context summary from task output entries.
+/// Returns a formatted string that can be prepended to the task plan.
+fn build_conversation_context(entries: &[TaskOutputEntry]) -> String {
+ if entries.is_empty() {
+ return String::new();
+ }
+
+ let mut context = String::from("\n\n=== PREVIOUS CONVERSATION CONTEXT ===\n");
+ context.push_str("The daemon running this task disconnected. Here is what happened so far:\n\n");
+
+ for entry in entries.iter() {
+ match entry.message_type.as_str() {
+ "assistant" => {
+ context.push_str("Assistant: ");
+ // Truncate long messages
+ let content = if entry.content.len() > 500 {
+ format!("{}... [truncated]", &entry.content[..500])
+ } else {
+ entry.content.clone()
+ };
+ context.push_str(&content);
+ context.push_str("\n\n");
+ }
+ "tool_use" => {
+ if let Some(ref tool_name) = entry.tool_name {
+ context.push_str(&format!("[Used tool: {}]\n", tool_name));
+ }
+ }
+ "tool_result" => {
+ // Summarize tool results briefly
+ if entry.content.len() > 200 {
+ context.push_str(&format!("[Tool result: {}... truncated]\n", &entry.content[..200]));
+ } else if !entry.content.is_empty() {
+ context.push_str(&format!("[Tool result: {}]\n", entry.content));
+ }
+ }
+ "user" => {
+ context.push_str("User: ");
+ context.push_str(&entry.content);
+ context.push_str("\n\n");
+ }
+ _ => {}
+ }
+ }
+
+ context.push_str("=== END PREVIOUS CONTEXT ===\n\n");
+ context.push_str("Please continue from where the conversation left off. Do not repeat work that was already done.\n\n");
+
+ context
+}
+
+/// Reassign a task to a new daemon after the original daemon disconnected.
+///
+/// This endpoint is used for daemon failover - when a daemon restarts or disconnects,
+/// the task can be reassigned to a new daemon with the conversation context preserved.
+#[utoipa::path(
+ post,
+ path = "/api/v1/mesh/tasks/{id}/reassign",
+ params(
+ ("id" = Uuid, Path, description = "Task ID")
+ ),
+ request_body = ReassignTaskRequest,
+ responses(
+ (status = 200, description = "Task reassigned successfully", body = ReassignTaskResponse),
+ (status = 400, description = "Task cannot be reassigned (not in failed/interrupted state)", body = ApiError),
+ (status = 401, description = "Unauthorized", body = ApiError),
+ (status = 404, description = "Task not found", body = ApiError),
+ (status = 503, description = "No daemon available", body = ApiError),
+ (status = 500, description = "Internal server error", body = ApiError),
+ ),
+ security(
+ ("bearer_auth" = []),
+ ("api_key" = [])
+ ),
+ tag = "Mesh"
+)]
+pub async fn reassign_task(
+ State(state): State<SharedState>,
+ Authenticated(auth): Authenticated,
+ Path(id): Path<Uuid>,
+ Json(body): Json<ReassignTaskRequest>,
+) -> impl IntoResponse {
+ let Some(ref pool) = state.db_pool else {
+ return (
+ StatusCode::SERVICE_UNAVAILABLE,
+ Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")),
+ )
+ .into_response();
+ };
+
+ // Get the task
+ let task = match repository::get_task_for_owner(pool, id, auth.owner_id).await {
+ Ok(Some(t)) => t,
+ Ok(None) => {
+ return (
+ StatusCode::NOT_FOUND,
+ Json(ApiError::new("NOT_FOUND", "Task not found")),
+ )
+ .into_response();
+ }
+ Err(e) => {
+ tracing::error!("Failed to get task: {}", e);
+ return (
+ StatusCode::INTERNAL_SERVER_ERROR,
+ Json(ApiError::new("DB_ERROR", e.to_string())),
+ )
+ .into_response();
+ }
+ };
+
+ // Check if task is in a state that can be reassigned
+ // Allow reassignment for: failed, interrupted, pending, or tasks whose daemon disconnected
+ // Helper closure to check if a daemon is connected by its UUID
+ let is_daemon_connected = |daemon_id: Uuid| {
+ state.daemon_connections.iter().any(|d| d.value().id == daemon_id)
+ };
+
+ let can_reassign = matches!(
+ task.status.as_str(),
+ "failed" | "interrupted" | "pending" | "starting"
+ ) || {
+ // Also allow if daemon is not connected
+ if let Some(daemon_id) = task.daemon_id {
+ !is_daemon_connected(daemon_id)
+ } else {
+ true
+ }
+ };
+
+ if !can_reassign && task.status == "running" {
+ // Running task - check if its daemon is still connected
+ if let Some(daemon_id) = task.daemon_id {
+ if is_daemon_connected(daemon_id) {
+ return (
+ StatusCode::BAD_REQUEST,
+ Json(ApiError::new(
+ "TASK_RUNNING",
+ "Task is running on a connected daemon. Stop it first to reassign.",
+ )),
+ )
+ .into_response();
+ }
+ }
+ }
+
+ // Find a target daemon
+ let target_daemon_id = if let Some(requested_daemon_id) = body.target_daemon_id {
+ // Verify the requested daemon is connected and belongs to the owner
+ let daemon = state.daemon_connections.iter().find(|d| d.value().id == requested_daemon_id);
+ if let Some(daemon) = daemon {
+ if daemon.owner_id != auth.owner_id {
+ return (
+ StatusCode::BAD_REQUEST,
+ Json(ApiError::new("INVALID_DAEMON", "Daemon does not belong to your account")),
+ )
+ .into_response();
+ }
+ requested_daemon_id
+ } else {
+ return (
+ StatusCode::SERVICE_UNAVAILABLE,
+ Json(ApiError::new("DAEMON_NOT_CONNECTED", "Requested daemon is not connected")),
+ )
+ .into_response();
+ }
+ } else {
+ // Find any available daemon for this owner
+ match state.daemon_connections.iter().find(|d| d.value().owner_id == auth.owner_id) {
+ Some(entry) => entry.value().id,
+ None => {
+ return (
+ StatusCode::SERVICE_UNAVAILABLE,
+ Json(ApiError::new("NO_DAEMON", "No daemon connected for your account")),
+ )
+ .into_response();
+ }
+ }
+ };
+
+ // Build conversation context if requested
+ let (context_str, context_entries) = if body.include_context {
+ match repository::get_task_output(pool, id, Some(500)).await {
+ Ok(events) => {
+ let entries: Vec<TaskOutputEntry> = events
+ .into_iter()
+ .filter_map(TaskOutputEntry::from_task_event)
+ .collect();
+ let context = build_conversation_context(&entries);
+ let count = entries.len();
+ (context, count)
+ }
+ Err(e) => {
+ tracing::warn!("Failed to get task output for context: {}", e);
+ (String::new(), 0)
+ }
+ }
+ } else {
+ (String::new(), 0)
+ };
+
+ // Build updated plan with context prepended
+ let updated_plan = if !context_str.is_empty() {
+ format!("{}{}", context_str, task.plan)
+ } else {
+ task.plan.clone()
+ };
+
+ // Create a NEW task with the conversation context
+ let create_req = CreateTaskRequest {
+ contract_id: task.contract_id.unwrap_or(Uuid::nil()),
+ name: format!("{} (resumed)", task.name),
+ description: task.description.clone(),
+ plan: updated_plan.clone(),
+ parent_task_id: task.parent_task_id,
+ is_supervisor: task.is_supervisor,
+ priority: task.priority,
+ repository_url: task.repository_url.clone(),
+ base_branch: task.base_branch.clone(),
+ target_branch: task.target_branch.clone(),
+ merge_mode: task.merge_mode.clone(),
+ target_repo_path: task.target_repo_path.clone(),
+ completion_action: task.completion_action.clone(),
+ continue_from_task_id: Some(id), // Continue from the old task's worktree if possible
+ copy_files: None,
+ checkpoint_sha: task.last_checkpoint_sha.clone(),
+ };
+
+ let new_task = match repository::create_task_for_owner(pool, auth.owner_id, create_req).await {
+ Ok(t) => t,
+ Err(e) => {
+ tracing::error!("Failed to create new task for reassignment: {}", e);
+ return (
+ StatusCode::INTERNAL_SERVER_ERROR,
+ Json(ApiError::new("DB_ERROR", e.to_string())),
+ )
+ .into_response();
+ }
+ };
+
+ // Update new task to starting and assign daemon
+ let start_update = UpdateTaskRequest {
+ status: Some("starting".to_string()),
+ daemon_id: Some(target_daemon_id),
+ version: Some(new_task.version),
+ ..Default::default()
+ };
+
+ let final_task = match repository::update_task_for_owner(pool, new_task.id, auth.owner_id, start_update).await {
+ Ok(Some(t)) => t,
+ Ok(None) => {
+ return (
+ StatusCode::NOT_FOUND,
+ Json(ApiError::new("NOT_FOUND", "New task not found")),
+ )
+ .into_response();
+ }
+ Err(e) => {
+ tracing::error!("Failed to update new task daemon assignment: {}", e);
+ return (
+ StatusCode::INTERNAL_SERVER_ERROR,
+ Json(ApiError::new("DB_ERROR", e.to_string())),
+ )
+ .into_response();
+ }
+ };
+
+ // Send SpawnTask command to daemon for the new task
+ let command = DaemonCommand::SpawnTask {
+ task_id: new_task.id,
+ task_name: final_task.name.clone(),
+ plan: updated_plan,
+ repo_url: task.repository_url.clone(),
+ base_branch: task.base_branch.clone(),
+ target_branch: task.target_branch.clone(),
+ parent_task_id: task.parent_task_id,
+ depth: task.depth,
+ is_orchestrator: false, // New task starts fresh
+ target_repo_path: task.target_repo_path.clone(),
+ completion_action: task.completion_action.clone(),
+ continue_from_task_id: Some(id), // Continue from old task's worktree
+ copy_files: None,
+ contract_id: task.contract_id,
+ is_supervisor: task.is_supervisor,
+ };
+
+ tracing::info!(
+ old_task_id = %id,
+ new_task_id = %new_task.id,
+ new_daemon_id = %target_daemon_id,
+ context_entries = context_entries,
+ "Reassigning task: creating new task and deleting old one"
+ );
+
+ if let Err(e) = state.send_daemon_command(target_daemon_id, command).await {
+ tracing::error!("Failed to send SpawnTask command for reassignment: {}", e);
+ // Rollback: delete the new task we created
+ let _ = repository::delete_task_for_owner(pool, new_task.id, auth.owner_id).await;
+ return (
+ StatusCode::INTERNAL_SERVER_ERROR,
+ Json(ApiError::new("DAEMON_ERROR", e)),
+ )
+ .into_response();
+ }
+
+ // Delete the old task now that the new one is spawned
+ let old_task_id = id;
+ if let Err(e) = repository::delete_task_for_owner(pool, old_task_id, auth.owner_id).await {
+ tracing::warn!("Failed to delete old task {}: {}", old_task_id, e);
+ // Don't fail the request, the new task is already running
+ }
+
+ // Notify the contract's supervisor about the reassignment (if applicable)
+ if let Some(contract_id) = task.contract_id {
+ if let Ok(Some(contract)) = repository::get_contract_for_owner(pool, contract_id, auth.owner_id).await {
+ if let Some(supervisor_task_id) = contract.supervisor_task_id {
+ // Don't notify if we're reassigning the supervisor itself
+ if supervisor_task_id != old_task_id {
+ // Find the supervisor's daemon and send a message
+ if let Ok(Some(supervisor_task)) = repository::get_task_for_owner(pool, supervisor_task_id, auth.owner_id).await {
+ if supervisor_task.status == "running" {
+ if let Some(supervisor_daemon_id) = supervisor_task.daemon_id {
+ // Find the daemon by its UUID
+ if let Some(daemon_entry) = state.daemon_connections.iter().find(|d| d.value().id == supervisor_daemon_id) {
+ let notification_msg = format!(
+ "\n\n[SYSTEM NOTIFICATION] Task '{}' (ID: {}) was reassigned due to daemon disconnect. \
+ A new task '{}' (ID: {}) has been created to continue the work. \
+ The new task has {} context entries from the previous conversation.\n\n",
+ task.name,
+ old_task_id,
+ final_task.name,
+ new_task.id,
+ context_entries
+ );
+
+ let notify_cmd = DaemonCommand::SendMessage {
+ task_id: supervisor_task_id,
+ message: notification_msg,
+ };
+
+ if let Err(e) = state.send_daemon_command(daemon_entry.value().id, notify_cmd).await {
+ tracing::warn!(
+ supervisor_id = %supervisor_task_id,
+ error = %e,
+ "Failed to notify supervisor about task reassignment"
+ );
+ } else {
+ tracing::info!(
+ supervisor_id = %supervisor_task_id,
+ old_task_id = %old_task_id,
+ new_task_id = %new_task.id,
+ "Notified supervisor about task reassignment"
+ );
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+
+ // Broadcast task update for the new task
+ state.broadcast_task_update(TaskUpdateNotification {
+ task_id: new_task.id,
+ owner_id: Some(auth.owner_id),
+ version: final_task.version,
+ status: "starting".to_string(),
+ updated_fields: vec!["status".to_string(), "daemon_id".to_string()],
+ updated_by: "reassignment".to_string(),
+ });
+
+ Json(ReassignTaskResponse {
+ task: final_task,
+ daemon_id: target_daemon_id,
+ old_task_id,
+ context_included: !context_str.is_empty(),
+ context_entries,
+ })
+ .into_response()
+}
+
+// =============================================================================
+// Task Continue (Restart with Context)
+// =============================================================================
+
+/// Request to continue a task after daemon disconnect (restart in-place with context).
+#[derive(Debug, serde::Deserialize, utoipa::ToSchema)]
+#[serde(rename_all = "camelCase")]
+pub struct ContinueTaskRequest {
+ /// Target daemon ID to continue on. If not provided, will select any available daemon.
+ pub target_daemon_id: Option<Uuid>,
+}
+
+/// Response from continuing a task.
+#[derive(Debug, serde::Serialize, utoipa::ToSchema)]
+#[serde(rename_all = "camelCase")]
+pub struct ContinueTaskResponse {
+ /// The continued task (same ID, updated plan with context).
+ pub task: Task,
+ /// The daemon ID running the task.
+ pub daemon_id: Uuid,
+ /// Number of context entries from previous conversation.
+ pub context_entries: usize,
+}
+
+/// Continue a task after daemon disconnect by restarting it with conversation context.
+///
+/// Unlike reassign, this keeps the same task ID and just restarts it with the
+/// previous conversation context prepended to the plan. Useful for supervisors.
+#[utoipa::path(
+ post,
+ path = "/api/v1/mesh/tasks/{id}/continue",
+ params(
+ ("id" = Uuid, Path, description = "Task ID")
+ ),
+ request_body = ContinueTaskRequest,
+ responses(
+ (status = 200, description = "Task continued successfully", body = ContinueTaskResponse),
+ (status = 400, description = "Task cannot be continued", body = ApiError),
+ (status = 401, description = "Unauthorized", body = ApiError),
+ (status = 404, description = "Task not found", body = ApiError),
+ (status = 503, description = "No daemon available", body = ApiError),
+ (status = 500, description = "Internal server error", body = ApiError),
+ ),
+ security(
+ ("bearer_auth" = []),
+ ("api_key" = [])
+ ),
+ tag = "Mesh"
+)]
+pub async fn continue_task(
+ State(state): State<SharedState>,
+ Authenticated(auth): Authenticated,
+ Path(id): Path<Uuid>,
+ Json(body): Json<ContinueTaskRequest>,
+) -> impl IntoResponse {
+ let Some(ref pool) = state.db_pool else {
+ return (
+ StatusCode::SERVICE_UNAVAILABLE,
+ Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")),
+ )
+ .into_response();
+ };
+
+ // Get the task
+ let task = match repository::get_task_for_owner(pool, id, auth.owner_id).await {
+ Ok(Some(t)) => t,
+ Ok(None) => {
+ return (
+ StatusCode::NOT_FOUND,
+ Json(ApiError::new("NOT_FOUND", "Task not found")),
+ )
+ .into_response();
+ }
+ Err(e) => {
+ tracing::error!("Failed to get task: {}", e);
+ return (
+ StatusCode::INTERNAL_SERVER_ERROR,
+ Json(ApiError::new("DB_ERROR", e.to_string())),
+ )
+ .into_response();
+ }
+ };
+
+ // Helper closure to check if a daemon is connected by its UUID
+ let is_daemon_connected = |daemon_id: Uuid| {
+ state.daemon_connections.iter().any(|d| d.value().id == daemon_id)
+ };
+
+ // Check if task can be continued (not currently running on a connected daemon)
+ let can_continue = matches!(
+ task.status.as_str(),
+ "failed" | "interrupted" | "pending" | "starting" | "completed"
+ ) || {
+ if let Some(daemon_id) = task.daemon_id {
+ !is_daemon_connected(daemon_id)
+ } else {
+ true
+ }
+ };
+
+ if !can_continue && task.status == "running" {
+ if let Some(daemon_id) = task.daemon_id {
+ if is_daemon_connected(daemon_id) {
+ return (
+ StatusCode::BAD_REQUEST,
+ Json(ApiError::new(
+ "TASK_RUNNING",
+ "Task is running on a connected daemon. Stop it first to continue.",
+ )),
+ )
+ .into_response();
+ }
+ }
+ }
+
+ // Find a target daemon
+ let target_daemon_id = if let Some(requested_daemon_id) = body.target_daemon_id {
+ let daemon = state.daemon_connections.iter().find(|d| d.value().id == requested_daemon_id);
+ if let Some(daemon) = daemon {
+ if daemon.owner_id != auth.owner_id {
+ return (
+ StatusCode::BAD_REQUEST,
+ Json(ApiError::new("INVALID_DAEMON", "Daemon does not belong to your account")),
+ )
+ .into_response();
+ }
+ requested_daemon_id
+ } else {
+ return (
+ StatusCode::SERVICE_UNAVAILABLE,
+ Json(ApiError::new("DAEMON_NOT_CONNECTED", "Requested daemon is not connected")),
+ )
+ .into_response();
+ }
+ } else {
+ match state.daemon_connections.iter().find(|d| d.value().owner_id == auth.owner_id) {
+ Some(entry) => entry.value().id,
+ None => {
+ return (
+ StatusCode::SERVICE_UNAVAILABLE,
+ Json(ApiError::new("NO_DAEMON", "No daemon connected for your account")),
+ )
+ .into_response();
+ }
+ }
+ };
+
+ // Build conversation context from task output
+ let (context_str, context_entries) = match repository::get_task_output(pool, id, Some(500)).await {
+ Ok(events) => {
+ let entries: Vec<TaskOutputEntry> = events
+ .into_iter()
+ .filter_map(TaskOutputEntry::from_task_event)
+ .collect();
+ let context = build_conversation_context(&entries);
+ let count = entries.len();
+ (context, count)
+ }
+ Err(e) => {
+ tracing::warn!("Failed to get task output for context: {}", e);
+ (String::new(), 0)
+ }
+ };
+
+ // Build updated plan with context prepended
+ let updated_plan = if !context_str.is_empty() {
+ format!("{}{}", context_str, task.plan)
+ } else {
+ task.plan.clone()
+ };
+
+ // Update task in database: reset status, update plan with context, assign daemon
+ let update_req = UpdateTaskRequest {
+ status: Some("starting".to_string()),
+ plan: Some(updated_plan.clone()),
+ daemon_id: Some(target_daemon_id),
+ error_message: None,
+ ..Default::default()
+ };
+
+ let updated_task = match repository::update_task_for_owner(pool, id, auth.owner_id, update_req).await {
+ Ok(Some(t)) => t,
+ Ok(None) => {
+ return (
+ StatusCode::NOT_FOUND,
+ Json(ApiError::new("NOT_FOUND", "Task not found")),
+ )
+ .into_response();
+ }
+ Err(e) => {
+ tracing::error!("Failed to update task for continuation: {}", e);
+ return (
+ StatusCode::INTERNAL_SERVER_ERROR,
+ Json(ApiError::new("DB_ERROR", e.to_string())),
+ )
+ .into_response();
+ }
+ };
+
+ // Check if this is an orchestrator
+ let subtask_count = match repository::list_subtasks_for_owner(pool, id, auth.owner_id).await {
+ Ok(subtasks) => subtasks.len(),
+ Err(_) => 0,
+ };
+ let is_orchestrator = task.depth == 0 && subtask_count > 0;
+
+ // Send SpawnTask command to daemon
+ let command = DaemonCommand::SpawnTask {
+ task_id: id,
+ task_name: task.name.clone(),
+ plan: updated_plan,
+ repo_url: task.repository_url.clone(),
+ base_branch: task.base_branch.clone(),
+ target_branch: task.target_branch.clone(),
+ parent_task_id: task.parent_task_id,
+ depth: task.depth,
+ is_orchestrator,
+ target_repo_path: task.target_repo_path.clone(),
+ completion_action: task.completion_action.clone(),
+ continue_from_task_id: task.continue_from_task_id,
+ copy_files: task.copy_files.as_ref().and_then(|v| serde_json::from_value(v.clone()).ok()),
+ contract_id: task.contract_id,
+ is_supervisor: task.is_supervisor,
+ };
+
+ tracing::info!(
+ task_id = %id,
+ daemon_id = %target_daemon_id,
+ context_entries = context_entries,
+ is_supervisor = task.is_supervisor,
+ "Continuing task with conversation context"
+ );
+
+ if let Err(e) = state.send_daemon_command(target_daemon_id, command).await {
+ tracing::error!("Failed to send SpawnTask command for continuation: {}", e);
+ // Rollback
+ let rollback_req = UpdateTaskRequest {
+ status: Some("failed".to_string()),
+ clear_daemon_id: true,
+ error_message: Some(format!("Continuation failed: {}", e)),
+ ..Default::default()
+ };
+ let _ = repository::update_task_for_owner(pool, id, auth.owner_id, rollback_req).await;
+ return (
+ StatusCode::INTERNAL_SERVER_ERROR,
+ Json(ApiError::new("DAEMON_ERROR", e)),
+ )
+ .into_response();
+ }
+
+ // Broadcast task update
+ state.broadcast_task_update(TaskUpdateNotification {
+ task_id: id,
+ owner_id: Some(auth.owner_id),
+ version: updated_task.version,
+ status: "starting".to_string(),
+ updated_fields: vec!["status".to_string(), "daemon_id".to_string(), "plan".to_string()],
+ updated_by: "continuation".to_string(),
+ });
+
+ Json(ContinueTaskResponse {
+ task: updated_task,
+ daemon_id: target_daemon_id,
+ context_entries,
+ })
+ .into_response()
+}