summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorsoryu <soryu@soryu.co>2026-01-15 17:59:37 +0000
committersoryu <soryu@soryu.co>2026-01-15 17:59:37 +0000
commit11c78ade600a2d74b8f033f18045a0c28fac4362 (patch)
tree19a62408769292cefd2f990f9fd8d9fff43becdf
parent3efdab36ca61a6795454668881d5b925abe22bd3 (diff)
downloadsoryu-11c78ade600a2d74b8f033f18045a0c28fac4362.tar.gz
soryu-11c78ade600a2d74b8f033f18045a0c28fac4362.zip
Implement simple git checkpoint command for supervisor
-rw-r--r--makima/src/daemon/task/manager.rs239
-rw-r--r--makima/src/daemon/ws/protocol.rs37
-rw-r--r--makima/src/server/handlers/mesh_daemon.rs124
-rw-r--r--makima/src/server/handlers/mesh_supervisor.rs43
-rw-r--r--makima/src/server/state.rs8
5 files changed, 441 insertions, 10 deletions
diff --git a/makima/src/daemon/task/manager.rs b/makima/src/daemon/task/manager.rs
index 75c884b..427a9d1 100644
--- a/makima/src/daemon/task/manager.rs
+++ b/makima/src/daemon/task/manager.rs
@@ -1431,6 +1431,13 @@ impl TaskManager {
tracing::info!(task_id = %task_id, "Getting task diff");
self.handle_get_task_diff(task_id).await?;
}
+ DaemonCommand::CreateCheckpoint {
+ task_id,
+ message,
+ } => {
+ tracing::info!(task_id = %task_id, "Creating checkpoint");
+ self.handle_create_checkpoint(task_id, message).await?;
+ }
DaemonCommand::CleanupWorktree {
task_id,
delete_branch,
@@ -2498,6 +2505,238 @@ impl TaskManager {
let _ = self.ws_tx.send(msg).await;
Ok(())
}
+
+ /// Handle CreateCheckpoint command - stage all changes, commit, and get stats.
+ async fn handle_create_checkpoint(
+ &self,
+ task_id: Uuid,
+ message: String,
+ ) -> Result<(), DaemonError> {
+ // Get task's worktree path and branch name
+ let task_info = {
+ let tasks = self.tasks.read().await;
+ tasks.get(&task_id).map(|t| (
+ t.worktree.as_ref().map(|w| w.path.clone()),
+ t.worktree.as_ref().map(|w| w.branch.clone()),
+ ))
+ };
+
+ let (worktree_path, branch_name) = match task_info {
+ Some((Some(path), Some(branch))) => (path, branch),
+ Some((Some(path), None)) => {
+ // Try to get current branch from git
+ let branch = self.get_current_branch(&path).await.unwrap_or_else(|| "unknown".to_string());
+ (path, branch)
+ }
+ _ => {
+ let msg = DaemonMessage::CheckpointCreated {
+ task_id,
+ success: false,
+ commit_sha: None,
+ branch_name: None,
+ checkpoint_number: None,
+ files_changed: None,
+ lines_added: None,
+ lines_removed: None,
+ error: Some(format!("Task {} not found or has no worktree", task_id)),
+ message,
+ };
+ let _ = self.ws_tx.send(msg).await;
+ return Ok(());
+ }
+ };
+
+ // Step 1: Check if there are changes to commit
+ let status_output = tokio::process::Command::new("git")
+ .current_dir(&worktree_path)
+ .args(["status", "--porcelain"])
+ .output()
+ .await;
+
+ let has_changes = match &status_output {
+ Ok(output) => !output.stdout.is_empty(),
+ Err(_) => false,
+ };
+
+ if !has_changes {
+ let msg = DaemonMessage::CheckpointCreated {
+ task_id,
+ success: false,
+ commit_sha: None,
+ branch_name: Some(branch_name),
+ checkpoint_number: None,
+ files_changed: None,
+ lines_added: None,
+ lines_removed: None,
+ error: Some("No changes to checkpoint".to_string()),
+ message,
+ };
+ let _ = self.ws_tx.send(msg).await;
+ return Ok(());
+ }
+
+ // Step 2: Stage all changes
+ let add_result = tokio::process::Command::new("git")
+ .current_dir(&worktree_path)
+ .args(["add", "-A"])
+ .output()
+ .await;
+
+ if let Err(e) = add_result {
+ let msg = DaemonMessage::CheckpointCreated {
+ task_id,
+ success: false,
+ commit_sha: None,
+ branch_name: Some(branch_name),
+ checkpoint_number: None,
+ files_changed: None,
+ lines_added: None,
+ lines_removed: None,
+ error: Some(format!("Failed to stage changes: {}", e)),
+ message,
+ };
+ let _ = self.ws_tx.send(msg).await;
+ return Ok(());
+ }
+
+ // Step 3: Get diff stats before commit
+ let (lines_added, lines_removed, files_changed) = self.get_staged_diff_stats(&worktree_path).await;
+
+ // Step 4: Create commit
+ let commit_result = tokio::process::Command::new("git")
+ .current_dir(&worktree_path)
+ .args(["commit", "-m", &message])
+ .output()
+ .await;
+
+ let commit_sha = match commit_result {
+ Ok(output) if output.status.success() => {
+ // Get the commit SHA
+ let sha_output = tokio::process::Command::new("git")
+ .current_dir(&worktree_path)
+ .args(["rev-parse", "HEAD"])
+ .output()
+ .await;
+
+ match sha_output {
+ Ok(o) => Some(String::from_utf8_lossy(&o.stdout).trim().to_string()),
+ Err(_) => None,
+ }
+ }
+ Ok(output) => {
+ let stderr = String::from_utf8_lossy(&output.stderr);
+ let msg = DaemonMessage::CheckpointCreated {
+ task_id,
+ success: false,
+ commit_sha: None,
+ branch_name: Some(branch_name),
+ checkpoint_number: None,
+ files_changed: Some(files_changed),
+ lines_added: Some(lines_added),
+ lines_removed: Some(lines_removed),
+ error: Some(format!("Commit failed: {}", stderr)),
+ message,
+ };
+ let _ = self.ws_tx.send(msg).await;
+ return Ok(());
+ }
+ Err(e) => {
+ let msg = DaemonMessage::CheckpointCreated {
+ task_id,
+ success: false,
+ commit_sha: None,
+ branch_name: Some(branch_name),
+ checkpoint_number: None,
+ files_changed: None,
+ lines_added: None,
+ lines_removed: None,
+ error: Some(format!("Failed to execute git commit: {}", e)),
+ message,
+ };
+ let _ = self.ws_tx.send(msg).await;
+ return Ok(());
+ }
+ };
+
+ // Success - send response (checkpoint_number will be assigned by server on DB insert)
+ let msg = DaemonMessage::CheckpointCreated {
+ task_id,
+ success: true,
+ commit_sha,
+ branch_name: Some(branch_name),
+ checkpoint_number: None, // Server will assign from DB
+ files_changed: Some(files_changed),
+ lines_added: Some(lines_added),
+ lines_removed: Some(lines_removed),
+ error: None,
+ message,
+ };
+ let _ = self.ws_tx.send(msg).await;
+ Ok(())
+ }
+
+ /// Get the current branch name from a worktree.
+ async fn get_current_branch(&self, worktree_path: &std::path::PathBuf) -> Option<String> {
+ let output = tokio::process::Command::new("git")
+ .current_dir(worktree_path)
+ .args(["branch", "--show-current"])
+ .output()
+ .await
+ .ok()?;
+
+ if output.status.success() {
+ Some(String::from_utf8_lossy(&output.stdout).trim().to_string())
+ } else {
+ None
+ }
+ }
+
+ /// Get diff stats for staged changes.
+ async fn get_staged_diff_stats(&self, worktree_path: &std::path::PathBuf) -> (i32, i32, serde_json::Value) {
+ // Get numstat for lines added/removed
+ let numstat = tokio::process::Command::new("git")
+ .current_dir(worktree_path)
+ .args(["diff", "--cached", "--numstat"])
+ .output()
+ .await;
+
+ let (mut total_added, mut total_removed) = (0i32, 0i32);
+ if let Ok(output) = numstat {
+ for line in String::from_utf8_lossy(&output.stdout).lines() {
+ let parts: Vec<&str> = line.split_whitespace().collect();
+ if parts.len() >= 2 {
+ if let Ok(added) = parts[0].parse::<i32>() {
+ total_added += added;
+ }
+ if let Ok(removed) = parts[1].parse::<i32>() {
+ total_removed += removed;
+ }
+ }
+ }
+ }
+
+ // Get name-status for file changes
+ let name_status = tokio::process::Command::new("git")
+ .current_dir(worktree_path)
+ .args(["diff", "--cached", "--name-status"])
+ .output()
+ .await;
+
+ let mut files = Vec::new();
+ if let Ok(output) = name_status {
+ for line in String::from_utf8_lossy(&output.stdout).lines() {
+ let parts: Vec<&str> = line.split_whitespace().collect();
+ if parts.len() >= 2 {
+ files.push(serde_json::json!({
+ "action": parts[0],
+ "path": parts[1]
+ }));
+ }
+ }
+ }
+
+ (total_added, total_removed, serde_json::json!(files))
+ }
}
/// Inner state for spawned tasks (cloneable).
diff --git a/makima/src/daemon/ws/protocol.rs b/makima/src/daemon/ws/protocol.rs
index 714c0f9..339f5a4 100644
--- a/makima/src/daemon/ws/protocol.rs
+++ b/makima/src/daemon/ws/protocol.rs
@@ -251,6 +251,35 @@ pub enum DaemonMessage {
error: Option<String>,
},
+ /// Response to CreateCheckpoint command.
+ CheckpointCreated {
+ #[serde(rename = "taskId")]
+ task_id: Uuid,
+ success: bool,
+ /// Commit SHA if successful
+ #[serde(rename = "commitSha")]
+ commit_sha: Option<String>,
+ /// Branch name where checkpoint was created
+ #[serde(rename = "branchName")]
+ branch_name: Option<String>,
+ /// Checkpoint number in sequence (assigned by server on DB insert)
+ #[serde(rename = "checkpointNumber")]
+ checkpoint_number: Option<i32>,
+ /// Files changed in this checkpoint: [{path, action}]
+ #[serde(rename = "filesChanged")]
+ files_changed: Option<serde_json::Value>,
+ /// Lines added
+ #[serde(rename = "linesAdded")]
+ lines_added: Option<i32>,
+ /// Lines removed
+ #[serde(rename = "linesRemoved")]
+ lines_removed: Option<i32>,
+ /// Error message if failed
+ error: Option<String>,
+ /// User-provided checkpoint message
+ message: String,
+ },
+
/// Response to CleanupWorktree command.
CleanupWorktreeResult {
#[serde(rename = "taskId")]
@@ -543,6 +572,14 @@ pub enum DaemonCommand {
task_id: Uuid,
},
+ /// Create a checkpoint (stage changes, commit, get stats).
+ CreateCheckpoint {
+ #[serde(rename = "taskId")]
+ task_id: Uuid,
+ /// Commit message for the checkpoint.
+ message: String,
+ },
+
/// Clean up a task's worktree (used when contract is completed/deleted).
CleanupWorktree {
#[serde(rename = "taskId")]
diff --git a/makima/src/server/handlers/mesh_daemon.rs b/makima/src/server/handlers/mesh_daemon.rs
index 39b12da..0d00f5b 100644
--- a/makima/src/server/handlers/mesh_daemon.rs
+++ b/makima/src/server/handlers/mesh_daemon.rs
@@ -381,6 +381,35 @@ pub enum DaemonMessage {
/// Error message if operation failed
error: Option<String>,
},
+ /// Notification that a checkpoint was created
+ CheckpointCreated {
+ #[serde(rename = "taskId")]
+ task_id: Uuid,
+ /// Whether the operation succeeded
+ success: bool,
+ /// Commit SHA if successful
+ #[serde(rename = "commitSha")]
+ commit_sha: Option<String>,
+ /// Branch name where checkpoint was created
+ #[serde(rename = "branchName")]
+ branch_name: Option<String>,
+ /// Checkpoint number in sequence
+ #[serde(rename = "checkpointNumber")]
+ checkpoint_number: Option<i32>,
+ /// Files changed in this checkpoint
+ #[serde(rename = "filesChanged")]
+ files_changed: Option<serde_json::Value>,
+ /// Lines added
+ #[serde(rename = "linesAdded")]
+ lines_added: Option<i32>,
+ /// Lines removed
+ #[serde(rename = "linesRemoved")]
+ lines_removed: Option<i32>,
+ /// Error message if operation failed
+ error: Option<String>,
+ /// User-provided checkpoint message
+ message: String,
+ },
}
/// Validated daemon authentication result.
@@ -1115,6 +1144,101 @@ async fn handle_daemon_connection(socket: WebSocket, state: SharedState, auth_re
});
}
}
+ Ok(DaemonMessage::CheckpointCreated {
+ task_id,
+ success,
+ commit_sha,
+ branch_name,
+ checkpoint_number: _, // We'll get from DB
+ files_changed,
+ lines_added,
+ lines_removed,
+ error,
+ message,
+ }) => {
+ tracing::info!(
+ task_id = %task_id,
+ success = success,
+ commit_sha = ?commit_sha,
+ "Checkpoint created notification received"
+ );
+
+ if success {
+ if let (Some(sha), Some(branch)) = (commit_sha.clone(), branch_name.clone()) {
+ // Store checkpoint in database
+ if let Some(pool) = state.db_pool.as_ref() {
+ match repository::create_task_checkpoint(
+ pool,
+ task_id,
+ &sha,
+ &branch,
+ &message,
+ files_changed.clone(),
+ lines_added,
+ lines_removed,
+ ).await {
+ Ok(checkpoint) => {
+ tracing::info!(
+ task_id = %task_id,
+ checkpoint_id = %checkpoint.id,
+ checkpoint_number = checkpoint.checkpoint_number,
+ "Checkpoint stored in database"
+ );
+
+ // Broadcast success as task output
+ state.broadcast_task_output(TaskOutputNotification {
+ task_id,
+ owner_id: Some(owner_id),
+ message_type: "system".to_string(),
+ content: format!(
+ "✓ Checkpoint #{} created: {} ({})",
+ checkpoint.checkpoint_number,
+ message,
+ &sha[..7.min(sha.len())]
+ ),
+ tool_name: None,
+ tool_input: None,
+ is_error: Some(false),
+ cost_usd: None,
+ duration_ms: None,
+ is_partial: false,
+ });
+ }
+ Err(e) => {
+ tracing::error!(error = %e, "Failed to store checkpoint in database");
+ state.broadcast_task_output(TaskOutputNotification {
+ task_id,
+ owner_id: Some(owner_id),
+ message_type: "error".to_string(),
+ content: format!("Checkpoint commit succeeded but DB storage failed: {}", e),
+ tool_name: None,
+ tool_input: None,
+ is_error: Some(true),
+ cost_usd: None,
+ duration_ms: None,
+ is_partial: false,
+ });
+ }
+ }
+ }
+ }
+ } else {
+ // Broadcast failure
+ let error_msg = error.unwrap_or_else(|| "Unknown error".to_string());
+ state.broadcast_task_output(TaskOutputNotification {
+ task_id,
+ owner_id: Some(owner_id),
+ message_type: "error".to_string(),
+ content: format!("✗ Checkpoint failed: {}", error_msg),
+ tool_name: None,
+ tool_input: None,
+ is_error: Some(true),
+ cost_usd: None,
+ duration_ms: None,
+ is_partial: false,
+ });
+ }
+ }
Err(e) => {
tracing::warn!("Failed to parse daemon message: {}", e);
}
diff --git a/makima/src/server/handlers/mesh_supervisor.rs b/makima/src/server/handlers/mesh_supervisor.rs
index 3add89f..278d0f5 100644
--- a/makima/src/server/handlers/mesh_supervisor.rs
+++ b/makima/src/server/handlers/mesh_supervisor.rs
@@ -714,10 +714,12 @@ pub async fn read_worktree_file(
),
request_body = CreateCheckpointRequest,
responses(
- (status = 201, description = "Checkpoint created", body = CheckpointResponse),
+ (status = 202, description = "Checkpoint creation accepted", body = CheckpointResponse),
(status = 401, description = "Unauthorized"),
+ (status = 403, description = "Forbidden - can only create checkpoint for own task"),
(status = 404, description = "Task not found"),
(status = 500, description = "Internal server error"),
+ (status = 503, description = "Task has no assigned daemon"),
),
tag = "Mesh Supervisor"
)]
@@ -749,7 +751,7 @@ pub async fn create_checkpoint(
let pool = state.db_pool.as_ref().unwrap();
- // Get task
+ // Get task and daemon_id
let task = match repository::get_task(pool, task_id).await {
Ok(Some(t)) => t,
Ok(None) => {
@@ -767,16 +769,37 @@ pub async fn create_checkpoint(
}
};
- // TODO: Implement checkpoint creation via daemon command
- // For now, checkpoints should be created by the task itself via git commands
- let _ = (task, request);
+ let Some(daemon_id) = task.daemon_id else {
+ return (
+ StatusCode::SERVICE_UNAVAILABLE,
+ Json(ApiError::new("NO_DAEMON", "Task has no assigned daemon")),
+ ).into_response();
+ };
+
+ // Send CreateCheckpoint command to daemon
+ let cmd = DaemonCommand::CreateCheckpoint {
+ task_id,
+ message: request.message.clone(),
+ };
+ if let Err(e) = state.send_daemon_command(daemon_id, cmd).await {
+ tracing::error!(error = %e, "Failed to send CreateCheckpoint command");
+ return (
+ StatusCode::INTERNAL_SERVER_ERROR,
+ Json(ApiError::new("COMMAND_FAILED", "Failed to send command to daemon")),
+ ).into_response();
+ }
+
+ // Return accepted - the checkpoint result will be delivered via WebSocket
+ // and stored in the database by the daemon message handler
(
- StatusCode::NOT_IMPLEMENTED,
- Json(ApiError::new(
- "NOT_IMPLEMENTED",
- "Checkpoint creation via API not yet implemented. Use git commands directly in the task.",
- )),
+ StatusCode::ACCEPTED,
+ Json(CheckpointResponse {
+ task_id,
+ checkpoint_number: 0, // Will be assigned by DB on actual creation
+ commit_sha: "pending".to_string(),
+ message: request.message,
+ }),
).into_response()
}
diff --git a/makima/src/server/state.rs b/makima/src/server/state.rs
index 2a45d88..6a56f21 100644
--- a/makima/src/server/state.rs
+++ b/makima/src/server/state.rs
@@ -396,6 +396,14 @@ pub enum DaemonCommand {
task_id: Uuid,
},
+ /// Create a git checkpoint (stage changes, commit, record stats)
+ CreateCheckpoint {
+ #[serde(rename = "taskId")]
+ task_id: Uuid,
+ /// Commit message for the checkpoint
+ message: String,
+ },
+
/// Clean up a task's worktree (used when contract is completed/deleted)
CleanupWorktree {
#[serde(rename = "taskId")]