From 78cb861412850889424ae7d5ae5cd952a2b90295 Mon Sep 17 00:00:00 2001 From: soryu Date: Mon, 2 Mar 2026 15:18:31 +0000 Subject: feat: move daemon reauth to daemons page, add contract-backed directive steps, rename Mesh to Exec (#84) * feat: soryu-co/soryu - makima: Rename Mesh to Exec in navigation * WIP: heartbeat checkpoint * WIP: heartbeat checkpoint * WIP: heartbeat checkpoint * feat: soryu-co/soryu - makima: Add contract-backed steps to directive flow * WIP: heartbeat checkpoint --- makima/src/daemon/task/manager.rs | 95 +++++++--- makima/src/daemon/ws/protocol.rs | 26 +++ makima/src/db/models.rs | 9 + makima/src/db/repository.rs | 63 ++++++- makima/src/orchestration/directive.rs | 243 ++++++++++++++++++++++++- makima/src/server/handlers/mesh.rs | 284 +++++++++++++++++++++++++++++- makima/src/server/handlers/mesh_daemon.rs | 31 ++++ makima/src/server/mod.rs | 3 + makima/src/server/state.rs | 60 +++++++ 9 files changed, 785 insertions(+), 29 deletions(-) (limited to 'makima/src') diff --git a/makima/src/daemon/task/manager.rs b/makima/src/daemon/task/manager.rs index b382507..addcd71 100644 --- a/makima/src/daemon/task/manager.rs +++ b/makima/src/daemon/task/manager.rs @@ -1886,6 +1886,66 @@ impl TaskManager { tracing::info!("Daemon restart: exiting process with code 42 (restart requested)"); std::process::exit(42); } + DaemonCommand::TriggerReauth { request_id } => { + tracing::info!(request_id = %request_id, "Received reauth trigger command from server"); + let claude_command = self.process_manager.claude_command().to_string(); + let ws_tx = self.ws_tx.clone(); + + // Spawn in a task so it doesn't block command handling + tokio::spawn(async move { + match get_oauth_login_url(&claude_command).await { + Some(login_url) => { + tracing::info!(request_id = %request_id, login_url = %login_url, "Got OAuth login URL for reauth"); + let msg = DaemonMessage::ReauthStatus { + request_id, + status: "url_ready".to_string(), + login_url: Some(login_url), + error: None, + }; + let _ = ws_tx.send(msg).await; + } + None => { + tracing::error!(request_id = %request_id, "Failed to get OAuth login URL for reauth"); + let msg = DaemonMessage::ReauthStatus { + request_id, + status: "failed".to_string(), + login_url: None, + error: Some("Failed to get OAuth login URL from setup-token".to_string()), + }; + let _ = ws_tx.send(msg).await; + } + } + }); + } + DaemonCommand::SubmitAuthCode { request_id, code } => { + tracing::info!(request_id = %request_id, "Received auth code submission from server"); + let ws_tx = self.ws_tx.clone(); + + if send_auth_code(&code) { + tracing::info!(request_id = %request_id, "Auth code forwarded to setup-token for reauth"); + // Wait a short time then report completion + // (the setup-token process takes a moment to complete) + tokio::spawn(async move { + tokio::time::sleep(std::time::Duration::from_secs(3)).await; + let msg = DaemonMessage::ReauthStatus { + request_id, + status: "completed".to_string(), + login_url: None, + error: None, + }; + let _ = ws_tx.send(msg).await; + }); + } else { + tracing::warn!(request_id = %request_id, "No pending auth flow to receive code for reauth"); + let msg = DaemonMessage::ReauthStatus { + request_id, + status: "failed".to_string(), + login_url: None, + error: Some("No pending auth flow to receive the code. Try triggering reauth again.".to_string()), + }; + let _ = self.ws_tx.send(msg).await; + } + } DaemonCommand::ApplyPatchToWorktree { target_task_id, source_task_id, @@ -5260,32 +5320,19 @@ impl TaskManagerInner { break; } - // Detect OAuth token expiration and trigger remote login flow + // Detect OAuth token expiration - log warning and let the task fail normally. + // Users can reauthorize via the Daemons page instead. if !auth_error_handled && is_oauth_auth_error(&content_for_auth_check, json_type_for_auth_check.as_deref(), is_stdout_for_auth_check) { auth_error_handled = true; - tracing::warn!(task_id = %task_id, "OAuth authentication error detected, initiating remote login flow"); - - // Spawn claude setup-token to get login URL - if let Some(login_url) = get_oauth_login_url(&claude_command).await { - tracing::info!(task_id = %task_id, login_url = %login_url, "Got OAuth login URL"); - let auth_msg = DaemonMessage::AuthenticationRequired { - task_id: Some(task_id), - login_url, - hostname: daemon_hostname.clone(), - }; - if ws_tx.send(auth_msg).await.is_err() { - tracing::warn!(task_id = %task_id, "Failed to send auth required message"); - } - } else { - tracing::error!(task_id = %task_id, "Failed to get OAuth login URL from setup-token"); - let fallback_msg = DaemonMessage::task_output( - task_id, - format!("Authentication required on daemon{}. Please run 'claude /login' on the daemon machine.\n", - daemon_hostname.as_ref().map(|h| format!(" ({})", h)).unwrap_or_default()), - false, - ); - let _ = ws_tx.send(fallback_msg).await; - } + tracing::warn!(task_id = %task_id, "OAuth authentication error detected - task will fail. Reauthorize via Daemons page."); + + let error_msg = DaemonMessage::task_output( + task_id, + format!("⚠ Authentication expired on daemon{}. Go to Daemons page to reauthorize, then retry this task.\n", + daemon_hostname.as_ref().map(|h| format!(" ({})", h)).unwrap_or_default()), + false, + ); + let _ = ws_tx.send(error_msg).await; } } None => { diff --git a/makima/src/daemon/ws/protocol.rs b/makima/src/daemon/ws/protocol.rs index 834e139..bed5ffd 100644 --- a/makima/src/daemon/ws/protocol.rs +++ b/makima/src/daemon/ws/protocol.rs @@ -129,6 +129,19 @@ pub enum DaemonMessage { hostname: Option, }, + /// Reauth status update (response to TriggerReauth/SubmitAuthCode commands). + ReauthStatus { + #[serde(rename = "requestId")] + request_id: Uuid, + /// Status: "url_ready", "completed", "failed" + status: String, + /// OAuth login URL (present when status is "url_ready") + #[serde(rename = "loginUrl")] + login_url: Option, + /// Error message (present when status is "failed") + error: Option, + }, + // ========================================================================= // Merge Response Messages (sent by daemon after processing merge commands) // ========================================================================= @@ -787,6 +800,19 @@ pub enum DaemonCommand { /// Restart the daemon process. RestartDaemon, + /// Trigger OAuth re-authentication on this daemon. + TriggerReauth { + #[serde(rename = "requestId")] + request_id: Uuid, + }, + + /// Submit auth code for pending reauth. + SubmitAuthCode { + #[serde(rename = "requestId")] + request_id: Uuid, + code: String, + }, + /// Apply a patch to a task's worktree (for cross-daemon merge). /// Sent by server when routing MergePatchToSupervisor to the supervisor's daemon. ApplyPatchToWorktree { diff --git a/makima/src/db/models.rs b/makima/src/db/models.rs index 6b77563..6292e7b 100644 --- a/makima/src/db/models.rs +++ b/makima/src/db/models.rs @@ -2746,6 +2746,11 @@ pub struct DirectiveStep { /// Status: pending, ready, running, completed, failed, skipped pub status: String, pub task_id: Option, + /// Optional contract ID for contract-backed execution. + pub contract_id: Option, + /// Optional contract type (e.g. "simple", "specification", "execute"). + /// When set, the orchestrator creates a contract instead of a standalone task. + pub contract_type: Option, pub order_index: i32, pub generation: i32, pub started_at: Option>, @@ -2871,6 +2876,10 @@ pub struct CreateDirectiveStepRequest { /// Optional order ID to auto-link this step to an order. #[serde(default)] pub order_id: Option, + /// Optional: create a contract for this step instead of a standalone task. + /// Valid values: "simple", "specification", "execute" + #[serde(default)] + pub contract_type: Option, } /// Request to update a directive step. diff --git a/makima/src/db/repository.rs b/makima/src/db/repository.rs index 8d7a70c..1af22f6 100644 --- a/makima/src/db/repository.rs +++ b/makima/src/db/repository.rs @@ -5402,10 +5402,11 @@ pub async fn create_directive_step( ) -> Result { let generation = req.generation.unwrap_or(1); let order_id = req.order_id; + let contract_type = req.contract_type.clone(); let step = sqlx::query_as::<_, DirectiveStep>( r#" - INSERT INTO directive_steps (directive_id, name, description, task_plan, depends_on, order_index, generation) - VALUES ($1, $2, $3, $4, $5, $6, $7) + INSERT INTO directive_steps (directive_id, name, description, task_plan, depends_on, order_index, generation, contract_type) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8) RETURNING * "#, ) @@ -5416,6 +5417,7 @@ pub async fn create_directive_step( .bind(&req.depends_on) .bind(req.order_index) .bind(generation) + .bind(&contract_type) .fetch_one(pool) .await?; @@ -5727,6 +5729,8 @@ pub struct StepForDispatch { pub order_index: i32, pub generation: i32, pub depends_on: Vec, + /// Optional contract type — when set, orchestrator creates a contract instead of a task. + pub contract_type: Option, // Directive fields pub owner_id: Uuid, pub directive_title: String, @@ -5751,6 +5755,7 @@ pub async fn get_ready_steps_for_dispatch( ds.order_index, ds.generation, ds.depends_on, + ds.contract_type, d.owner_id, d.title AS directive_title, d.repository_url, @@ -5760,6 +5765,7 @@ pub async fn get_ready_steps_for_dispatch( JOIN directives d ON d.id = ds.directive_id WHERE ds.status = 'ready' AND ds.task_id IS NULL + AND ds.contract_id IS NULL AND d.status = 'active' ORDER BY ds.order_index "#, @@ -5831,6 +5837,39 @@ pub async fn get_running_steps_with_tasks( JOIN tasks t ON t.id = ds.task_id WHERE ds.status = 'running' AND ds.task_id IS NOT NULL + AND ds.contract_id IS NULL + "#, + ) + .fetch_all(pool) + .await +} + +/// A running step backed by a contract, joined with the contract's current status. +#[derive(Debug, Clone, sqlx::FromRow)] +pub struct RunningStepWithContract { + pub step_id: Uuid, + pub directive_id: Uuid, + pub contract_id: Uuid, + pub contract_status: String, + pub contract_phase: String, +} + +/// Get running steps that are backed by contracts (for contract-based monitoring). +pub async fn get_running_steps_with_contracts( + pool: &PgPool, +) -> Result, sqlx::Error> { + sqlx::query_as::<_, RunningStepWithContract>( + r#" + SELECT + ds.id AS step_id, + ds.directive_id, + ds.contract_id AS "contract_id!", + c.status AS contract_status, + c.phase AS contract_phase + FROM directive_steps ds + JOIN contracts c ON c.id = ds.contract_id + WHERE ds.status = 'running' + AND ds.contract_id IS NOT NULL "#, ) .fetch_all(pool) @@ -5995,6 +6034,26 @@ pub async fn link_task_to_step( Ok(()) } +/// Link a contract to a directive step. +pub async fn link_contract_to_step( + pool: &PgPool, + step_id: Uuid, + contract_id: Uuid, +) -> Result<(), sqlx::Error> { + sqlx::query( + r#" + UPDATE directive_steps + SET contract_id = $1 + WHERE id = $2 + "#, + ) + .bind(contract_id) + .bind(step_id) + .execute(pool) + .await?; + Ok(()) +} + /// Set a step to 'running' status (after its task has been dispatched). pub async fn set_step_running( pool: &PgPool, diff --git a/makima/src/orchestration/directive.rs b/makima/src/orchestration/directive.rs index 98690bb..155cfad 100644 --- a/makima/src/orchestration/directive.rs +++ b/makima/src/orchestration/directive.rs @@ -11,7 +11,7 @@ use uuid::Uuid; use base64::Engine; -use crate::db::models::{CreateTaskRequest, UpdateTaskRequest}; +use crate::db::models::{CreateContractRequest, CreateTaskRequest, UpdateContractRequest, UpdateTaskRequest}; use crate::db::repository; use crate::server::state::{DaemonCommand, SharedState}; @@ -86,6 +86,42 @@ impl DirectiveOrchestrator { let steps = repository::get_ready_steps_for_dispatch(&self.pool).await?; for step in steps { + // If the step has a contract_type, create a contract instead of a standalone task + if step.contract_type.is_some() { + tracing::info!( + step_id = %step.step_id, + directive_id = %step.directive_id, + step_name = %step.step_name, + contract_type = ?step.contract_type, + "Spawning contract for contract-backed step" + ); + + match self + .spawn_step_contract( + step.step_id, + step.directive_id, + step.owner_id, + &step.step_name, + step.step_description.as_deref(), + step.task_plan.as_deref(), + step.contract_type.as_deref().unwrap_or("simple"), + step.repository_url.as_deref(), + step.base_branch.as_deref(), + ) + .await + { + Ok(()) => {} + Err(e) => { + tracing::warn!( + step_id = %step.step_id, + error = %e, + "Failed to spawn contract for step" + ); + } + } + continue; + } + tracing::info!( step_id = %step.step_id, directive_id = %step.directive_id, @@ -218,7 +254,70 @@ impl DirectiveOrchestrator { /// Phase 3: Monitor running steps and orchestrator tasks. async fn phase_monitoring(&self) -> Result<(), anyhow::Error> { - // Check running steps + // Check contract-backed running steps first + let contract_steps = repository::get_running_steps_with_contracts(&self.pool).await?; + + for step in contract_steps { + if let Err(e) = async { + match step.contract_status.as_str() { + "completed" | "archived" => { + tracing::info!( + step_id = %step.step_id, + directive_id = %step.directive_id, + contract_id = %step.contract_id, + contract_status = %step.contract_status, + "Contract-backed step contract completed — updating step to completed" + ); + let update = crate::db::models::UpdateDirectiveStepRequest { + status: Some("completed".to_string()), + ..Default::default() + }; + repository::update_directive_step(&self.pool, step.step_id, update).await?; + + // Mark linked orders as done + if let Ok(linked_orders) = repository::get_orders_by_step_id(&self.pool, step.step_id).await { + for order in linked_orders { + if order.status != "done" && order.status != "archived" { + let order_update = crate::db::models::UpdateOrderRequest { + status: Some("done".to_string()), + ..Default::default() + }; + let _ = repository::update_order(&self.pool, order.owner_id, order.id, order_update).await; + } + } + } + + repository::advance_directive_ready_steps(&self.pool, step.directive_id) + .await?; + repository::check_directive_idle(&self.pool, step.directive_id).await?; + } + "active" => { + // Contract still active — check if the supervisor has failed + // by looking at whether there are any failed tasks with no active tasks remaining + tracing::debug!( + step_id = %step.step_id, + contract_id = %step.contract_id, + contract_phase = %step.contract_phase, + "Contract-backed step still active — monitoring" + ); + } + _ => { + // Unknown status — log and skip + tracing::debug!( + step_id = %step.step_id, + contract_id = %step.contract_id, + contract_status = %step.contract_status, + "Contract-backed step in unexpected status" + ); + } + } + Ok::<(), anyhow::Error>(()) + }.await { + tracing::warn!(step_id = %step.step_id, error = %e, "Error processing contract-backed step — continuing"); + } + } + + // Check task-backed running steps (excludes contract-backed steps) let running = repository::get_running_steps_with_tasks(&self.pool).await?; for step in running { @@ -505,6 +604,142 @@ impl DirectiveOrchestrator { Ok(()) } + /// Spawn a contract for a contract-backed step. + /// Creates a contract, adds the directive's repository to it, links it to the step, + /// creates a supervisor task, and marks the step as running. + async fn spawn_step_contract( + &self, + step_id: Uuid, + directive_id: Uuid, + owner_id: Uuid, + step_name: &str, + step_description: Option<&str>, + task_plan: Option<&str>, + contract_type: &str, + repo_url: Option<&str>, + base_branch: Option<&str>, + ) -> Result<(), anyhow::Error> { + // Build contract description from step info + let description = match (step_description, task_plan) { + (Some(desc), Some(plan)) => Some(format!("{}\n\n{}", desc, plan)), + (Some(desc), None) => Some(desc.to_string()), + (None, Some(plan)) => Some(plan.to_string()), + (None, None) => None, + }; + + // Create the contract + let contract_req = CreateContractRequest { + name: step_name.to_string(), + description, + contract_type: Some(contract_type.to_string()), + template_id: None, + initial_phase: None, + autonomous_loop: Some(true), + phase_guard: None, + local_only: None, + auto_merge_local: None, + }; + + let contract = repository::create_contract_for_owner(&self.pool, owner_id, contract_req).await?; + + tracing::info!( + step_id = %step_id, + contract_id = %contract.id, + contract_type = %contract.contract_type, + "Created contract for directive step" + ); + + // Link the contract to the step + repository::link_contract_to_step(&self.pool, step_id, contract.id).await?; + + // Add the directive's repository to the contract (if available) + if let Some(url) = repo_url { + if let Err(e) = repository::add_remote_repository( + &self.pool, + contract.id, + step_name, + url, + true, // is_primary + ) + .await + { + tracing::warn!( + contract_id = %contract.id, + error = %e, + "Failed to add repository to contract — continuing without it" + ); + } + } + + // Create supervisor task for the contract (following the pattern from contract handlers) + let supervisor_name = format!("{} Supervisor", step_name); + let supervisor_plan = format!( + "You are the supervisor for contract '{}'. Your goal is to drive this contract to completion.\n\n{}", + step_name, + contract.description.as_deref().unwrap_or("No description provided.") + ); + + let supervisor_req = CreateTaskRequest { + name: supervisor_name.clone(), + description: None, + plan: supervisor_plan.clone(), + repository_url: repo_url.map(|s| s.to_string()), + base_branch: base_branch.map(|s| s.to_string()), + target_branch: None, + parent_task_id: None, + contract_id: Some(contract.id), + target_repo_path: None, + completion_action: None, + continue_from_task_id: None, + copy_files: None, + is_supervisor: true, + checkpoint_sha: None, + priority: 0, + merge_mode: None, + branched_from_task_id: None, + conversation_history: None, + supervisor_worktree_task_id: None, + directive_id: Some(directive_id), + directive_step_id: Some(step_id), + }; + + let supervisor_task = repository::create_task_for_owner(&self.pool, owner_id, supervisor_req).await?; + + tracing::info!( + contract_id = %contract.id, + supervisor_task_id = %supervisor_task.id, + "Created supervisor task for contract-backed step" + ); + + // Link supervisor task to contract + let update_req = UpdateContractRequest { + supervisor_task_id: Some(supervisor_task.id), + version: Some(contract.version), + ..Default::default() + }; + if let Err(e) = repository::update_contract_for_owner(&self.pool, contract.id, owner_id, update_req).await { + tracing::warn!( + contract_id = %contract.id, + error = %e, + "Failed to link supervisor task to contract" + ); + } + + // Try to dispatch the supervisor task to a daemon + if self + .try_dispatch_task(supervisor_task.id, owner_id, &supervisor_task.name, &supervisor_task.plan, supervisor_task.version) + .await + { + repository::set_step_running(&self.pool, step_id).await?; + } else { + // Even if dispatch fails, mark step as running since contract is created. + // The supervisor task will be retried by the pending task retry logic. + repository::set_step_running(&self.pool, step_id).await?; + } + + Ok(()) + } + /// Try to dispatch a task to an available daemon. Returns true if dispatched. async fn try_dispatch_task( &self, @@ -1337,6 +1572,10 @@ For each step, define: - orderIndex: Execution phase number. Steps only start after ALL steps with a lower orderIndex complete. Steps with the same orderIndex run in parallel. Use ascending values (0, 1, 2, ...) to create sequential phases. Use dependsOn for fine-grained control within the same phase. +- contractType (OPTIONAL): For large, complex work items, set this to create a full contract instead of a + standalone task. Valid values: "simple" (Plan → Execute), "specification" (Research → Specify → Plan → Execute → Review), + "execute" (Execute only). Only use this for steps that truly need multi-phase orchestration. + Most steps should NOT use this — standalone tasks are the default and preferred for typical work. Submit steps: makima directive add-step "Step Name" --description "..." --task-plan "..." diff --git a/makima/src/server/handlers/mesh.rs b/makima/src/server/handlers/mesh.rs index c840676..0e72bdf 100644 --- a/makima/src/server/handlers/mesh.rs +++ b/makima/src/server/handlers/mesh.rs @@ -20,7 +20,7 @@ use crate::db::models::{ use crate::db::repository::{self, RepositoryError}; use crate::server::auth::Authenticated; use crate::server::messages::ApiError; -use crate::server::state::{DaemonCommand, SharedState, TaskUpdateNotification}; +use crate::server::state::{DaemonCommand, DaemonReauthStatus, SharedState, TaskUpdateNotification}; // ============================================================================= // Authentication Types @@ -4283,3 +4283,285 @@ pub async fn restart_daemon( }) .into_response() } + +// ============================================================================= +// Daemon Reauthorization +// ============================================================================= + +/// Response from the trigger reauth endpoint. +#[derive(Debug, serde::Serialize, utoipa::ToSchema)] +pub struct TriggerReauthResponse { + /// Whether the reauth command was sent successfully. + pub success: bool, + /// The daemon ID that received the reauth command. + #[serde(rename = "daemonId")] + pub daemon_id: Uuid, + /// Unique request ID for tracking this reauth flow. + #[serde(rename = "requestId")] + pub request_id: Uuid, +} + +/// Request body for submitting an auth code. +#[derive(Debug, serde::Deserialize, utoipa::ToSchema)] +pub struct SubmitAuthCodeRequest { + /// The auth code obtained from the OAuth login flow. + pub code: String, + /// The request ID from the trigger reauth response. + #[serde(rename = "requestId")] + pub request_id: Uuid, +} + +/// Response from the submit auth code endpoint. +#[derive(Debug, serde::Serialize, utoipa::ToSchema)] +pub struct SubmitAuthCodeResponse { + /// Whether the auth code was sent to the daemon successfully. + pub success: bool, +} + +/// Response from the reauth status polling endpoint. +#[derive(Debug, serde::Serialize, utoipa::ToSchema)] +pub struct ReauthStatusResponse { + /// Current status of the reauth flow: "pending", "url_ready", "completed", "failed" + pub status: String, + /// OAuth login URL (present when status is "url_ready") + #[serde(rename = "loginUrl", skip_serializing_if = "Option::is_none")] + pub login_url: Option, + /// Error message (present when status is "failed") + #[serde(skip_serializing_if = "Option::is_none")] + pub error: Option, +} + +/// Trigger OAuth re-authentication on a daemon. +/// +/// Sends a reauth command to the specified daemon, which will spawn `claude setup-token` +/// and return the OAuth login URL via a status update. +#[utoipa::path( + post, + path = "/api/v1/mesh/daemons/{id}/reauth", + params( + ("id" = Uuid, Path, description = "Daemon ID") + ), + responses( + (status = 200, description = "Reauth command sent", body = TriggerReauthResponse), + (status = 401, description = "Unauthorized", body = ApiError), + (status = 404, description = "Daemon not found or not connected", body = ApiError), + (status = 503, description = "Database not configured", body = ApiError), + (status = 500, description = "Internal server error", body = ApiError), + ), + security( + ("bearer_auth" = []), + ("api_key" = []) + ), + tag = "Mesh" +)] +pub async fn trigger_daemon_reauth( + State(state): State, + Authenticated(auth): Authenticated, + Path(id): Path, +) -> impl IntoResponse { + let Some(ref pool) = state.db_pool else { + return ( + StatusCode::SERVICE_UNAVAILABLE, + Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")), + ) + .into_response(); + }; + + // Verify the daemon exists and belongs to this owner + match repository::get_daemon_for_owner(pool, id, auth.owner_id).await { + Ok(Some(_)) => {} + Ok(None) => { + return ( + StatusCode::NOT_FOUND, + Json(ApiError::new("NOT_FOUND", "Daemon not found")), + ) + .into_response(); + } + Err(e) => { + tracing::error!("Failed to get daemon {}: {}", id, e); + return ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(ApiError::new("DB_ERROR", e.to_string())), + ) + .into_response(); + } + } + + // Check if daemon is connected + if !state.is_daemon_connected(id) { + return ( + StatusCode::NOT_FOUND, + Json(ApiError::new( + "DAEMON_NOT_CONNECTED", + "Daemon is not currently connected", + )), + ) + .into_response(); + } + + // Generate a unique request ID for this reauth flow + let request_id = Uuid::new_v4(); + + // Initialize the status as "pending" + state.set_daemon_reauth_status(id, request_id, "pending".to_string(), None, None); + + // Send reauth command to daemon + let command = DaemonCommand::TriggerReauth { request_id }; + if let Err(e) = state.send_daemon_command(id, command).await { + tracing::error!("Failed to send reauth command to daemon {}: {}", id, e); + return ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(ApiError::new("DAEMON_ERROR", e)), + ) + .into_response(); + } + + tracing::info!( + daemon_id = %id, + request_id = %request_id, + owner_id = %auth.owner_id, + "Reauth command sent to daemon" + ); + + Json(TriggerReauthResponse { + success: true, + daemon_id: id, + request_id, + }) + .into_response() +} + +/// Submit an OAuth auth code to a daemon's pending reauth flow. +/// +/// Sends the auth code to the daemon, which will forward it to the `claude setup-token` process. +#[utoipa::path( + post, + path = "/api/v1/mesh/daemons/{id}/reauth/code", + params( + ("id" = Uuid, Path, description = "Daemon ID") + ), + request_body = SubmitAuthCodeRequest, + responses( + (status = 200, description = "Auth code submitted", body = SubmitAuthCodeResponse), + (status = 401, description = "Unauthorized", body = ApiError), + (status = 404, description = "Daemon not found or not connected", body = ApiError), + (status = 503, description = "Database not configured", body = ApiError), + (status = 500, description = "Internal server error", body = ApiError), + ), + security( + ("bearer_auth" = []), + ("api_key" = []) + ), + tag = "Mesh" +)] +pub async fn submit_daemon_auth_code( + State(state): State, + Authenticated(auth): Authenticated, + Path(id): Path, + Json(body): Json, +) -> impl IntoResponse { + let Some(ref pool) = state.db_pool else { + return ( + StatusCode::SERVICE_UNAVAILABLE, + Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")), + ) + .into_response(); + }; + + // Verify the daemon exists and belongs to this owner + match repository::get_daemon_for_owner(pool, id, auth.owner_id).await { + Ok(Some(_)) => {} + Ok(None) => { + return ( + StatusCode::NOT_FOUND, + Json(ApiError::new("NOT_FOUND", "Daemon not found")), + ) + .into_response(); + } + Err(e) => { + tracing::error!("Failed to get daemon {}: {}", id, e); + return ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(ApiError::new("DB_ERROR", e.to_string())), + ) + .into_response(); + } + } + + // Check if daemon is connected + if !state.is_daemon_connected(id) { + return ( + StatusCode::NOT_FOUND, + Json(ApiError::new( + "DAEMON_NOT_CONNECTED", + "Daemon is not currently connected", + )), + ) + .into_response(); + } + + // Send auth code command to daemon + let command = DaemonCommand::SubmitAuthCode { + request_id: body.request_id, + code: body.code, + }; + if let Err(e) = state.send_daemon_command(id, command).await { + tracing::error!("Failed to send auth code to daemon {}: {}", id, e); + return ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(ApiError::new("DAEMON_ERROR", e)), + ) + .into_response(); + } + + tracing::info!( + daemon_id = %id, + request_id = %body.request_id, + owner_id = %auth.owner_id, + "Auth code submitted to daemon" + ); + + Json(SubmitAuthCodeResponse { success: true }).into_response() +} + +/// Get the status of a daemon reauth request. +/// +/// Used by the frontend to poll for reauth status updates. +#[utoipa::path( + get, + path = "/api/v1/mesh/daemons/{id}/reauth/{request_id}/status", + params( + ("id" = Uuid, Path, description = "Daemon ID"), + ("request_id" = Uuid, Path, description = "Reauth request ID") + ), + responses( + (status = 200, description = "Reauth status", body = ReauthStatusResponse), + (status = 401, description = "Unauthorized", body = ApiError), + (status = 404, description = "Reauth request not found", body = ApiError), + (status = 503, description = "Database not configured", body = ApiError), + ), + security( + ("bearer_auth" = []), + ("api_key" = []) + ), + tag = "Mesh" +)] +pub async fn get_daemon_reauth_status( + State(state): State, + Authenticated(_auth): Authenticated, + Path((id, request_id)): Path<(Uuid, Uuid)>, +) -> impl IntoResponse { + match state.get_daemon_reauth_status(id, request_id) { + Some(status) => Json(ReauthStatusResponse { + status: status.status, + login_url: status.login_url, + error: status.error, + }) + .into_response(), + None => ( + StatusCode::NOT_FOUND, + Json(ApiError::new("NOT_FOUND", "Reauth request not found")), + ) + .into_response(), + } +} diff --git a/makima/src/server/handlers/mesh_daemon.rs b/makima/src/server/handlers/mesh_daemon.rs index 743a1ca..30439a4 100644 --- a/makima/src/server/handlers/mesh_daemon.rs +++ b/makima/src/server/handlers/mesh_daemon.rs @@ -350,6 +350,18 @@ pub enum DaemonMessage { /// Hostname of the daemon requiring auth hostname: Option, }, + /// Reauth status update (response to TriggerReauth/SubmitAuthCode commands) + ReauthStatus { + #[serde(rename = "requestId")] + request_id: Uuid, + /// Status: "url_ready", "completed", "failed" + status: String, + /// OAuth login URL (present when status is "url_ready") + #[serde(rename = "loginUrl")] + login_url: Option, + /// Error message (present when status is "failed") + error: Option, + }, /// Response to RetryCompletionAction command CompletionActionResult { #[serde(rename = "taskId")] @@ -1622,6 +1634,25 @@ async fn handle_daemon_connection(socket: WebSocket, state: SharedState, auth_re "OAuth login URL available - user should open this in browser" ); } + Ok(DaemonMessage::ReauthStatus { request_id, status, login_url, error }) => { + tracing::info!( + daemon_id = %daemon_uuid, + request_id = %request_id, + status = %status, + login_url = ?login_url, + error = ?error, + "Daemon reauth status update" + ); + + // Store the reauth status for polling by the frontend + state.set_daemon_reauth_status( + daemon_uuid, + request_id, + status.clone(), + login_url.clone(), + error.clone(), + ); + } Ok(DaemonMessage::DaemonDirectories { working_directory, home_directory, worktrees_directory }) => { tracing::info!( daemon_id = %daemon_uuid, diff --git a/makima/src/server/mod.rs b/makima/src/server/mod.rs index e0f8e7d..b84b90e 100644 --- a/makima/src/server/mod.rs +++ b/makima/src/server/mod.rs @@ -98,6 +98,9 @@ pub fn make_router(state: SharedState) -> Router { .route("/mesh/daemons/directories", get(mesh::get_daemon_directories)) .route("/mesh/daemons/{id}", get(mesh::get_daemon)) .route("/mesh/daemons/{id}/restart", post(mesh::restart_daemon)) + .route("/mesh/daemons/{id}/reauth", post(mesh::trigger_daemon_reauth)) + .route("/mesh/daemons/{id}/reauth/code", post(mesh::submit_daemon_auth_code)) + .route("/mesh/daemons/{id}/reauth/{request_id}/status", get(mesh::get_daemon_reauth_status)) // Merge endpoints for orchestrators .route("/mesh/tasks/{id}/branches", get(mesh_merge::list_branches)) .route("/mesh/tasks/{id}/merge/start", post(mesh_merge::merge_start)) diff --git a/makima/src/server/state.rs b/makima/src/server/state.rs index 15fec6b..5c5e24f 100644 --- a/makima/src/server/state.rs +++ b/makima/src/server/state.rs @@ -521,6 +521,19 @@ pub enum DaemonCommand { /// Restart the daemon process RestartDaemon, + /// Trigger OAuth re-authentication on this daemon + TriggerReauth { + #[serde(rename = "requestId")] + request_id: Uuid, + }, + + /// Submit auth code for pending reauth + SubmitAuthCode { + #[serde(rename = "requestId")] + request_id: Uuid, + code: String, + }, + /// Apply a patch to a task's worktree (for cross-daemon merge). /// Sent by server when routing MergePatchToSupervisor to the supervisor's daemon. ApplyPatchToWorktree { @@ -562,6 +575,15 @@ pub struct DaemonConnectionInfo { pub worktrees_directory: Option, } +/// Status of a daemon reauth request (stored in state for polling). +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] +pub struct DaemonReauthStatus { + pub request_id: Uuid, + pub status: String, + pub login_url: Option, + pub error: Option, +} + /// Configuration paths for ML models (used for lazy loading). #[derive(Clone)] pub struct ModelConfig { @@ -616,6 +638,8 @@ pub struct AppState { pub pending_worktree_info: DashMap>, /// Lazily-loaded TTS engine (initialized on first Speak connection) pub tts_engine: OnceCell>, + /// Daemon reauth status storage (keyed by (daemon_id, request_id)) + pub daemon_reauth_status: DashMap<(Uuid, Uuid), DaemonReauthStatus>, } impl AppState { @@ -694,6 +718,7 @@ impl AppState { jwt_verifier, pending_worktree_info: DashMap::new(), tts_engine: OnceCell::new(), + daemon_reauth_status: DashMap::new(), } } @@ -1200,6 +1225,41 @@ impl AppState { tracing::info!(task_id = %task_id, "Revoked tool key"); } + // ========================================================================= + // Daemon Reauth Status + // ========================================================================= + + /// Store a daemon reauth status update (from daemon's ReauthStatus message). + pub fn set_daemon_reauth_status( + &self, + daemon_id: Uuid, + request_id: Uuid, + status: String, + login_url: Option, + error: Option, + ) { + self.daemon_reauth_status.insert( + (daemon_id, request_id), + DaemonReauthStatus { + request_id, + status, + login_url, + error, + }, + ); + } + + /// Get a daemon reauth status (for frontend polling). + pub fn get_daemon_reauth_status( + &self, + daemon_id: Uuid, + request_id: Uuid, + ) -> Option { + self.daemon_reauth_status + .get(&(daemon_id, request_id)) + .map(|entry| entry.value().clone()) + } + // ========================================================================= // Supervisor Notifications // ========================================================================= -- cgit v1.2.3