summaryrefslogtreecommitdiff
path: root/makima/src/server/handlers/mesh_daemon.rs
diff options
context:
space:
mode:
authorsoryu <soryu@soryu.co>2026-01-11 05:52:14 +0000
committersoryu <soryu@soryu.co>2026-01-15 00:21:16 +0000
commit87044a747b47bd83249d61a45842c7f7b2eae56d (patch)
treeef2000ce79ffcc2723ef841acef5aa1deb1d5378 /makima/src/server/handlers/mesh_daemon.rs
parent077820c4167c168072d217a1b01df840463a12a8 (diff)
downloadsoryu-87044a747b47bd83249d61a45842c7f7b2eae56d.tar.gz
soryu-87044a747b47bd83249d61a45842c7f7b2eae56d.zip
Contract system
Diffstat (limited to 'makima/src/server/handlers/mesh_daemon.rs')
-rw-r--r--makima/src/server/handlers/mesh_daemon.rs211
1 files changed, 210 insertions, 1 deletions
diff --git a/makima/src/server/handlers/mesh_daemon.rs b/makima/src/server/handlers/mesh_daemon.rs
index 644d0bc..178e5e1 100644
--- a/makima/src/server/handlers/mesh_daemon.rs
+++ b/makima/src/server/handlers/mesh_daemon.rs
@@ -301,6 +301,17 @@ pub enum DaemonMessage {
#[serde(rename = "taskId")]
task_id: Uuid,
},
+ /// Authentication required - OAuth token expired, provides login URL
+ AuthenticationRequired {
+ /// Task ID that triggered the auth error (if any)
+ #[serde(rename = "taskId")]
+ task_id: Option<Uuid>,
+ /// OAuth login URL for remote authentication
+ #[serde(rename = "loginUrl")]
+ login_url: String,
+ /// Hostname of the daemon requiring auth
+ hostname: Option<String>,
+ },
/// Response to RetryCompletionAction command
CompletionActionResult {
#[serde(rename = "taskId")]
@@ -343,6 +354,21 @@ pub enum DaemonMessage {
#[serde(rename = "targetDir")]
target_dir: String,
},
+ /// Response to ReadRepoFile command
+ RepoFileContent {
+ /// Request ID from the original command
+ #[serde(rename = "requestId")]
+ request_id: Uuid,
+ /// Path to the file that was read
+ #[serde(rename = "filePath")]
+ file_path: String,
+ /// File content (None if error occurred)
+ content: Option<String>,
+ /// Whether the operation succeeded
+ success: bool,
+ /// Error message if operation failed
+ error: Option<String>,
+ },
}
/// Validated daemon authentication result.
@@ -509,6 +535,31 @@ async fn handle_daemon_connection(socket: WebSocket, state: SharedState, auth_re
"Daemon registered"
);
+ // Register daemon in database
+ if let Some(ref pool) = state.db_pool {
+ match repository::register_daemon(
+ pool,
+ owner_id,
+ &connection_id,
+ Some(&hostname),
+ Some(&machine_id),
+ max_concurrent_tasks as i32,
+ ).await {
+ Ok(db_daemon) => {
+ tracing::debug!(
+ daemon_id = %db_daemon.id,
+ "Daemon registered in database"
+ );
+ }
+ Err(e) => {
+ tracing::error!(
+ error = %e,
+ "Failed to register daemon in database"
+ );
+ }
+ }
+ }
+
// Register daemon in state with owner_id
state.register_daemon(
connection_id.clone(),
@@ -718,6 +769,24 @@ async fn handle_daemon_connection(socket: WebSocket, state: SharedState, auth_re
],
updated_by: "daemon".into(),
});
+
+ // Notify supervisor if this task belongs to a contract
+ if let Some(contract_id) = updated_task.contract_id {
+ // Don't notify for supervisor tasks (they don't report to themselves)
+ if !updated_task.is_supervisor {
+ if let Ok(Some(supervisor)) = repository::get_contract_supervisor_task(&pool, contract_id).await {
+ state.notify_supervisor_of_task_completion(
+ supervisor.id,
+ supervisor.daemon_id,
+ updated_task.id,
+ &updated_task.name,
+ &updated_task.status,
+ updated_task.progress_summary.as_deref(),
+ updated_task.error_message.as_deref(),
+ ).await;
+ }
+ }
+ }
}
Ok(None) => {
tracing::warn!(
@@ -763,6 +832,50 @@ async fn handle_daemon_connection(socket: WebSocket, state: SharedState, auth_re
);
state.revoke_tool_key(task_id);
}
+ Ok(DaemonMessage::AuthenticationRequired { task_id, login_url, hostname }) => {
+ tracing::warn!(
+ task_id = ?task_id,
+ login_url = %login_url,
+ hostname = ?hostname,
+ "Daemon requires authentication - OAuth token expired"
+ );
+
+ // Broadcast as task output with auth_required type so UI can display the login link
+ let content = format!(
+ "🔐 Authentication required on daemon{}. Click to login: {}",
+ hostname.as_ref().map(|h| format!(" ({})", h)).unwrap_or_default(),
+ login_url
+ );
+
+ // Broadcast to task subscribers if we have a task_id
+ if let Some(tid) = task_id {
+ tracing::info!(task_id = %tid, "Broadcasting auth_required to task subscribers");
+ state.broadcast_task_output(TaskOutputNotification {
+ task_id: tid,
+ owner_id: Some(owner_id),
+ message_type: "auth_required".to_string(),
+ content: "Authentication required".to_string(), // Constant for dedup
+ tool_name: None,
+ tool_input: Some(serde_json::json!({
+ "loginUrl": login_url,
+ "hostname": hostname,
+ "taskId": tid.to_string(),
+ })),
+ is_error: Some(true),
+ cost_usd: None,
+ duration_ms: None,
+ is_partial: false,
+ });
+ } else {
+ tracing::warn!("No task_id for auth_required - cannot broadcast to specific task");
+ }
+
+ // Also log the full URL for manual use
+ tracing::info!(
+ login_url = %login_url,
+ "OAuth login URL available - user should open this in browser"
+ );
+ }
Ok(DaemonMessage::DaemonDirectories { working_directory, home_directory, worktrees_directory }) => {
tracing::info!(
daemon_id = %daemon_uuid,
@@ -874,6 +987,92 @@ async fn handle_daemon_connection(socket: WebSocket, state: SharedState, auth_re
is_partial: false,
});
}
+ Ok(DaemonMessage::RepoFileContent {
+ request_id,
+ file_path,
+ content,
+ success,
+ error,
+ }) => {
+ tracing::info!(
+ request_id = %request_id,
+ file_path = %file_path,
+ success = success,
+ content_len = content.as_ref().map(|c| c.len()),
+ error = ?error,
+ "Repo file content received from daemon"
+ );
+
+ // The request_id is the file_id we want to update
+ if success {
+ if let (Some(pool), Some(content)) = (&state.db_pool, content) {
+ // Convert markdown to body elements
+ let body = crate::llm::markdown_to_body(&content);
+
+ // Update file in database
+ let update_req = crate::db::models::UpdateFileRequest {
+ name: None,
+ description: None,
+ transcript: None,
+ summary: None,
+ body: Some(body),
+ version: None,
+ repo_file_path: None,
+ };
+
+ match repository::update_file_for_owner(pool, request_id, owner_id, update_req).await {
+ Ok(Some(_file)) => {
+ tracing::info!(
+ file_id = %request_id,
+ "File synced from repository successfully"
+ );
+
+ // Update repo_sync_status to 'synced' and set repo_synced_at
+ if let Err(e) = sqlx::query(
+ "UPDATE files SET repo_sync_status = 'synced', repo_synced_at = NOW() WHERE id = $1"
+ )
+ .bind(request_id)
+ .execute(pool)
+ .await
+ {
+ tracing::warn!(
+ file_id = %request_id,
+ error = %e,
+ "Failed to update repo sync status"
+ );
+ }
+
+ // Broadcast file update notification
+ state.broadcast_file_update(crate::server::state::FileUpdateNotification {
+ file_id: request_id,
+ version: 0, // Will be updated by next fetch
+ updated_fields: vec!["body".to_string(), "repo_sync_status".to_string()],
+ updated_by: "daemon".to_string(),
+ });
+ }
+ Ok(None) => {
+ tracing::warn!(
+ file_id = %request_id,
+ "File not found when syncing from repository"
+ );
+ }
+ Err(e) => {
+ tracing::error!(
+ file_id = %request_id,
+ error = %e,
+ "Failed to update file from repository content"
+ );
+ }
+ }
+ }
+ } else {
+ tracing::warn!(
+ file_id = %request_id,
+ error = ?error,
+ "Daemon failed to read repo file"
+ );
+ }
+ }
Err(e) => {
tracing::warn!("Failed to parse daemon message: {}", e);
}
@@ -913,10 +1112,20 @@ async fn handle_daemon_connection(socket: WebSocket, state: SharedState, auth_re
// Cleanup on disconnect
state.unregister_daemon(&connection_id);
- // Clear daemon_id from any tasks that were running on this daemon
+ // Delete daemon from database and clear tasks
if let Some(ref pool) = state.db_pool {
let pool = pool.clone();
+ let conn_id = connection_id.clone();
tokio::spawn(async move {
+ // Delete daemon from database
+ if let Err(e) = repository::delete_daemon_by_connection(&pool, &conn_id).await {
+ tracing::error!(
+ connection_id = %conn_id,
+ error = %e,
+ "Failed to delete daemon from database"
+ );
+ }
+
// Find tasks assigned to this daemon that are still active
if let Err(e) = clear_daemon_from_tasks(&pool, daemon_uuid).await {
tracing::error!(