summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorsoryu <soryu@soryu.co>2026-01-26 21:24:04 +0000
committersoryu <soryu@soryu.co>2026-01-26 21:24:04 +0000
commitbc1ce8013bc36a1585be05b928f2386ab56529c2 (patch)
tree8a6f2f5641103a029138047d675d0008986d60b7
parent04e1e8f0dd85d19917ac5ba0b73cba65ebac8976 (diff)
downloadsoryu-bc1ce8013bc36a1585be05b928f2386ab56529c2.tar.gz
soryu-bc1ce8013bc36a1585be05b928f2386ab56529c2.zip
Make merges synchronous
-rw-r--r--makima/src/server/handlers/mesh_daemon.rs13
-rw-r--r--makima/src/server/handlers/mesh_supervisor.rs59
-rw-r--r--makima/src/server/state.rs26
3 files changed, 86 insertions, 12 deletions
diff --git a/makima/src/server/handlers/mesh_daemon.rs b/makima/src/server/handlers/mesh_daemon.rs
index 270118f..0ba37d2 100644
--- a/makima/src/server/handlers/mesh_daemon.rs
+++ b/makima/src/server/handlers/mesh_daemon.rs
@@ -1665,8 +1665,8 @@ async fn handle_daemon_connection(socket: WebSocket, state: SharedState, auth_re
task_id,
success,
message,
- commit_sha: _,
- conflicts: _,
+ commit_sha,
+ conflicts,
}) => {
tracing::info!(
task_id = %task_id,
@@ -1674,6 +1674,15 @@ async fn handle_daemon_connection(socket: WebSocket, state: SharedState, auth_re
"Merge to target result received"
);
+ // Broadcast the merge result for waiting handlers
+ state.broadcast_merge_result(crate::server::state::MergeResultNotification {
+ task_id,
+ success,
+ message: message.clone(),
+ commit_sha: commit_sha.clone(),
+ conflicts: conflicts.clone(),
+ });
+
// On successful merge, notify supervisor to check if all merges complete
if success {
if let Some(pool) = state.db_pool.as_ref() {
diff --git a/makima/src/server/handlers/mesh_supervisor.rs b/makima/src/server/handlers/mesh_supervisor.rs
index 1b476ef..d1a1a99 100644
--- a/makima/src/server/handlers/mesh_supervisor.rs
+++ b/makima/src/server/handlers/mesh_supervisor.rs
@@ -1349,6 +1349,9 @@ pub async fn merge_task(
).into_response();
};
+ // Subscribe to merge results BEFORE sending the command
+ let mut rx = state.merge_results.subscribe();
+
// Send MergeTaskToTarget command to daemon
let cmd = DaemonCommand::MergeTaskToTarget {
task_id,
@@ -1364,16 +1367,52 @@ pub async fn merge_task(
).into_response();
}
- (
- StatusCode::OK,
- Json(MergeTaskResponse {
- task_id,
- success: true,
- message: "Merge command sent".to_string(),
- commit_sha: None,
- conflicts: None,
- }),
- ).into_response()
+ // Wait for the merge result with a timeout (60 seconds should be plenty for a merge)
+ let timeout = tokio::time::Duration::from_secs(60);
+ let result = tokio::time::timeout(timeout, async {
+ loop {
+ match rx.recv().await {
+ Ok(notification) => {
+ if notification.task_id == task_id {
+ return Some(notification);
+ }
+ // Not our task, keep waiting
+ }
+ Err(_) => {
+ // Channel closed or lagged
+ return None;
+ }
+ }
+ }
+ }).await;
+
+ match result {
+ Ok(Some(notification)) => {
+ (
+ StatusCode::OK,
+ Json(MergeTaskResponse {
+ task_id,
+ success: notification.success,
+ message: notification.message,
+ commit_sha: notification.commit_sha,
+ conflicts: notification.conflicts,
+ }),
+ ).into_response()
+ }
+ Ok(None) | Err(_) => {
+ // Timeout or channel error - return error status
+ (
+ StatusCode::GATEWAY_TIMEOUT,
+ Json(MergeTaskResponse {
+ task_id,
+ success: false,
+ message: "Merge operation timed out waiting for daemon response".to_string(),
+ commit_sha: None,
+ conflicts: None,
+ }),
+ ).into_response()
+ }
+ }
}
/// Create a pull request for a task's changes.
diff --git a/makima/src/server/state.rs b/makima/src/server/state.rs
index b954efe..988f657 100644
--- a/makima/src/server/state.rs
+++ b/makima/src/server/state.rs
@@ -103,6 +103,21 @@ pub struct TaskCompletionNotification {
pub error_message: Option<String>,
}
+/// Notification for merge operation results.
+#[derive(Debug, Clone)]
+pub struct MergeResultNotification {
+ /// ID of the task that was merged
+ pub task_id: Uuid,
+ /// Whether the merge succeeded
+ pub success: bool,
+ /// Message describing the result
+ pub message: String,
+ /// Commit SHA if merge succeeded
+ pub commit_sha: Option<String>,
+ /// List of conflicting files if merge failed due to conflicts
+ pub conflicts: Option<Vec<String>>,
+}
+
/// Notification for supervisor questions requiring user feedback.
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
#[serde(rename_all = "camelCase")]
@@ -507,6 +522,8 @@ pub struct AppState {
pub task_completions: broadcast::Sender<TaskCompletionNotification>,
/// Broadcast channel for supervisor question notifications
pub supervisor_questions: broadcast::Sender<SupervisorQuestionNotification>,
+ /// Broadcast channel for merge result notifications
+ pub merge_results: broadcast::Sender<MergeResultNotification>,
/// Pending supervisor questions awaiting user response (keyed by question_id)
pub pending_questions: DashMap<Uuid, PendingSupervisorQuestion>,
/// Responses to supervisor questions (keyed by question_id)
@@ -539,6 +556,7 @@ impl AppState {
let (task_output, _) = broadcast::channel(1024); // Larger buffer for output streaming
let (task_completions, _) = broadcast::channel(256); // For supervisor task monitoring
let (supervisor_questions, _) = broadcast::channel(256); // For supervisor questions to users
+ let (merge_results, _) = broadcast::channel(256); // For merge operation results
// Initialize JWT verifier from environment (optional)
// Requires SUPABASE_URL and either SUPABASE_JWT_PUBLIC_KEY (RS256) or SUPABASE_JWT_SECRET (HS256)
@@ -581,6 +599,7 @@ impl AppState {
task_output,
task_completions,
supervisor_questions,
+ merge_results,
pending_questions: DashMap::new(),
question_responses: DashMap::new(),
daemon_connections: DashMap::new(),
@@ -670,6 +689,13 @@ impl AppState {
let _ = self.supervisor_questions.send(notification);
}
+ /// Broadcast a merge result notification to all subscribers.
+ ///
+ /// Used to notify waiting handlers when a merge operation completes.
+ pub fn broadcast_merge_result(&self, notification: MergeResultNotification) {
+ let _ = self.merge_results.send(notification);
+ }
+
/// Add a pending supervisor question and broadcast it.
pub fn add_supervisor_question(
&self,