diff options
| author | soryu <soryu@soryu.co> | 2026-01-27 02:38:37 +0000 |
|---|---|---|
| committer | soryu <soryu@soryu.co> | 2026-01-27 02:38:37 +0000 |
| commit | f6d5692eb1e290689df516cec6fe77f07d419783 (patch) | |
| tree | 7679fd1a6c3dee19849c0b835fa9e661c36ef053 | |
| parent | f1129adb9799c9c9ca26189220bf9af5970f50fa (diff) | |
| download | soryu-f6d5692eb1e290689df516cec6fe77f07d419783.tar.gz soryu-f6d5692eb1e290689df516cec6fe77f07d419783.zip | |
Fix worktree info and patches endpoint
| -rw-r--r-- | makima/src/server/handlers/mesh.rs | 248 | ||||
| -rw-r--r-- | makima/src/server/handlers/mesh_daemon.rs | 67 | ||||
| -rw-r--r-- | makima/src/server/mod.rs | 1 | ||||
| -rw-r--r-- | makima/src/server/state.rs | 22 |
4 files changed, 321 insertions, 17 deletions
diff --git a/makima/src/server/handlers/mesh.rs b/makima/src/server/handlers/mesh.rs index 4b82ed7..0dcab91 100644 --- a/makima/src/server/handlers/mesh.rs +++ b/makima/src/server/handlers/mesh.rs @@ -7,6 +7,8 @@ use axum::{ Json, }; use base64::Engine; +use std::time::Duration; +use tokio::sync::oneshot; use uuid::Uuid; use crate::db::models::{ @@ -2015,10 +2017,18 @@ pub async fn get_worktree_info( .into_response(); }; + // Create oneshot channel for response + let (tx, rx) = oneshot::channel(); + + // Store the sender for the daemon message handler to use + state.pending_worktree_info.insert(id, tx); + // Send GetWorktreeInfo command to daemon let command = DaemonCommand::GetWorktreeInfo { task_id: id }; if let Err(e) = state.send_daemon_command(daemon_id, command).await { + // Clean up pending request on error + state.pending_worktree_info.remove(&id); tracing::error!("Failed to send GetWorktreeInfo command: {}", e); return ( StatusCode::INTERNAL_SERVER_ERROR, @@ -2027,22 +2037,228 @@ pub async fn get_worktree_info( .into_response(); } - // Return placeholder - actual data will be streamed via WebSocket - // For now, return empty data indicating the request is being processed - Json(WorktreeInfoResponse { - task_id: id, - worktree_path: None, - exists: false, - stats: WorktreeStats { - files_changed: 0, - insertions: 0, - deletions: 0, - }, - files: vec![], - branch: None, - head_sha: None, - }) - .into_response() + // Wait for daemon response with timeout + match tokio::time::timeout(Duration::from_secs(10), rx).await { + Ok(Ok(response)) => { + // Convert internal response to API response + let files: Vec<WorktreeFile> = response.files + .and_then(|f| f.as_array().cloned()) + .unwrap_or_default() + .into_iter() + .filter_map(|v| { + Some(WorktreeFile { + path: v.get("path")?.as_str()?.to_string(), + status: v.get("status")?.as_str()?.to_string(), + lines_added: v.get("linesAdded")?.as_i64()? as i32, + lines_removed: v.get("linesRemoved")?.as_i64()? as i32, + }) + }) + .collect(); + + Json(WorktreeInfoResponse { + task_id: id, + worktree_path: response.worktree_path, + exists: response.exists, + stats: WorktreeStats { + files_changed: response.files_changed, + insertions: response.insertions, + deletions: response.deletions, + }, + files, + branch: response.branch, + head_sha: response.head_sha, + }) + .into_response() + } + Ok(Err(_)) => { + // Channel was dropped (sender side closed) + ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(ApiError::new("DAEMON_DISCONNECTED", "Daemon disconnected before responding")), + ) + .into_response() + } + Err(_) => { + // Timeout - clean up pending request + state.pending_worktree_info.remove(&id); + ( + StatusCode::GATEWAY_TIMEOUT, + Json(ApiError::new("TIMEOUT", "Daemon did not respond in time")), + ) + .into_response() + } + } +} + +// ============================================================================= +// Task Patches +// ============================================================================= + +/// Query parameters for listing task patches +#[derive(Debug, serde::Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct ListPatchesQuery { + /// Contract ID to scope the patches + pub contract_id: Uuid, +} + +/// Patch summary for API response +#[derive(Debug, serde::Serialize, utoipa::ToSchema)] +#[serde(rename_all = "camelCase")] +pub struct PatchSummary { + /// Patch ID + pub id: Uuid, + /// Patch name (generated from checkpoint message or timestamp) + pub name: String, + /// Description (checkpoint message) + pub description: Option<String>, + /// Task ID + pub task_id: Uuid, + /// Contract ID + pub contract_id: Uuid, + /// Number of files in the patch + pub files_count: i32, + /// Total lines added (estimated from patch size) + pub lines_added: i32, + /// Total lines removed (estimated from patch size) + pub lines_removed: i32, + /// List of file paths in the patch (if available from checkpoint) + pub files: Option<Vec<String>>, + /// When the patch was created + pub created_at: chrono::DateTime<chrono::Utc>, + /// When the patch was last updated + pub updated_at: chrono::DateTime<chrono::Utc>, +} + +/// List patches (checkpoint patches) for a task. +#[utoipa::path( + get, + path = "/api/v1/mesh/tasks/{id}/patches", + params( + ("id" = Uuid, Path, description = "Task ID"), + ("contractId" = Uuid, Query, description = "Contract ID") + ), + responses( + (status = 200, description = "List of patches", body = Vec<PatchSummary>), + (status = 401, description = "Unauthorized", body = ApiError), + (status = 404, description = "Task not found", body = ApiError), + (status = 503, description = "Database not configured", body = ApiError), + ), + security( + ("bearer_auth" = []), + ("api_key" = []) + ), + tag = "Mesh" +)] +pub async fn list_task_patches( + State(state): State<SharedState>, + Authenticated(auth): Authenticated, + Path(id): Path<Uuid>, + axum::extract::Query(query): axum::extract::Query<ListPatchesQuery>, +) -> impl IntoResponse { + let Some(ref pool) = state.db_pool else { + return ( + StatusCode::SERVICE_UNAVAILABLE, + Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")), + ) + .into_response(); + }; + + // Get the task (scoped by owner) + let task = match repository::get_task_for_owner(pool, id, auth.owner_id).await { + Ok(Some(t)) => t, + Ok(None) => { + return ( + StatusCode::NOT_FOUND, + Json(ApiError::new("NOT_FOUND", "Task not found")), + ) + .into_response(); + } + Err(e) => { + tracing::error!("Failed to get task {}: {}", id, e); + return ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(ApiError::new("DB_ERROR", e.to_string())), + ) + .into_response(); + } + }; + + // Verify task belongs to the specified contract + if task.contract_id != Some(query.contract_id) { + return ( + StatusCode::BAD_REQUEST, + Json(ApiError::new("INVALID_CONTRACT", "Task does not belong to the specified contract")), + ) + .into_response(); + } + + // Get checkpoint patches for this task + let patches = match repository::list_checkpoint_patches(pool, id).await { + Ok(p) => p, + Err(e) => { + tracing::error!("Failed to list patches for task {}: {}", id, e); + return ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(ApiError::new("DB_ERROR", e.to_string())), + ) + .into_response(); + } + }; + + // Get checkpoints to get file lists and messages + let checkpoints = match repository::list_task_checkpoints(pool, id).await { + Ok(c) => c, + Err(e) => { + tracing::warn!("Failed to list checkpoints for task {}: {}", id, e); + vec![] + } + }; + + // Create a map of checkpoint_id to checkpoint info + let checkpoint_map: std::collections::HashMap<Uuid, _> = checkpoints + .into_iter() + .map(|c| (c.id, c)) + .collect(); + + // Transform to PatchSummary + let summaries: Vec<PatchSummary> = patches + .into_iter() + .map(|p| { + let checkpoint = p.checkpoint_id.and_then(|cid| checkpoint_map.get(&cid)); + let name = checkpoint + .map(|c| c.message.clone()) + .unwrap_or_else(|| format!("Patch {}", p.created_at.format("%Y-%m-%d %H:%M"))); + let description = checkpoint.map(|c| c.message.clone()); + let files = checkpoint.and_then(|c| { + c.files_changed.as_ref().and_then(|f| { + f.as_array().map(|arr| { + arr.iter() + .filter_map(|v| v.get("path").and_then(|p| p.as_str()).map(|s| s.to_string())) + .collect() + }) + }) + }); + let lines_added = checkpoint.and_then(|c| c.lines_added).unwrap_or(0); + let lines_removed = checkpoint.and_then(|c| c.lines_removed).unwrap_or(0); + + PatchSummary { + id: p.id, + name, + description, + task_id: p.task_id, + contract_id: query.contract_id, + files_count: p.files_count, + lines_added, + lines_removed, + files, + created_at: p.created_at, + updated_at: p.created_at, // Use created_at as updated_at + } + }) + .collect(); + + Json(summaries).into_response() } /// Request to check if a target directory exists. diff --git a/makima/src/server/handlers/mesh_daemon.rs b/makima/src/server/handlers/mesh_daemon.rs index f7fe49f..433c787 100644 --- a/makima/src/server/handlers/mesh_daemon.rs +++ b/makima/src/server/handlers/mesh_daemon.rs @@ -493,6 +493,33 @@ pub enum DaemonMessage { #[serde(rename = "prNumber")] pr_number: Option<i32>, }, + /// Response to GetWorktreeInfo command + WorktreeInfoResult { + #[serde(rename = "taskId")] + task_id: Uuid, + success: bool, + /// Path to the worktree directory + #[serde(rename = "worktreePath")] + worktree_path: Option<String>, + /// Whether the worktree exists + exists: bool, + /// Number of files changed + #[serde(rename = "filesChanged")] + files_changed: i32, + /// Total lines inserted + insertions: i32, + /// Total lines deleted + deletions: i32, + /// Changed files list + files: Option<serde_json::Value>, + /// Current branch name + branch: Option<String>, + /// Current HEAD commit SHA + #[serde(rename = "headSha")] + head_sha: Option<String>, + /// Error message if failed + error: Option<String>, + }, } /// Validated daemon authentication result. @@ -1880,6 +1907,46 @@ async fn handle_daemon_connection(socket: WebSocket, state: SharedState, auth_re }); } } + Ok(DaemonMessage::WorktreeInfoResult { + task_id, + success, + worktree_path, + exists, + files_changed, + insertions, + deletions, + files, + branch, + head_sha, + error, + }) => { + tracing::debug!( + task_id = %task_id, + success = success, + exists = exists, + files_changed = files_changed, + "Worktree info result received" + ); + + // Fulfill pending worktree info request if any + if let Some((_, tx)) = state.pending_worktree_info.remove(&task_id) { + let response = crate::server::state::WorktreeInfoResponse { + task_id, + success, + worktree_path, + exists, + files_changed, + insertions, + deletions, + files, + branch, + head_sha, + error, + }; + // Ignore send error - receiver may have timed out + let _ = tx.send(response); + } + } Err(e) => { tracing::warn!("Failed to parse daemon message: {}", e); } diff --git a/makima/src/server/mod.rs b/makima/src/server/mod.rs index 1c4229e..b969650 100644 --- a/makima/src/server/mod.rs +++ b/makima/src/server/mod.rs @@ -82,6 +82,7 @@ pub fn make_router(state: SharedState) -> Router { .route("/mesh/tasks/{id}/retry-completion", post(mesh::retry_completion_action)) .route("/mesh/tasks/{id}/clone", post(mesh::clone_worktree)) .route("/mesh/tasks/{id}/worktree-info", get(mesh::get_worktree_info)) + .route("/mesh/tasks/{id}/patches", get(mesh::list_task_patches)) .route("/mesh/tasks/{id}/check-target", post(mesh::check_target_exists)) .route("/mesh/tasks/{id}/reassign", post(mesh::reassign_task)) .route("/mesh/tasks/{id}/continue", post(mesh::continue_task)) diff --git a/makima/src/server/state.rs b/makima/src/server/state.rs index b3cf27d..6872d5e 100644 --- a/makima/src/server/state.rs +++ b/makima/src/server/state.rs @@ -3,7 +3,7 @@ use std::sync::Arc; use dashmap::DashMap; use sqlx::PgPool; -use tokio::sync::{broadcast, mpsc, Mutex, OnceCell}; +use tokio::sync::{broadcast, mpsc, oneshot, Mutex, OnceCell}; use uuid::Uuid; use crate::listen::{DiarizationConfig, ParakeetEOU, ParakeetTDT, Sortformer}; @@ -188,6 +188,23 @@ pub struct SupervisorQuestionResponse { pub responded_at: chrono::DateTime<chrono::Utc>, } +/// Worktree info response from daemon +#[derive(Debug, Clone, serde::Serialize)] +#[serde(rename_all = "camelCase")] +pub struct WorktreeInfoResponse { + pub task_id: Uuid, + pub success: bool, + pub worktree_path: Option<String>, + pub exists: bool, + pub files_changed: i32, + pub insertions: i32, + pub deletions: i32, + pub files: Option<serde_json::Value>, + pub branch: Option<String>, + pub head_sha: Option<String>, + pub error: Option<String>, +} + /// Command sent from server to daemon. #[derive(Debug, Clone, serde::Serialize)] #[serde(tag = "type", rename_all = "camelCase")] @@ -563,6 +580,8 @@ pub struct AppState { pub tool_keys: DashMap<String, Uuid>, /// JWT verifier for Supabase authentication (None if not configured) pub jwt_verifier: Option<JwtVerifier>, + /// Pending worktree info requests awaiting daemon response (keyed by task_id) + pub pending_worktree_info: DashMap<Uuid, oneshot::Sender<WorktreeInfoResponse>>, } impl AppState { @@ -636,6 +655,7 @@ impl AppState { daemon_connections: DashMap::new(), tool_keys: DashMap::new(), jwt_verifier, + pending_worktree_info: DashMap::new(), } } |
