diff options
| author | soryu <soryu@soryu.co> | 2026-01-26 21:24:04 +0000 |
|---|---|---|
| committer | soryu <soryu@soryu.co> | 2026-01-26 21:24:04 +0000 |
| commit | bc1ce8013bc36a1585be05b928f2386ab56529c2 (patch) | |
| tree | 8a6f2f5641103a029138047d675d0008986d60b7 | |
| parent | 04e1e8f0dd85d19917ac5ba0b73cba65ebac8976 (diff) | |
| download | soryu-bc1ce8013bc36a1585be05b928f2386ab56529c2.tar.gz soryu-bc1ce8013bc36a1585be05b928f2386ab56529c2.zip | |
Make merges synchronous
| -rw-r--r-- | makima/src/server/handlers/mesh_daemon.rs | 13 | ||||
| -rw-r--r-- | makima/src/server/handlers/mesh_supervisor.rs | 59 | ||||
| -rw-r--r-- | makima/src/server/state.rs | 26 |
3 files changed, 86 insertions, 12 deletions
diff --git a/makima/src/server/handlers/mesh_daemon.rs b/makima/src/server/handlers/mesh_daemon.rs index 270118f..0ba37d2 100644 --- a/makima/src/server/handlers/mesh_daemon.rs +++ b/makima/src/server/handlers/mesh_daemon.rs @@ -1665,8 +1665,8 @@ async fn handle_daemon_connection(socket: WebSocket, state: SharedState, auth_re task_id, success, message, - commit_sha: _, - conflicts: _, + commit_sha, + conflicts, }) => { tracing::info!( task_id = %task_id, @@ -1674,6 +1674,15 @@ async fn handle_daemon_connection(socket: WebSocket, state: SharedState, auth_re "Merge to target result received" ); + // Broadcast the merge result for waiting handlers + state.broadcast_merge_result(crate::server::state::MergeResultNotification { + task_id, + success, + message: message.clone(), + commit_sha: commit_sha.clone(), + conflicts: conflicts.clone(), + }); + // On successful merge, notify supervisor to check if all merges complete if success { if let Some(pool) = state.db_pool.as_ref() { diff --git a/makima/src/server/handlers/mesh_supervisor.rs b/makima/src/server/handlers/mesh_supervisor.rs index 1b476ef..d1a1a99 100644 --- a/makima/src/server/handlers/mesh_supervisor.rs +++ b/makima/src/server/handlers/mesh_supervisor.rs @@ -1349,6 +1349,9 @@ pub async fn merge_task( ).into_response(); }; + // Subscribe to merge results BEFORE sending the command + let mut rx = state.merge_results.subscribe(); + // Send MergeTaskToTarget command to daemon let cmd = DaemonCommand::MergeTaskToTarget { task_id, @@ -1364,16 +1367,52 @@ pub async fn merge_task( ).into_response(); } - ( - StatusCode::OK, - Json(MergeTaskResponse { - task_id, - success: true, - message: "Merge command sent".to_string(), - commit_sha: None, - conflicts: None, - }), - ).into_response() + // Wait for the merge result with a timeout (60 seconds should be plenty for a merge) + let timeout = tokio::time::Duration::from_secs(60); + let result = tokio::time::timeout(timeout, async { + loop { + match rx.recv().await { + Ok(notification) => { + if notification.task_id == task_id { + return Some(notification); + } + // Not our task, keep waiting + } + Err(_) => { + // Channel closed or lagged + return None; + } + } + } + }).await; + + match result { + Ok(Some(notification)) => { + ( + StatusCode::OK, + Json(MergeTaskResponse { + task_id, + success: notification.success, + message: notification.message, + commit_sha: notification.commit_sha, + conflicts: notification.conflicts, + }), + ).into_response() + } + Ok(None) | Err(_) => { + // Timeout or channel error - return error status + ( + StatusCode::GATEWAY_TIMEOUT, + Json(MergeTaskResponse { + task_id, + success: false, + message: "Merge operation timed out waiting for daemon response".to_string(), + commit_sha: None, + conflicts: None, + }), + ).into_response() + } + } } /// Create a pull request for a task's changes. diff --git a/makima/src/server/state.rs b/makima/src/server/state.rs index b954efe..988f657 100644 --- a/makima/src/server/state.rs +++ b/makima/src/server/state.rs @@ -103,6 +103,21 @@ pub struct TaskCompletionNotification { pub error_message: Option<String>, } +/// Notification for merge operation results. +#[derive(Debug, Clone)] +pub struct MergeResultNotification { + /// ID of the task that was merged + pub task_id: Uuid, + /// Whether the merge succeeded + pub success: bool, + /// Message describing the result + pub message: String, + /// Commit SHA if merge succeeded + pub commit_sha: Option<String>, + /// List of conflicting files if merge failed due to conflicts + pub conflicts: Option<Vec<String>>, +} + /// Notification for supervisor questions requiring user feedback. #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] #[serde(rename_all = "camelCase")] @@ -507,6 +522,8 @@ pub struct AppState { pub task_completions: broadcast::Sender<TaskCompletionNotification>, /// Broadcast channel for supervisor question notifications pub supervisor_questions: broadcast::Sender<SupervisorQuestionNotification>, + /// Broadcast channel for merge result notifications + pub merge_results: broadcast::Sender<MergeResultNotification>, /// Pending supervisor questions awaiting user response (keyed by question_id) pub pending_questions: DashMap<Uuid, PendingSupervisorQuestion>, /// Responses to supervisor questions (keyed by question_id) @@ -539,6 +556,7 @@ impl AppState { let (task_output, _) = broadcast::channel(1024); // Larger buffer for output streaming let (task_completions, _) = broadcast::channel(256); // For supervisor task monitoring let (supervisor_questions, _) = broadcast::channel(256); // For supervisor questions to users + let (merge_results, _) = broadcast::channel(256); // For merge operation results // Initialize JWT verifier from environment (optional) // Requires SUPABASE_URL and either SUPABASE_JWT_PUBLIC_KEY (RS256) or SUPABASE_JWT_SECRET (HS256) @@ -581,6 +599,7 @@ impl AppState { task_output, task_completions, supervisor_questions, + merge_results, pending_questions: DashMap::new(), question_responses: DashMap::new(), daemon_connections: DashMap::new(), @@ -670,6 +689,13 @@ impl AppState { let _ = self.supervisor_questions.send(notification); } + /// Broadcast a merge result notification to all subscribers. + /// + /// Used to notify waiting handlers when a merge operation completes. + pub fn broadcast_merge_result(&self, notification: MergeResultNotification) { + let _ = self.merge_results.send(notification); + } + /// Add a pending supervisor question and broadcast it. pub fn add_supervisor_question( &self, |
