diff options
| author | soryu <soryu@soryu.co> | 2026-01-27 01:14:17 +0000 |
|---|---|---|
| committer | soryu <soryu@soryu.co> | 2026-01-27 01:14:17 +0000 |
| commit | 448c5a8cc0c6e39909a90999a009565fa4b1c639 (patch) | |
| tree | 29df2496035a2292320c83811112160245c734b2 | |
| parent | f6b4d06a0158fb7803a2d7a861cf891cb3b202b4 (diff) | |
| download | soryu-makima/task-task-59202404-59202404.tar.gz soryu-makima/task-task-59202404-59202404.zip | |
[WIP] Heartbeat checkpoint - 2026-01-27 01:14:17 UTCmakima/task-task-59202404-59202404
| -rw-r--r-- | makima/src/daemon/task/manager.rs | 340 | ||||
| -rw-r--r-- | makima/src/server/handlers/mesh_daemon.rs | 12 | ||||
| -rw-r--r-- | makima/src/server/handlers/mesh_supervisor.rs | 64 | ||||
| -rw-r--r-- | makima/src/server/state.rs | 26 |
4 files changed, 272 insertions, 170 deletions
diff --git a/makima/src/daemon/task/manager.rs b/makima/src/daemon/task/manager.rs index b162f33..9dd4506 100644 --- a/makima/src/daemon/task/manager.rs +++ b/makima/src/daemon/task/manager.rs @@ -2922,121 +2922,137 @@ impl TaskManager { target_branch: Option<String>, squash: bool, ) -> Result<(), DaemonError> { - // Get task info - let task_info = { + // Get worktree path - this works even for completed tasks by scanning worktrees directory + let worktree_path = match self.get_task_worktree_path(task_id).await { + Ok(path) => path, + Err(e) => { + tracing::warn!(task_id = %task_id, error = %e, "Failed to find worktree for merge"); + let msg = DaemonMessage::MergeToTargetResult { + task_id, + success: false, + message: format!("Task {} not found or has no worktree: {}", task_id, e), + commit_sha: None, + conflicts: None, + }; + let _ = self.ws_tx.send(msg).await; + return Ok(()); + } + }; + + // Get base_branch from in-memory tasks if available (for fallback target branch) + let base_branch = { let tasks = self.tasks.read().await; - tasks.get(&task_id).map(|t| ( - t.worktree.as_ref().map(|w| w.path.clone()), - t.base_branch.clone(), - )) + tasks.get(&task_id).and_then(|t| t.base_branch.clone()) }; - let (success, message, commit_sha, conflicts) = match task_info { - Some((Some(worktree_path), base)) => { - let target = target_branch.unwrap_or_else(|| base.unwrap_or_else(|| "main".to_string())); + let target = target_branch.unwrap_or_else(|| base_branch.unwrap_or_else(|| "main".to_string())); - // First, stage and commit any uncommitted changes - let add_result = tokio::process::Command::new("git") - .current_dir(&worktree_path) - .args(["add", "-A"]) - .output() - .await; + tracing::info!( + task_id = %task_id, + worktree_path = %worktree_path.display(), + target_branch = %target, + squash = squash, + "Starting merge operation" + ); - if let Err(e) = add_result { - (false, format!("Failed to stage changes: {}", e), None, None) - } else { - // Commit if there are staged changes - let commit_result = tokio::process::Command::new("git") - .current_dir(&worktree_path) - .args(["commit", "-m", "Task completion checkpoint", "--allow-empty"]) - .output() - .await; - - if let Err(e) = commit_result { - tracing::warn!(task_id = %task_id, error = %e, "Commit failed (may be empty)"); + // First, stage and commit any uncommitted changes + let add_result = tokio::process::Command::new("git") + .current_dir(&worktree_path) + .args(["add", "-A"]) + .output() + .await; + + let (success, message, commit_sha, conflicts) = if let Err(e) = add_result { + (false, format!("Failed to stage changes: {}", e), None, None) + } else { + // Commit if there are staged changes + let commit_result = tokio::process::Command::new("git") + .current_dir(&worktree_path) + .args(["commit", "-m", "Task completion checkpoint", "--allow-empty"]) + .output() + .await; + + if let Err(e) = commit_result { + tracing::warn!(task_id = %task_id, error = %e, "Commit failed (may be empty)"); + } + + // Get current branch name + let branch_output = tokio::process::Command::new("git") + .current_dir(&worktree_path) + .args(["rev-parse", "--abbrev-ref", "HEAD"]) + .output() + .await; + + let source_branch = branch_output + .map(|o| String::from_utf8_lossy(&o.stdout).trim().to_string()) + .unwrap_or_else(|_| "unknown".to_string()); + + // Checkout target branch + let checkout = tokio::process::Command::new("git") + .current_dir(&worktree_path) + .args(["checkout", &target]) + .output() + .await; + + match checkout { + Ok(output) if output.status.success() => { + // Merge the source branch + let mut merge_cmd = tokio::process::Command::new("git"); + merge_cmd.current_dir(&worktree_path); + merge_cmd.arg("merge"); + if squash { + merge_cmd.arg("--squash"); } + merge_cmd.arg(&source_branch); + merge_cmd.arg("-m").arg(format!("Merge task {} into {}", task_id, target)); - // Get current branch name - let branch_output = tokio::process::Command::new("git") - .current_dir(&worktree_path) - .args(["rev-parse", "--abbrev-ref", "HEAD"]) - .output() - .await; - - let source_branch = branch_output - .map(|o| String::from_utf8_lossy(&o.stdout).trim().to_string()) - .unwrap_or_else(|_| "unknown".to_string()); - - // Checkout target branch - let checkout = tokio::process::Command::new("git") - .current_dir(&worktree_path) - .args(["checkout", &target]) - .output() - .await; - - match checkout { + match merge_cmd.output().await { Ok(output) if output.status.success() => { - // Merge the source branch - let mut merge_cmd = tokio::process::Command::new("git"); - merge_cmd.current_dir(&worktree_path); - merge_cmd.arg("merge"); + // Get the commit SHA + let sha_output = tokio::process::Command::new("git") + .current_dir(&worktree_path) + .args(["rev-parse", "HEAD"]) + .output() + .await; + + let sha = sha_output + .ok() + .map(|o| String::from_utf8_lossy(&o.stdout).trim().to_string()); + if squash { - merge_cmd.arg("--squash"); + // For squash merge, we need to commit + let _ = tokio::process::Command::new("git") + .current_dir(&worktree_path) + .args(["commit", "-m", &format!("Squashed merge of task {}", task_id)]) + .output() + .await; } - merge_cmd.arg(&source_branch); - merge_cmd.arg("-m").arg(format!("Merge task {} into {}", task_id, target)); - - match merge_cmd.output().await { - Ok(output) if output.status.success() => { - // Get the commit SHA - let sha_output = tokio::process::Command::new("git") - .current_dir(&worktree_path) - .args(["rev-parse", "HEAD"]) - .output() - .await; - - let sha = sha_output - .ok() - .map(|o| String::from_utf8_lossy(&o.stdout).trim().to_string()); - - if squash { - // For squash merge, we need to commit - let _ = tokio::process::Command::new("git") - .current_dir(&worktree_path) - .args(["commit", "-m", &format!("Squashed merge of task {}", task_id)]) - .output() - .await; - } - (true, format!("Merged {} into {}", source_branch, target), sha, None) - } - Ok(output) => { - let stderr = String::from_utf8_lossy(&output.stderr); - // Check for merge conflicts - if stderr.contains("CONFLICT") { - let conflict_files = stderr - .lines() - .filter(|l| l.contains("CONFLICT")) - .map(|l| l.to_string()) - .collect::<Vec<_>>(); - (false, "Merge conflicts detected".to_string(), None, Some(conflict_files)) - } else { - (false, format!("Merge failed: {}", stderr), None, None) - } - } - Err(e) => (false, format!("Failed to merge: {}", e), None, None), - } + (true, format!("Merged {} into {}", source_branch, target), sha, None) } Ok(output) => { let stderr = String::from_utf8_lossy(&output.stderr); - (false, format!("Failed to checkout target branch: {}", stderr), None, None) + // Check for merge conflicts + if stderr.contains("CONFLICT") { + let conflict_files = stderr + .lines() + .filter(|l| l.contains("CONFLICT")) + .map(|l| l.to_string()) + .collect::<Vec<_>>(); + (false, "Merge conflicts detected".to_string(), None, Some(conflict_files)) + } else { + (false, format!("Merge failed: {}", stderr), None, None) + } } - Err(e) => (false, format!("Failed to checkout: {}", e), None, None), + Err(e) => (false, format!("Failed to merge: {}", e), None, None), } } + Ok(output) => { + let stderr = String::from_utf8_lossy(&output.stderr); + (false, format!("Failed to checkout target branch: {}", stderr), None, None) + } + Err(e) => (false, format!("Failed to checkout: {}", e), None, None), } - Some((None, _)) => (false, format!("Task {} has no worktree", task_id), None, None), - None => (false, format!("Task {} not found", task_id), None, None), }; let msg = DaemonMessage::MergeToTargetResult { @@ -3058,15 +3074,27 @@ impl TaskManager { body: Option<String>, base_branch: String, ) -> Result<(), DaemonError> { - // Get task's worktree path and base branch - let (worktree_path, task_base_branch) = { + // Get worktree path - this works even for completed tasks by scanning worktrees directory + let worktree_path = match self.get_task_worktree_path(task_id).await { + Ok(path) => path, + Err(e) => { + tracing::error!(task_id = %task_id, error = %e, "Failed to find worktree for PR creation"); + let msg = DaemonMessage::PRCreated { + task_id, + success: false, + message: format!("Task {} not found or has no worktree: {}", task_id, e), + pr_url: None, + pr_number: None, + }; + let _ = self.ws_tx.send(msg).await; + return Ok(()); + } + }; + + // Get base_branch from in-memory tasks if available (for fallback) + let task_base_branch = { let tasks = self.tasks.read().await; - let worktree = tasks.get(&task_id) - .and_then(|t| t.worktree.as_ref()) - .map(|w| w.path.clone()); - let base = tasks.get(&task_id) - .and_then(|t| t.base_branch.clone()); - (worktree, base) + tasks.get(&task_id).and_then(|t| t.base_branch.clone()) }; // Use task's base_branch if the provided one is the default "main" and task has a detected one @@ -3079,69 +3107,63 @@ impl TaskManager { tracing::info!( task_id = %task_id, effective_base_branch = %effective_base_branch, - worktree_exists = worktree_path.is_some(), + worktree_path = %worktree_path.display(), "Creating PR with effective base branch" ); - let (success, message, pr_url, pr_number) = if let Some(path) = worktree_path { - // Push the current branch first - tracing::info!(path = %path.display(), "Pushing branch to origin"); - let push_result = tokio::process::Command::new("git") - .current_dir(&path) - .args(["push", "-u", "origin", "HEAD"]) - .output() - .await; + // Push the current branch first + let push_result = tokio::process::Command::new("git") + .current_dir(&worktree_path) + .args(["push", "-u", "origin", "HEAD"]) + .output() + .await; - match push_result { - Err(e) => { - tracing::error!(error = %e, "Failed to execute git push"); - (false, format!("Failed to push branch: {}", e), None, None) - } - Ok(output) if !output.status.success() => { - let stderr = String::from_utf8_lossy(&output.stderr); - tracing::error!(stderr = %stderr, "git push failed"); - (false, format!("Failed to push branch: {}", stderr), None, None) + let (success, message, pr_url, pr_number) = match push_result { + Err(e) => { + tracing::error!(error = %e, "Failed to execute git push"); + (false, format!("Failed to push branch: {}", e), None, None) + } + Ok(output) if !output.status.success() => { + let stderr = String::from_utf8_lossy(&output.stderr); + tracing::error!(stderr = %stderr, "git push failed"); + (false, format!("Failed to push branch: {}", stderr), None, None) + } + Ok(_) => { + tracing::info!("Branch pushed successfully, creating PR"); + // Create PR using gh CLI + let mut pr_cmd = tokio::process::Command::new("gh"); + pr_cmd.current_dir(&worktree_path); + pr_cmd.args(["pr", "create", "--title", &title, "--base", &effective_base_branch]); + + if let Some(ref body_text) = body { + pr_cmd.args(["--body", body_text]); + } else { + pr_cmd.args(["--body", ""]); } - Ok(_) => { - tracing::info!("Branch pushed successfully, creating PR"); - // Create PR using gh CLI - let mut pr_cmd = tokio::process::Command::new("gh"); - pr_cmd.current_dir(&path); - pr_cmd.args(["pr", "create", "--title", &title, "--base", &effective_base_branch]); - - if let Some(ref body_text) = body { - pr_cmd.args(["--body", body_text]); - } else { - pr_cmd.args(["--body", ""]); - } - match pr_cmd.output().await { - Ok(output) if output.status.success() => { - let stdout = String::from_utf8_lossy(&output.stdout); - // gh pr create outputs the PR URL - let url = stdout.lines().last().map(|s| s.trim().to_string()); - // Extract PR number from URL - let number = url.as_ref().and_then(|u| { - u.split('/').last().and_then(|n| n.parse::<i32>().ok()) - }); - tracing::info!(pr_url = ?url, pr_number = ?number, "PR created successfully"); - (true, "Pull request created".to_string(), url, number) - } - Ok(output) => { - let stderr = String::from_utf8_lossy(&output.stderr); - tracing::error!(stderr = %stderr, "gh pr create failed"); - (false, format!("Failed to create PR: {}", stderr), None, None) - } - Err(e) => { - tracing::error!(error = %e, "Failed to execute gh command"); - (false, format!("Failed to run gh: {}", e), None, None) - } + match pr_cmd.output().await { + Ok(output) if output.status.success() => { + let stdout = String::from_utf8_lossy(&output.stdout); + // gh pr create outputs the PR URL + let url = stdout.lines().last().map(|s| s.trim().to_string()); + // Extract PR number from URL + let number = url.as_ref().and_then(|u| { + u.split('/').last().and_then(|n| n.parse::<i32>().ok()) + }); + tracing::info!(pr_url = ?url, pr_number = ?number, "PR created successfully"); + (true, "Pull request created".to_string(), url, number) + } + Ok(output) => { + let stderr = String::from_utf8_lossy(&output.stderr); + tracing::error!(stderr = %stderr, "gh pr create failed"); + (false, format!("Failed to create PR: {}", stderr), None, None) + } + Err(e) => { + tracing::error!(error = %e, "Failed to execute gh command"); + (false, format!("Failed to run gh: {}", e), None, None) } } } - } else { - tracing::error!(task_id = %task_id, "Task not found or has no worktree"); - (false, format!("Task {} not found or has no worktree", task_id), None, None) }; tracing::info!( diff --git a/makima/src/server/handlers/mesh_daemon.rs b/makima/src/server/handlers/mesh_daemon.rs index 0aea40e..f7fe49f 100644 --- a/makima/src/server/handlers/mesh_daemon.rs +++ b/makima/src/server/handlers/mesh_daemon.rs @@ -1734,15 +1734,25 @@ async fn handle_daemon_connection(socket: WebSocket, state: SharedState, auth_re success, message, pr_url, - pr_number: _, + pr_number, }) => { tracing::info!( task_id = %task_id, success = success, pr_url = ?pr_url, + pr_number = ?pr_number, "PR created result received" ); + // Broadcast the PR result for waiting handlers + state.broadcast_pr_result(crate::server::state::PrResultNotification { + task_id, + success, + message: message.clone(), + pr_url: pr_url.clone(), + pr_number, + }); + // On successful PR creation, notify supervisor of next steps if success { if let Some(pool) = state.db_pool.as_ref() { diff --git a/makima/src/server/handlers/mesh_supervisor.rs b/makima/src/server/handlers/mesh_supervisor.rs index a654a05..f0cb69e 100644 --- a/makima/src/server/handlers/mesh_supervisor.rs +++ b/makima/src/server/handlers/mesh_supervisor.rs @@ -1476,6 +1476,9 @@ pub async fn create_pr( ).into_response(); }; + // Subscribe to PR results BEFORE sending the command + let mut rx = state.pr_results.subscribe(); + // Send CreatePR command to daemon let cmd = DaemonCommand::CreatePR { task_id: request.task_id, @@ -1492,16 +1495,57 @@ pub async fn create_pr( ).into_response(); } - ( - StatusCode::CREATED, - Json(CreatePRResponse { - task_id: request.task_id, - success: true, - message: "PR creation command sent".to_string(), - pr_url: None, - pr_number: None, - }), - ).into_response() + // Wait for the PR result with a timeout (60 seconds should be plenty for PR creation) + let timeout = tokio::time::Duration::from_secs(60); + let result = tokio::time::timeout(timeout, async { + loop { + match rx.recv().await { + Ok(notification) => { + if notification.task_id == request.task_id { + return Some(notification); + } + // Not our task, keep waiting + } + Err(_) => { + // Channel closed or lagged + return None; + } + } + } + }).await; + + match result { + Ok(Some(notification)) => { + let status = if notification.success { + StatusCode::CREATED + } else { + StatusCode::INTERNAL_SERVER_ERROR + }; + ( + status, + Json(CreatePRResponse { + task_id: request.task_id, + success: notification.success, + message: notification.message, + pr_url: notification.pr_url, + pr_number: notification.pr_number, + }), + ).into_response() + } + Ok(None) | Err(_) => { + // Timeout or channel error - return error status + ( + StatusCode::GATEWAY_TIMEOUT, + Json(CreatePRResponse { + task_id: request.task_id, + success: false, + message: "PR creation timed out waiting for daemon response".to_string(), + pr_url: None, + pr_number: None, + }), + ).into_response() + } + } } /// Get the diff for a task's changes. diff --git a/makima/src/server/state.rs b/makima/src/server/state.rs index c579f0f..02a2328 100644 --- a/makima/src/server/state.rs +++ b/makima/src/server/state.rs @@ -118,6 +118,21 @@ pub struct MergeResultNotification { pub conflicts: Option<Vec<String>>, } +/// Notification for PR creation results. +#[derive(Debug, Clone)] +pub struct PrResultNotification { + /// ID of the task for which PR was created + pub task_id: Uuid, + /// Whether the PR creation succeeded + pub success: bool, + /// Message describing the result + pub message: String, + /// PR URL if creation succeeded + pub pr_url: Option<String>, + /// PR number if creation succeeded + pub pr_number: Option<i32>, +} + /// Notification for supervisor questions requiring user feedback. #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] #[serde(rename_all = "camelCase")] @@ -527,6 +542,8 @@ pub struct AppState { pub supervisor_questions: broadcast::Sender<SupervisorQuestionNotification>, /// Broadcast channel for merge result notifications pub merge_results: broadcast::Sender<MergeResultNotification>, + /// Broadcast channel for PR creation result notifications + pub pr_results: broadcast::Sender<PrResultNotification>, /// Pending supervisor questions awaiting user response (keyed by question_id) pub pending_questions: DashMap<Uuid, PendingSupervisorQuestion>, /// Responses to supervisor questions (keyed by question_id) @@ -560,6 +577,7 @@ impl AppState { let (task_completions, _) = broadcast::channel(256); // For supervisor task monitoring let (supervisor_questions, _) = broadcast::channel(256); // For supervisor questions to users let (merge_results, _) = broadcast::channel(256); // For merge operation results + let (pr_results, _) = broadcast::channel(256); // For PR creation results // Initialize JWT verifier from environment (optional) // Requires SUPABASE_URL and either SUPABASE_JWT_PUBLIC_KEY (RS256) or SUPABASE_JWT_SECRET (HS256) @@ -603,6 +621,7 @@ impl AppState { task_completions, supervisor_questions, merge_results, + pr_results, pending_questions: DashMap::new(), question_responses: DashMap::new(), daemon_connections: DashMap::new(), @@ -699,6 +718,13 @@ impl AppState { let _ = self.merge_results.send(notification); } + /// Broadcast a PR creation result notification to all subscribers. + /// + /// Used to notify waiting handlers when a PR creation operation completes. + pub fn broadcast_pr_result(&self, notification: PrResultNotification) { + let _ = self.pr_results.send(notification); + } + /// Add a pending supervisor question and broadcast it. pub fn add_supervisor_question( &self, |
