From 87044a747b47bd83249d61a45842c7f7b2eae56d Mon Sep 17 00:00:00 2001 From: soryu Date: Sun, 11 Jan 2026 05:52:14 +0000 Subject: Contract system --- makima/src/server/handlers/mesh.rs | 105 +++++++++++++++++++++++++++++++++++-- 1 file changed, 101 insertions(+), 4 deletions(-) (limited to 'makima/src/server/handlers/mesh.rs') diff --git a/makima/src/server/handlers/mesh.rs b/makima/src/server/handlers/mesh.rs index 760740c..2d90a04 100644 --- a/makima/src/server/handlers/mesh.rs +++ b/makima/src/server/handlers/mesh.rs @@ -214,7 +214,27 @@ pub async fn create_task( }; match repository::create_task_for_owner(pool, auth.owner_id, req).await { - Ok(task) => (StatusCode::CREATED, Json(task)).into_response(), + Ok(task) => { + // Notify supervisor of new task creation if task belongs to a contract + if let Some(contract_id) = task.contract_id { + if !task.is_supervisor { + let pool = pool.clone(); + let state_clone = state.clone(); + let task_clone = task.clone(); + tokio::spawn(async move { + if let Ok(Some(supervisor)) = repository::get_contract_supervisor_task(&pool, contract_id).await { + state_clone.notify_supervisor_of_task_created( + supervisor.id, + supervisor.daemon_id, + task_clone.id, + &task_clone.name, + ).await; + } + }); + } + } + (StatusCode::CREATED, Json(task)).into_response() + } Err(e) => { tracing::error!("Failed to create task: {}", e); ( @@ -262,6 +282,26 @@ pub async fn update_task( .into_response(); }; + // Check if trying to set a supervisor task to a terminal status + if let Some(ref new_status) = req.status { + let terminal_statuses = ["done", "failed", "merged"]; + if terminal_statuses.contains(&new_status.as_str()) { + // Get the task to check if it's a supervisor + if let Ok(Some(task)) = repository::get_task_for_owner(pool, id, auth.owner_id).await { + if task.is_supervisor { + return ( + StatusCode::BAD_REQUEST, + Json(ApiError::new( + "SUPERVISOR_CANNOT_COMPLETE", + "Supervisor tasks cannot be marked as done, failed, or merged. They run for the lifetime of the contract.", + )), + ) + .into_response(); + } + } + } + } + // Track which fields are being updated for the notification let mut updated_fields = Vec::new(); if req.name.is_some() { @@ -288,6 +328,8 @@ pub async fn update_task( match repository::update_task_for_owner(pool, id, auth.owner_id, req).await { Ok(Some(task)) => { + let updated_fields_clone = updated_fields.clone(); + // Broadcast task update notification state.broadcast_task_update(TaskUpdateNotification { task_id: task.id, @@ -297,6 +339,28 @@ pub async fn update_task( updated_fields, updated_by: "user".to_string(), }); + + // Notify supervisor of status change if task belongs to a contract + if let Some(contract_id) = task.contract_id { + if !task.is_supervisor && updated_fields_clone.contains(&"status".to_string()) { + let pool = pool.clone(); + let state_clone = state.clone(); + let task_clone = task.clone(); + tokio::spawn(async move { + if let Ok(Some(supervisor)) = repository::get_contract_supervisor_task(&pool, contract_id).await { + state_clone.notify_supervisor_of_task_update( + supervisor.id, + supervisor.daemon_id, + task_clone.id, + &task_clone.name, + &task_clone.status, + &updated_fields_clone, + ).await; + } + }); + } + } + Json(task).into_response() } Ok(None) => ( @@ -556,7 +620,8 @@ pub async fn start_task( task_depth = task.depth, subtask_count = subtask_count, is_orchestrator = is_orchestrator, - "Starting task with orchestrator determination" + is_supervisor = task.is_supervisor, + "Starting task with orchestrator/supervisor determination" ); // IMPORTANT: Update database FIRST to assign daemon_id before sending command @@ -602,8 +667,18 @@ pub async fn start_task( completion_action: task.completion_action.clone(), continue_from_task_id: task.continue_from_task_id, copy_files: task.copy_files.as_ref().and_then(|v| serde_json::from_value(v.clone()).ok()), + contract_id: task.contract_id, + is_supervisor: task.is_supervisor, }; + tracing::info!( + task_id = %id, + is_supervisor = task.is_supervisor, + is_orchestrator = is_orchestrator, + daemon_id = %target_daemon_id, + "Sending SpawnTask command to daemon" + ); + if let Err(e) = state.send_daemon_command(target_daemon_id, command).await { tracing::error!("Failed to send SpawnTask command: {}", e); // Rollback: clear daemon_id and reset status since command failed @@ -884,8 +959,11 @@ pub async fn send_message( } }; - // Check if task is running - if task.status != "running" { + // Check if task is running (except for AUTH_CODE messages and supervisor tasks) + // Supervisor tasks can receive messages even when not running - daemon will respawn Claude + let is_auth_code = req.message.starts_with("AUTH_CODE:"); + let is_supervisor = task.is_supervisor; + if task.status != "running" && !is_auth_code && !is_supervisor { return ( StatusCode::BAD_REQUEST, Json(ApiError::new( @@ -900,8 +978,27 @@ pub async fn send_message( } // Find the daemon running this task + // For supervisors, if no daemon is assigned, find any available daemon for this owner let target_daemon_id = if let Some(daemon_id) = task.daemon_id { daemon_id + } else if is_supervisor { + // Supervisor without daemon - find one + match state.daemon_connections + .iter() + .find(|d| d.value().owner_id == auth.owner_id) + { + Some(entry) => entry.value().id, + None => { + return ( + StatusCode::SERVICE_UNAVAILABLE, + Json(ApiError::new( + "NO_DAEMON", + "No daemon available. Please start a daemon.", + )), + ) + .into_response(); + } + } } else { return ( StatusCode::SERVICE_UNAVAILABLE, -- cgit v1.2.3