summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorsoryu <soryu@soryu.co>2026-01-27 01:14:17 +0000
committersoryu <soryu@soryu.co>2026-01-27 01:14:17 +0000
commit448c5a8cc0c6e39909a90999a009565fa4b1c639 (patch)
tree29df2496035a2292320c83811112160245c734b2
parentf6b4d06a0158fb7803a2d7a861cf891cb3b202b4 (diff)
downloadsoryu-makima/task-task-59202404-59202404.tar.gz
soryu-makima/task-task-59202404-59202404.zip
[WIP] Heartbeat checkpoint - 2026-01-27 01:14:17 UTCmakima/task-task-59202404-59202404
-rw-r--r--makima/src/daemon/task/manager.rs340
-rw-r--r--makima/src/server/handlers/mesh_daemon.rs12
-rw-r--r--makima/src/server/handlers/mesh_supervisor.rs64
-rw-r--r--makima/src/server/state.rs26
4 files changed, 272 insertions, 170 deletions
diff --git a/makima/src/daemon/task/manager.rs b/makima/src/daemon/task/manager.rs
index b162f33..9dd4506 100644
--- a/makima/src/daemon/task/manager.rs
+++ b/makima/src/daemon/task/manager.rs
@@ -2922,121 +2922,137 @@ impl TaskManager {
target_branch: Option<String>,
squash: bool,
) -> Result<(), DaemonError> {
- // Get task info
- let task_info = {
+ // Get worktree path - this works even for completed tasks by scanning worktrees directory
+ let worktree_path = match self.get_task_worktree_path(task_id).await {
+ Ok(path) => path,
+ Err(e) => {
+ tracing::warn!(task_id = %task_id, error = %e, "Failed to find worktree for merge");
+ let msg = DaemonMessage::MergeToTargetResult {
+ task_id,
+ success: false,
+ message: format!("Task {} not found or has no worktree: {}", task_id, e),
+ commit_sha: None,
+ conflicts: None,
+ };
+ let _ = self.ws_tx.send(msg).await;
+ return Ok(());
+ }
+ };
+
+ // Get base_branch from in-memory tasks if available (for fallback target branch)
+ let base_branch = {
let tasks = self.tasks.read().await;
- tasks.get(&task_id).map(|t| (
- t.worktree.as_ref().map(|w| w.path.clone()),
- t.base_branch.clone(),
- ))
+ tasks.get(&task_id).and_then(|t| t.base_branch.clone())
};
- let (success, message, commit_sha, conflicts) = match task_info {
- Some((Some(worktree_path), base)) => {
- let target = target_branch.unwrap_or_else(|| base.unwrap_or_else(|| "main".to_string()));
+ let target = target_branch.unwrap_or_else(|| base_branch.unwrap_or_else(|| "main".to_string()));
- // First, stage and commit any uncommitted changes
- let add_result = tokio::process::Command::new("git")
- .current_dir(&worktree_path)
- .args(["add", "-A"])
- .output()
- .await;
+ tracing::info!(
+ task_id = %task_id,
+ worktree_path = %worktree_path.display(),
+ target_branch = %target,
+ squash = squash,
+ "Starting merge operation"
+ );
- if let Err(e) = add_result {
- (false, format!("Failed to stage changes: {}", e), None, None)
- } else {
- // Commit if there are staged changes
- let commit_result = tokio::process::Command::new("git")
- .current_dir(&worktree_path)
- .args(["commit", "-m", "Task completion checkpoint", "--allow-empty"])
- .output()
- .await;
-
- if let Err(e) = commit_result {
- tracing::warn!(task_id = %task_id, error = %e, "Commit failed (may be empty)");
+ // First, stage and commit any uncommitted changes
+ let add_result = tokio::process::Command::new("git")
+ .current_dir(&worktree_path)
+ .args(["add", "-A"])
+ .output()
+ .await;
+
+ let (success, message, commit_sha, conflicts) = if let Err(e) = add_result {
+ (false, format!("Failed to stage changes: {}", e), None, None)
+ } else {
+ // Commit if there are staged changes
+ let commit_result = tokio::process::Command::new("git")
+ .current_dir(&worktree_path)
+ .args(["commit", "-m", "Task completion checkpoint", "--allow-empty"])
+ .output()
+ .await;
+
+ if let Err(e) = commit_result {
+ tracing::warn!(task_id = %task_id, error = %e, "Commit failed (may be empty)");
+ }
+
+ // Get current branch name
+ let branch_output = tokio::process::Command::new("git")
+ .current_dir(&worktree_path)
+ .args(["rev-parse", "--abbrev-ref", "HEAD"])
+ .output()
+ .await;
+
+ let source_branch = branch_output
+ .map(|o| String::from_utf8_lossy(&o.stdout).trim().to_string())
+ .unwrap_or_else(|_| "unknown".to_string());
+
+ // Checkout target branch
+ let checkout = tokio::process::Command::new("git")
+ .current_dir(&worktree_path)
+ .args(["checkout", &target])
+ .output()
+ .await;
+
+ match checkout {
+ Ok(output) if output.status.success() => {
+ // Merge the source branch
+ let mut merge_cmd = tokio::process::Command::new("git");
+ merge_cmd.current_dir(&worktree_path);
+ merge_cmd.arg("merge");
+ if squash {
+ merge_cmd.arg("--squash");
}
+ merge_cmd.arg(&source_branch);
+ merge_cmd.arg("-m").arg(format!("Merge task {} into {}", task_id, target));
- // Get current branch name
- let branch_output = tokio::process::Command::new("git")
- .current_dir(&worktree_path)
- .args(["rev-parse", "--abbrev-ref", "HEAD"])
- .output()
- .await;
-
- let source_branch = branch_output
- .map(|o| String::from_utf8_lossy(&o.stdout).trim().to_string())
- .unwrap_or_else(|_| "unknown".to_string());
-
- // Checkout target branch
- let checkout = tokio::process::Command::new("git")
- .current_dir(&worktree_path)
- .args(["checkout", &target])
- .output()
- .await;
-
- match checkout {
+ match merge_cmd.output().await {
Ok(output) if output.status.success() => {
- // Merge the source branch
- let mut merge_cmd = tokio::process::Command::new("git");
- merge_cmd.current_dir(&worktree_path);
- merge_cmd.arg("merge");
+ // Get the commit SHA
+ let sha_output = tokio::process::Command::new("git")
+ .current_dir(&worktree_path)
+ .args(["rev-parse", "HEAD"])
+ .output()
+ .await;
+
+ let sha = sha_output
+ .ok()
+ .map(|o| String::from_utf8_lossy(&o.stdout).trim().to_string());
+
if squash {
- merge_cmd.arg("--squash");
+ // For squash merge, we need to commit
+ let _ = tokio::process::Command::new("git")
+ .current_dir(&worktree_path)
+ .args(["commit", "-m", &format!("Squashed merge of task {}", task_id)])
+ .output()
+ .await;
}
- merge_cmd.arg(&source_branch);
- merge_cmd.arg("-m").arg(format!("Merge task {} into {}", task_id, target));
-
- match merge_cmd.output().await {
- Ok(output) if output.status.success() => {
- // Get the commit SHA
- let sha_output = tokio::process::Command::new("git")
- .current_dir(&worktree_path)
- .args(["rev-parse", "HEAD"])
- .output()
- .await;
-
- let sha = sha_output
- .ok()
- .map(|o| String::from_utf8_lossy(&o.stdout).trim().to_string());
-
- if squash {
- // For squash merge, we need to commit
- let _ = tokio::process::Command::new("git")
- .current_dir(&worktree_path)
- .args(["commit", "-m", &format!("Squashed merge of task {}", task_id)])
- .output()
- .await;
- }
- (true, format!("Merged {} into {}", source_branch, target), sha, None)
- }
- Ok(output) => {
- let stderr = String::from_utf8_lossy(&output.stderr);
- // Check for merge conflicts
- if stderr.contains("CONFLICT") {
- let conflict_files = stderr
- .lines()
- .filter(|l| l.contains("CONFLICT"))
- .map(|l| l.to_string())
- .collect::<Vec<_>>();
- (false, "Merge conflicts detected".to_string(), None, Some(conflict_files))
- } else {
- (false, format!("Merge failed: {}", stderr), None, None)
- }
- }
- Err(e) => (false, format!("Failed to merge: {}", e), None, None),
- }
+ (true, format!("Merged {} into {}", source_branch, target), sha, None)
}
Ok(output) => {
let stderr = String::from_utf8_lossy(&output.stderr);
- (false, format!("Failed to checkout target branch: {}", stderr), None, None)
+ // Check for merge conflicts
+ if stderr.contains("CONFLICT") {
+ let conflict_files = stderr
+ .lines()
+ .filter(|l| l.contains("CONFLICT"))
+ .map(|l| l.to_string())
+ .collect::<Vec<_>>();
+ (false, "Merge conflicts detected".to_string(), None, Some(conflict_files))
+ } else {
+ (false, format!("Merge failed: {}", stderr), None, None)
+ }
}
- Err(e) => (false, format!("Failed to checkout: {}", e), None, None),
+ Err(e) => (false, format!("Failed to merge: {}", e), None, None),
}
}
+ Ok(output) => {
+ let stderr = String::from_utf8_lossy(&output.stderr);
+ (false, format!("Failed to checkout target branch: {}", stderr), None, None)
+ }
+ Err(e) => (false, format!("Failed to checkout: {}", e), None, None),
}
- Some((None, _)) => (false, format!("Task {} has no worktree", task_id), None, None),
- None => (false, format!("Task {} not found", task_id), None, None),
};
let msg = DaemonMessage::MergeToTargetResult {
@@ -3058,15 +3074,27 @@ impl TaskManager {
body: Option<String>,
base_branch: String,
) -> Result<(), DaemonError> {
- // Get task's worktree path and base branch
- let (worktree_path, task_base_branch) = {
+ // Get worktree path - this works even for completed tasks by scanning worktrees directory
+ let worktree_path = match self.get_task_worktree_path(task_id).await {
+ Ok(path) => path,
+ Err(e) => {
+ tracing::error!(task_id = %task_id, error = %e, "Failed to find worktree for PR creation");
+ let msg = DaemonMessage::PRCreated {
+ task_id,
+ success: false,
+ message: format!("Task {} not found or has no worktree: {}", task_id, e),
+ pr_url: None,
+ pr_number: None,
+ };
+ let _ = self.ws_tx.send(msg).await;
+ return Ok(());
+ }
+ };
+
+ // Get base_branch from in-memory tasks if available (for fallback)
+ let task_base_branch = {
let tasks = self.tasks.read().await;
- let worktree = tasks.get(&task_id)
- .and_then(|t| t.worktree.as_ref())
- .map(|w| w.path.clone());
- let base = tasks.get(&task_id)
- .and_then(|t| t.base_branch.clone());
- (worktree, base)
+ tasks.get(&task_id).and_then(|t| t.base_branch.clone())
};
// Use task's base_branch if the provided one is the default "main" and task has a detected one
@@ -3079,69 +3107,63 @@ impl TaskManager {
tracing::info!(
task_id = %task_id,
effective_base_branch = %effective_base_branch,
- worktree_exists = worktree_path.is_some(),
+ worktree_path = %worktree_path.display(),
"Creating PR with effective base branch"
);
- let (success, message, pr_url, pr_number) = if let Some(path) = worktree_path {
- // Push the current branch first
- tracing::info!(path = %path.display(), "Pushing branch to origin");
- let push_result = tokio::process::Command::new("git")
- .current_dir(&path)
- .args(["push", "-u", "origin", "HEAD"])
- .output()
- .await;
+ // Push the current branch first
+ let push_result = tokio::process::Command::new("git")
+ .current_dir(&worktree_path)
+ .args(["push", "-u", "origin", "HEAD"])
+ .output()
+ .await;
- match push_result {
- Err(e) => {
- tracing::error!(error = %e, "Failed to execute git push");
- (false, format!("Failed to push branch: {}", e), None, None)
- }
- Ok(output) if !output.status.success() => {
- let stderr = String::from_utf8_lossy(&output.stderr);
- tracing::error!(stderr = %stderr, "git push failed");
- (false, format!("Failed to push branch: {}", stderr), None, None)
+ let (success, message, pr_url, pr_number) = match push_result {
+ Err(e) => {
+ tracing::error!(error = %e, "Failed to execute git push");
+ (false, format!("Failed to push branch: {}", e), None, None)
+ }
+ Ok(output) if !output.status.success() => {
+ let stderr = String::from_utf8_lossy(&output.stderr);
+ tracing::error!(stderr = %stderr, "git push failed");
+ (false, format!("Failed to push branch: {}", stderr), None, None)
+ }
+ Ok(_) => {
+ tracing::info!("Branch pushed successfully, creating PR");
+ // Create PR using gh CLI
+ let mut pr_cmd = tokio::process::Command::new("gh");
+ pr_cmd.current_dir(&worktree_path);
+ pr_cmd.args(["pr", "create", "--title", &title, "--base", &effective_base_branch]);
+
+ if let Some(ref body_text) = body {
+ pr_cmd.args(["--body", body_text]);
+ } else {
+ pr_cmd.args(["--body", ""]);
}
- Ok(_) => {
- tracing::info!("Branch pushed successfully, creating PR");
- // Create PR using gh CLI
- let mut pr_cmd = tokio::process::Command::new("gh");
- pr_cmd.current_dir(&path);
- pr_cmd.args(["pr", "create", "--title", &title, "--base", &effective_base_branch]);
-
- if let Some(ref body_text) = body {
- pr_cmd.args(["--body", body_text]);
- } else {
- pr_cmd.args(["--body", ""]);
- }
- match pr_cmd.output().await {
- Ok(output) if output.status.success() => {
- let stdout = String::from_utf8_lossy(&output.stdout);
- // gh pr create outputs the PR URL
- let url = stdout.lines().last().map(|s| s.trim().to_string());
- // Extract PR number from URL
- let number = url.as_ref().and_then(|u| {
- u.split('/').last().and_then(|n| n.parse::<i32>().ok())
- });
- tracing::info!(pr_url = ?url, pr_number = ?number, "PR created successfully");
- (true, "Pull request created".to_string(), url, number)
- }
- Ok(output) => {
- let stderr = String::from_utf8_lossy(&output.stderr);
- tracing::error!(stderr = %stderr, "gh pr create failed");
- (false, format!("Failed to create PR: {}", stderr), None, None)
- }
- Err(e) => {
- tracing::error!(error = %e, "Failed to execute gh command");
- (false, format!("Failed to run gh: {}", e), None, None)
- }
+ match pr_cmd.output().await {
+ Ok(output) if output.status.success() => {
+ let stdout = String::from_utf8_lossy(&output.stdout);
+ // gh pr create outputs the PR URL
+ let url = stdout.lines().last().map(|s| s.trim().to_string());
+ // Extract PR number from URL
+ let number = url.as_ref().and_then(|u| {
+ u.split('/').last().and_then(|n| n.parse::<i32>().ok())
+ });
+ tracing::info!(pr_url = ?url, pr_number = ?number, "PR created successfully");
+ (true, "Pull request created".to_string(), url, number)
+ }
+ Ok(output) => {
+ let stderr = String::from_utf8_lossy(&output.stderr);
+ tracing::error!(stderr = %stderr, "gh pr create failed");
+ (false, format!("Failed to create PR: {}", stderr), None, None)
+ }
+ Err(e) => {
+ tracing::error!(error = %e, "Failed to execute gh command");
+ (false, format!("Failed to run gh: {}", e), None, None)
}
}
}
- } else {
- tracing::error!(task_id = %task_id, "Task not found or has no worktree");
- (false, format!("Task {} not found or has no worktree", task_id), None, None)
};
tracing::info!(
diff --git a/makima/src/server/handlers/mesh_daemon.rs b/makima/src/server/handlers/mesh_daemon.rs
index 0aea40e..f7fe49f 100644
--- a/makima/src/server/handlers/mesh_daemon.rs
+++ b/makima/src/server/handlers/mesh_daemon.rs
@@ -1734,15 +1734,25 @@ async fn handle_daemon_connection(socket: WebSocket, state: SharedState, auth_re
success,
message,
pr_url,
- pr_number: _,
+ pr_number,
}) => {
tracing::info!(
task_id = %task_id,
success = success,
pr_url = ?pr_url,
+ pr_number = ?pr_number,
"PR created result received"
);
+ // Broadcast the PR result for waiting handlers
+ state.broadcast_pr_result(crate::server::state::PrResultNotification {
+ task_id,
+ success,
+ message: message.clone(),
+ pr_url: pr_url.clone(),
+ pr_number,
+ });
+
// On successful PR creation, notify supervisor of next steps
if success {
if let Some(pool) = state.db_pool.as_ref() {
diff --git a/makima/src/server/handlers/mesh_supervisor.rs b/makima/src/server/handlers/mesh_supervisor.rs
index a654a05..f0cb69e 100644
--- a/makima/src/server/handlers/mesh_supervisor.rs
+++ b/makima/src/server/handlers/mesh_supervisor.rs
@@ -1476,6 +1476,9 @@ pub async fn create_pr(
).into_response();
};
+ // Subscribe to PR results BEFORE sending the command
+ let mut rx = state.pr_results.subscribe();
+
// Send CreatePR command to daemon
let cmd = DaemonCommand::CreatePR {
task_id: request.task_id,
@@ -1492,16 +1495,57 @@ pub async fn create_pr(
).into_response();
}
- (
- StatusCode::CREATED,
- Json(CreatePRResponse {
- task_id: request.task_id,
- success: true,
- message: "PR creation command sent".to_string(),
- pr_url: None,
- pr_number: None,
- }),
- ).into_response()
+ // Wait for the PR result with a timeout (60 seconds should be plenty for PR creation)
+ let timeout = tokio::time::Duration::from_secs(60);
+ let result = tokio::time::timeout(timeout, async {
+ loop {
+ match rx.recv().await {
+ Ok(notification) => {
+ if notification.task_id == request.task_id {
+ return Some(notification);
+ }
+ // Not our task, keep waiting
+ }
+ Err(_) => {
+ // Channel closed or lagged
+ return None;
+ }
+ }
+ }
+ }).await;
+
+ match result {
+ Ok(Some(notification)) => {
+ let status = if notification.success {
+ StatusCode::CREATED
+ } else {
+ StatusCode::INTERNAL_SERVER_ERROR
+ };
+ (
+ status,
+ Json(CreatePRResponse {
+ task_id: request.task_id,
+ success: notification.success,
+ message: notification.message,
+ pr_url: notification.pr_url,
+ pr_number: notification.pr_number,
+ }),
+ ).into_response()
+ }
+ Ok(None) | Err(_) => {
+ // Timeout or channel error - return error status
+ (
+ StatusCode::GATEWAY_TIMEOUT,
+ Json(CreatePRResponse {
+ task_id: request.task_id,
+ success: false,
+ message: "PR creation timed out waiting for daemon response".to_string(),
+ pr_url: None,
+ pr_number: None,
+ }),
+ ).into_response()
+ }
+ }
}
/// Get the diff for a task's changes.
diff --git a/makima/src/server/state.rs b/makima/src/server/state.rs
index c579f0f..02a2328 100644
--- a/makima/src/server/state.rs
+++ b/makima/src/server/state.rs
@@ -118,6 +118,21 @@ pub struct MergeResultNotification {
pub conflicts: Option<Vec<String>>,
}
+/// Notification for PR creation results.
+#[derive(Debug, Clone)]
+pub struct PrResultNotification {
+ /// ID of the task for which PR was created
+ pub task_id: Uuid,
+ /// Whether the PR creation succeeded
+ pub success: bool,
+ /// Message describing the result
+ pub message: String,
+ /// PR URL if creation succeeded
+ pub pr_url: Option<String>,
+ /// PR number if creation succeeded
+ pub pr_number: Option<i32>,
+}
+
/// Notification for supervisor questions requiring user feedback.
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
#[serde(rename_all = "camelCase")]
@@ -527,6 +542,8 @@ pub struct AppState {
pub supervisor_questions: broadcast::Sender<SupervisorQuestionNotification>,
/// Broadcast channel for merge result notifications
pub merge_results: broadcast::Sender<MergeResultNotification>,
+ /// Broadcast channel for PR creation result notifications
+ pub pr_results: broadcast::Sender<PrResultNotification>,
/// Pending supervisor questions awaiting user response (keyed by question_id)
pub pending_questions: DashMap<Uuid, PendingSupervisorQuestion>,
/// Responses to supervisor questions (keyed by question_id)
@@ -560,6 +577,7 @@ impl AppState {
let (task_completions, _) = broadcast::channel(256); // For supervisor task monitoring
let (supervisor_questions, _) = broadcast::channel(256); // For supervisor questions to users
let (merge_results, _) = broadcast::channel(256); // For merge operation results
+ let (pr_results, _) = broadcast::channel(256); // For PR creation results
// Initialize JWT verifier from environment (optional)
// Requires SUPABASE_URL and either SUPABASE_JWT_PUBLIC_KEY (RS256) or SUPABASE_JWT_SECRET (HS256)
@@ -603,6 +621,7 @@ impl AppState {
task_completions,
supervisor_questions,
merge_results,
+ pr_results,
pending_questions: DashMap::new(),
question_responses: DashMap::new(),
daemon_connections: DashMap::new(),
@@ -699,6 +718,13 @@ impl AppState {
let _ = self.merge_results.send(notification);
}
+ /// Broadcast a PR creation result notification to all subscribers.
+ ///
+ /// Used to notify waiting handlers when a PR creation operation completes.
+ pub fn broadcast_pr_result(&self, notification: PrResultNotification) {
+ let _ = self.pr_results.send(notification);
+ }
+
/// Add a pending supervisor question and broadcast it.
pub fn add_supervisor_question(
&self,