diff options
| author | soryu <soryu@soryu.co> | 2026-01-15 17:59:37 +0000 |
|---|---|---|
| committer | soryu <soryu@soryu.co> | 2026-01-15 17:59:37 +0000 |
| commit | 11c78ade600a2d74b8f033f18045a0c28fac4362 (patch) | |
| tree | 19a62408769292cefd2f990f9fd8d9fff43becdf | |
| parent | 3efdab36ca61a6795454668881d5b925abe22bd3 (diff) | |
| download | soryu-11c78ade600a2d74b8f033f18045a0c28fac4362.tar.gz soryu-11c78ade600a2d74b8f033f18045a0c28fac4362.zip | |
Implement simple git checkpoint command for supervisor
| -rw-r--r-- | makima/src/daemon/task/manager.rs | 239 | ||||
| -rw-r--r-- | makima/src/daemon/ws/protocol.rs | 37 | ||||
| -rw-r--r-- | makima/src/server/handlers/mesh_daemon.rs | 124 | ||||
| -rw-r--r-- | makima/src/server/handlers/mesh_supervisor.rs | 43 | ||||
| -rw-r--r-- | makima/src/server/state.rs | 8 |
5 files changed, 441 insertions, 10 deletions
diff --git a/makima/src/daemon/task/manager.rs b/makima/src/daemon/task/manager.rs index 75c884b..427a9d1 100644 --- a/makima/src/daemon/task/manager.rs +++ b/makima/src/daemon/task/manager.rs @@ -1431,6 +1431,13 @@ impl TaskManager { tracing::info!(task_id = %task_id, "Getting task diff"); self.handle_get_task_diff(task_id).await?; } + DaemonCommand::CreateCheckpoint { + task_id, + message, + } => { + tracing::info!(task_id = %task_id, "Creating checkpoint"); + self.handle_create_checkpoint(task_id, message).await?; + } DaemonCommand::CleanupWorktree { task_id, delete_branch, @@ -2498,6 +2505,238 @@ impl TaskManager { let _ = self.ws_tx.send(msg).await; Ok(()) } + + /// Handle CreateCheckpoint command - stage all changes, commit, and get stats. + async fn handle_create_checkpoint( + &self, + task_id: Uuid, + message: String, + ) -> Result<(), DaemonError> { + // Get task's worktree path and branch name + let task_info = { + let tasks = self.tasks.read().await; + tasks.get(&task_id).map(|t| ( + t.worktree.as_ref().map(|w| w.path.clone()), + t.worktree.as_ref().map(|w| w.branch.clone()), + )) + }; + + let (worktree_path, branch_name) = match task_info { + Some((Some(path), Some(branch))) => (path, branch), + Some((Some(path), None)) => { + // Try to get current branch from git + let branch = self.get_current_branch(&path).await.unwrap_or_else(|| "unknown".to_string()); + (path, branch) + } + _ => { + let msg = DaemonMessage::CheckpointCreated { + task_id, + success: false, + commit_sha: None, + branch_name: None, + checkpoint_number: None, + files_changed: None, + lines_added: None, + lines_removed: None, + error: Some(format!("Task {} not found or has no worktree", task_id)), + message, + }; + let _ = self.ws_tx.send(msg).await; + return Ok(()); + } + }; + + // Step 1: Check if there are changes to commit + let status_output = tokio::process::Command::new("git") + .current_dir(&worktree_path) + .args(["status", "--porcelain"]) + .output() + .await; + + let has_changes = match &status_output { + Ok(output) => !output.stdout.is_empty(), + Err(_) => false, + }; + + if !has_changes { + let msg = DaemonMessage::CheckpointCreated { + task_id, + success: false, + commit_sha: None, + branch_name: Some(branch_name), + checkpoint_number: None, + files_changed: None, + lines_added: None, + lines_removed: None, + error: Some("No changes to checkpoint".to_string()), + message, + }; + let _ = self.ws_tx.send(msg).await; + return Ok(()); + } + + // Step 2: Stage all changes + let add_result = tokio::process::Command::new("git") + .current_dir(&worktree_path) + .args(["add", "-A"]) + .output() + .await; + + if let Err(e) = add_result { + let msg = DaemonMessage::CheckpointCreated { + task_id, + success: false, + commit_sha: None, + branch_name: Some(branch_name), + checkpoint_number: None, + files_changed: None, + lines_added: None, + lines_removed: None, + error: Some(format!("Failed to stage changes: {}", e)), + message, + }; + let _ = self.ws_tx.send(msg).await; + return Ok(()); + } + + // Step 3: Get diff stats before commit + let (lines_added, lines_removed, files_changed) = self.get_staged_diff_stats(&worktree_path).await; + + // Step 4: Create commit + let commit_result = tokio::process::Command::new("git") + .current_dir(&worktree_path) + .args(["commit", "-m", &message]) + .output() + .await; + + let commit_sha = match commit_result { + Ok(output) if output.status.success() => { + // Get the commit SHA + let sha_output = tokio::process::Command::new("git") + .current_dir(&worktree_path) + .args(["rev-parse", "HEAD"]) + .output() + .await; + + match sha_output { + Ok(o) => Some(String::from_utf8_lossy(&o.stdout).trim().to_string()), + Err(_) => None, + } + } + Ok(output) => { + let stderr = String::from_utf8_lossy(&output.stderr); + let msg = DaemonMessage::CheckpointCreated { + task_id, + success: false, + commit_sha: None, + branch_name: Some(branch_name), + checkpoint_number: None, + files_changed: Some(files_changed), + lines_added: Some(lines_added), + lines_removed: Some(lines_removed), + error: Some(format!("Commit failed: {}", stderr)), + message, + }; + let _ = self.ws_tx.send(msg).await; + return Ok(()); + } + Err(e) => { + let msg = DaemonMessage::CheckpointCreated { + task_id, + success: false, + commit_sha: None, + branch_name: Some(branch_name), + checkpoint_number: None, + files_changed: None, + lines_added: None, + lines_removed: None, + error: Some(format!("Failed to execute git commit: {}", e)), + message, + }; + let _ = self.ws_tx.send(msg).await; + return Ok(()); + } + }; + + // Success - send response (checkpoint_number will be assigned by server on DB insert) + let msg = DaemonMessage::CheckpointCreated { + task_id, + success: true, + commit_sha, + branch_name: Some(branch_name), + checkpoint_number: None, // Server will assign from DB + files_changed: Some(files_changed), + lines_added: Some(lines_added), + lines_removed: Some(lines_removed), + error: None, + message, + }; + let _ = self.ws_tx.send(msg).await; + Ok(()) + } + + /// Get the current branch name from a worktree. + async fn get_current_branch(&self, worktree_path: &std::path::PathBuf) -> Option<String> { + let output = tokio::process::Command::new("git") + .current_dir(worktree_path) + .args(["branch", "--show-current"]) + .output() + .await + .ok()?; + + if output.status.success() { + Some(String::from_utf8_lossy(&output.stdout).trim().to_string()) + } else { + None + } + } + + /// Get diff stats for staged changes. + async fn get_staged_diff_stats(&self, worktree_path: &std::path::PathBuf) -> (i32, i32, serde_json::Value) { + // Get numstat for lines added/removed + let numstat = tokio::process::Command::new("git") + .current_dir(worktree_path) + .args(["diff", "--cached", "--numstat"]) + .output() + .await; + + let (mut total_added, mut total_removed) = (0i32, 0i32); + if let Ok(output) = numstat { + for line in String::from_utf8_lossy(&output.stdout).lines() { + let parts: Vec<&str> = line.split_whitespace().collect(); + if parts.len() >= 2 { + if let Ok(added) = parts[0].parse::<i32>() { + total_added += added; + } + if let Ok(removed) = parts[1].parse::<i32>() { + total_removed += removed; + } + } + } + } + + // Get name-status for file changes + let name_status = tokio::process::Command::new("git") + .current_dir(worktree_path) + .args(["diff", "--cached", "--name-status"]) + .output() + .await; + + let mut files = Vec::new(); + if let Ok(output) = name_status { + for line in String::from_utf8_lossy(&output.stdout).lines() { + let parts: Vec<&str> = line.split_whitespace().collect(); + if parts.len() >= 2 { + files.push(serde_json::json!({ + "action": parts[0], + "path": parts[1] + })); + } + } + } + + (total_added, total_removed, serde_json::json!(files)) + } } /// Inner state for spawned tasks (cloneable). diff --git a/makima/src/daemon/ws/protocol.rs b/makima/src/daemon/ws/protocol.rs index 714c0f9..339f5a4 100644 --- a/makima/src/daemon/ws/protocol.rs +++ b/makima/src/daemon/ws/protocol.rs @@ -251,6 +251,35 @@ pub enum DaemonMessage { error: Option<String>, }, + /// Response to CreateCheckpoint command. + CheckpointCreated { + #[serde(rename = "taskId")] + task_id: Uuid, + success: bool, + /// Commit SHA if successful + #[serde(rename = "commitSha")] + commit_sha: Option<String>, + /// Branch name where checkpoint was created + #[serde(rename = "branchName")] + branch_name: Option<String>, + /// Checkpoint number in sequence (assigned by server on DB insert) + #[serde(rename = "checkpointNumber")] + checkpoint_number: Option<i32>, + /// Files changed in this checkpoint: [{path, action}] + #[serde(rename = "filesChanged")] + files_changed: Option<serde_json::Value>, + /// Lines added + #[serde(rename = "linesAdded")] + lines_added: Option<i32>, + /// Lines removed + #[serde(rename = "linesRemoved")] + lines_removed: Option<i32>, + /// Error message if failed + error: Option<String>, + /// User-provided checkpoint message + message: String, + }, + /// Response to CleanupWorktree command. CleanupWorktreeResult { #[serde(rename = "taskId")] @@ -543,6 +572,14 @@ pub enum DaemonCommand { task_id: Uuid, }, + /// Create a checkpoint (stage changes, commit, get stats). + CreateCheckpoint { + #[serde(rename = "taskId")] + task_id: Uuid, + /// Commit message for the checkpoint. + message: String, + }, + /// Clean up a task's worktree (used when contract is completed/deleted). CleanupWorktree { #[serde(rename = "taskId")] diff --git a/makima/src/server/handlers/mesh_daemon.rs b/makima/src/server/handlers/mesh_daemon.rs index 39b12da..0d00f5b 100644 --- a/makima/src/server/handlers/mesh_daemon.rs +++ b/makima/src/server/handlers/mesh_daemon.rs @@ -381,6 +381,35 @@ pub enum DaemonMessage { /// Error message if operation failed error: Option<String>, }, + /// Notification that a checkpoint was created + CheckpointCreated { + #[serde(rename = "taskId")] + task_id: Uuid, + /// Whether the operation succeeded + success: bool, + /// Commit SHA if successful + #[serde(rename = "commitSha")] + commit_sha: Option<String>, + /// Branch name where checkpoint was created + #[serde(rename = "branchName")] + branch_name: Option<String>, + /// Checkpoint number in sequence + #[serde(rename = "checkpointNumber")] + checkpoint_number: Option<i32>, + /// Files changed in this checkpoint + #[serde(rename = "filesChanged")] + files_changed: Option<serde_json::Value>, + /// Lines added + #[serde(rename = "linesAdded")] + lines_added: Option<i32>, + /// Lines removed + #[serde(rename = "linesRemoved")] + lines_removed: Option<i32>, + /// Error message if operation failed + error: Option<String>, + /// User-provided checkpoint message + message: String, + }, } /// Validated daemon authentication result. @@ -1115,6 +1144,101 @@ async fn handle_daemon_connection(socket: WebSocket, state: SharedState, auth_re }); } } + Ok(DaemonMessage::CheckpointCreated { + task_id, + success, + commit_sha, + branch_name, + checkpoint_number: _, // We'll get from DB + files_changed, + lines_added, + lines_removed, + error, + message, + }) => { + tracing::info!( + task_id = %task_id, + success = success, + commit_sha = ?commit_sha, + "Checkpoint created notification received" + ); + + if success { + if let (Some(sha), Some(branch)) = (commit_sha.clone(), branch_name.clone()) { + // Store checkpoint in database + if let Some(pool) = state.db_pool.as_ref() { + match repository::create_task_checkpoint( + pool, + task_id, + &sha, + &branch, + &message, + files_changed.clone(), + lines_added, + lines_removed, + ).await { + Ok(checkpoint) => { + tracing::info!( + task_id = %task_id, + checkpoint_id = %checkpoint.id, + checkpoint_number = checkpoint.checkpoint_number, + "Checkpoint stored in database" + ); + + // Broadcast success as task output + state.broadcast_task_output(TaskOutputNotification { + task_id, + owner_id: Some(owner_id), + message_type: "system".to_string(), + content: format!( + "✓ Checkpoint #{} created: {} ({})", + checkpoint.checkpoint_number, + message, + &sha[..7.min(sha.len())] + ), + tool_name: None, + tool_input: None, + is_error: Some(false), + cost_usd: None, + duration_ms: None, + is_partial: false, + }); + } + Err(e) => { + tracing::error!(error = %e, "Failed to store checkpoint in database"); + state.broadcast_task_output(TaskOutputNotification { + task_id, + owner_id: Some(owner_id), + message_type: "error".to_string(), + content: format!("Checkpoint commit succeeded but DB storage failed: {}", e), + tool_name: None, + tool_input: None, + is_error: Some(true), + cost_usd: None, + duration_ms: None, + is_partial: false, + }); + } + } + } + } + } else { + // Broadcast failure + let error_msg = error.unwrap_or_else(|| "Unknown error".to_string()); + state.broadcast_task_output(TaskOutputNotification { + task_id, + owner_id: Some(owner_id), + message_type: "error".to_string(), + content: format!("✗ Checkpoint failed: {}", error_msg), + tool_name: None, + tool_input: None, + is_error: Some(true), + cost_usd: None, + duration_ms: None, + is_partial: false, + }); + } + } Err(e) => { tracing::warn!("Failed to parse daemon message: {}", e); } diff --git a/makima/src/server/handlers/mesh_supervisor.rs b/makima/src/server/handlers/mesh_supervisor.rs index 3add89f..278d0f5 100644 --- a/makima/src/server/handlers/mesh_supervisor.rs +++ b/makima/src/server/handlers/mesh_supervisor.rs @@ -714,10 +714,12 @@ pub async fn read_worktree_file( ), request_body = CreateCheckpointRequest, responses( - (status = 201, description = "Checkpoint created", body = CheckpointResponse), + (status = 202, description = "Checkpoint creation accepted", body = CheckpointResponse), (status = 401, description = "Unauthorized"), + (status = 403, description = "Forbidden - can only create checkpoint for own task"), (status = 404, description = "Task not found"), (status = 500, description = "Internal server error"), + (status = 503, description = "Task has no assigned daemon"), ), tag = "Mesh Supervisor" )] @@ -749,7 +751,7 @@ pub async fn create_checkpoint( let pool = state.db_pool.as_ref().unwrap(); - // Get task + // Get task and daemon_id let task = match repository::get_task(pool, task_id).await { Ok(Some(t)) => t, Ok(None) => { @@ -767,16 +769,37 @@ pub async fn create_checkpoint( } }; - // TODO: Implement checkpoint creation via daemon command - // For now, checkpoints should be created by the task itself via git commands - let _ = (task, request); + let Some(daemon_id) = task.daemon_id else { + return ( + StatusCode::SERVICE_UNAVAILABLE, + Json(ApiError::new("NO_DAEMON", "Task has no assigned daemon")), + ).into_response(); + }; + + // Send CreateCheckpoint command to daemon + let cmd = DaemonCommand::CreateCheckpoint { + task_id, + message: request.message.clone(), + }; + if let Err(e) = state.send_daemon_command(daemon_id, cmd).await { + tracing::error!(error = %e, "Failed to send CreateCheckpoint command"); + return ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(ApiError::new("COMMAND_FAILED", "Failed to send command to daemon")), + ).into_response(); + } + + // Return accepted - the checkpoint result will be delivered via WebSocket + // and stored in the database by the daemon message handler ( - StatusCode::NOT_IMPLEMENTED, - Json(ApiError::new( - "NOT_IMPLEMENTED", - "Checkpoint creation via API not yet implemented. Use git commands directly in the task.", - )), + StatusCode::ACCEPTED, + Json(CheckpointResponse { + task_id, + checkpoint_number: 0, // Will be assigned by DB on actual creation + commit_sha: "pending".to_string(), + message: request.message, + }), ).into_response() } diff --git a/makima/src/server/state.rs b/makima/src/server/state.rs index 2a45d88..6a56f21 100644 --- a/makima/src/server/state.rs +++ b/makima/src/server/state.rs @@ -396,6 +396,14 @@ pub enum DaemonCommand { task_id: Uuid, }, + /// Create a git checkpoint (stage changes, commit, record stats) + CreateCheckpoint { + #[serde(rename = "taskId")] + task_id: Uuid, + /// Commit message for the checkpoint + message: String, + }, + /// Clean up a task's worktree (used when contract is completed/deleted) CleanupWorktree { #[serde(rename = "taskId")] |
