summaryrefslogtreecommitdiff
path: root/makima/src/server
diff options
context:
space:
mode:
Diffstat (limited to 'makima/src/server')
-rw-r--r--makima/src/server/handlers/mesh.rs275
-rw-r--r--makima/src/server/handlers/mesh_daemon.rs60
-rw-r--r--makima/src/server/mod.rs2
-rw-r--r--makima/src/server/state.rs43
4 files changed, 380 insertions, 0 deletions
diff --git a/makima/src/server/handlers/mesh.rs b/makima/src/server/handlers/mesh.rs
index 0e72bdf..1a5b9c1 100644
--- a/makima/src/server/handlers/mesh.rs
+++ b/makima/src/server/handlers/mesh.rs
@@ -2099,6 +2099,281 @@ pub async fn get_worktree_info(
}
// =============================================================================
+// Task Diff
+// =============================================================================
+
+/// Response for the task diff endpoint.
+#[derive(Debug, serde::Serialize, utoipa::ToSchema)]
+#[serde(rename_all = "camelCase")]
+pub struct TaskDiffApiResponse {
+ /// Task ID.
+ pub task_id: Uuid,
+ /// Whether the diff was retrieved successfully.
+ pub success: bool,
+ /// The diff content.
+ pub diff: Option<String>,
+ /// Error message if failed.
+ pub error: Option<String>,
+}
+
+/// Get the diff for a task's changes.
+#[utoipa::path(
+ get,
+ path = "/api/v1/mesh/tasks/{id}/diff",
+ params(
+ ("id" = Uuid, Path, description = "Task ID")
+ ),
+ responses(
+ (status = 200, description = "Task diff", body = TaskDiffApiResponse),
+ (status = 401, description = "Unauthorized", body = ApiError),
+ (status = 404, description = "Task not found", body = ApiError),
+ (status = 503, description = "Database not configured or daemon not connected", body = ApiError),
+ (status = 500, description = "Internal server error", body = ApiError),
+ ),
+ security(
+ ("bearer_auth" = []),
+ ("api_key" = [])
+ ),
+ tag = "Mesh"
+)]
+pub async fn get_task_diff(
+ State(state): State<SharedState>,
+ Authenticated(auth): Authenticated,
+ Path(id): Path<Uuid>,
+) -> impl IntoResponse {
+ let Some(ref pool) = state.db_pool else {
+ return (
+ StatusCode::SERVICE_UNAVAILABLE,
+ Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")),
+ )
+ .into_response();
+ };
+
+ // Get the task (scoped by owner)
+ let task = match repository::get_task_for_owner(pool, id, auth.owner_id).await {
+ Ok(Some(t)) => t,
+ Ok(None) => {
+ return (
+ StatusCode::NOT_FOUND,
+ Json(ApiError::new("NOT_FOUND", "Task not found")),
+ )
+ .into_response();
+ }
+ Err(e) => {
+ tracing::error!("Failed to get task {}: {}", id, e);
+ return (
+ StatusCode::INTERNAL_SERVER_ERROR,
+ Json(ApiError::new("DB_ERROR", e.to_string())),
+ )
+ .into_response();
+ }
+ };
+
+ // Get daemon running the task
+ let Some(daemon_id) = task.daemon_id else {
+ return (
+ StatusCode::SERVICE_UNAVAILABLE,
+ Json(ApiError::new("NO_DAEMON", "Task has no assigned daemon")),
+ )
+ .into_response();
+ };
+
+ // Create oneshot channel for response
+ let (tx, rx) = oneshot::channel();
+
+ // Store the sender for the daemon message handler to use
+ state.pending_task_diff.insert(id, tx);
+
+ // Send GetTaskDiff command to daemon
+ let command = DaemonCommand::GetTaskDiff { task_id: id };
+
+ if let Err(e) = state.send_daemon_command(daemon_id, command).await {
+ // Clean up pending request on error
+ state.pending_task_diff.remove(&id);
+ tracing::error!("Failed to send GetTaskDiff command: {}", e);
+ return (
+ StatusCode::INTERNAL_SERVER_ERROR,
+ Json(ApiError::new("DAEMON_ERROR", e)),
+ )
+ .into_response();
+ }
+
+ // Wait for daemon response with timeout
+ match tokio::time::timeout(Duration::from_secs(15), rx).await {
+ Ok(Ok(response)) => {
+ Json(TaskDiffApiResponse {
+ task_id: id,
+ success: response.success,
+ diff: response.diff,
+ error: response.error,
+ })
+ .into_response()
+ }
+ Ok(Err(_)) => {
+ // Channel was dropped (sender side closed)
+ (
+ StatusCode::INTERNAL_SERVER_ERROR,
+ Json(ApiError::new("DAEMON_DISCONNECTED", "Daemon disconnected before responding")),
+ )
+ .into_response()
+ }
+ Err(_) => {
+ // Timeout - clean up pending request
+ state.pending_task_diff.remove(&id);
+ (
+ StatusCode::GATEWAY_TIMEOUT,
+ Json(ApiError::new("TIMEOUT", "Daemon did not respond in time")),
+ )
+ .into_response()
+ }
+ }
+}
+
+// =============================================================================
+// Worktree Commit
+// =============================================================================
+
+/// Request body for worktree commit.
+#[derive(Debug, serde::Deserialize, utoipa::ToSchema)]
+#[serde(rename_all = "camelCase")]
+pub struct CommitWorktreeRequest {
+ /// Optional commit message. Defaults to "Worktree commit" if not provided.
+ pub message: Option<String>,
+}
+
+/// Response for the worktree commit endpoint.
+#[derive(Debug, serde::Serialize, utoipa::ToSchema)]
+#[serde(rename_all = "camelCase")]
+pub struct CommitWorktreeApiResponse {
+ /// Task ID.
+ pub task_id: Uuid,
+ /// Whether the commit was successful.
+ pub success: bool,
+ /// The commit SHA if successful.
+ pub commit_sha: Option<String>,
+ /// Error message if failed.
+ pub error: Option<String>,
+}
+
+/// Commit changes in a task's worktree.
+#[utoipa::path(
+ post,
+ path = "/api/v1/mesh/tasks/{id}/worktree-commit",
+ params(
+ ("id" = Uuid, Path, description = "Task ID")
+ ),
+ request_body = CommitWorktreeRequest,
+ responses(
+ (status = 200, description = "Worktree commit result", body = CommitWorktreeApiResponse),
+ (status = 401, description = "Unauthorized", body = ApiError),
+ (status = 404, description = "Task not found", body = ApiError),
+ (status = 503, description = "Database not configured or daemon not connected", body = ApiError),
+ (status = 500, description = "Internal server error", body = ApiError),
+ ),
+ security(
+ ("bearer_auth" = []),
+ ("api_key" = [])
+ ),
+ tag = "Mesh"
+)]
+pub async fn commit_worktree(
+ State(state): State<SharedState>,
+ Authenticated(auth): Authenticated,
+ Path(id): Path<Uuid>,
+ Json(body): Json<CommitWorktreeRequest>,
+) -> impl IntoResponse {
+ let Some(ref pool) = state.db_pool else {
+ return (
+ StatusCode::SERVICE_UNAVAILABLE,
+ Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")),
+ )
+ .into_response();
+ };
+
+ // Get the task (scoped by owner)
+ let task = match repository::get_task_for_owner(pool, id, auth.owner_id).await {
+ Ok(Some(t)) => t,
+ Ok(None) => {
+ return (
+ StatusCode::NOT_FOUND,
+ Json(ApiError::new("NOT_FOUND", "Task not found")),
+ )
+ .into_response();
+ }
+ Err(e) => {
+ tracing::error!("Failed to get task {}: {}", id, e);
+ return (
+ StatusCode::INTERNAL_SERVER_ERROR,
+ Json(ApiError::new("DB_ERROR", e.to_string())),
+ )
+ .into_response();
+ }
+ };
+
+ // Get daemon running the task
+ let Some(daemon_id) = task.daemon_id else {
+ return (
+ StatusCode::SERVICE_UNAVAILABLE,
+ Json(ApiError::new("NO_DAEMON", "Task has no assigned daemon")),
+ )
+ .into_response();
+ };
+
+ // Create oneshot channel for response
+ let (tx, rx) = oneshot::channel();
+
+ // Store the sender for the daemon message handler to use
+ state.pending_worktree_commit.insert(id, tx);
+
+ // Send CommitWorktree command to daemon
+ let command = DaemonCommand::CommitWorktree {
+ task_id: id,
+ message: body.message,
+ };
+
+ if let Err(e) = state.send_daemon_command(daemon_id, command).await {
+ // Clean up pending request on error
+ state.pending_worktree_commit.remove(&id);
+ tracing::error!("Failed to send CommitWorktree command: {}", e);
+ return (
+ StatusCode::INTERNAL_SERVER_ERROR,
+ Json(ApiError::new("DAEMON_ERROR", e)),
+ )
+ .into_response();
+ }
+
+ // Wait for daemon response with timeout
+ match tokio::time::timeout(Duration::from_secs(15), rx).await {
+ Ok(Ok(response)) => {
+ Json(CommitWorktreeApiResponse {
+ task_id: id,
+ success: response.success,
+ commit_sha: response.commit_sha,
+ error: response.error,
+ })
+ .into_response()
+ }
+ Ok(Err(_)) => {
+ // Channel was dropped (sender side closed)
+ (
+ StatusCode::INTERNAL_SERVER_ERROR,
+ Json(ApiError::new("DAEMON_DISCONNECTED", "Daemon disconnected before responding")),
+ )
+ .into_response()
+ }
+ Err(_) => {
+ // Timeout - clean up pending request
+ state.pending_worktree_commit.remove(&id);
+ (
+ StatusCode::GATEWAY_TIMEOUT,
+ Json(ApiError::new("TIMEOUT", "Daemon did not respond in time")),
+ )
+ .into_response()
+ }
+ }
+}
+
+// =============================================================================
// Task Patches
// =============================================================================
diff --git a/makima/src/server/handlers/mesh_daemon.rs b/makima/src/server/handlers/mesh_daemon.rs
index d5ef1f9..139db70 100644
--- a/makima/src/server/handlers/mesh_daemon.rs
+++ b/makima/src/server/handlers/mesh_daemon.rs
@@ -530,6 +530,14 @@ pub enum DaemonMessage {
#[serde(rename = "prNumber")]
pr_number: Option<i32>,
},
+ /// Response to GetWorktreeDiff command
+ WorktreeDiffResult {
+ #[serde(rename = "taskId")]
+ task_id: Uuid,
+ success: bool,
+ diff: Option<String>,
+ error: Option<String>,
+ },
/// Response to GetWorktreeInfo command
WorktreeInfoResult {
#[serde(rename = "taskId")]
@@ -557,6 +565,23 @@ pub enum DaemonMessage {
/// Error message if failed
error: Option<String>,
},
+ /// Response to GetTaskDiff command
+ TaskDiff {
+ #[serde(rename = "taskId")]
+ task_id: Uuid,
+ success: bool,
+ diff: Option<String>,
+ error: Option<String>,
+ },
+ /// Response to CommitWorktree command
+ WorktreeCommitResult {
+ #[serde(rename = "taskId")]
+ task_id: Uuid,
+ success: bool,
+ #[serde(rename = "commitSha")]
+ commit_sha: Option<String>,
+ error: Option<String>,
+ },
/// Request to merge a task's patch to supervisor's worktree (cross-daemon case).
/// Sent when a task completes on a different daemon than its supervisor.
MergePatchToSupervisor {
@@ -2358,6 +2383,41 @@ async fn handle_daemon_connection(socket: WebSocket, state: SharedState, auth_re
let _ = tx.send(response);
}
}
+ Ok(DaemonMessage::TaskDiff { task_id, success, diff, error }) => {
+ tracing::debug!(
+ task_id = %task_id,
+ success = success,
+ "Task diff result received"
+ );
+
+ // Fulfill pending task diff request if any
+ if let Some((_, tx)) = state.pending_task_diff.remove(&task_id) {
+ let _ = tx.send(crate::server::state::TaskDiffResult {
+ task_id,
+ success,
+ diff,
+ error,
+ });
+ }
+ }
+ Ok(DaemonMessage::WorktreeCommitResult { task_id, success, commit_sha, error }) => {
+ tracing::debug!(
+ task_id = %task_id,
+ success = success,
+ commit_sha = ?commit_sha,
+ "Worktree commit result received"
+ );
+
+ // Fulfill pending worktree commit request if any
+ if let Some((_, tx)) = state.pending_worktree_commit.remove(&task_id) {
+ let _ = tx.send(crate::server::state::WorktreeCommitResponse {
+ task_id,
+ success,
+ commit_sha,
+ error,
+ });
+ }
+ }
Ok(DaemonMessage::MergePatchToSupervisor {
task_id,
supervisor_task_id,
diff --git a/makima/src/server/mod.rs b/makima/src/server/mod.rs
index 6321518..b382f04 100644
--- a/makima/src/server/mod.rs
+++ b/makima/src/server/mod.rs
@@ -83,6 +83,8 @@ pub fn make_router(state: SharedState) -> Router {
.route("/mesh/tasks/{id}/retry-completion", post(mesh::retry_completion_action))
.route("/mesh/tasks/{id}/clone", post(mesh::clone_worktree))
.route("/mesh/tasks/{id}/worktree-info", get(mesh::get_worktree_info))
+ .route("/mesh/tasks/{id}/diff", get(mesh::get_task_diff))
+ .route("/mesh/tasks/{id}/worktree-commit", post(mesh::commit_worktree))
.route("/mesh/tasks/{id}/patches", get(mesh::list_task_patches))
.route("/mesh/tasks/{id}/patch-data", get(mesh::get_task_patch_data))
.route("/mesh/tasks/{id}/check-target", post(mesh::check_target_exists))
diff --git a/makima/src/server/state.rs b/makima/src/server/state.rs
index 5c5e24f..83ac2e8 100644
--- a/makima/src/server/state.rs
+++ b/makima/src/server/state.rs
@@ -194,6 +194,16 @@ pub struct SupervisorQuestionResponse {
pub responded_at: chrono::DateTime<chrono::Utc>,
}
+/// Worktree diff response from daemon
+#[derive(Debug, Clone, serde::Serialize)]
+#[serde(rename_all = "camelCase")]
+pub struct WorktreeDiffResponse {
+ pub task_id: Uuid,
+ pub success: bool,
+ pub diff: String,
+ pub error: Option<String>,
+}
+
/// Worktree info response from daemon
#[derive(Debug, Clone, serde::Serialize)]
#[serde(rename_all = "camelCase")]
@@ -211,6 +221,26 @@ pub struct WorktreeInfoResponse {
pub error: Option<String>,
}
+/// Task diff result from daemon
+#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
+#[serde(rename_all = "camelCase")]
+pub struct TaskDiffResult {
+ pub task_id: Uuid,
+ pub success: bool,
+ pub diff: Option<String>,
+ pub error: Option<String>,
+}
+
+/// Worktree commit response from daemon
+#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
+#[serde(rename_all = "camelCase")]
+pub struct WorktreeCommitResponse {
+ pub task_id: Uuid,
+ pub success: bool,
+ pub commit_sha: Option<String>,
+ pub error: Option<String>,
+}
+
/// Command sent from server to daemon.
#[derive(Debug, Clone, serde::Serialize)]
#[serde(tag = "type", rename_all = "camelCase")]
@@ -491,6 +521,13 @@ pub enum DaemonCommand {
task_id: Uuid,
},
+ /// Commit changes in a task worktree
+ CommitWorktree {
+ #[serde(rename = "taskId")]
+ task_id: Uuid,
+ message: Option<String>,
+ },
+
/// Create a git checkpoint (stage changes, commit, record stats)
CreateCheckpoint {
#[serde(rename = "taskId")]
@@ -636,6 +673,10 @@ pub struct AppState {
pub jwt_verifier: Option<JwtVerifier>,
/// Pending worktree info requests awaiting daemon response (keyed by task_id)
pub pending_worktree_info: DashMap<Uuid, oneshot::Sender<WorktreeInfoResponse>>,
+ /// Pending task diff requests awaiting daemon response (keyed by task_id)
+ pub pending_task_diff: DashMap<Uuid, oneshot::Sender<TaskDiffResult>>,
+ /// Pending worktree commit requests awaiting daemon response (keyed by task_id)
+ pub pending_worktree_commit: DashMap<Uuid, oneshot::Sender<WorktreeCommitResponse>>,
/// Lazily-loaded TTS engine (initialized on first Speak connection)
pub tts_engine: OnceCell<Box<dyn TtsEngine>>,
/// Daemon reauth status storage (keyed by (daemon_id, request_id))
@@ -717,6 +758,8 @@ impl AppState {
tool_keys: DashMap::new(),
jwt_verifier,
pending_worktree_info: DashMap::new(),
+ pending_task_diff: DashMap::new(),
+ pending_worktree_commit: DashMap::new(),
tts_engine: OnceCell::new(),
daemon_reauth_status: DashMap::new(),
}