summaryrefslogtreecommitdiff
path: root/makima/src/server/handlers
diff options
context:
space:
mode:
Diffstat (limited to 'makima/src/server/handlers')
-rw-r--r--makima/src/server/handlers/mesh.rs687
-rw-r--r--makima/src/server/handlers/mesh_daemon.rs42
-rw-r--r--makima/src/server/handlers/mesh_supervisor.rs321
3 files changed, 1035 insertions, 15 deletions
diff --git a/makima/src/server/handlers/mesh.rs b/makima/src/server/handlers/mesh.rs
index 2d90a04..3da6fd5 100644
--- a/makima/src/server/handlers/mesh.rs
+++ b/makima/src/server/handlers/mesh.rs
@@ -52,7 +52,7 @@ pub fn extract_auth(state: &SharedState, headers: &HeaderMap) -> AuthSource {
if let Some(task_id) = state.validate_tool_key(key_str) {
return AuthSource::ToolKey(task_id);
}
- tracing::warn!("Invalid tool key provided");
+ tracing::warn!("Invalid tool key provided: {}", key_str);
}
}
@@ -1774,3 +1774,688 @@ pub async fn check_target_exists(
}))
.into_response()
}
+
+// =============================================================================
+// Task Reassignment (Daemon Failover)
+// =============================================================================
+
+/// Request to reassign a task to a new daemon after daemon disconnect.
+#[derive(Debug, serde::Deserialize, utoipa::ToSchema)]
+#[serde(rename_all = "camelCase")]
+pub struct ReassignTaskRequest {
+ /// Target daemon ID to reassign to. If not provided, will select any available daemon.
+ pub target_daemon_id: Option<Uuid>,
+ /// Whether to include conversation context from previous run.
+ #[serde(default = "default_include_context")]
+ pub include_context: bool,
+}
+
+fn default_include_context() -> bool {
+ true
+}
+
+/// Response from task reassignment.
+#[derive(Debug, serde::Serialize, utoipa::ToSchema)]
+#[serde(rename_all = "camelCase")]
+pub struct ReassignTaskResponse {
+ /// The new task that was created.
+ pub task: Task,
+ /// The new daemon ID.
+ pub daemon_id: Uuid,
+ /// The ID of the old task that was deleted.
+ pub old_task_id: Uuid,
+ /// Whether conversation context was included.
+ pub context_included: bool,
+ /// Number of context entries from previous conversation.
+ pub context_entries: usize,
+}
+
+/// Build a conversation context summary from task output entries.
+/// Returns a formatted string that can be prepended to the task plan.
+fn build_conversation_context(entries: &[TaskOutputEntry]) -> String {
+ if entries.is_empty() {
+ return String::new();
+ }
+
+ let mut context = String::from("\n\n=== PREVIOUS CONVERSATION CONTEXT ===\n");
+ context.push_str("The daemon running this task disconnected. Here is what happened so far:\n\n");
+
+ for entry in entries.iter() {
+ match entry.message_type.as_str() {
+ "assistant" => {
+ context.push_str("Assistant: ");
+ // Truncate long messages
+ let content = if entry.content.len() > 500 {
+ format!("{}... [truncated]", &entry.content[..500])
+ } else {
+ entry.content.clone()
+ };
+ context.push_str(&content);
+ context.push_str("\n\n");
+ }
+ "tool_use" => {
+ if let Some(ref tool_name) = entry.tool_name {
+ context.push_str(&format!("[Used tool: {}]\n", tool_name));
+ }
+ }
+ "tool_result" => {
+ // Summarize tool results briefly
+ if entry.content.len() > 200 {
+ context.push_str(&format!("[Tool result: {}... truncated]\n", &entry.content[..200]));
+ } else if !entry.content.is_empty() {
+ context.push_str(&format!("[Tool result: {}]\n", entry.content));
+ }
+ }
+ "user" => {
+ context.push_str("User: ");
+ context.push_str(&entry.content);
+ context.push_str("\n\n");
+ }
+ _ => {}
+ }
+ }
+
+ context.push_str("=== END PREVIOUS CONTEXT ===\n\n");
+ context.push_str("Please continue from where the conversation left off. Do not repeat work that was already done.\n\n");
+
+ context
+}
+
+/// Reassign a task to a new daemon after the original daemon disconnected.
+///
+/// This endpoint is used for daemon failover - when a daemon restarts or disconnects,
+/// the task can be reassigned to a new daemon with the conversation context preserved.
+#[utoipa::path(
+ post,
+ path = "/api/v1/mesh/tasks/{id}/reassign",
+ params(
+ ("id" = Uuid, Path, description = "Task ID")
+ ),
+ request_body = ReassignTaskRequest,
+ responses(
+ (status = 200, description = "Task reassigned successfully", body = ReassignTaskResponse),
+ (status = 400, description = "Task cannot be reassigned (not in failed/interrupted state)", body = ApiError),
+ (status = 401, description = "Unauthorized", body = ApiError),
+ (status = 404, description = "Task not found", body = ApiError),
+ (status = 503, description = "No daemon available", body = ApiError),
+ (status = 500, description = "Internal server error", body = ApiError),
+ ),
+ security(
+ ("bearer_auth" = []),
+ ("api_key" = [])
+ ),
+ tag = "Mesh"
+)]
+pub async fn reassign_task(
+ State(state): State<SharedState>,
+ Authenticated(auth): Authenticated,
+ Path(id): Path<Uuid>,
+ Json(body): Json<ReassignTaskRequest>,
+) -> impl IntoResponse {
+ let Some(ref pool) = state.db_pool else {
+ return (
+ StatusCode::SERVICE_UNAVAILABLE,
+ Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")),
+ )
+ .into_response();
+ };
+
+ // Get the task
+ let task = match repository::get_task_for_owner(pool, id, auth.owner_id).await {
+ Ok(Some(t)) => t,
+ Ok(None) => {
+ return (
+ StatusCode::NOT_FOUND,
+ Json(ApiError::new("NOT_FOUND", "Task not found")),
+ )
+ .into_response();
+ }
+ Err(e) => {
+ tracing::error!("Failed to get task: {}", e);
+ return (
+ StatusCode::INTERNAL_SERVER_ERROR,
+ Json(ApiError::new("DB_ERROR", e.to_string())),
+ )
+ .into_response();
+ }
+ };
+
+ // Check if task is in a state that can be reassigned
+ // Allow reassignment for: failed, interrupted, pending, or tasks whose daemon disconnected
+ // Helper closure to check if a daemon is connected by its UUID
+ let is_daemon_connected = |daemon_id: Uuid| {
+ state.daemon_connections.iter().any(|d| d.value().id == daemon_id)
+ };
+
+ let can_reassign = matches!(
+ task.status.as_str(),
+ "failed" | "interrupted" | "pending" | "starting"
+ ) || {
+ // Also allow if daemon is not connected
+ if let Some(daemon_id) = task.daemon_id {
+ !is_daemon_connected(daemon_id)
+ } else {
+ true
+ }
+ };
+
+ if !can_reassign && task.status == "running" {
+ // Running task - check if its daemon is still connected
+ if let Some(daemon_id) = task.daemon_id {
+ if is_daemon_connected(daemon_id) {
+ return (
+ StatusCode::BAD_REQUEST,
+ Json(ApiError::new(
+ "TASK_RUNNING",
+ "Task is running on a connected daemon. Stop it first to reassign.",
+ )),
+ )
+ .into_response();
+ }
+ }
+ }
+
+ // Find a target daemon
+ let target_daemon_id = if let Some(requested_daemon_id) = body.target_daemon_id {
+ // Verify the requested daemon is connected and belongs to the owner
+ let daemon = state.daemon_connections.iter().find(|d| d.value().id == requested_daemon_id);
+ if let Some(daemon) = daemon {
+ if daemon.owner_id != auth.owner_id {
+ return (
+ StatusCode::BAD_REQUEST,
+ Json(ApiError::new("INVALID_DAEMON", "Daemon does not belong to your account")),
+ )
+ .into_response();
+ }
+ requested_daemon_id
+ } else {
+ return (
+ StatusCode::SERVICE_UNAVAILABLE,
+ Json(ApiError::new("DAEMON_NOT_CONNECTED", "Requested daemon is not connected")),
+ )
+ .into_response();
+ }
+ } else {
+ // Find any available daemon for this owner
+ match state.daemon_connections.iter().find(|d| d.value().owner_id == auth.owner_id) {
+ Some(entry) => entry.value().id,
+ None => {
+ return (
+ StatusCode::SERVICE_UNAVAILABLE,
+ Json(ApiError::new("NO_DAEMON", "No daemon connected for your account")),
+ )
+ .into_response();
+ }
+ }
+ };
+
+ // Build conversation context if requested
+ let (context_str, context_entries) = if body.include_context {
+ match repository::get_task_output(pool, id, Some(500)).await {
+ Ok(events) => {
+ let entries: Vec<TaskOutputEntry> = events
+ .into_iter()
+ .filter_map(TaskOutputEntry::from_task_event)
+ .collect();
+ let context = build_conversation_context(&entries);
+ let count = entries.len();
+ (context, count)
+ }
+ Err(e) => {
+ tracing::warn!("Failed to get task output for context: {}", e);
+ (String::new(), 0)
+ }
+ }
+ } else {
+ (String::new(), 0)
+ };
+
+ // Build updated plan with context prepended
+ let updated_plan = if !context_str.is_empty() {
+ format!("{}{}", context_str, task.plan)
+ } else {
+ task.plan.clone()
+ };
+
+ // Create a NEW task with the conversation context
+ let create_req = CreateTaskRequest {
+ contract_id: task.contract_id.unwrap_or(Uuid::nil()),
+ name: format!("{} (resumed)", task.name),
+ description: task.description.clone(),
+ plan: updated_plan.clone(),
+ parent_task_id: task.parent_task_id,
+ is_supervisor: task.is_supervisor,
+ priority: task.priority,
+ repository_url: task.repository_url.clone(),
+ base_branch: task.base_branch.clone(),
+ target_branch: task.target_branch.clone(),
+ merge_mode: task.merge_mode.clone(),
+ target_repo_path: task.target_repo_path.clone(),
+ completion_action: task.completion_action.clone(),
+ continue_from_task_id: Some(id), // Continue from the old task's worktree if possible
+ copy_files: None,
+ checkpoint_sha: task.last_checkpoint_sha.clone(),
+ };
+
+ let new_task = match repository::create_task_for_owner(pool, auth.owner_id, create_req).await {
+ Ok(t) => t,
+ Err(e) => {
+ tracing::error!("Failed to create new task for reassignment: {}", e);
+ return (
+ StatusCode::INTERNAL_SERVER_ERROR,
+ Json(ApiError::new("DB_ERROR", e.to_string())),
+ )
+ .into_response();
+ }
+ };
+
+ // Update new task to starting and assign daemon
+ let start_update = UpdateTaskRequest {
+ status: Some("starting".to_string()),
+ daemon_id: Some(target_daemon_id),
+ version: Some(new_task.version),
+ ..Default::default()
+ };
+
+ let final_task = match repository::update_task_for_owner(pool, new_task.id, auth.owner_id, start_update).await {
+ Ok(Some(t)) => t,
+ Ok(None) => {
+ return (
+ StatusCode::NOT_FOUND,
+ Json(ApiError::new("NOT_FOUND", "New task not found")),
+ )
+ .into_response();
+ }
+ Err(e) => {
+ tracing::error!("Failed to update new task daemon assignment: {}", e);
+ return (
+ StatusCode::INTERNAL_SERVER_ERROR,
+ Json(ApiError::new("DB_ERROR", e.to_string())),
+ )
+ .into_response();
+ }
+ };
+
+ // Send SpawnTask command to daemon for the new task
+ let command = DaemonCommand::SpawnTask {
+ task_id: new_task.id,
+ task_name: final_task.name.clone(),
+ plan: updated_plan,
+ repo_url: task.repository_url.clone(),
+ base_branch: task.base_branch.clone(),
+ target_branch: task.target_branch.clone(),
+ parent_task_id: task.parent_task_id,
+ depth: task.depth,
+ is_orchestrator: false, // New task starts fresh
+ target_repo_path: task.target_repo_path.clone(),
+ completion_action: task.completion_action.clone(),
+ continue_from_task_id: Some(id), // Continue from old task's worktree
+ copy_files: None,
+ contract_id: task.contract_id,
+ is_supervisor: task.is_supervisor,
+ };
+
+ tracing::info!(
+ old_task_id = %id,
+ new_task_id = %new_task.id,
+ new_daemon_id = %target_daemon_id,
+ context_entries = context_entries,
+ "Reassigning task: creating new task and deleting old one"
+ );
+
+ if let Err(e) = state.send_daemon_command(target_daemon_id, command).await {
+ tracing::error!("Failed to send SpawnTask command for reassignment: {}", e);
+ // Rollback: delete the new task we created
+ let _ = repository::delete_task_for_owner(pool, new_task.id, auth.owner_id).await;
+ return (
+ StatusCode::INTERNAL_SERVER_ERROR,
+ Json(ApiError::new("DAEMON_ERROR", e)),
+ )
+ .into_response();
+ }
+
+ // Delete the old task now that the new one is spawned
+ let old_task_id = id;
+ if let Err(e) = repository::delete_task_for_owner(pool, old_task_id, auth.owner_id).await {
+ tracing::warn!("Failed to delete old task {}: {}", old_task_id, e);
+ // Don't fail the request, the new task is already running
+ }
+
+ // Notify the contract's supervisor about the reassignment (if applicable)
+ if let Some(contract_id) = task.contract_id {
+ if let Ok(Some(contract)) = repository::get_contract_for_owner(pool, contract_id, auth.owner_id).await {
+ if let Some(supervisor_task_id) = contract.supervisor_task_id {
+ // Don't notify if we're reassigning the supervisor itself
+ if supervisor_task_id != old_task_id {
+ // Find the supervisor's daemon and send a message
+ if let Ok(Some(supervisor_task)) = repository::get_task_for_owner(pool, supervisor_task_id, auth.owner_id).await {
+ if supervisor_task.status == "running" {
+ if let Some(supervisor_daemon_id) = supervisor_task.daemon_id {
+ // Find the daemon by its UUID
+ if let Some(daemon_entry) = state.daemon_connections.iter().find(|d| d.value().id == supervisor_daemon_id) {
+ let notification_msg = format!(
+ "\n\n[SYSTEM NOTIFICATION] Task '{}' (ID: {}) was reassigned due to daemon disconnect. \
+ A new task '{}' (ID: {}) has been created to continue the work. \
+ The new task has {} context entries from the previous conversation.\n\n",
+ task.name,
+ old_task_id,
+ final_task.name,
+ new_task.id,
+ context_entries
+ );
+
+ let notify_cmd = DaemonCommand::SendMessage {
+ task_id: supervisor_task_id,
+ message: notification_msg,
+ };
+
+ if let Err(e) = state.send_daemon_command(daemon_entry.value().id, notify_cmd).await {
+ tracing::warn!(
+ supervisor_id = %supervisor_task_id,
+ error = %e,
+ "Failed to notify supervisor about task reassignment"
+ );
+ } else {
+ tracing::info!(
+ supervisor_id = %supervisor_task_id,
+ old_task_id = %old_task_id,
+ new_task_id = %new_task.id,
+ "Notified supervisor about task reassignment"
+ );
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+
+ // Broadcast task update for the new task
+ state.broadcast_task_update(TaskUpdateNotification {
+ task_id: new_task.id,
+ owner_id: Some(auth.owner_id),
+ version: final_task.version,
+ status: "starting".to_string(),
+ updated_fields: vec!["status".to_string(), "daemon_id".to_string()],
+ updated_by: "reassignment".to_string(),
+ });
+
+ Json(ReassignTaskResponse {
+ task: final_task,
+ daemon_id: target_daemon_id,
+ old_task_id,
+ context_included: !context_str.is_empty(),
+ context_entries,
+ })
+ .into_response()
+}
+
+// =============================================================================
+// Task Continue (Restart with Context)
+// =============================================================================
+
+/// Request to continue a task after daemon disconnect (restart in-place with context).
+#[derive(Debug, serde::Deserialize, utoipa::ToSchema)]
+#[serde(rename_all = "camelCase")]
+pub struct ContinueTaskRequest {
+ /// Target daemon ID to continue on. If not provided, will select any available daemon.
+ pub target_daemon_id: Option<Uuid>,
+}
+
+/// Response from continuing a task.
+#[derive(Debug, serde::Serialize, utoipa::ToSchema)]
+#[serde(rename_all = "camelCase")]
+pub struct ContinueTaskResponse {
+ /// The continued task (same ID, updated plan with context).
+ pub task: Task,
+ /// The daemon ID running the task.
+ pub daemon_id: Uuid,
+ /// Number of context entries from previous conversation.
+ pub context_entries: usize,
+}
+
+/// Continue a task after daemon disconnect by restarting it with conversation context.
+///
+/// Unlike reassign, this keeps the same task ID and just restarts it with the
+/// previous conversation context prepended to the plan. Useful for supervisors.
+#[utoipa::path(
+ post,
+ path = "/api/v1/mesh/tasks/{id}/continue",
+ params(
+ ("id" = Uuid, Path, description = "Task ID")
+ ),
+ request_body = ContinueTaskRequest,
+ responses(
+ (status = 200, description = "Task continued successfully", body = ContinueTaskResponse),
+ (status = 400, description = "Task cannot be continued", body = ApiError),
+ (status = 401, description = "Unauthorized", body = ApiError),
+ (status = 404, description = "Task not found", body = ApiError),
+ (status = 503, description = "No daemon available", body = ApiError),
+ (status = 500, description = "Internal server error", body = ApiError),
+ ),
+ security(
+ ("bearer_auth" = []),
+ ("api_key" = [])
+ ),
+ tag = "Mesh"
+)]
+pub async fn continue_task(
+ State(state): State<SharedState>,
+ Authenticated(auth): Authenticated,
+ Path(id): Path<Uuid>,
+ Json(body): Json<ContinueTaskRequest>,
+) -> impl IntoResponse {
+ let Some(ref pool) = state.db_pool else {
+ return (
+ StatusCode::SERVICE_UNAVAILABLE,
+ Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")),
+ )
+ .into_response();
+ };
+
+ // Get the task
+ let task = match repository::get_task_for_owner(pool, id, auth.owner_id).await {
+ Ok(Some(t)) => t,
+ Ok(None) => {
+ return (
+ StatusCode::NOT_FOUND,
+ Json(ApiError::new("NOT_FOUND", "Task not found")),
+ )
+ .into_response();
+ }
+ Err(e) => {
+ tracing::error!("Failed to get task: {}", e);
+ return (
+ StatusCode::INTERNAL_SERVER_ERROR,
+ Json(ApiError::new("DB_ERROR", e.to_string())),
+ )
+ .into_response();
+ }
+ };
+
+ // Helper closure to check if a daemon is connected by its UUID
+ let is_daemon_connected = |daemon_id: Uuid| {
+ state.daemon_connections.iter().any(|d| d.value().id == daemon_id)
+ };
+
+ // Check if task can be continued (not currently running on a connected daemon)
+ let can_continue = matches!(
+ task.status.as_str(),
+ "failed" | "interrupted" | "pending" | "starting" | "completed"
+ ) || {
+ if let Some(daemon_id) = task.daemon_id {
+ !is_daemon_connected(daemon_id)
+ } else {
+ true
+ }
+ };
+
+ if !can_continue && task.status == "running" {
+ if let Some(daemon_id) = task.daemon_id {
+ if is_daemon_connected(daemon_id) {
+ return (
+ StatusCode::BAD_REQUEST,
+ Json(ApiError::new(
+ "TASK_RUNNING",
+ "Task is running on a connected daemon. Stop it first to continue.",
+ )),
+ )
+ .into_response();
+ }
+ }
+ }
+
+ // Find a target daemon
+ let target_daemon_id = if let Some(requested_daemon_id) = body.target_daemon_id {
+ let daemon = state.daemon_connections.iter().find(|d| d.value().id == requested_daemon_id);
+ if let Some(daemon) = daemon {
+ if daemon.owner_id != auth.owner_id {
+ return (
+ StatusCode::BAD_REQUEST,
+ Json(ApiError::new("INVALID_DAEMON", "Daemon does not belong to your account")),
+ )
+ .into_response();
+ }
+ requested_daemon_id
+ } else {
+ return (
+ StatusCode::SERVICE_UNAVAILABLE,
+ Json(ApiError::new("DAEMON_NOT_CONNECTED", "Requested daemon is not connected")),
+ )
+ .into_response();
+ }
+ } else {
+ match state.daemon_connections.iter().find(|d| d.value().owner_id == auth.owner_id) {
+ Some(entry) => entry.value().id,
+ None => {
+ return (
+ StatusCode::SERVICE_UNAVAILABLE,
+ Json(ApiError::new("NO_DAEMON", "No daemon connected for your account")),
+ )
+ .into_response();
+ }
+ }
+ };
+
+ // Build conversation context from task output
+ let (context_str, context_entries) = match repository::get_task_output(pool, id, Some(500)).await {
+ Ok(events) => {
+ let entries: Vec<TaskOutputEntry> = events
+ .into_iter()
+ .filter_map(TaskOutputEntry::from_task_event)
+ .collect();
+ let context = build_conversation_context(&entries);
+ let count = entries.len();
+ (context, count)
+ }
+ Err(e) => {
+ tracing::warn!("Failed to get task output for context: {}", e);
+ (String::new(), 0)
+ }
+ };
+
+ // Build updated plan with context prepended
+ let updated_plan = if !context_str.is_empty() {
+ format!("{}{}", context_str, task.plan)
+ } else {
+ task.plan.clone()
+ };
+
+ // Update task in database: reset status, update plan with context, assign daemon
+ let update_req = UpdateTaskRequest {
+ status: Some("starting".to_string()),
+ plan: Some(updated_plan.clone()),
+ daemon_id: Some(target_daemon_id),
+ error_message: None,
+ ..Default::default()
+ };
+
+ let updated_task = match repository::update_task_for_owner(pool, id, auth.owner_id, update_req).await {
+ Ok(Some(t)) => t,
+ Ok(None) => {
+ return (
+ StatusCode::NOT_FOUND,
+ Json(ApiError::new("NOT_FOUND", "Task not found")),
+ )
+ .into_response();
+ }
+ Err(e) => {
+ tracing::error!("Failed to update task for continuation: {}", e);
+ return (
+ StatusCode::INTERNAL_SERVER_ERROR,
+ Json(ApiError::new("DB_ERROR", e.to_string())),
+ )
+ .into_response();
+ }
+ };
+
+ // Check if this is an orchestrator
+ let subtask_count = match repository::list_subtasks_for_owner(pool, id, auth.owner_id).await {
+ Ok(subtasks) => subtasks.len(),
+ Err(_) => 0,
+ };
+ let is_orchestrator = task.depth == 0 && subtask_count > 0;
+
+ // Send SpawnTask command to daemon
+ let command = DaemonCommand::SpawnTask {
+ task_id: id,
+ task_name: task.name.clone(),
+ plan: updated_plan,
+ repo_url: task.repository_url.clone(),
+ base_branch: task.base_branch.clone(),
+ target_branch: task.target_branch.clone(),
+ parent_task_id: task.parent_task_id,
+ depth: task.depth,
+ is_orchestrator,
+ target_repo_path: task.target_repo_path.clone(),
+ completion_action: task.completion_action.clone(),
+ continue_from_task_id: task.continue_from_task_id,
+ copy_files: task.copy_files.as_ref().and_then(|v| serde_json::from_value(v.clone()).ok()),
+ contract_id: task.contract_id,
+ is_supervisor: task.is_supervisor,
+ };
+
+ tracing::info!(
+ task_id = %id,
+ daemon_id = %target_daemon_id,
+ context_entries = context_entries,
+ is_supervisor = task.is_supervisor,
+ "Continuing task with conversation context"
+ );
+
+ if let Err(e) = state.send_daemon_command(target_daemon_id, command).await {
+ tracing::error!("Failed to send SpawnTask command for continuation: {}", e);
+ // Rollback
+ let rollback_req = UpdateTaskRequest {
+ status: Some("failed".to_string()),
+ clear_daemon_id: true,
+ error_message: Some(format!("Continuation failed: {}", e)),
+ ..Default::default()
+ };
+ let _ = repository::update_task_for_owner(pool, id, auth.owner_id, rollback_req).await;
+ return (
+ StatusCode::INTERNAL_SERVER_ERROR,
+ Json(ApiError::new("DAEMON_ERROR", e)),
+ )
+ .into_response();
+ }
+
+ // Broadcast task update
+ state.broadcast_task_update(TaskUpdateNotification {
+ task_id: id,
+ owner_id: Some(auth.owner_id),
+ version: updated_task.version,
+ status: "starting".to_string(),
+ updated_fields: vec!["status".to_string(), "daemon_id".to_string(), "plan".to_string()],
+ updated_by: "continuation".to_string(),
+ });
+
+ Json(ContinueTaskResponse {
+ task: updated_task,
+ daemon_id: target_daemon_id,
+ context_entries,
+ })
+ .into_response()
+}
diff --git a/makima/src/server/handlers/mesh_daemon.rs b/makima/src/server/handlers/mesh_daemon.rs
index 178e5e1..39b12da 100644
--- a/makima/src/server/handlers/mesh_daemon.rs
+++ b/makima/src/server/handlers/mesh_daemon.rs
@@ -369,6 +369,18 @@ pub enum DaemonMessage {
/// Error message if operation failed
error: Option<String>,
},
+ /// Notification that a branch was created
+ BranchCreated {
+ #[serde(rename = "taskId")]
+ task_id: Option<Uuid>,
+ /// Name of the branch that was created
+ #[serde(rename = "branchName")]
+ branch_name: String,
+ /// Whether the operation succeeded
+ success: bool,
+ /// Error message if operation failed
+ error: Option<String>,
+ },
}
/// Validated daemon authentication result.
@@ -1073,6 +1085,36 @@ async fn handle_daemon_connection(socket: WebSocket, state: SharedState, auth_re
);
}
}
+ Ok(DaemonMessage::BranchCreated { task_id, branch_name, success, error }) => {
+ tracing::info!(
+ task_id = ?task_id,
+ branch_name = %branch_name,
+ success = success,
+ error = ?error,
+ "Branch created notification received"
+ );
+
+ // Broadcast as task output if we have a task_id
+ if let Some(tid) = task_id {
+ let output_text = if success {
+ format!("✓ Branch '{}' created successfully", branch_name)
+ } else {
+ format!("✗ Failed to create branch '{}': {}", branch_name, error.unwrap_or_default())
+ };
+ state.broadcast_task_output(TaskOutputNotification {
+ task_id: tid,
+ owner_id: Some(owner_id),
+ message_type: "system".to_string(),
+ content: output_text,
+ tool_name: None,
+ tool_input: None,
+ is_error: Some(!success),
+ cost_usd: None,
+ duration_ms: None,
+ is_partial: false,
+ });
+ }
+ }
Err(e) => {
tracing::warn!("Failed to parse daemon message: {}", e);
}
diff --git a/makima/src/server/handlers/mesh_supervisor.rs b/makima/src/server/handlers/mesh_supervisor.rs
index ac59130..d0fa4d1 100644
--- a/makima/src/server/handlers/mesh_supervisor.rs
+++ b/makima/src/server/handlers/mesh_supervisor.rs
@@ -15,6 +15,7 @@ use uuid::Uuid;
use crate::db::models::{CreateTaskRequest, Task, TaskSummary};
use crate::db::repository;
+use crate::server::auth::Authenticated;
use crate::server::handlers::mesh::{extract_auth, AuthSource};
use crate::server::messages::ApiError;
use crate::server::state::{DaemonCommand, SharedState};
@@ -32,7 +33,7 @@ pub struct SpawnTaskRequest {
pub contract_id: Uuid,
pub parent_task_id: Option<Uuid>,
pub checkpoint_sha: Option<String>,
- /// Repository URL for the task (supervisor should provide this)
+ /// Repository URL for the task (optional - if not provided, will be looked up from contract).
pub repository_url: Option<String>,
}
@@ -55,6 +56,67 @@ pub struct ReadWorktreeFileRequest {
pub file_path: String,
}
+/// Request to ask a question and wait for user feedback.
+#[derive(Debug, Deserialize, ToSchema)]
+#[serde(rename_all = "camelCase")]
+pub struct AskQuestionRequest {
+ /// The question to ask the user
+ pub question: String,
+ /// Optional choices (if empty, free-form text response)
+ #[serde(default)]
+ pub choices: Vec<String>,
+ /// Optional context about what this relates to
+ pub context: Option<String>,
+ /// How long to wait for a response (seconds)
+ #[serde(default = "default_question_timeout")]
+ pub timeout_seconds: i32,
+}
+
+fn default_question_timeout() -> i32 {
+ 3600 // 1 hour default
+}
+
+/// Response from asking a question.
+#[derive(Debug, Serialize, ToSchema)]
+#[serde(rename_all = "camelCase")]
+pub struct AskQuestionResponse {
+ /// The question ID for tracking
+ pub question_id: Uuid,
+ /// The user's response (None if timed out)
+ pub response: Option<String>,
+ /// Whether the question timed out
+ pub timed_out: bool,
+}
+
+/// Request to answer a supervisor question.
+#[derive(Debug, Deserialize, ToSchema)]
+#[serde(rename_all = "camelCase")]
+pub struct AnswerQuestionRequest {
+ /// The user's response
+ pub response: String,
+}
+
+/// Response to answering a question.
+#[derive(Debug, Serialize, ToSchema)]
+#[serde(rename_all = "camelCase")]
+pub struct AnswerQuestionResponse {
+ /// Whether the answer was accepted
+ pub success: bool,
+}
+
+/// Pending question summary.
+#[derive(Debug, Serialize, ToSchema)]
+#[serde(rename_all = "camelCase")]
+pub struct PendingQuestionSummary {
+ pub question_id: Uuid,
+ pub task_id: Uuid,
+ pub contract_id: Uuid,
+ pub question: String,
+ pub choices: Vec<String>,
+ pub context: Option<String>,
+ pub created_at: chrono::DateTime<chrono::Utc>,
+}
+
/// Request to create a checkpoint.
#[derive(Debug, Deserialize, ToSchema)]
#[serde(rename_all = "camelCase")]
@@ -321,23 +383,49 @@ pub async fn spawn_task(
}
};
- // Get repository URL from the contract's primary repository
- let repo_url = match repository::list_contract_repositories(pool, request.contract_id).await {
- Ok(repos) => {
- // Prefer primary repo, fallback to first repo
- repos.iter()
- .find(|r| r.is_primary)
- .or(repos.first())
- .and_then(|r| r.repository_url.clone())
- }
- Err(e) => {
- tracing::warn!(error = %e, "Failed to get contract repositories, continuing without repo URL");
+ // Get repository URL - either from request or from contract's repositories
+ let repo_url = if let Some(url) = request.repository_url.clone() {
+ if !url.trim().is_empty() {
+ Some(url)
+ } else {
None
}
+ } else {
+ None
};
- // Supervisor can override with explicit repository_url
- let repo_url = request.repository_url.clone().or(repo_url);
+ // If no repo URL provided, look it up from the contract
+ let repo_url = match repo_url {
+ Some(url) => Some(url),
+ None => {
+ match repository::list_contract_repositories(pool, request.contract_id).await {
+ Ok(repos) => {
+ // Prefer primary repo, fallback to first repo
+ let repo = repos.iter()
+ .find(|r| r.is_primary)
+ .or(repos.first());
+
+ // Use repository_url if set, otherwise use local_path
+ repo.and_then(|r| {
+ r.repository_url.clone()
+ .or_else(|| r.local_path.clone())
+ })
+ }
+ Err(e) => {
+ tracing::warn!(error = %e, "Failed to get contract repositories");
+ None
+ }
+ }
+ }
+ };
+
+ // Validate that we have a repo URL
+ if repo_url.is_none() {
+ return (
+ StatusCode::BAD_REQUEST,
+ Json(ApiError::new("MISSING_REPO_URL", "No repository URL found. Either provide one or ensure the contract has repositories configured.")),
+ ).into_response();
+ }
// Create task request
let create_req = CreateTaskRequest {
@@ -1151,3 +1239,208 @@ pub async fn get_task_diff(
}),
).into_response()
}
+
+// =============================================================================
+// Supervisor Question Handlers
+// =============================================================================
+
+/// Ask a question and wait for user feedback.
+///
+/// The supervisor calls this to ask a question. The endpoint will poll until
+/// either the user responds or the timeout is reached.
+#[utoipa::path(
+ post,
+ path = "/api/v1/mesh/supervisor/questions",
+ request_body = AskQuestionRequest,
+ responses(
+ (status = 200, description = "Question answered", body = AskQuestionResponse),
+ (status = 408, description = "Question timed out", body = AskQuestionResponse),
+ (status = 401, description = "Unauthorized"),
+ (status = 403, description = "Forbidden - not a supervisor"),
+ (status = 500, description = "Internal server error"),
+ ),
+ security(
+ ("tool_key" = [])
+ ),
+ tag = "Mesh Supervisor"
+)]
+pub async fn ask_question(
+ State(state): State<SharedState>,
+ headers: HeaderMap,
+ Json(request): Json<AskQuestionRequest>,
+) -> impl IntoResponse {
+ let (supervisor_id, owner_id) = match verify_supervisor_auth(&state, &headers, None).await {
+ Ok(ids) => ids,
+ Err(e) => return e.into_response(),
+ };
+
+ let pool = state.db_pool.as_ref().unwrap();
+
+ // Get the supervisor task to find its contract
+ let supervisor = match repository::get_task_for_owner(pool, supervisor_id, owner_id).await {
+ Ok(Some(t)) => t,
+ Ok(None) => {
+ return (
+ StatusCode::NOT_FOUND,
+ Json(ApiError::new("NOT_FOUND", "Supervisor task not found")),
+ ).into_response();
+ }
+ Err(e) => {
+ tracing::error!(error = %e, "Failed to get supervisor task");
+ return (
+ StatusCode::INTERNAL_SERVER_ERROR,
+ Json(ApiError::new("DB_ERROR", "Failed to get supervisor task")),
+ ).into_response();
+ }
+ };
+
+ let Some(contract_id) = supervisor.contract_id else {
+ return (
+ StatusCode::BAD_REQUEST,
+ Json(ApiError::new("NO_CONTRACT", "Supervisor has no associated contract")),
+ ).into_response();
+ };
+
+ // Add the question
+ let question_id = state.add_supervisor_question(
+ supervisor_id,
+ contract_id,
+ owner_id,
+ request.question.clone(),
+ request.choices.clone(),
+ request.context.clone(),
+ );
+
+ // Poll for response with timeout
+ let timeout_duration = std::time::Duration::from_secs(request.timeout_seconds.max(1) as u64);
+ let start = std::time::Instant::now();
+ let poll_interval = std::time::Duration::from_millis(500);
+
+ loop {
+ // Check if response has been submitted
+ if let Some(response) = state.get_question_response(question_id) {
+ // Clean up the response
+ state.cleanup_question_response(question_id);
+
+ return (
+ StatusCode::OK,
+ Json(AskQuestionResponse {
+ question_id,
+ response: Some(response.response),
+ timed_out: false,
+ }),
+ ).into_response();
+ }
+
+ // Check timeout
+ if start.elapsed() >= timeout_duration {
+ // Remove the pending question on timeout
+ state.remove_pending_question(question_id);
+
+ return (
+ StatusCode::REQUEST_TIMEOUT,
+ Json(AskQuestionResponse {
+ question_id,
+ response: None,
+ timed_out: true,
+ }),
+ ).into_response();
+ }
+
+ // Wait before polling again
+ tokio::time::sleep(poll_interval).await;
+ }
+}
+
+/// Get all pending questions for the current user.
+#[utoipa::path(
+ get,
+ path = "/api/v1/mesh/questions",
+ responses(
+ (status = 200, description = "List of pending questions", body = Vec<PendingQuestionSummary>),
+ (status = 401, description = "Unauthorized"),
+ (status = 500, description = "Internal server error"),
+ ),
+ security(
+ ("bearer_auth" = []),
+ ("api_key" = [])
+ ),
+ tag = "Mesh"
+)]
+pub async fn list_pending_questions(
+ State(state): State<SharedState>,
+ Authenticated(auth): Authenticated,
+) -> impl IntoResponse {
+ let questions: Vec<PendingQuestionSummary> = state
+ .get_pending_questions_for_owner(auth.owner_id)
+ .into_iter()
+ .map(|q| PendingQuestionSummary {
+ question_id: q.question_id,
+ task_id: q.task_id,
+ contract_id: q.contract_id,
+ question: q.question,
+ choices: q.choices,
+ context: q.context,
+ created_at: q.created_at,
+ })
+ .collect();
+
+ Json(questions).into_response()
+}
+
+/// Answer a pending supervisor question.
+#[utoipa::path(
+ post,
+ path = "/api/v1/mesh/questions/{question_id}/answer",
+ params(
+ ("question_id" = Uuid, Path, description = "Question ID")
+ ),
+ request_body = AnswerQuestionRequest,
+ responses(
+ (status = 200, description = "Question answered", body = AnswerQuestionResponse),
+ (status = 401, description = "Unauthorized"),
+ (status = 404, description = "Question not found"),
+ (status = 500, description = "Internal server error"),
+ ),
+ security(
+ ("bearer_auth" = []),
+ ("api_key" = [])
+ ),
+ tag = "Mesh"
+)]
+pub async fn answer_question(
+ State(state): State<SharedState>,
+ Authenticated(auth): Authenticated,
+ Path(question_id): Path<Uuid>,
+ Json(request): Json<AnswerQuestionRequest>,
+) -> impl IntoResponse {
+ // Verify the question exists and belongs to this owner
+ let question = match state.get_pending_question(question_id) {
+ Some(q) if q.owner_id == auth.owner_id => q,
+ Some(_) => {
+ return (
+ StatusCode::FORBIDDEN,
+ Json(ApiError::new("FORBIDDEN", "Question belongs to another user")),
+ ).into_response();
+ }
+ None => {
+ return (
+ StatusCode::NOT_FOUND,
+ Json(ApiError::new("NOT_FOUND", "Question not found or already answered")),
+ ).into_response();
+ }
+ };
+
+ // Submit the response
+ let success = state.submit_question_response(question_id, request.response);
+
+ if success {
+ tracing::info!(
+ question_id = %question_id,
+ task_id = %question.task_id,
+ "User answered supervisor question"
+ );
+ }
+
+ Json(AnswerQuestionResponse { success }).into_response()
+}