summaryrefslogtreecommitdiff
path: root/makima/src/server/handlers/mesh.rs
diff options
context:
space:
mode:
Diffstat (limited to 'makima/src/server/handlers/mesh.rs')
-rw-r--r--makima/src/server/handlers/mesh.rs105
1 files changed, 101 insertions, 4 deletions
diff --git a/makima/src/server/handlers/mesh.rs b/makima/src/server/handlers/mesh.rs
index 760740c..2d90a04 100644
--- a/makima/src/server/handlers/mesh.rs
+++ b/makima/src/server/handlers/mesh.rs
@@ -214,7 +214,27 @@ pub async fn create_task(
};
match repository::create_task_for_owner(pool, auth.owner_id, req).await {
- Ok(task) => (StatusCode::CREATED, Json(task)).into_response(),
+ Ok(task) => {
+ // Notify supervisor of new task creation if task belongs to a contract
+ if let Some(contract_id) = task.contract_id {
+ if !task.is_supervisor {
+ let pool = pool.clone();
+ let state_clone = state.clone();
+ let task_clone = task.clone();
+ tokio::spawn(async move {
+ if let Ok(Some(supervisor)) = repository::get_contract_supervisor_task(&pool, contract_id).await {
+ state_clone.notify_supervisor_of_task_created(
+ supervisor.id,
+ supervisor.daemon_id,
+ task_clone.id,
+ &task_clone.name,
+ ).await;
+ }
+ });
+ }
+ }
+ (StatusCode::CREATED, Json(task)).into_response()
+ }
Err(e) => {
tracing::error!("Failed to create task: {}", e);
(
@@ -262,6 +282,26 @@ pub async fn update_task(
.into_response();
};
+ // Check if trying to set a supervisor task to a terminal status
+ if let Some(ref new_status) = req.status {
+ let terminal_statuses = ["done", "failed", "merged"];
+ if terminal_statuses.contains(&new_status.as_str()) {
+ // Get the task to check if it's a supervisor
+ if let Ok(Some(task)) = repository::get_task_for_owner(pool, id, auth.owner_id).await {
+ if task.is_supervisor {
+ return (
+ StatusCode::BAD_REQUEST,
+ Json(ApiError::new(
+ "SUPERVISOR_CANNOT_COMPLETE",
+ "Supervisor tasks cannot be marked as done, failed, or merged. They run for the lifetime of the contract.",
+ )),
+ )
+ .into_response();
+ }
+ }
+ }
+ }
+
// Track which fields are being updated for the notification
let mut updated_fields = Vec::new();
if req.name.is_some() {
@@ -288,6 +328,8 @@ pub async fn update_task(
match repository::update_task_for_owner(pool, id, auth.owner_id, req).await {
Ok(Some(task)) => {
+ let updated_fields_clone = updated_fields.clone();
+
// Broadcast task update notification
state.broadcast_task_update(TaskUpdateNotification {
task_id: task.id,
@@ -297,6 +339,28 @@ pub async fn update_task(
updated_fields,
updated_by: "user".to_string(),
});
+
+ // Notify supervisor of status change if task belongs to a contract
+ if let Some(contract_id) = task.contract_id {
+ if !task.is_supervisor && updated_fields_clone.contains(&"status".to_string()) {
+ let pool = pool.clone();
+ let state_clone = state.clone();
+ let task_clone = task.clone();
+ tokio::spawn(async move {
+ if let Ok(Some(supervisor)) = repository::get_contract_supervisor_task(&pool, contract_id).await {
+ state_clone.notify_supervisor_of_task_update(
+ supervisor.id,
+ supervisor.daemon_id,
+ task_clone.id,
+ &task_clone.name,
+ &task_clone.status,
+ &updated_fields_clone,
+ ).await;
+ }
+ });
+ }
+ }
+
Json(task).into_response()
}
Ok(None) => (
@@ -556,7 +620,8 @@ pub async fn start_task(
task_depth = task.depth,
subtask_count = subtask_count,
is_orchestrator = is_orchestrator,
- "Starting task with orchestrator determination"
+ is_supervisor = task.is_supervisor,
+ "Starting task with orchestrator/supervisor determination"
);
// IMPORTANT: Update database FIRST to assign daemon_id before sending command
@@ -602,8 +667,18 @@ pub async fn start_task(
completion_action: task.completion_action.clone(),
continue_from_task_id: task.continue_from_task_id,
copy_files: task.copy_files.as_ref().and_then(|v| serde_json::from_value(v.clone()).ok()),
+ contract_id: task.contract_id,
+ is_supervisor: task.is_supervisor,
};
+ tracing::info!(
+ task_id = %id,
+ is_supervisor = task.is_supervisor,
+ is_orchestrator = is_orchestrator,
+ daemon_id = %target_daemon_id,
+ "Sending SpawnTask command to daemon"
+ );
+
if let Err(e) = state.send_daemon_command(target_daemon_id, command).await {
tracing::error!("Failed to send SpawnTask command: {}", e);
// Rollback: clear daemon_id and reset status since command failed
@@ -884,8 +959,11 @@ pub async fn send_message(
}
};
- // Check if task is running
- if task.status != "running" {
+ // Check if task is running (except for AUTH_CODE messages and supervisor tasks)
+ // Supervisor tasks can receive messages even when not running - daemon will respawn Claude
+ let is_auth_code = req.message.starts_with("AUTH_CODE:");
+ let is_supervisor = task.is_supervisor;
+ if task.status != "running" && !is_auth_code && !is_supervisor {
return (
StatusCode::BAD_REQUEST,
Json(ApiError::new(
@@ -900,8 +978,27 @@ pub async fn send_message(
}
// Find the daemon running this task
+ // For supervisors, if no daemon is assigned, find any available daemon for this owner
let target_daemon_id = if let Some(daemon_id) = task.daemon_id {
daemon_id
+ } else if is_supervisor {
+ // Supervisor without daemon - find one
+ match state.daemon_connections
+ .iter()
+ .find(|d| d.value().owner_id == auth.owner_id)
+ {
+ Some(entry) => entry.value().id,
+ None => {
+ return (
+ StatusCode::SERVICE_UNAVAILABLE,
+ Json(ApiError::new(
+ "NO_DAEMON",
+ "No daemon available. Please start a daemon.",
+ )),
+ )
+ .into_response();
+ }
+ }
} else {
return (
StatusCode::SERVICE_UNAVAILABLE,