diff options
| author | soryu <soryu@soryu.co> | 2026-01-15 03:26:28 +0000 |
|---|---|---|
| committer | soryu <soryu@soryu.co> | 2026-01-15 03:26:28 +0000 |
| commit | eeafe072bc6bb81459f7d087b48fc921afe9cc11 (patch) | |
| tree | 7f835993edd732f8ff66d756391dedffe3d44e90 /makima/src/server/handlers | |
| parent | c61a2b9b9c988f5460f85980d4ddf285f1a730b5 (diff) | |
| download | soryu-eeafe072bc6bb81459f7d087b48fc921afe9cc11.tar.gz soryu-eeafe072bc6bb81459f7d087b48fc921afe9cc11.zip | |
Automatically derive repo URL and add notifications for input
Diffstat (limited to 'makima/src/server/handlers')
| -rw-r--r-- | makima/src/server/handlers/mesh.rs | 687 | ||||
| -rw-r--r-- | makima/src/server/handlers/mesh_daemon.rs | 42 | ||||
| -rw-r--r-- | makima/src/server/handlers/mesh_supervisor.rs | 321 |
3 files changed, 1035 insertions, 15 deletions
diff --git a/makima/src/server/handlers/mesh.rs b/makima/src/server/handlers/mesh.rs index 2d90a04..3da6fd5 100644 --- a/makima/src/server/handlers/mesh.rs +++ b/makima/src/server/handlers/mesh.rs @@ -52,7 +52,7 @@ pub fn extract_auth(state: &SharedState, headers: &HeaderMap) -> AuthSource { if let Some(task_id) = state.validate_tool_key(key_str) { return AuthSource::ToolKey(task_id); } - tracing::warn!("Invalid tool key provided"); + tracing::warn!("Invalid tool key provided: {}", key_str); } } @@ -1774,3 +1774,688 @@ pub async fn check_target_exists( })) .into_response() } + +// ============================================================================= +// Task Reassignment (Daemon Failover) +// ============================================================================= + +/// Request to reassign a task to a new daemon after daemon disconnect. +#[derive(Debug, serde::Deserialize, utoipa::ToSchema)] +#[serde(rename_all = "camelCase")] +pub struct ReassignTaskRequest { + /// Target daemon ID to reassign to. If not provided, will select any available daemon. + pub target_daemon_id: Option<Uuid>, + /// Whether to include conversation context from previous run. + #[serde(default = "default_include_context")] + pub include_context: bool, +} + +fn default_include_context() -> bool { + true +} + +/// Response from task reassignment. +#[derive(Debug, serde::Serialize, utoipa::ToSchema)] +#[serde(rename_all = "camelCase")] +pub struct ReassignTaskResponse { + /// The new task that was created. + pub task: Task, + /// The new daemon ID. + pub daemon_id: Uuid, + /// The ID of the old task that was deleted. + pub old_task_id: Uuid, + /// Whether conversation context was included. + pub context_included: bool, + /// Number of context entries from previous conversation. + pub context_entries: usize, +} + +/// Build a conversation context summary from task output entries. +/// Returns a formatted string that can be prepended to the task plan. +fn build_conversation_context(entries: &[TaskOutputEntry]) -> String { + if entries.is_empty() { + return String::new(); + } + + let mut context = String::from("\n\n=== PREVIOUS CONVERSATION CONTEXT ===\n"); + context.push_str("The daemon running this task disconnected. Here is what happened so far:\n\n"); + + for entry in entries.iter() { + match entry.message_type.as_str() { + "assistant" => { + context.push_str("Assistant: "); + // Truncate long messages + let content = if entry.content.len() > 500 { + format!("{}... [truncated]", &entry.content[..500]) + } else { + entry.content.clone() + }; + context.push_str(&content); + context.push_str("\n\n"); + } + "tool_use" => { + if let Some(ref tool_name) = entry.tool_name { + context.push_str(&format!("[Used tool: {}]\n", tool_name)); + } + } + "tool_result" => { + // Summarize tool results briefly + if entry.content.len() > 200 { + context.push_str(&format!("[Tool result: {}... truncated]\n", &entry.content[..200])); + } else if !entry.content.is_empty() { + context.push_str(&format!("[Tool result: {}]\n", entry.content)); + } + } + "user" => { + context.push_str("User: "); + context.push_str(&entry.content); + context.push_str("\n\n"); + } + _ => {} + } + } + + context.push_str("=== END PREVIOUS CONTEXT ===\n\n"); + context.push_str("Please continue from where the conversation left off. Do not repeat work that was already done.\n\n"); + + context +} + +/// Reassign a task to a new daemon after the original daemon disconnected. +/// +/// This endpoint is used for daemon failover - when a daemon restarts or disconnects, +/// the task can be reassigned to a new daemon with the conversation context preserved. +#[utoipa::path( + post, + path = "/api/v1/mesh/tasks/{id}/reassign", + params( + ("id" = Uuid, Path, description = "Task ID") + ), + request_body = ReassignTaskRequest, + responses( + (status = 200, description = "Task reassigned successfully", body = ReassignTaskResponse), + (status = 400, description = "Task cannot be reassigned (not in failed/interrupted state)", body = ApiError), + (status = 401, description = "Unauthorized", body = ApiError), + (status = 404, description = "Task not found", body = ApiError), + (status = 503, description = "No daemon available", body = ApiError), + (status = 500, description = "Internal server error", body = ApiError), + ), + security( + ("bearer_auth" = []), + ("api_key" = []) + ), + tag = "Mesh" +)] +pub async fn reassign_task( + State(state): State<SharedState>, + Authenticated(auth): Authenticated, + Path(id): Path<Uuid>, + Json(body): Json<ReassignTaskRequest>, +) -> impl IntoResponse { + let Some(ref pool) = state.db_pool else { + return ( + StatusCode::SERVICE_UNAVAILABLE, + Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")), + ) + .into_response(); + }; + + // Get the task + let task = match repository::get_task_for_owner(pool, id, auth.owner_id).await { + Ok(Some(t)) => t, + Ok(None) => { + return ( + StatusCode::NOT_FOUND, + Json(ApiError::new("NOT_FOUND", "Task not found")), + ) + .into_response(); + } + Err(e) => { + tracing::error!("Failed to get task: {}", e); + return ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(ApiError::new("DB_ERROR", e.to_string())), + ) + .into_response(); + } + }; + + // Check if task is in a state that can be reassigned + // Allow reassignment for: failed, interrupted, pending, or tasks whose daemon disconnected + // Helper closure to check if a daemon is connected by its UUID + let is_daemon_connected = |daemon_id: Uuid| { + state.daemon_connections.iter().any(|d| d.value().id == daemon_id) + }; + + let can_reassign = matches!( + task.status.as_str(), + "failed" | "interrupted" | "pending" | "starting" + ) || { + // Also allow if daemon is not connected + if let Some(daemon_id) = task.daemon_id { + !is_daemon_connected(daemon_id) + } else { + true + } + }; + + if !can_reassign && task.status == "running" { + // Running task - check if its daemon is still connected + if let Some(daemon_id) = task.daemon_id { + if is_daemon_connected(daemon_id) { + return ( + StatusCode::BAD_REQUEST, + Json(ApiError::new( + "TASK_RUNNING", + "Task is running on a connected daemon. Stop it first to reassign.", + )), + ) + .into_response(); + } + } + } + + // Find a target daemon + let target_daemon_id = if let Some(requested_daemon_id) = body.target_daemon_id { + // Verify the requested daemon is connected and belongs to the owner + let daemon = state.daemon_connections.iter().find(|d| d.value().id == requested_daemon_id); + if let Some(daemon) = daemon { + if daemon.owner_id != auth.owner_id { + return ( + StatusCode::BAD_REQUEST, + Json(ApiError::new("INVALID_DAEMON", "Daemon does not belong to your account")), + ) + .into_response(); + } + requested_daemon_id + } else { + return ( + StatusCode::SERVICE_UNAVAILABLE, + Json(ApiError::new("DAEMON_NOT_CONNECTED", "Requested daemon is not connected")), + ) + .into_response(); + } + } else { + // Find any available daemon for this owner + match state.daemon_connections.iter().find(|d| d.value().owner_id == auth.owner_id) { + Some(entry) => entry.value().id, + None => { + return ( + StatusCode::SERVICE_UNAVAILABLE, + Json(ApiError::new("NO_DAEMON", "No daemon connected for your account")), + ) + .into_response(); + } + } + }; + + // Build conversation context if requested + let (context_str, context_entries) = if body.include_context { + match repository::get_task_output(pool, id, Some(500)).await { + Ok(events) => { + let entries: Vec<TaskOutputEntry> = events + .into_iter() + .filter_map(TaskOutputEntry::from_task_event) + .collect(); + let context = build_conversation_context(&entries); + let count = entries.len(); + (context, count) + } + Err(e) => { + tracing::warn!("Failed to get task output for context: {}", e); + (String::new(), 0) + } + } + } else { + (String::new(), 0) + }; + + // Build updated plan with context prepended + let updated_plan = if !context_str.is_empty() { + format!("{}{}", context_str, task.plan) + } else { + task.plan.clone() + }; + + // Create a NEW task with the conversation context + let create_req = CreateTaskRequest { + contract_id: task.contract_id.unwrap_or(Uuid::nil()), + name: format!("{} (resumed)", task.name), + description: task.description.clone(), + plan: updated_plan.clone(), + parent_task_id: task.parent_task_id, + is_supervisor: task.is_supervisor, + priority: task.priority, + repository_url: task.repository_url.clone(), + base_branch: task.base_branch.clone(), + target_branch: task.target_branch.clone(), + merge_mode: task.merge_mode.clone(), + target_repo_path: task.target_repo_path.clone(), + completion_action: task.completion_action.clone(), + continue_from_task_id: Some(id), // Continue from the old task's worktree if possible + copy_files: None, + checkpoint_sha: task.last_checkpoint_sha.clone(), + }; + + let new_task = match repository::create_task_for_owner(pool, auth.owner_id, create_req).await { + Ok(t) => t, + Err(e) => { + tracing::error!("Failed to create new task for reassignment: {}", e); + return ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(ApiError::new("DB_ERROR", e.to_string())), + ) + .into_response(); + } + }; + + // Update new task to starting and assign daemon + let start_update = UpdateTaskRequest { + status: Some("starting".to_string()), + daemon_id: Some(target_daemon_id), + version: Some(new_task.version), + ..Default::default() + }; + + let final_task = match repository::update_task_for_owner(pool, new_task.id, auth.owner_id, start_update).await { + Ok(Some(t)) => t, + Ok(None) => { + return ( + StatusCode::NOT_FOUND, + Json(ApiError::new("NOT_FOUND", "New task not found")), + ) + .into_response(); + } + Err(e) => { + tracing::error!("Failed to update new task daemon assignment: {}", e); + return ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(ApiError::new("DB_ERROR", e.to_string())), + ) + .into_response(); + } + }; + + // Send SpawnTask command to daemon for the new task + let command = DaemonCommand::SpawnTask { + task_id: new_task.id, + task_name: final_task.name.clone(), + plan: updated_plan, + repo_url: task.repository_url.clone(), + base_branch: task.base_branch.clone(), + target_branch: task.target_branch.clone(), + parent_task_id: task.parent_task_id, + depth: task.depth, + is_orchestrator: false, // New task starts fresh + target_repo_path: task.target_repo_path.clone(), + completion_action: task.completion_action.clone(), + continue_from_task_id: Some(id), // Continue from old task's worktree + copy_files: None, + contract_id: task.contract_id, + is_supervisor: task.is_supervisor, + }; + + tracing::info!( + old_task_id = %id, + new_task_id = %new_task.id, + new_daemon_id = %target_daemon_id, + context_entries = context_entries, + "Reassigning task: creating new task and deleting old one" + ); + + if let Err(e) = state.send_daemon_command(target_daemon_id, command).await { + tracing::error!("Failed to send SpawnTask command for reassignment: {}", e); + // Rollback: delete the new task we created + let _ = repository::delete_task_for_owner(pool, new_task.id, auth.owner_id).await; + return ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(ApiError::new("DAEMON_ERROR", e)), + ) + .into_response(); + } + + // Delete the old task now that the new one is spawned + let old_task_id = id; + if let Err(e) = repository::delete_task_for_owner(pool, old_task_id, auth.owner_id).await { + tracing::warn!("Failed to delete old task {}: {}", old_task_id, e); + // Don't fail the request, the new task is already running + } + + // Notify the contract's supervisor about the reassignment (if applicable) + if let Some(contract_id) = task.contract_id { + if let Ok(Some(contract)) = repository::get_contract_for_owner(pool, contract_id, auth.owner_id).await { + if let Some(supervisor_task_id) = contract.supervisor_task_id { + // Don't notify if we're reassigning the supervisor itself + if supervisor_task_id != old_task_id { + // Find the supervisor's daemon and send a message + if let Ok(Some(supervisor_task)) = repository::get_task_for_owner(pool, supervisor_task_id, auth.owner_id).await { + if supervisor_task.status == "running" { + if let Some(supervisor_daemon_id) = supervisor_task.daemon_id { + // Find the daemon by its UUID + if let Some(daemon_entry) = state.daemon_connections.iter().find(|d| d.value().id == supervisor_daemon_id) { + let notification_msg = format!( + "\n\n[SYSTEM NOTIFICATION] Task '{}' (ID: {}) was reassigned due to daemon disconnect. \ + A new task '{}' (ID: {}) has been created to continue the work. \ + The new task has {} context entries from the previous conversation.\n\n", + task.name, + old_task_id, + final_task.name, + new_task.id, + context_entries + ); + + let notify_cmd = DaemonCommand::SendMessage { + task_id: supervisor_task_id, + message: notification_msg, + }; + + if let Err(e) = state.send_daemon_command(daemon_entry.value().id, notify_cmd).await { + tracing::warn!( + supervisor_id = %supervisor_task_id, + error = %e, + "Failed to notify supervisor about task reassignment" + ); + } else { + tracing::info!( + supervisor_id = %supervisor_task_id, + old_task_id = %old_task_id, + new_task_id = %new_task.id, + "Notified supervisor about task reassignment" + ); + } + } + } + } + } + } + } + } + } + + // Broadcast task update for the new task + state.broadcast_task_update(TaskUpdateNotification { + task_id: new_task.id, + owner_id: Some(auth.owner_id), + version: final_task.version, + status: "starting".to_string(), + updated_fields: vec!["status".to_string(), "daemon_id".to_string()], + updated_by: "reassignment".to_string(), + }); + + Json(ReassignTaskResponse { + task: final_task, + daemon_id: target_daemon_id, + old_task_id, + context_included: !context_str.is_empty(), + context_entries, + }) + .into_response() +} + +// ============================================================================= +// Task Continue (Restart with Context) +// ============================================================================= + +/// Request to continue a task after daemon disconnect (restart in-place with context). +#[derive(Debug, serde::Deserialize, utoipa::ToSchema)] +#[serde(rename_all = "camelCase")] +pub struct ContinueTaskRequest { + /// Target daemon ID to continue on. If not provided, will select any available daemon. + pub target_daemon_id: Option<Uuid>, +} + +/// Response from continuing a task. +#[derive(Debug, serde::Serialize, utoipa::ToSchema)] +#[serde(rename_all = "camelCase")] +pub struct ContinueTaskResponse { + /// The continued task (same ID, updated plan with context). + pub task: Task, + /// The daemon ID running the task. + pub daemon_id: Uuid, + /// Number of context entries from previous conversation. + pub context_entries: usize, +} + +/// Continue a task after daemon disconnect by restarting it with conversation context. +/// +/// Unlike reassign, this keeps the same task ID and just restarts it with the +/// previous conversation context prepended to the plan. Useful for supervisors. +#[utoipa::path( + post, + path = "/api/v1/mesh/tasks/{id}/continue", + params( + ("id" = Uuid, Path, description = "Task ID") + ), + request_body = ContinueTaskRequest, + responses( + (status = 200, description = "Task continued successfully", body = ContinueTaskResponse), + (status = 400, description = "Task cannot be continued", body = ApiError), + (status = 401, description = "Unauthorized", body = ApiError), + (status = 404, description = "Task not found", body = ApiError), + (status = 503, description = "No daemon available", body = ApiError), + (status = 500, description = "Internal server error", body = ApiError), + ), + security( + ("bearer_auth" = []), + ("api_key" = []) + ), + tag = "Mesh" +)] +pub async fn continue_task( + State(state): State<SharedState>, + Authenticated(auth): Authenticated, + Path(id): Path<Uuid>, + Json(body): Json<ContinueTaskRequest>, +) -> impl IntoResponse { + let Some(ref pool) = state.db_pool else { + return ( + StatusCode::SERVICE_UNAVAILABLE, + Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")), + ) + .into_response(); + }; + + // Get the task + let task = match repository::get_task_for_owner(pool, id, auth.owner_id).await { + Ok(Some(t)) => t, + Ok(None) => { + return ( + StatusCode::NOT_FOUND, + Json(ApiError::new("NOT_FOUND", "Task not found")), + ) + .into_response(); + } + Err(e) => { + tracing::error!("Failed to get task: {}", e); + return ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(ApiError::new("DB_ERROR", e.to_string())), + ) + .into_response(); + } + }; + + // Helper closure to check if a daemon is connected by its UUID + let is_daemon_connected = |daemon_id: Uuid| { + state.daemon_connections.iter().any(|d| d.value().id == daemon_id) + }; + + // Check if task can be continued (not currently running on a connected daemon) + let can_continue = matches!( + task.status.as_str(), + "failed" | "interrupted" | "pending" | "starting" | "completed" + ) || { + if let Some(daemon_id) = task.daemon_id { + !is_daemon_connected(daemon_id) + } else { + true + } + }; + + if !can_continue && task.status == "running" { + if let Some(daemon_id) = task.daemon_id { + if is_daemon_connected(daemon_id) { + return ( + StatusCode::BAD_REQUEST, + Json(ApiError::new( + "TASK_RUNNING", + "Task is running on a connected daemon. Stop it first to continue.", + )), + ) + .into_response(); + } + } + } + + // Find a target daemon + let target_daemon_id = if let Some(requested_daemon_id) = body.target_daemon_id { + let daemon = state.daemon_connections.iter().find(|d| d.value().id == requested_daemon_id); + if let Some(daemon) = daemon { + if daemon.owner_id != auth.owner_id { + return ( + StatusCode::BAD_REQUEST, + Json(ApiError::new("INVALID_DAEMON", "Daemon does not belong to your account")), + ) + .into_response(); + } + requested_daemon_id + } else { + return ( + StatusCode::SERVICE_UNAVAILABLE, + Json(ApiError::new("DAEMON_NOT_CONNECTED", "Requested daemon is not connected")), + ) + .into_response(); + } + } else { + match state.daemon_connections.iter().find(|d| d.value().owner_id == auth.owner_id) { + Some(entry) => entry.value().id, + None => { + return ( + StatusCode::SERVICE_UNAVAILABLE, + Json(ApiError::new("NO_DAEMON", "No daemon connected for your account")), + ) + .into_response(); + } + } + }; + + // Build conversation context from task output + let (context_str, context_entries) = match repository::get_task_output(pool, id, Some(500)).await { + Ok(events) => { + let entries: Vec<TaskOutputEntry> = events + .into_iter() + .filter_map(TaskOutputEntry::from_task_event) + .collect(); + let context = build_conversation_context(&entries); + let count = entries.len(); + (context, count) + } + Err(e) => { + tracing::warn!("Failed to get task output for context: {}", e); + (String::new(), 0) + } + }; + + // Build updated plan with context prepended + let updated_plan = if !context_str.is_empty() { + format!("{}{}", context_str, task.plan) + } else { + task.plan.clone() + }; + + // Update task in database: reset status, update plan with context, assign daemon + let update_req = UpdateTaskRequest { + status: Some("starting".to_string()), + plan: Some(updated_plan.clone()), + daemon_id: Some(target_daemon_id), + error_message: None, + ..Default::default() + }; + + let updated_task = match repository::update_task_for_owner(pool, id, auth.owner_id, update_req).await { + Ok(Some(t)) => t, + Ok(None) => { + return ( + StatusCode::NOT_FOUND, + Json(ApiError::new("NOT_FOUND", "Task not found")), + ) + .into_response(); + } + Err(e) => { + tracing::error!("Failed to update task for continuation: {}", e); + return ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(ApiError::new("DB_ERROR", e.to_string())), + ) + .into_response(); + } + }; + + // Check if this is an orchestrator + let subtask_count = match repository::list_subtasks_for_owner(pool, id, auth.owner_id).await { + Ok(subtasks) => subtasks.len(), + Err(_) => 0, + }; + let is_orchestrator = task.depth == 0 && subtask_count > 0; + + // Send SpawnTask command to daemon + let command = DaemonCommand::SpawnTask { + task_id: id, + task_name: task.name.clone(), + plan: updated_plan, + repo_url: task.repository_url.clone(), + base_branch: task.base_branch.clone(), + target_branch: task.target_branch.clone(), + parent_task_id: task.parent_task_id, + depth: task.depth, + is_orchestrator, + target_repo_path: task.target_repo_path.clone(), + completion_action: task.completion_action.clone(), + continue_from_task_id: task.continue_from_task_id, + copy_files: task.copy_files.as_ref().and_then(|v| serde_json::from_value(v.clone()).ok()), + contract_id: task.contract_id, + is_supervisor: task.is_supervisor, + }; + + tracing::info!( + task_id = %id, + daemon_id = %target_daemon_id, + context_entries = context_entries, + is_supervisor = task.is_supervisor, + "Continuing task with conversation context" + ); + + if let Err(e) = state.send_daemon_command(target_daemon_id, command).await { + tracing::error!("Failed to send SpawnTask command for continuation: {}", e); + // Rollback + let rollback_req = UpdateTaskRequest { + status: Some("failed".to_string()), + clear_daemon_id: true, + error_message: Some(format!("Continuation failed: {}", e)), + ..Default::default() + }; + let _ = repository::update_task_for_owner(pool, id, auth.owner_id, rollback_req).await; + return ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(ApiError::new("DAEMON_ERROR", e)), + ) + .into_response(); + } + + // Broadcast task update + state.broadcast_task_update(TaskUpdateNotification { + task_id: id, + owner_id: Some(auth.owner_id), + version: updated_task.version, + status: "starting".to_string(), + updated_fields: vec!["status".to_string(), "daemon_id".to_string(), "plan".to_string()], + updated_by: "continuation".to_string(), + }); + + Json(ContinueTaskResponse { + task: updated_task, + daemon_id: target_daemon_id, + context_entries, + }) + .into_response() +} diff --git a/makima/src/server/handlers/mesh_daemon.rs b/makima/src/server/handlers/mesh_daemon.rs index 178e5e1..39b12da 100644 --- a/makima/src/server/handlers/mesh_daemon.rs +++ b/makima/src/server/handlers/mesh_daemon.rs @@ -369,6 +369,18 @@ pub enum DaemonMessage { /// Error message if operation failed error: Option<String>, }, + /// Notification that a branch was created + BranchCreated { + #[serde(rename = "taskId")] + task_id: Option<Uuid>, + /// Name of the branch that was created + #[serde(rename = "branchName")] + branch_name: String, + /// Whether the operation succeeded + success: bool, + /// Error message if operation failed + error: Option<String>, + }, } /// Validated daemon authentication result. @@ -1073,6 +1085,36 @@ async fn handle_daemon_connection(socket: WebSocket, state: SharedState, auth_re ); } } + Ok(DaemonMessage::BranchCreated { task_id, branch_name, success, error }) => { + tracing::info!( + task_id = ?task_id, + branch_name = %branch_name, + success = success, + error = ?error, + "Branch created notification received" + ); + + // Broadcast as task output if we have a task_id + if let Some(tid) = task_id { + let output_text = if success { + format!("✓ Branch '{}' created successfully", branch_name) + } else { + format!("✗ Failed to create branch '{}': {}", branch_name, error.unwrap_or_default()) + }; + state.broadcast_task_output(TaskOutputNotification { + task_id: tid, + owner_id: Some(owner_id), + message_type: "system".to_string(), + content: output_text, + tool_name: None, + tool_input: None, + is_error: Some(!success), + cost_usd: None, + duration_ms: None, + is_partial: false, + }); + } + } Err(e) => { tracing::warn!("Failed to parse daemon message: {}", e); } diff --git a/makima/src/server/handlers/mesh_supervisor.rs b/makima/src/server/handlers/mesh_supervisor.rs index ac59130..d0fa4d1 100644 --- a/makima/src/server/handlers/mesh_supervisor.rs +++ b/makima/src/server/handlers/mesh_supervisor.rs @@ -15,6 +15,7 @@ use uuid::Uuid; use crate::db::models::{CreateTaskRequest, Task, TaskSummary}; use crate::db::repository; +use crate::server::auth::Authenticated; use crate::server::handlers::mesh::{extract_auth, AuthSource}; use crate::server::messages::ApiError; use crate::server::state::{DaemonCommand, SharedState}; @@ -32,7 +33,7 @@ pub struct SpawnTaskRequest { pub contract_id: Uuid, pub parent_task_id: Option<Uuid>, pub checkpoint_sha: Option<String>, - /// Repository URL for the task (supervisor should provide this) + /// Repository URL for the task (optional - if not provided, will be looked up from contract). pub repository_url: Option<String>, } @@ -55,6 +56,67 @@ pub struct ReadWorktreeFileRequest { pub file_path: String, } +/// Request to ask a question and wait for user feedback. +#[derive(Debug, Deserialize, ToSchema)] +#[serde(rename_all = "camelCase")] +pub struct AskQuestionRequest { + /// The question to ask the user + pub question: String, + /// Optional choices (if empty, free-form text response) + #[serde(default)] + pub choices: Vec<String>, + /// Optional context about what this relates to + pub context: Option<String>, + /// How long to wait for a response (seconds) + #[serde(default = "default_question_timeout")] + pub timeout_seconds: i32, +} + +fn default_question_timeout() -> i32 { + 3600 // 1 hour default +} + +/// Response from asking a question. +#[derive(Debug, Serialize, ToSchema)] +#[serde(rename_all = "camelCase")] +pub struct AskQuestionResponse { + /// The question ID for tracking + pub question_id: Uuid, + /// The user's response (None if timed out) + pub response: Option<String>, + /// Whether the question timed out + pub timed_out: bool, +} + +/// Request to answer a supervisor question. +#[derive(Debug, Deserialize, ToSchema)] +#[serde(rename_all = "camelCase")] +pub struct AnswerQuestionRequest { + /// The user's response + pub response: String, +} + +/// Response to answering a question. +#[derive(Debug, Serialize, ToSchema)] +#[serde(rename_all = "camelCase")] +pub struct AnswerQuestionResponse { + /// Whether the answer was accepted + pub success: bool, +} + +/// Pending question summary. +#[derive(Debug, Serialize, ToSchema)] +#[serde(rename_all = "camelCase")] +pub struct PendingQuestionSummary { + pub question_id: Uuid, + pub task_id: Uuid, + pub contract_id: Uuid, + pub question: String, + pub choices: Vec<String>, + pub context: Option<String>, + pub created_at: chrono::DateTime<chrono::Utc>, +} + /// Request to create a checkpoint. #[derive(Debug, Deserialize, ToSchema)] #[serde(rename_all = "camelCase")] @@ -321,23 +383,49 @@ pub async fn spawn_task( } }; - // Get repository URL from the contract's primary repository - let repo_url = match repository::list_contract_repositories(pool, request.contract_id).await { - Ok(repos) => { - // Prefer primary repo, fallback to first repo - repos.iter() - .find(|r| r.is_primary) - .or(repos.first()) - .and_then(|r| r.repository_url.clone()) - } - Err(e) => { - tracing::warn!(error = %e, "Failed to get contract repositories, continuing without repo URL"); + // Get repository URL - either from request or from contract's repositories + let repo_url = if let Some(url) = request.repository_url.clone() { + if !url.trim().is_empty() { + Some(url) + } else { None } + } else { + None }; - // Supervisor can override with explicit repository_url - let repo_url = request.repository_url.clone().or(repo_url); + // If no repo URL provided, look it up from the contract + let repo_url = match repo_url { + Some(url) => Some(url), + None => { + match repository::list_contract_repositories(pool, request.contract_id).await { + Ok(repos) => { + // Prefer primary repo, fallback to first repo + let repo = repos.iter() + .find(|r| r.is_primary) + .or(repos.first()); + + // Use repository_url if set, otherwise use local_path + repo.and_then(|r| { + r.repository_url.clone() + .or_else(|| r.local_path.clone()) + }) + } + Err(e) => { + tracing::warn!(error = %e, "Failed to get contract repositories"); + None + } + } + } + }; + + // Validate that we have a repo URL + if repo_url.is_none() { + return ( + StatusCode::BAD_REQUEST, + Json(ApiError::new("MISSING_REPO_URL", "No repository URL found. Either provide one or ensure the contract has repositories configured.")), + ).into_response(); + } // Create task request let create_req = CreateTaskRequest { @@ -1151,3 +1239,208 @@ pub async fn get_task_diff( }), ).into_response() } + +// ============================================================================= +// Supervisor Question Handlers +// ============================================================================= + +/// Ask a question and wait for user feedback. +/// +/// The supervisor calls this to ask a question. The endpoint will poll until +/// either the user responds or the timeout is reached. +#[utoipa::path( + post, + path = "/api/v1/mesh/supervisor/questions", + request_body = AskQuestionRequest, + responses( + (status = 200, description = "Question answered", body = AskQuestionResponse), + (status = 408, description = "Question timed out", body = AskQuestionResponse), + (status = 401, description = "Unauthorized"), + (status = 403, description = "Forbidden - not a supervisor"), + (status = 500, description = "Internal server error"), + ), + security( + ("tool_key" = []) + ), + tag = "Mesh Supervisor" +)] +pub async fn ask_question( + State(state): State<SharedState>, + headers: HeaderMap, + Json(request): Json<AskQuestionRequest>, +) -> impl IntoResponse { + let (supervisor_id, owner_id) = match verify_supervisor_auth(&state, &headers, None).await { + Ok(ids) => ids, + Err(e) => return e.into_response(), + }; + + let pool = state.db_pool.as_ref().unwrap(); + + // Get the supervisor task to find its contract + let supervisor = match repository::get_task_for_owner(pool, supervisor_id, owner_id).await { + Ok(Some(t)) => t, + Ok(None) => { + return ( + StatusCode::NOT_FOUND, + Json(ApiError::new("NOT_FOUND", "Supervisor task not found")), + ).into_response(); + } + Err(e) => { + tracing::error!(error = %e, "Failed to get supervisor task"); + return ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(ApiError::new("DB_ERROR", "Failed to get supervisor task")), + ).into_response(); + } + }; + + let Some(contract_id) = supervisor.contract_id else { + return ( + StatusCode::BAD_REQUEST, + Json(ApiError::new("NO_CONTRACT", "Supervisor has no associated contract")), + ).into_response(); + }; + + // Add the question + let question_id = state.add_supervisor_question( + supervisor_id, + contract_id, + owner_id, + request.question.clone(), + request.choices.clone(), + request.context.clone(), + ); + + // Poll for response with timeout + let timeout_duration = std::time::Duration::from_secs(request.timeout_seconds.max(1) as u64); + let start = std::time::Instant::now(); + let poll_interval = std::time::Duration::from_millis(500); + + loop { + // Check if response has been submitted + if let Some(response) = state.get_question_response(question_id) { + // Clean up the response + state.cleanup_question_response(question_id); + + return ( + StatusCode::OK, + Json(AskQuestionResponse { + question_id, + response: Some(response.response), + timed_out: false, + }), + ).into_response(); + } + + // Check timeout + if start.elapsed() >= timeout_duration { + // Remove the pending question on timeout + state.remove_pending_question(question_id); + + return ( + StatusCode::REQUEST_TIMEOUT, + Json(AskQuestionResponse { + question_id, + response: None, + timed_out: true, + }), + ).into_response(); + } + + // Wait before polling again + tokio::time::sleep(poll_interval).await; + } +} + +/// Get all pending questions for the current user. +#[utoipa::path( + get, + path = "/api/v1/mesh/questions", + responses( + (status = 200, description = "List of pending questions", body = Vec<PendingQuestionSummary>), + (status = 401, description = "Unauthorized"), + (status = 500, description = "Internal server error"), + ), + security( + ("bearer_auth" = []), + ("api_key" = []) + ), + tag = "Mesh" +)] +pub async fn list_pending_questions( + State(state): State<SharedState>, + Authenticated(auth): Authenticated, +) -> impl IntoResponse { + let questions: Vec<PendingQuestionSummary> = state + .get_pending_questions_for_owner(auth.owner_id) + .into_iter() + .map(|q| PendingQuestionSummary { + question_id: q.question_id, + task_id: q.task_id, + contract_id: q.contract_id, + question: q.question, + choices: q.choices, + context: q.context, + created_at: q.created_at, + }) + .collect(); + + Json(questions).into_response() +} + +/// Answer a pending supervisor question. +#[utoipa::path( + post, + path = "/api/v1/mesh/questions/{question_id}/answer", + params( + ("question_id" = Uuid, Path, description = "Question ID") + ), + request_body = AnswerQuestionRequest, + responses( + (status = 200, description = "Question answered", body = AnswerQuestionResponse), + (status = 401, description = "Unauthorized"), + (status = 404, description = "Question not found"), + (status = 500, description = "Internal server error"), + ), + security( + ("bearer_auth" = []), + ("api_key" = []) + ), + tag = "Mesh" +)] +pub async fn answer_question( + State(state): State<SharedState>, + Authenticated(auth): Authenticated, + Path(question_id): Path<Uuid>, + Json(request): Json<AnswerQuestionRequest>, +) -> impl IntoResponse { + // Verify the question exists and belongs to this owner + let question = match state.get_pending_question(question_id) { + Some(q) if q.owner_id == auth.owner_id => q, + Some(_) => { + return ( + StatusCode::FORBIDDEN, + Json(ApiError::new("FORBIDDEN", "Question belongs to another user")), + ).into_response(); + } + None => { + return ( + StatusCode::NOT_FOUND, + Json(ApiError::new("NOT_FOUND", "Question not found or already answered")), + ).into_response(); + } + }; + + // Submit the response + let success = state.submit_question_response(question_id, request.response); + + if success { + tracing::info!( + question_id = %question_id, + task_id = %question.task_id, + "User answered supervisor question" + ); + } + + Json(AnswerQuestionResponse { success }).into_response() +} |
