diff options
| author | soryu <soryu@soryu.co> | 2026-01-11 05:52:14 +0000 |
|---|---|---|
| committer | soryu <soryu@soryu.co> | 2026-01-15 00:21:16 +0000 |
| commit | 87044a747b47bd83249d61a45842c7f7b2eae56d (patch) | |
| tree | ef2000ce79ffcc2723ef841acef5aa1deb1d5378 /makima/src/server/handlers/mesh_daemon.rs | |
| parent | 077820c4167c168072d217a1b01df840463a12a8 (diff) | |
| download | soryu-87044a747b47bd83249d61a45842c7f7b2eae56d.tar.gz soryu-87044a747b47bd83249d61a45842c7f7b2eae56d.zip | |
Contract system
Diffstat (limited to 'makima/src/server/handlers/mesh_daemon.rs')
| -rw-r--r-- | makima/src/server/handlers/mesh_daemon.rs | 211 |
1 files changed, 210 insertions, 1 deletions
diff --git a/makima/src/server/handlers/mesh_daemon.rs b/makima/src/server/handlers/mesh_daemon.rs index 644d0bc..178e5e1 100644 --- a/makima/src/server/handlers/mesh_daemon.rs +++ b/makima/src/server/handlers/mesh_daemon.rs @@ -301,6 +301,17 @@ pub enum DaemonMessage { #[serde(rename = "taskId")] task_id: Uuid, }, + /// Authentication required - OAuth token expired, provides login URL + AuthenticationRequired { + /// Task ID that triggered the auth error (if any) + #[serde(rename = "taskId")] + task_id: Option<Uuid>, + /// OAuth login URL for remote authentication + #[serde(rename = "loginUrl")] + login_url: String, + /// Hostname of the daemon requiring auth + hostname: Option<String>, + }, /// Response to RetryCompletionAction command CompletionActionResult { #[serde(rename = "taskId")] @@ -343,6 +354,21 @@ pub enum DaemonMessage { #[serde(rename = "targetDir")] target_dir: String, }, + /// Response to ReadRepoFile command + RepoFileContent { + /// Request ID from the original command + #[serde(rename = "requestId")] + request_id: Uuid, + /// Path to the file that was read + #[serde(rename = "filePath")] + file_path: String, + /// File content (None if error occurred) + content: Option<String>, + /// Whether the operation succeeded + success: bool, + /// Error message if operation failed + error: Option<String>, + }, } /// Validated daemon authentication result. @@ -509,6 +535,31 @@ async fn handle_daemon_connection(socket: WebSocket, state: SharedState, auth_re "Daemon registered" ); + // Register daemon in database + if let Some(ref pool) = state.db_pool { + match repository::register_daemon( + pool, + owner_id, + &connection_id, + Some(&hostname), + Some(&machine_id), + max_concurrent_tasks as i32, + ).await { + Ok(db_daemon) => { + tracing::debug!( + daemon_id = %db_daemon.id, + "Daemon registered in database" + ); + } + Err(e) => { + tracing::error!( + error = %e, + "Failed to register daemon in database" + ); + } + } + } + // Register daemon in state with owner_id state.register_daemon( connection_id.clone(), @@ -718,6 +769,24 @@ async fn handle_daemon_connection(socket: WebSocket, state: SharedState, auth_re ], updated_by: "daemon".into(), }); + + // Notify supervisor if this task belongs to a contract + if let Some(contract_id) = updated_task.contract_id { + // Don't notify for supervisor tasks (they don't report to themselves) + if !updated_task.is_supervisor { + if let Ok(Some(supervisor)) = repository::get_contract_supervisor_task(&pool, contract_id).await { + state.notify_supervisor_of_task_completion( + supervisor.id, + supervisor.daemon_id, + updated_task.id, + &updated_task.name, + &updated_task.status, + updated_task.progress_summary.as_deref(), + updated_task.error_message.as_deref(), + ).await; + } + } + } } Ok(None) => { tracing::warn!( @@ -763,6 +832,50 @@ async fn handle_daemon_connection(socket: WebSocket, state: SharedState, auth_re ); state.revoke_tool_key(task_id); } + Ok(DaemonMessage::AuthenticationRequired { task_id, login_url, hostname }) => { + tracing::warn!( + task_id = ?task_id, + login_url = %login_url, + hostname = ?hostname, + "Daemon requires authentication - OAuth token expired" + ); + + // Broadcast as task output with auth_required type so UI can display the login link + let content = format!( + "🔐 Authentication required on daemon{}. Click to login: {}", + hostname.as_ref().map(|h| format!(" ({})", h)).unwrap_or_default(), + login_url + ); + + // Broadcast to task subscribers if we have a task_id + if let Some(tid) = task_id { + tracing::info!(task_id = %tid, "Broadcasting auth_required to task subscribers"); + state.broadcast_task_output(TaskOutputNotification { + task_id: tid, + owner_id: Some(owner_id), + message_type: "auth_required".to_string(), + content: "Authentication required".to_string(), // Constant for dedup + tool_name: None, + tool_input: Some(serde_json::json!({ + "loginUrl": login_url, + "hostname": hostname, + "taskId": tid.to_string(), + })), + is_error: Some(true), + cost_usd: None, + duration_ms: None, + is_partial: false, + }); + } else { + tracing::warn!("No task_id for auth_required - cannot broadcast to specific task"); + } + + // Also log the full URL for manual use + tracing::info!( + login_url = %login_url, + "OAuth login URL available - user should open this in browser" + ); + } Ok(DaemonMessage::DaemonDirectories { working_directory, home_directory, worktrees_directory }) => { tracing::info!( daemon_id = %daemon_uuid, @@ -874,6 +987,92 @@ async fn handle_daemon_connection(socket: WebSocket, state: SharedState, auth_re is_partial: false, }); } + Ok(DaemonMessage::RepoFileContent { + request_id, + file_path, + content, + success, + error, + }) => { + tracing::info!( + request_id = %request_id, + file_path = %file_path, + success = success, + content_len = content.as_ref().map(|c| c.len()), + error = ?error, + "Repo file content received from daemon" + ); + + // The request_id is the file_id we want to update + if success { + if let (Some(pool), Some(content)) = (&state.db_pool, content) { + // Convert markdown to body elements + let body = crate::llm::markdown_to_body(&content); + + // Update file in database + let update_req = crate::db::models::UpdateFileRequest { + name: None, + description: None, + transcript: None, + summary: None, + body: Some(body), + version: None, + repo_file_path: None, + }; + + match repository::update_file_for_owner(pool, request_id, owner_id, update_req).await { + Ok(Some(_file)) => { + tracing::info!( + file_id = %request_id, + "File synced from repository successfully" + ); + + // Update repo_sync_status to 'synced' and set repo_synced_at + if let Err(e) = sqlx::query( + "UPDATE files SET repo_sync_status = 'synced', repo_synced_at = NOW() WHERE id = $1" + ) + .bind(request_id) + .execute(pool) + .await + { + tracing::warn!( + file_id = %request_id, + error = %e, + "Failed to update repo sync status" + ); + } + + // Broadcast file update notification + state.broadcast_file_update(crate::server::state::FileUpdateNotification { + file_id: request_id, + version: 0, // Will be updated by next fetch + updated_fields: vec!["body".to_string(), "repo_sync_status".to_string()], + updated_by: "daemon".to_string(), + }); + } + Ok(None) => { + tracing::warn!( + file_id = %request_id, + "File not found when syncing from repository" + ); + } + Err(e) => { + tracing::error!( + file_id = %request_id, + error = %e, + "Failed to update file from repository content" + ); + } + } + } + } else { + tracing::warn!( + file_id = %request_id, + error = ?error, + "Daemon failed to read repo file" + ); + } + } Err(e) => { tracing::warn!("Failed to parse daemon message: {}", e); } @@ -913,10 +1112,20 @@ async fn handle_daemon_connection(socket: WebSocket, state: SharedState, auth_re // Cleanup on disconnect state.unregister_daemon(&connection_id); - // Clear daemon_id from any tasks that were running on this daemon + // Delete daemon from database and clear tasks if let Some(ref pool) = state.db_pool { let pool = pool.clone(); + let conn_id = connection_id.clone(); tokio::spawn(async move { + // Delete daemon from database + if let Err(e) = repository::delete_daemon_by_connection(&pool, &conn_id).await { + tracing::error!( + connection_id = %conn_id, + error = %e, + "Failed to delete daemon from database" + ); + } + // Find tasks assigned to this daemon that are still active if let Err(e) = clear_daemon_from_tasks(&pool, daemon_uuid).await { tracing::error!( |
