diff options
Diffstat (limited to 'makima/src/server/handlers/mesh.rs')
| -rw-r--r-- | makima/src/server/handlers/mesh.rs | 687 |
1 files changed, 686 insertions, 1 deletions
diff --git a/makima/src/server/handlers/mesh.rs b/makima/src/server/handlers/mesh.rs index 2d90a04..3da6fd5 100644 --- a/makima/src/server/handlers/mesh.rs +++ b/makima/src/server/handlers/mesh.rs @@ -52,7 +52,7 @@ pub fn extract_auth(state: &SharedState, headers: &HeaderMap) -> AuthSource { if let Some(task_id) = state.validate_tool_key(key_str) { return AuthSource::ToolKey(task_id); } - tracing::warn!("Invalid tool key provided"); + tracing::warn!("Invalid tool key provided: {}", key_str); } } @@ -1774,3 +1774,688 @@ pub async fn check_target_exists( })) .into_response() } + +// ============================================================================= +// Task Reassignment (Daemon Failover) +// ============================================================================= + +/// Request to reassign a task to a new daemon after daemon disconnect. +#[derive(Debug, serde::Deserialize, utoipa::ToSchema)] +#[serde(rename_all = "camelCase")] +pub struct ReassignTaskRequest { + /// Target daemon ID to reassign to. If not provided, will select any available daemon. + pub target_daemon_id: Option<Uuid>, + /// Whether to include conversation context from previous run. + #[serde(default = "default_include_context")] + pub include_context: bool, +} + +fn default_include_context() -> bool { + true +} + +/// Response from task reassignment. +#[derive(Debug, serde::Serialize, utoipa::ToSchema)] +#[serde(rename_all = "camelCase")] +pub struct ReassignTaskResponse { + /// The new task that was created. + pub task: Task, + /// The new daemon ID. + pub daemon_id: Uuid, + /// The ID of the old task that was deleted. + pub old_task_id: Uuid, + /// Whether conversation context was included. + pub context_included: bool, + /// Number of context entries from previous conversation. + pub context_entries: usize, +} + +/// Build a conversation context summary from task output entries. +/// Returns a formatted string that can be prepended to the task plan. +fn build_conversation_context(entries: &[TaskOutputEntry]) -> String { + if entries.is_empty() { + return String::new(); + } + + let mut context = String::from("\n\n=== PREVIOUS CONVERSATION CONTEXT ===\n"); + context.push_str("The daemon running this task disconnected. Here is what happened so far:\n\n"); + + for entry in entries.iter() { + match entry.message_type.as_str() { + "assistant" => { + context.push_str("Assistant: "); + // Truncate long messages + let content = if entry.content.len() > 500 { + format!("{}... [truncated]", &entry.content[..500]) + } else { + entry.content.clone() + }; + context.push_str(&content); + context.push_str("\n\n"); + } + "tool_use" => { + if let Some(ref tool_name) = entry.tool_name { + context.push_str(&format!("[Used tool: {}]\n", tool_name)); + } + } + "tool_result" => { + // Summarize tool results briefly + if entry.content.len() > 200 { + context.push_str(&format!("[Tool result: {}... truncated]\n", &entry.content[..200])); + } else if !entry.content.is_empty() { + context.push_str(&format!("[Tool result: {}]\n", entry.content)); + } + } + "user" => { + context.push_str("User: "); + context.push_str(&entry.content); + context.push_str("\n\n"); + } + _ => {} + } + } + + context.push_str("=== END PREVIOUS CONTEXT ===\n\n"); + context.push_str("Please continue from where the conversation left off. Do not repeat work that was already done.\n\n"); + + context +} + +/// Reassign a task to a new daemon after the original daemon disconnected. +/// +/// This endpoint is used for daemon failover - when a daemon restarts or disconnects, +/// the task can be reassigned to a new daemon with the conversation context preserved. +#[utoipa::path( + post, + path = "/api/v1/mesh/tasks/{id}/reassign", + params( + ("id" = Uuid, Path, description = "Task ID") + ), + request_body = ReassignTaskRequest, + responses( + (status = 200, description = "Task reassigned successfully", body = ReassignTaskResponse), + (status = 400, description = "Task cannot be reassigned (not in failed/interrupted state)", body = ApiError), + (status = 401, description = "Unauthorized", body = ApiError), + (status = 404, description = "Task not found", body = ApiError), + (status = 503, description = "No daemon available", body = ApiError), + (status = 500, description = "Internal server error", body = ApiError), + ), + security( + ("bearer_auth" = []), + ("api_key" = []) + ), + tag = "Mesh" +)] +pub async fn reassign_task( + State(state): State<SharedState>, + Authenticated(auth): Authenticated, + Path(id): Path<Uuid>, + Json(body): Json<ReassignTaskRequest>, +) -> impl IntoResponse { + let Some(ref pool) = state.db_pool else { + return ( + StatusCode::SERVICE_UNAVAILABLE, + Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")), + ) + .into_response(); + }; + + // Get the task + let task = match repository::get_task_for_owner(pool, id, auth.owner_id).await { + Ok(Some(t)) => t, + Ok(None) => { + return ( + StatusCode::NOT_FOUND, + Json(ApiError::new("NOT_FOUND", "Task not found")), + ) + .into_response(); + } + Err(e) => { + tracing::error!("Failed to get task: {}", e); + return ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(ApiError::new("DB_ERROR", e.to_string())), + ) + .into_response(); + } + }; + + // Check if task is in a state that can be reassigned + // Allow reassignment for: failed, interrupted, pending, or tasks whose daemon disconnected + // Helper closure to check if a daemon is connected by its UUID + let is_daemon_connected = |daemon_id: Uuid| { + state.daemon_connections.iter().any(|d| d.value().id == daemon_id) + }; + + let can_reassign = matches!( + task.status.as_str(), + "failed" | "interrupted" | "pending" | "starting" + ) || { + // Also allow if daemon is not connected + if let Some(daemon_id) = task.daemon_id { + !is_daemon_connected(daemon_id) + } else { + true + } + }; + + if !can_reassign && task.status == "running" { + // Running task - check if its daemon is still connected + if let Some(daemon_id) = task.daemon_id { + if is_daemon_connected(daemon_id) { + return ( + StatusCode::BAD_REQUEST, + Json(ApiError::new( + "TASK_RUNNING", + "Task is running on a connected daemon. Stop it first to reassign.", + )), + ) + .into_response(); + } + } + } + + // Find a target daemon + let target_daemon_id = if let Some(requested_daemon_id) = body.target_daemon_id { + // Verify the requested daemon is connected and belongs to the owner + let daemon = state.daemon_connections.iter().find(|d| d.value().id == requested_daemon_id); + if let Some(daemon) = daemon { + if daemon.owner_id != auth.owner_id { + return ( + StatusCode::BAD_REQUEST, + Json(ApiError::new("INVALID_DAEMON", "Daemon does not belong to your account")), + ) + .into_response(); + } + requested_daemon_id + } else { + return ( + StatusCode::SERVICE_UNAVAILABLE, + Json(ApiError::new("DAEMON_NOT_CONNECTED", "Requested daemon is not connected")), + ) + .into_response(); + } + } else { + // Find any available daemon for this owner + match state.daemon_connections.iter().find(|d| d.value().owner_id == auth.owner_id) { + Some(entry) => entry.value().id, + None => { + return ( + StatusCode::SERVICE_UNAVAILABLE, + Json(ApiError::new("NO_DAEMON", "No daemon connected for your account")), + ) + .into_response(); + } + } + }; + + // Build conversation context if requested + let (context_str, context_entries) = if body.include_context { + match repository::get_task_output(pool, id, Some(500)).await { + Ok(events) => { + let entries: Vec<TaskOutputEntry> = events + .into_iter() + .filter_map(TaskOutputEntry::from_task_event) + .collect(); + let context = build_conversation_context(&entries); + let count = entries.len(); + (context, count) + } + Err(e) => { + tracing::warn!("Failed to get task output for context: {}", e); + (String::new(), 0) + } + } + } else { + (String::new(), 0) + }; + + // Build updated plan with context prepended + let updated_plan = if !context_str.is_empty() { + format!("{}{}", context_str, task.plan) + } else { + task.plan.clone() + }; + + // Create a NEW task with the conversation context + let create_req = CreateTaskRequest { + contract_id: task.contract_id.unwrap_or(Uuid::nil()), + name: format!("{} (resumed)", task.name), + description: task.description.clone(), + plan: updated_plan.clone(), + parent_task_id: task.parent_task_id, + is_supervisor: task.is_supervisor, + priority: task.priority, + repository_url: task.repository_url.clone(), + base_branch: task.base_branch.clone(), + target_branch: task.target_branch.clone(), + merge_mode: task.merge_mode.clone(), + target_repo_path: task.target_repo_path.clone(), + completion_action: task.completion_action.clone(), + continue_from_task_id: Some(id), // Continue from the old task's worktree if possible + copy_files: None, + checkpoint_sha: task.last_checkpoint_sha.clone(), + }; + + let new_task = match repository::create_task_for_owner(pool, auth.owner_id, create_req).await { + Ok(t) => t, + Err(e) => { + tracing::error!("Failed to create new task for reassignment: {}", e); + return ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(ApiError::new("DB_ERROR", e.to_string())), + ) + .into_response(); + } + }; + + // Update new task to starting and assign daemon + let start_update = UpdateTaskRequest { + status: Some("starting".to_string()), + daemon_id: Some(target_daemon_id), + version: Some(new_task.version), + ..Default::default() + }; + + let final_task = match repository::update_task_for_owner(pool, new_task.id, auth.owner_id, start_update).await { + Ok(Some(t)) => t, + Ok(None) => { + return ( + StatusCode::NOT_FOUND, + Json(ApiError::new("NOT_FOUND", "New task not found")), + ) + .into_response(); + } + Err(e) => { + tracing::error!("Failed to update new task daemon assignment: {}", e); + return ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(ApiError::new("DB_ERROR", e.to_string())), + ) + .into_response(); + } + }; + + // Send SpawnTask command to daemon for the new task + let command = DaemonCommand::SpawnTask { + task_id: new_task.id, + task_name: final_task.name.clone(), + plan: updated_plan, + repo_url: task.repository_url.clone(), + base_branch: task.base_branch.clone(), + target_branch: task.target_branch.clone(), + parent_task_id: task.parent_task_id, + depth: task.depth, + is_orchestrator: false, // New task starts fresh + target_repo_path: task.target_repo_path.clone(), + completion_action: task.completion_action.clone(), + continue_from_task_id: Some(id), // Continue from old task's worktree + copy_files: None, + contract_id: task.contract_id, + is_supervisor: task.is_supervisor, + }; + + tracing::info!( + old_task_id = %id, + new_task_id = %new_task.id, + new_daemon_id = %target_daemon_id, + context_entries = context_entries, + "Reassigning task: creating new task and deleting old one" + ); + + if let Err(e) = state.send_daemon_command(target_daemon_id, command).await { + tracing::error!("Failed to send SpawnTask command for reassignment: {}", e); + // Rollback: delete the new task we created + let _ = repository::delete_task_for_owner(pool, new_task.id, auth.owner_id).await; + return ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(ApiError::new("DAEMON_ERROR", e)), + ) + .into_response(); + } + + // Delete the old task now that the new one is spawned + let old_task_id = id; + if let Err(e) = repository::delete_task_for_owner(pool, old_task_id, auth.owner_id).await { + tracing::warn!("Failed to delete old task {}: {}", old_task_id, e); + // Don't fail the request, the new task is already running + } + + // Notify the contract's supervisor about the reassignment (if applicable) + if let Some(contract_id) = task.contract_id { + if let Ok(Some(contract)) = repository::get_contract_for_owner(pool, contract_id, auth.owner_id).await { + if let Some(supervisor_task_id) = contract.supervisor_task_id { + // Don't notify if we're reassigning the supervisor itself + if supervisor_task_id != old_task_id { + // Find the supervisor's daemon and send a message + if let Ok(Some(supervisor_task)) = repository::get_task_for_owner(pool, supervisor_task_id, auth.owner_id).await { + if supervisor_task.status == "running" { + if let Some(supervisor_daemon_id) = supervisor_task.daemon_id { + // Find the daemon by its UUID + if let Some(daemon_entry) = state.daemon_connections.iter().find(|d| d.value().id == supervisor_daemon_id) { + let notification_msg = format!( + "\n\n[SYSTEM NOTIFICATION] Task '{}' (ID: {}) was reassigned due to daemon disconnect. \ + A new task '{}' (ID: {}) has been created to continue the work. \ + The new task has {} context entries from the previous conversation.\n\n", + task.name, + old_task_id, + final_task.name, + new_task.id, + context_entries + ); + + let notify_cmd = DaemonCommand::SendMessage { + task_id: supervisor_task_id, + message: notification_msg, + }; + + if let Err(e) = state.send_daemon_command(daemon_entry.value().id, notify_cmd).await { + tracing::warn!( + supervisor_id = %supervisor_task_id, + error = %e, + "Failed to notify supervisor about task reassignment" + ); + } else { + tracing::info!( + supervisor_id = %supervisor_task_id, + old_task_id = %old_task_id, + new_task_id = %new_task.id, + "Notified supervisor about task reassignment" + ); + } + } + } + } + } + } + } + } + } + + // Broadcast task update for the new task + state.broadcast_task_update(TaskUpdateNotification { + task_id: new_task.id, + owner_id: Some(auth.owner_id), + version: final_task.version, + status: "starting".to_string(), + updated_fields: vec!["status".to_string(), "daemon_id".to_string()], + updated_by: "reassignment".to_string(), + }); + + Json(ReassignTaskResponse { + task: final_task, + daemon_id: target_daemon_id, + old_task_id, + context_included: !context_str.is_empty(), + context_entries, + }) + .into_response() +} + +// ============================================================================= +// Task Continue (Restart with Context) +// ============================================================================= + +/// Request to continue a task after daemon disconnect (restart in-place with context). +#[derive(Debug, serde::Deserialize, utoipa::ToSchema)] +#[serde(rename_all = "camelCase")] +pub struct ContinueTaskRequest { + /// Target daemon ID to continue on. If not provided, will select any available daemon. + pub target_daemon_id: Option<Uuid>, +} + +/// Response from continuing a task. +#[derive(Debug, serde::Serialize, utoipa::ToSchema)] +#[serde(rename_all = "camelCase")] +pub struct ContinueTaskResponse { + /// The continued task (same ID, updated plan with context). + pub task: Task, + /// The daemon ID running the task. + pub daemon_id: Uuid, + /// Number of context entries from previous conversation. + pub context_entries: usize, +} + +/// Continue a task after daemon disconnect by restarting it with conversation context. +/// +/// Unlike reassign, this keeps the same task ID and just restarts it with the +/// previous conversation context prepended to the plan. Useful for supervisors. +#[utoipa::path( + post, + path = "/api/v1/mesh/tasks/{id}/continue", + params( + ("id" = Uuid, Path, description = "Task ID") + ), + request_body = ContinueTaskRequest, + responses( + (status = 200, description = "Task continued successfully", body = ContinueTaskResponse), + (status = 400, description = "Task cannot be continued", body = ApiError), + (status = 401, description = "Unauthorized", body = ApiError), + (status = 404, description = "Task not found", body = ApiError), + (status = 503, description = "No daemon available", body = ApiError), + (status = 500, description = "Internal server error", body = ApiError), + ), + security( + ("bearer_auth" = []), + ("api_key" = []) + ), + tag = "Mesh" +)] +pub async fn continue_task( + State(state): State<SharedState>, + Authenticated(auth): Authenticated, + Path(id): Path<Uuid>, + Json(body): Json<ContinueTaskRequest>, +) -> impl IntoResponse { + let Some(ref pool) = state.db_pool else { + return ( + StatusCode::SERVICE_UNAVAILABLE, + Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")), + ) + .into_response(); + }; + + // Get the task + let task = match repository::get_task_for_owner(pool, id, auth.owner_id).await { + Ok(Some(t)) => t, + Ok(None) => { + return ( + StatusCode::NOT_FOUND, + Json(ApiError::new("NOT_FOUND", "Task not found")), + ) + .into_response(); + } + Err(e) => { + tracing::error!("Failed to get task: {}", e); + return ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(ApiError::new("DB_ERROR", e.to_string())), + ) + .into_response(); + } + }; + + // Helper closure to check if a daemon is connected by its UUID + let is_daemon_connected = |daemon_id: Uuid| { + state.daemon_connections.iter().any(|d| d.value().id == daemon_id) + }; + + // Check if task can be continued (not currently running on a connected daemon) + let can_continue = matches!( + task.status.as_str(), + "failed" | "interrupted" | "pending" | "starting" | "completed" + ) || { + if let Some(daemon_id) = task.daemon_id { + !is_daemon_connected(daemon_id) + } else { + true + } + }; + + if !can_continue && task.status == "running" { + if let Some(daemon_id) = task.daemon_id { + if is_daemon_connected(daemon_id) { + return ( + StatusCode::BAD_REQUEST, + Json(ApiError::new( + "TASK_RUNNING", + "Task is running on a connected daemon. Stop it first to continue.", + )), + ) + .into_response(); + } + } + } + + // Find a target daemon + let target_daemon_id = if let Some(requested_daemon_id) = body.target_daemon_id { + let daemon = state.daemon_connections.iter().find(|d| d.value().id == requested_daemon_id); + if let Some(daemon) = daemon { + if daemon.owner_id != auth.owner_id { + return ( + StatusCode::BAD_REQUEST, + Json(ApiError::new("INVALID_DAEMON", "Daemon does not belong to your account")), + ) + .into_response(); + } + requested_daemon_id + } else { + return ( + StatusCode::SERVICE_UNAVAILABLE, + Json(ApiError::new("DAEMON_NOT_CONNECTED", "Requested daemon is not connected")), + ) + .into_response(); + } + } else { + match state.daemon_connections.iter().find(|d| d.value().owner_id == auth.owner_id) { + Some(entry) => entry.value().id, + None => { + return ( + StatusCode::SERVICE_UNAVAILABLE, + Json(ApiError::new("NO_DAEMON", "No daemon connected for your account")), + ) + .into_response(); + } + } + }; + + // Build conversation context from task output + let (context_str, context_entries) = match repository::get_task_output(pool, id, Some(500)).await { + Ok(events) => { + let entries: Vec<TaskOutputEntry> = events + .into_iter() + .filter_map(TaskOutputEntry::from_task_event) + .collect(); + let context = build_conversation_context(&entries); + let count = entries.len(); + (context, count) + } + Err(e) => { + tracing::warn!("Failed to get task output for context: {}", e); + (String::new(), 0) + } + }; + + // Build updated plan with context prepended + let updated_plan = if !context_str.is_empty() { + format!("{}{}", context_str, task.plan) + } else { + task.plan.clone() + }; + + // Update task in database: reset status, update plan with context, assign daemon + let update_req = UpdateTaskRequest { + status: Some("starting".to_string()), + plan: Some(updated_plan.clone()), + daemon_id: Some(target_daemon_id), + error_message: None, + ..Default::default() + }; + + let updated_task = match repository::update_task_for_owner(pool, id, auth.owner_id, update_req).await { + Ok(Some(t)) => t, + Ok(None) => { + return ( + StatusCode::NOT_FOUND, + Json(ApiError::new("NOT_FOUND", "Task not found")), + ) + .into_response(); + } + Err(e) => { + tracing::error!("Failed to update task for continuation: {}", e); + return ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(ApiError::new("DB_ERROR", e.to_string())), + ) + .into_response(); + } + }; + + // Check if this is an orchestrator + let subtask_count = match repository::list_subtasks_for_owner(pool, id, auth.owner_id).await { + Ok(subtasks) => subtasks.len(), + Err(_) => 0, + }; + let is_orchestrator = task.depth == 0 && subtask_count > 0; + + // Send SpawnTask command to daemon + let command = DaemonCommand::SpawnTask { + task_id: id, + task_name: task.name.clone(), + plan: updated_plan, + repo_url: task.repository_url.clone(), + base_branch: task.base_branch.clone(), + target_branch: task.target_branch.clone(), + parent_task_id: task.parent_task_id, + depth: task.depth, + is_orchestrator, + target_repo_path: task.target_repo_path.clone(), + completion_action: task.completion_action.clone(), + continue_from_task_id: task.continue_from_task_id, + copy_files: task.copy_files.as_ref().and_then(|v| serde_json::from_value(v.clone()).ok()), + contract_id: task.contract_id, + is_supervisor: task.is_supervisor, + }; + + tracing::info!( + task_id = %id, + daemon_id = %target_daemon_id, + context_entries = context_entries, + is_supervisor = task.is_supervisor, + "Continuing task with conversation context" + ); + + if let Err(e) = state.send_daemon_command(target_daemon_id, command).await { + tracing::error!("Failed to send SpawnTask command for continuation: {}", e); + // Rollback + let rollback_req = UpdateTaskRequest { + status: Some("failed".to_string()), + clear_daemon_id: true, + error_message: Some(format!("Continuation failed: {}", e)), + ..Default::default() + }; + let _ = repository::update_task_for_owner(pool, id, auth.owner_id, rollback_req).await; + return ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(ApiError::new("DAEMON_ERROR", e)), + ) + .into_response(); + } + + // Broadcast task update + state.broadcast_task_update(TaskUpdateNotification { + task_id: id, + owner_id: Some(auth.owner_id), + version: updated_task.version, + status: "starting".to_string(), + updated_fields: vec!["status".to_string(), "daemon_id".to_string(), "plan".to_string()], + updated_by: "continuation".to_string(), + }); + + Json(ContinueTaskResponse { + task: updated_task, + daemon_id: target_daemon_id, + context_entries, + }) + .into_response() +} |
