summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorsoryu <soryu@soryu.co>2026-01-27 02:38:37 +0000
committersoryu <soryu@soryu.co>2026-01-27 02:38:37 +0000
commitf6d5692eb1e290689df516cec6fe77f07d419783 (patch)
tree7679fd1a6c3dee19849c0b835fa9e661c36ef053
parentf1129adb9799c9c9ca26189220bf9af5970f50fa (diff)
downloadsoryu-f6d5692eb1e290689df516cec6fe77f07d419783.tar.gz
soryu-f6d5692eb1e290689df516cec6fe77f07d419783.zip
Fix worktree info and patches endpoint
-rw-r--r--makima/src/server/handlers/mesh.rs248
-rw-r--r--makima/src/server/handlers/mesh_daemon.rs67
-rw-r--r--makima/src/server/mod.rs1
-rw-r--r--makima/src/server/state.rs22
4 files changed, 321 insertions, 17 deletions
diff --git a/makima/src/server/handlers/mesh.rs b/makima/src/server/handlers/mesh.rs
index 4b82ed7..0dcab91 100644
--- a/makima/src/server/handlers/mesh.rs
+++ b/makima/src/server/handlers/mesh.rs
@@ -7,6 +7,8 @@ use axum::{
Json,
};
use base64::Engine;
+use std::time::Duration;
+use tokio::sync::oneshot;
use uuid::Uuid;
use crate::db::models::{
@@ -2015,10 +2017,18 @@ pub async fn get_worktree_info(
.into_response();
};
+ // Create oneshot channel for response
+ let (tx, rx) = oneshot::channel();
+
+ // Store the sender for the daemon message handler to use
+ state.pending_worktree_info.insert(id, tx);
+
// Send GetWorktreeInfo command to daemon
let command = DaemonCommand::GetWorktreeInfo { task_id: id };
if let Err(e) = state.send_daemon_command(daemon_id, command).await {
+ // Clean up pending request on error
+ state.pending_worktree_info.remove(&id);
tracing::error!("Failed to send GetWorktreeInfo command: {}", e);
return (
StatusCode::INTERNAL_SERVER_ERROR,
@@ -2027,22 +2037,228 @@ pub async fn get_worktree_info(
.into_response();
}
- // Return placeholder - actual data will be streamed via WebSocket
- // For now, return empty data indicating the request is being processed
- Json(WorktreeInfoResponse {
- task_id: id,
- worktree_path: None,
- exists: false,
- stats: WorktreeStats {
- files_changed: 0,
- insertions: 0,
- deletions: 0,
- },
- files: vec![],
- branch: None,
- head_sha: None,
- })
- .into_response()
+ // Wait for daemon response with timeout
+ match tokio::time::timeout(Duration::from_secs(10), rx).await {
+ Ok(Ok(response)) => {
+ // Convert internal response to API response
+ let files: Vec<WorktreeFile> = response.files
+ .and_then(|f| f.as_array().cloned())
+ .unwrap_or_default()
+ .into_iter()
+ .filter_map(|v| {
+ Some(WorktreeFile {
+ path: v.get("path")?.as_str()?.to_string(),
+ status: v.get("status")?.as_str()?.to_string(),
+ lines_added: v.get("linesAdded")?.as_i64()? as i32,
+ lines_removed: v.get("linesRemoved")?.as_i64()? as i32,
+ })
+ })
+ .collect();
+
+ Json(WorktreeInfoResponse {
+ task_id: id,
+ worktree_path: response.worktree_path,
+ exists: response.exists,
+ stats: WorktreeStats {
+ files_changed: response.files_changed,
+ insertions: response.insertions,
+ deletions: response.deletions,
+ },
+ files,
+ branch: response.branch,
+ head_sha: response.head_sha,
+ })
+ .into_response()
+ }
+ Ok(Err(_)) => {
+ // Channel was dropped (sender side closed)
+ (
+ StatusCode::INTERNAL_SERVER_ERROR,
+ Json(ApiError::new("DAEMON_DISCONNECTED", "Daemon disconnected before responding")),
+ )
+ .into_response()
+ }
+ Err(_) => {
+ // Timeout - clean up pending request
+ state.pending_worktree_info.remove(&id);
+ (
+ StatusCode::GATEWAY_TIMEOUT,
+ Json(ApiError::new("TIMEOUT", "Daemon did not respond in time")),
+ )
+ .into_response()
+ }
+ }
+}
+
+// =============================================================================
+// Task Patches
+// =============================================================================
+
+/// Query parameters for listing task patches
+#[derive(Debug, serde::Deserialize)]
+#[serde(rename_all = "camelCase")]
+pub struct ListPatchesQuery {
+ /// Contract ID to scope the patches
+ pub contract_id: Uuid,
+}
+
+/// Patch summary for API response
+#[derive(Debug, serde::Serialize, utoipa::ToSchema)]
+#[serde(rename_all = "camelCase")]
+pub struct PatchSummary {
+ /// Patch ID
+ pub id: Uuid,
+ /// Patch name (generated from checkpoint message or timestamp)
+ pub name: String,
+ /// Description (checkpoint message)
+ pub description: Option<String>,
+ /// Task ID
+ pub task_id: Uuid,
+ /// Contract ID
+ pub contract_id: Uuid,
+ /// Number of files in the patch
+ pub files_count: i32,
+ /// Total lines added (estimated from patch size)
+ pub lines_added: i32,
+ /// Total lines removed (estimated from patch size)
+ pub lines_removed: i32,
+ /// List of file paths in the patch (if available from checkpoint)
+ pub files: Option<Vec<String>>,
+ /// When the patch was created
+ pub created_at: chrono::DateTime<chrono::Utc>,
+ /// When the patch was last updated
+ pub updated_at: chrono::DateTime<chrono::Utc>,
+}
+
+/// List patches (checkpoint patches) for a task.
+#[utoipa::path(
+ get,
+ path = "/api/v1/mesh/tasks/{id}/patches",
+ params(
+ ("id" = Uuid, Path, description = "Task ID"),
+ ("contractId" = Uuid, Query, description = "Contract ID")
+ ),
+ responses(
+ (status = 200, description = "List of patches", body = Vec<PatchSummary>),
+ (status = 401, description = "Unauthorized", body = ApiError),
+ (status = 404, description = "Task not found", body = ApiError),
+ (status = 503, description = "Database not configured", body = ApiError),
+ ),
+ security(
+ ("bearer_auth" = []),
+ ("api_key" = [])
+ ),
+ tag = "Mesh"
+)]
+pub async fn list_task_patches(
+ State(state): State<SharedState>,
+ Authenticated(auth): Authenticated,
+ Path(id): Path<Uuid>,
+ axum::extract::Query(query): axum::extract::Query<ListPatchesQuery>,
+) -> impl IntoResponse {
+ let Some(ref pool) = state.db_pool else {
+ return (
+ StatusCode::SERVICE_UNAVAILABLE,
+ Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")),
+ )
+ .into_response();
+ };
+
+ // Get the task (scoped by owner)
+ let task = match repository::get_task_for_owner(pool, id, auth.owner_id).await {
+ Ok(Some(t)) => t,
+ Ok(None) => {
+ return (
+ StatusCode::NOT_FOUND,
+ Json(ApiError::new("NOT_FOUND", "Task not found")),
+ )
+ .into_response();
+ }
+ Err(e) => {
+ tracing::error!("Failed to get task {}: {}", id, e);
+ return (
+ StatusCode::INTERNAL_SERVER_ERROR,
+ Json(ApiError::new("DB_ERROR", e.to_string())),
+ )
+ .into_response();
+ }
+ };
+
+ // Verify task belongs to the specified contract
+ if task.contract_id != Some(query.contract_id) {
+ return (
+ StatusCode::BAD_REQUEST,
+ Json(ApiError::new("INVALID_CONTRACT", "Task does not belong to the specified contract")),
+ )
+ .into_response();
+ }
+
+ // Get checkpoint patches for this task
+ let patches = match repository::list_checkpoint_patches(pool, id).await {
+ Ok(p) => p,
+ Err(e) => {
+ tracing::error!("Failed to list patches for task {}: {}", id, e);
+ return (
+ StatusCode::INTERNAL_SERVER_ERROR,
+ Json(ApiError::new("DB_ERROR", e.to_string())),
+ )
+ .into_response();
+ }
+ };
+
+ // Get checkpoints to get file lists and messages
+ let checkpoints = match repository::list_task_checkpoints(pool, id).await {
+ Ok(c) => c,
+ Err(e) => {
+ tracing::warn!("Failed to list checkpoints for task {}: {}", id, e);
+ vec![]
+ }
+ };
+
+ // Create a map of checkpoint_id to checkpoint info
+ let checkpoint_map: std::collections::HashMap<Uuid, _> = checkpoints
+ .into_iter()
+ .map(|c| (c.id, c))
+ .collect();
+
+ // Transform to PatchSummary
+ let summaries: Vec<PatchSummary> = patches
+ .into_iter()
+ .map(|p| {
+ let checkpoint = p.checkpoint_id.and_then(|cid| checkpoint_map.get(&cid));
+ let name = checkpoint
+ .map(|c| c.message.clone())
+ .unwrap_or_else(|| format!("Patch {}", p.created_at.format("%Y-%m-%d %H:%M")));
+ let description = checkpoint.map(|c| c.message.clone());
+ let files = checkpoint.and_then(|c| {
+ c.files_changed.as_ref().and_then(|f| {
+ f.as_array().map(|arr| {
+ arr.iter()
+ .filter_map(|v| v.get("path").and_then(|p| p.as_str()).map(|s| s.to_string()))
+ .collect()
+ })
+ })
+ });
+ let lines_added = checkpoint.and_then(|c| c.lines_added).unwrap_or(0);
+ let lines_removed = checkpoint.and_then(|c| c.lines_removed).unwrap_or(0);
+
+ PatchSummary {
+ id: p.id,
+ name,
+ description,
+ task_id: p.task_id,
+ contract_id: query.contract_id,
+ files_count: p.files_count,
+ lines_added,
+ lines_removed,
+ files,
+ created_at: p.created_at,
+ updated_at: p.created_at, // Use created_at as updated_at
+ }
+ })
+ .collect();
+
+ Json(summaries).into_response()
}
/// Request to check if a target directory exists.
diff --git a/makima/src/server/handlers/mesh_daemon.rs b/makima/src/server/handlers/mesh_daemon.rs
index f7fe49f..433c787 100644
--- a/makima/src/server/handlers/mesh_daemon.rs
+++ b/makima/src/server/handlers/mesh_daemon.rs
@@ -493,6 +493,33 @@ pub enum DaemonMessage {
#[serde(rename = "prNumber")]
pr_number: Option<i32>,
},
+ /// Response to GetWorktreeInfo command
+ WorktreeInfoResult {
+ #[serde(rename = "taskId")]
+ task_id: Uuid,
+ success: bool,
+ /// Path to the worktree directory
+ #[serde(rename = "worktreePath")]
+ worktree_path: Option<String>,
+ /// Whether the worktree exists
+ exists: bool,
+ /// Number of files changed
+ #[serde(rename = "filesChanged")]
+ files_changed: i32,
+ /// Total lines inserted
+ insertions: i32,
+ /// Total lines deleted
+ deletions: i32,
+ /// Changed files list
+ files: Option<serde_json::Value>,
+ /// Current branch name
+ branch: Option<String>,
+ /// Current HEAD commit SHA
+ #[serde(rename = "headSha")]
+ head_sha: Option<String>,
+ /// Error message if failed
+ error: Option<String>,
+ },
}
/// Validated daemon authentication result.
@@ -1880,6 +1907,46 @@ async fn handle_daemon_connection(socket: WebSocket, state: SharedState, auth_re
});
}
}
+ Ok(DaemonMessage::WorktreeInfoResult {
+ task_id,
+ success,
+ worktree_path,
+ exists,
+ files_changed,
+ insertions,
+ deletions,
+ files,
+ branch,
+ head_sha,
+ error,
+ }) => {
+ tracing::debug!(
+ task_id = %task_id,
+ success = success,
+ exists = exists,
+ files_changed = files_changed,
+ "Worktree info result received"
+ );
+
+ // Fulfill pending worktree info request if any
+ if let Some((_, tx)) = state.pending_worktree_info.remove(&task_id) {
+ let response = crate::server::state::WorktreeInfoResponse {
+ task_id,
+ success,
+ worktree_path,
+ exists,
+ files_changed,
+ insertions,
+ deletions,
+ files,
+ branch,
+ head_sha,
+ error,
+ };
+ // Ignore send error - receiver may have timed out
+ let _ = tx.send(response);
+ }
+ }
Err(e) => {
tracing::warn!("Failed to parse daemon message: {}", e);
}
diff --git a/makima/src/server/mod.rs b/makima/src/server/mod.rs
index 1c4229e..b969650 100644
--- a/makima/src/server/mod.rs
+++ b/makima/src/server/mod.rs
@@ -82,6 +82,7 @@ pub fn make_router(state: SharedState) -> Router {
.route("/mesh/tasks/{id}/retry-completion", post(mesh::retry_completion_action))
.route("/mesh/tasks/{id}/clone", post(mesh::clone_worktree))
.route("/mesh/tasks/{id}/worktree-info", get(mesh::get_worktree_info))
+ .route("/mesh/tasks/{id}/patches", get(mesh::list_task_patches))
.route("/mesh/tasks/{id}/check-target", post(mesh::check_target_exists))
.route("/mesh/tasks/{id}/reassign", post(mesh::reassign_task))
.route("/mesh/tasks/{id}/continue", post(mesh::continue_task))
diff --git a/makima/src/server/state.rs b/makima/src/server/state.rs
index b3cf27d..6872d5e 100644
--- a/makima/src/server/state.rs
+++ b/makima/src/server/state.rs
@@ -3,7 +3,7 @@
use std::sync::Arc;
use dashmap::DashMap;
use sqlx::PgPool;
-use tokio::sync::{broadcast, mpsc, Mutex, OnceCell};
+use tokio::sync::{broadcast, mpsc, oneshot, Mutex, OnceCell};
use uuid::Uuid;
use crate::listen::{DiarizationConfig, ParakeetEOU, ParakeetTDT, Sortformer};
@@ -188,6 +188,23 @@ pub struct SupervisorQuestionResponse {
pub responded_at: chrono::DateTime<chrono::Utc>,
}
+/// Worktree info response from daemon
+#[derive(Debug, Clone, serde::Serialize)]
+#[serde(rename_all = "camelCase")]
+pub struct WorktreeInfoResponse {
+ pub task_id: Uuid,
+ pub success: bool,
+ pub worktree_path: Option<String>,
+ pub exists: bool,
+ pub files_changed: i32,
+ pub insertions: i32,
+ pub deletions: i32,
+ pub files: Option<serde_json::Value>,
+ pub branch: Option<String>,
+ pub head_sha: Option<String>,
+ pub error: Option<String>,
+}
+
/// Command sent from server to daemon.
#[derive(Debug, Clone, serde::Serialize)]
#[serde(tag = "type", rename_all = "camelCase")]
@@ -563,6 +580,8 @@ pub struct AppState {
pub tool_keys: DashMap<String, Uuid>,
/// JWT verifier for Supabase authentication (None if not configured)
pub jwt_verifier: Option<JwtVerifier>,
+ /// Pending worktree info requests awaiting daemon response (keyed by task_id)
+ pub pending_worktree_info: DashMap<Uuid, oneshot::Sender<WorktreeInfoResponse>>,
}
impl AppState {
@@ -636,6 +655,7 @@ impl AppState {
daemon_connections: DashMap::new(),
tool_keys: DashMap::new(),
jwt_verifier,
+ pending_worktree_info: DashMap::new(),
}
}