diff options
| author | soryu <soryu@soryu.co> | 2026-01-27 01:14:17 +0000 |
|---|---|---|
| committer | soryu <soryu@soryu.co> | 2026-01-27 01:18:55 +0000 |
| commit | d2ae4ce4f5808f823a77c79afb37ca089a04431c (patch) | |
| tree | ba27ab3a84d2504427e92aa2486e464090100f62 /makima/src/server | |
| parent | ee21c74987c61715660ff877b547b430426a27fd (diff) | |
| download | soryu-makima/fix-supervisor-merge.tar.gz soryu-makima/fix-supervisor-merge.zip | |
Fix supervisor merge for completed tasks and make PR command synchronousmakima/fix-supervisor-merge
## Issue 1: makima supervisor merge doesn't work for completed tasks
When a task completes, the daemon removes it from in-memory task tracking.
This caused merge operations to fail with "Task not found".
Fixed by updating handle_merge_task_to_target() to use get_task_worktree_path()
which scans the worktrees directory as a fallback when the task is not in memory.
Also updated handle_create_pr() with the same pattern for consistency.
## Issue 2: makima supervisor pr returns immediately without result
The create_pr handler was asynchronous - it sent the CreatePR command to the
daemon and immediately returned without waiting for the result.
Fixed by:
1. Adding PrResultNotification struct and pr_results broadcast channel to AppState
2. Updating mesh_daemon.rs to broadcast PRCreated results to the channel
3. Updating create_pr() handler to subscribe to pr_results and wait for the
result with a 60-second timeout (matching the merge command pattern)
Now the PR command returns the actual pr_url and pr_number from the daemon.
Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
Diffstat (limited to 'makima/src/server')
| -rw-r--r-- | makima/src/server/handlers/mesh_daemon.rs | 12 | ||||
| -rw-r--r-- | makima/src/server/handlers/mesh_supervisor.rs | 64 | ||||
| -rw-r--r-- | makima/src/server/state.rs | 26 |
3 files changed, 91 insertions, 11 deletions
diff --git a/makima/src/server/handlers/mesh_daemon.rs b/makima/src/server/handlers/mesh_daemon.rs index 0aea40e..f7fe49f 100644 --- a/makima/src/server/handlers/mesh_daemon.rs +++ b/makima/src/server/handlers/mesh_daemon.rs @@ -1734,15 +1734,25 @@ async fn handle_daemon_connection(socket: WebSocket, state: SharedState, auth_re success, message, pr_url, - pr_number: _, + pr_number, }) => { tracing::info!( task_id = %task_id, success = success, pr_url = ?pr_url, + pr_number = ?pr_number, "PR created result received" ); + // Broadcast the PR result for waiting handlers + state.broadcast_pr_result(crate::server::state::PrResultNotification { + task_id, + success, + message: message.clone(), + pr_url: pr_url.clone(), + pr_number, + }); + // On successful PR creation, notify supervisor of next steps if success { if let Some(pool) = state.db_pool.as_ref() { diff --git a/makima/src/server/handlers/mesh_supervisor.rs b/makima/src/server/handlers/mesh_supervisor.rs index 6d9f8fb..24ba4bb 100644 --- a/makima/src/server/handlers/mesh_supervisor.rs +++ b/makima/src/server/handlers/mesh_supervisor.rs @@ -1538,6 +1538,9 @@ pub async fn create_pr( ).into_response(); }; + // Subscribe to PR results BEFORE sending the command + let mut rx = state.pr_results.subscribe(); + // Send CreatePR command to daemon let cmd = DaemonCommand::CreatePR { task_id: request.task_id, @@ -1554,16 +1557,57 @@ pub async fn create_pr( ).into_response(); } - ( - StatusCode::CREATED, - Json(CreatePRResponse { - task_id: request.task_id, - success: true, - message: "PR creation command sent".to_string(), - pr_url: None, - pr_number: None, - }), - ).into_response() + // Wait for the PR result with a timeout (60 seconds should be plenty for PR creation) + let timeout = tokio::time::Duration::from_secs(60); + let result = tokio::time::timeout(timeout, async { + loop { + match rx.recv().await { + Ok(notification) => { + if notification.task_id == request.task_id { + return Some(notification); + } + // Not our task, keep waiting + } + Err(_) => { + // Channel closed or lagged + return None; + } + } + } + }).await; + + match result { + Ok(Some(notification)) => { + let status = if notification.success { + StatusCode::CREATED + } else { + StatusCode::INTERNAL_SERVER_ERROR + }; + ( + status, + Json(CreatePRResponse { + task_id: request.task_id, + success: notification.success, + message: notification.message, + pr_url: notification.pr_url, + pr_number: notification.pr_number, + }), + ).into_response() + } + Ok(None) | Err(_) => { + // Timeout or channel error - return error status + ( + StatusCode::GATEWAY_TIMEOUT, + Json(CreatePRResponse { + task_id: request.task_id, + success: false, + message: "PR creation timed out waiting for daemon response".to_string(), + pr_url: None, + pr_number: None, + }), + ).into_response() + } + } } /// Get the diff for a task's changes. diff --git a/makima/src/server/state.rs b/makima/src/server/state.rs index c579f0f..02a2328 100644 --- a/makima/src/server/state.rs +++ b/makima/src/server/state.rs @@ -118,6 +118,21 @@ pub struct MergeResultNotification { pub conflicts: Option<Vec<String>>, } +/// Notification for PR creation results. +#[derive(Debug, Clone)] +pub struct PrResultNotification { + /// ID of the task for which PR was created + pub task_id: Uuid, + /// Whether the PR creation succeeded + pub success: bool, + /// Message describing the result + pub message: String, + /// PR URL if creation succeeded + pub pr_url: Option<String>, + /// PR number if creation succeeded + pub pr_number: Option<i32>, +} + /// Notification for supervisor questions requiring user feedback. #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] #[serde(rename_all = "camelCase")] @@ -527,6 +542,8 @@ pub struct AppState { pub supervisor_questions: broadcast::Sender<SupervisorQuestionNotification>, /// Broadcast channel for merge result notifications pub merge_results: broadcast::Sender<MergeResultNotification>, + /// Broadcast channel for PR creation result notifications + pub pr_results: broadcast::Sender<PrResultNotification>, /// Pending supervisor questions awaiting user response (keyed by question_id) pub pending_questions: DashMap<Uuid, PendingSupervisorQuestion>, /// Responses to supervisor questions (keyed by question_id) @@ -560,6 +577,7 @@ impl AppState { let (task_completions, _) = broadcast::channel(256); // For supervisor task monitoring let (supervisor_questions, _) = broadcast::channel(256); // For supervisor questions to users let (merge_results, _) = broadcast::channel(256); // For merge operation results + let (pr_results, _) = broadcast::channel(256); // For PR creation results // Initialize JWT verifier from environment (optional) // Requires SUPABASE_URL and either SUPABASE_JWT_PUBLIC_KEY (RS256) or SUPABASE_JWT_SECRET (HS256) @@ -603,6 +621,7 @@ impl AppState { task_completions, supervisor_questions, merge_results, + pr_results, pending_questions: DashMap::new(), question_responses: DashMap::new(), daemon_connections: DashMap::new(), @@ -699,6 +718,13 @@ impl AppState { let _ = self.merge_results.send(notification); } + /// Broadcast a PR creation result notification to all subscribers. + /// + /// Used to notify waiting handlers when a PR creation operation completes. + pub fn broadcast_pr_result(&self, notification: PrResultNotification) { + let _ = self.pr_results.send(notification); + } + /// Add a pending supervisor question and broadcast it. pub fn add_supervisor_question( &self, |
