diff options
Diffstat (limited to 'makima/src/server/handlers/mesh.rs')
| -rw-r--r-- | makima/src/server/handlers/mesh.rs | 247 |
1 files changed, 26 insertions, 221 deletions
diff --git a/makima/src/server/handlers/mesh.rs b/makima/src/server/handlers/mesh.rs index be5387e..6ba4c8b 100644 --- a/makima/src/server/handlers/mesh.rs +++ b/makima/src/server/handlers/mesh.rs @@ -274,35 +274,16 @@ pub async fn create_task( let _ = repository::record_history_event( pool, auth.owner_id, - task.contract_id, Some(task.id), "task", Some("created"), - None, serde_json::json!({ "name": &task.name, - "isSupervisor": task.is_supervisor, }), ).await; - // Notify supervisor of new task creation if task belongs to a contract - if let Some(contract_id) = task.contract_id { - if !task.is_supervisor { - let pool = pool.clone(); - let state_clone = state.clone(); - let task_clone = task.clone(); - tokio::spawn(async move { - if let Ok(Some(supervisor)) = repository::get_contract_supervisor_task(&pool, contract_id).await { - state_clone.notify_supervisor_of_task_created( - supervisor.id, - supervisor.daemon_id, - task_clone.id, - &task_clone.name, - ).await; - } - }); - } - } + // Supervisor notification on task creation removed alongside + // legacy contracts. (StatusCode::CREATED, Json(task)).into_response() } Err(e) => { @@ -352,26 +333,6 @@ pub async fn update_task( .into_response(); }; - // Check if trying to set a supervisor task to a terminal status - if let Some(ref new_status) = req.status { - let terminal_statuses = ["done", "failed", "merged"]; - if terminal_statuses.contains(&new_status.as_str()) { - // Get the task to check if it's a supervisor - if let Ok(Some(task)) = repository::get_task_for_owner(pool, id, auth.owner_id).await { - if task.is_supervisor { - return ( - StatusCode::BAD_REQUEST, - Json(ApiError::new( - "SUPERVISOR_CANNOT_COMPLETE", - "Supervisor tasks cannot be marked as done, failed, or merged. They run for the lifetime of the contract.", - )), - ) - .into_response(); - } - } - } - } - // Track which fields are being updated for the notification let mut updated_fields = Vec::new(); if req.name.is_some() { @@ -410,26 +371,9 @@ pub async fn update_task( updated_by: "user".to_string(), }); - // Notify supervisor of status change if task belongs to a contract - if let Some(contract_id) = task.contract_id { - if !task.is_supervisor && updated_fields_clone.contains(&"status".to_string()) { - let pool = pool.clone(); - let state_clone = state.clone(); - let task_clone = task.clone(); - tokio::spawn(async move { - if let Ok(Some(supervisor)) = repository::get_contract_supervisor_task(&pool, contract_id).await { - state_clone.notify_supervisor_of_task_update( - supervisor.id, - supervisor.daemon_id, - task_clone.id, - &task_clone.name, - &task_clone.status, - &updated_fields_clone, - ).await; - } - }); - } - } + // Supervisor notification on task update removed alongside + // legacy contracts. + let _ = updated_fields_clone; Json(task).into_response() } @@ -657,15 +601,10 @@ pub async fn start_task( .into_response(); } - // Get local_only and auto_merge_local flags from contract if task has one - let (local_only, auto_merge_local) = if let Some(contract_id) = task.contract_id { - match repository::get_contract_for_owner(pool, contract_id, auth.owner_id).await { - Ok(Some(contract)) => (contract.local_only, contract.auto_merge_local), - _ => (false, false), - } - } else { - (false, false) - }; + // local_only / auto_merge_local used to come from the parent contract. + // With legacy contracts removed they default to false; the directive + // lifecycle handles its own completion now. + let (local_only, auto_merge_local) = (false, false); // Get list of daemons that have previously failed this task let mut exclude_daemon_ids: Vec<Uuid> = task.failed_daemon_ids.clone().unwrap_or_default(); @@ -708,8 +647,7 @@ pub async fn start_task( task_depth = task.depth, subtask_count = subtask_count, is_orchestrator = is_orchestrator, - is_supervisor = task.is_supervisor, - "Starting task with orchestrator/supervisor determination" + "Starting task" ); // IMPORTANT: Update database FIRST to assign daemon_id before sending command @@ -755,8 +693,6 @@ pub async fn start_task( completion_action: task.completion_action.clone(), continue_from_task_id: task.continue_from_task_id, copy_files: task.copy_files.as_ref().and_then(|v| serde_json::from_value(v.clone()).ok()), - contract_id: task.contract_id, - is_supervisor: task.is_supervisor, autonomous_loop: false, resume_session: false, conversation_history: None, @@ -764,13 +700,11 @@ pub async fn start_task( patch_base_sha: None, local_only, auto_merge_local, - supervisor_worktree_task_id: None, // Not spawned by supervisor directive_id: task.directive_id, }; tracing::info!( task_id = %id, - is_supervisor = task.is_supervisor, is_orchestrator = is_orchestrator, daemon_id = %target_daemon_id, "Sending SpawnTask command to daemon" @@ -811,8 +745,6 @@ pub async fn start_task( completion_action: task.completion_action.clone(), continue_from_task_id: task.continue_from_task_id, copy_files: task.copy_files.as_ref().and_then(|v| serde_json::from_value(v.clone()).ok()), - contract_id: task.contract_id, - is_supervisor: task.is_supervisor, autonomous_loop: false, resume_session: false, conversation_history: None, @@ -820,7 +752,6 @@ pub async fn start_task( patch_base_sha: None, local_only, auto_merge_local, - supervisor_worktree_task_id: None, // Not spawned by supervisor directive_id: task.directive_id, }; @@ -1128,11 +1059,10 @@ pub async fn send_message( // Check if task is in a state that can receive messages // Allow "running" and "starting" (to handle race between status update and message send) - // Also allow AUTH_CODE messages and supervisor tasks regardless of status + // Also allow AUTH_CODE messages regardless of status let is_auth_code = req.message.starts_with("AUTH_CODE:"); - let is_supervisor = task.is_supervisor; let can_receive_message = task.status == "running" || task.status == "starting"; - if !can_receive_message && !is_auth_code && !is_supervisor { + if !can_receive_message && !is_auth_code { return ( StatusCode::BAD_REQUEST, Json(ApiError::new( @@ -1147,27 +1077,8 @@ pub async fn send_message( } // Find the daemon running this task - // For supervisors, if no daemon is assigned, find any available daemon for this owner let target_daemon_id = if let Some(daemon_id) = task.daemon_id { daemon_id - } else if is_supervisor { - // Supervisor without daemon - find one - match state.daemon_connections - .iter() - .find(|d| d.value().owner_id == auth.owner_id) - { - Some(entry) => entry.value().id, - None => { - return ( - StatusCode::SERVICE_UNAVAILABLE, - Json(ApiError::new( - "NO_DAEMON", - "No daemon available. Please start a daemon.", - )), - ) - .into_response(); - } - } } else { return ( StatusCode::SERVICE_UNAVAILABLE, @@ -1206,15 +1117,7 @@ pub async fn send_message( }; if let Ok(Some(updated_task)) = repository::update_task_for_owner(pool, id, auth.owner_id, update_req).await { - // Get local_only and auto_merge_local from contract if task has one - let (local_only, auto_merge_local) = if let Some(contract_id) = updated_task.contract_id { - match repository::get_contract_for_owner(pool, contract_id, auth.owner_id).await { - Ok(Some(contract)) => (contract.local_only, contract.auto_merge_local), - _ => (false, false), - } - } else { - (false, false) - }; + let (local_only, auto_merge_local) = (false, false); // Send spawn command to new daemon let spawn_cmd = DaemonCommand::SpawnTask { @@ -1231,8 +1134,6 @@ pub async fn send_message( completion_action: updated_task.completion_action.clone(), continue_from_task_id: updated_task.continue_from_task_id, copy_files: updated_task.copy_files.as_ref().and_then(|v| serde_json::from_value(v.clone()).ok()), - contract_id: updated_task.contract_id, - is_supervisor: updated_task.is_supervisor, autonomous_loop: false, resume_session: false, conversation_history: None, @@ -1240,7 +1141,6 @@ pub async fn send_message( patch_base_sha: None, local_only, auto_merge_local, - supervisor_worktree_task_id: None, // Not spawned by supervisor directive_id: updated_task.directive_id, }; @@ -2433,13 +2333,11 @@ pub async fn commit_worktree( // Task Patches // ============================================================================= -/// Query parameters for listing task patches +/// Query parameters for listing task patches (legacy contract scope +/// removed; query is currently empty). #[derive(Debug, serde::Deserialize)] #[serde(rename_all = "camelCase")] -pub struct ListPatchesQuery { - /// Contract ID to scope the patches - pub contract_id: Uuid, -} +pub struct ListPatchesQuery {} /// Patch summary for API response #[derive(Debug, serde::Serialize, utoipa::ToSchema)] @@ -2453,8 +2351,6 @@ pub struct PatchSummary { pub description: Option<String>, /// Task ID pub task_id: Uuid, - /// Contract ID - pub contract_id: Uuid, /// Number of files in the patch pub files_count: i32, /// Total lines added (estimated from patch size) @@ -2523,14 +2419,8 @@ pub async fn list_task_patches( } }; - // Verify task belongs to the specified contract - if task.contract_id != Some(query.contract_id) { - return ( - StatusCode::BAD_REQUEST, - Json(ApiError::new("INVALID_CONTRACT", "Task does not belong to the specified contract")), - ) - .into_response(); - } + // Legacy contract verification removed; checkpoint patches are + // accessible to any owner of the task. // Get checkpoint patches for this task let patches = match repository::list_checkpoint_patches(pool, id).await { @@ -2586,7 +2476,6 @@ pub async fn list_task_patches( name, description, task_id: p.task_id, - contract_id: query.contract_id, files_count: p.files_count, lines_added, lines_removed, @@ -3040,12 +2929,10 @@ pub async fn reassign_task( // Create a NEW task with the conversation context let create_req = CreateTaskRequest { - contract_id: task.contract_id, name: format!("{} (resumed)", task.name), description: task.description.clone(), plan: updated_plan.clone(), parent_task_id: task.parent_task_id, - is_supervisor: task.is_supervisor, priority: task.priority, repository_url: task.repository_url.clone(), base_branch: task.base_branch.clone(), @@ -3058,7 +2945,6 @@ pub async fn reassign_task( checkpoint_sha: task.last_checkpoint_sha.clone(), branched_from_task_id: None, conversation_history: None, - supervisor_worktree_task_id: None, // Not spawned by supervisor directive_id: None, directive_step_id: None, }; @@ -3126,15 +3012,8 @@ pub async fn reassign_task( } }; - // Get local_only and auto_merge_local from contract if task has one - let (local_only, auto_merge_local) = if let Some(contract_id) = task.contract_id { - match repository::get_contract_for_owner(pool, contract_id, auth.owner_id).await { - Ok(Some(contract)) => (contract.local_only, contract.auto_merge_local), - _ => (false, false), - } - } else { - (false, false) - }; + // Legacy contract scope removed; defaults to false. + let (local_only, auto_merge_local) = (false, false); // Send SpawnTask command to daemon for the new task let command = DaemonCommand::SpawnTask { @@ -3151,8 +3030,6 @@ pub async fn reassign_task( completion_action: task.completion_action.clone(), continue_from_task_id: Some(id), // Continue from old task's worktree copy_files: None, - contract_id: task.contract_id, - is_supervisor: task.is_supervisor, autonomous_loop: false, resume_session: false, conversation_history: None, @@ -3160,7 +3037,6 @@ pub async fn reassign_task( patch_base_sha, local_only, auto_merge_local, - supervisor_worktree_task_id: None, // Not spawned by supervisor directive_id: task.directive_id, }; @@ -3190,56 +3066,10 @@ pub async fn reassign_task( // Don't fail the request, the new task is already running } - // Notify the contract's supervisor about the reassignment (if applicable) - if let Some(contract_id) = task.contract_id { - if let Ok(Some(contract)) = repository::get_contract_for_owner(pool, contract_id, auth.owner_id).await { - if let Some(supervisor_task_id) = contract.supervisor_task_id { - // Don't notify if we're reassigning the supervisor itself - if supervisor_task_id != old_task_id { - // Find the supervisor's daemon and send a message - if let Ok(Some(supervisor_task)) = repository::get_task_for_owner(pool, supervisor_task_id, auth.owner_id).await { - if supervisor_task.status == "running" { - if let Some(supervisor_daemon_id) = supervisor_task.daemon_id { - // Find the daemon by its UUID - if let Some(daemon_entry) = state.daemon_connections.iter().find(|d| d.value().id == supervisor_daemon_id) { - let notification_msg = format!( - "\n\n[SYSTEM NOTIFICATION] Task '{}' (ID: {}) was reassigned due to daemon disconnect. \ - A new task '{}' (ID: {}) has been created to continue the work. \ - The new task has {} context entries from the previous conversation.\n\n", - task.name, - old_task_id, - final_task.name, - new_task.id, - context_entries - ); - - let notify_cmd = DaemonCommand::SendMessage { - task_id: supervisor_task_id, - message: notification_msg, - }; - - if let Err(e) = state.send_daemon_command(daemon_entry.value().id, notify_cmd).await { - tracing::warn!( - supervisor_id = %supervisor_task_id, - error = %e, - "Failed to notify supervisor about task reassignment" - ); - } else { - tracing::info!( - supervisor_id = %supervisor_task_id, - old_task_id = %old_task_id, - new_task_id = %new_task.id, - "Notified supervisor about task reassignment" - ); - } - } - } - } - } - } - } - } - } + // Supervisor reassignment notification removed alongside legacy + // contracts. The directive reconciler picks up reassigned tasks on + // its next tick. + let _ = context_entries; // Broadcast task update for the new task state.broadcast_task_update(TaskUpdateNotification { @@ -3467,15 +3297,8 @@ pub async fn continue_task( }; let is_orchestrator = task.depth == 0 && subtask_count > 0; - // Get local_only and auto_merge_local from contract if task has one - let (local_only, auto_merge_local) = if let Some(contract_id) = task.contract_id { - match repository::get_contract_for_owner(pool, contract_id, auth.owner_id).await { - Ok(Some(contract)) => (contract.local_only, contract.auto_merge_local), - _ => (false, false), - } - } else { - (false, false) - }; + // Legacy contract scope removed; defaults to false. + let (local_only, auto_merge_local) = (false, false); // Send SpawnTask command to daemon let command = DaemonCommand::SpawnTask { @@ -3492,8 +3315,6 @@ pub async fn continue_task( completion_action: task.completion_action.clone(), continue_from_task_id: task.continue_from_task_id, copy_files: task.copy_files.as_ref().and_then(|v| serde_json::from_value(v.clone()).ok()), - contract_id: task.contract_id, - is_supervisor: task.is_supervisor, autonomous_loop: false, resume_session: false, conversation_history: None, @@ -3501,7 +3322,6 @@ pub async fn continue_task( patch_base_sha: None, local_only, auto_merge_local, - supervisor_worktree_task_id: None, // Not spawned by supervisor directive_id: task.directive_id, }; @@ -3509,7 +3329,6 @@ pub async fn continue_task( task_id = %id, daemon_id = %target_daemon_id, context_entries = context_entries, - is_supervisor = task.is_supervisor, "Continuing task with conversation context" ); @@ -3820,12 +3639,10 @@ pub async fn fork_task( // Create the new forked task let create_req = CreateTaskRequest { - contract_id: task.contract_id, name: req.new_task_name.clone(), description: task.description.clone(), plan: req.new_task_plan.clone(), parent_task_id: None, // Forked tasks are independent - is_supervisor: false, priority: task.priority, repository_url: task.repository_url.clone(), base_branch: task.base_branch.clone(), @@ -3838,7 +3655,6 @@ pub async fn fork_task( checkpoint_sha: Some(checkpoint.commit_sha.clone()), branched_from_task_id: None, conversation_history: None, - supervisor_worktree_task_id: None, // Not spawned by supervisor directive_id: None, directive_step_id: None, }; @@ -3980,12 +3796,10 @@ pub async fn resume_from_checkpoint( }); let create_req = CreateTaskRequest { - contract_id: task.contract_id, name: task_name, description: task.description.clone(), plan: req.plan, parent_task_id: None, - is_supervisor: false, priority: task.priority, repository_url: task.repository_url.clone(), base_branch: task.base_branch.clone(), @@ -3998,7 +3812,6 @@ pub async fn resume_from_checkpoint( checkpoint_sha: Some(checkpoint.commit_sha.clone()), branched_from_task_id: None, conversation_history: None, - supervisor_worktree_task_id: None, // Not spawned by supervisor directive_id: None, directive_step_id: None, }; @@ -4318,12 +4131,10 @@ pub async fn branch_task( // Create the branched task (anonymous - no contract_id) let create_req = CreateTaskRequest { - contract_id: None, // Anonymous task name: task_name, description: Some(format!("Branched from task: {}", source_task.name)), plan: req.message, parent_task_id: None, - is_supervisor: false, priority: source_task.priority, repository_url: source_task.repository_url.clone(), base_branch: source_task.base_branch.clone(), @@ -4336,7 +4147,6 @@ pub async fn branch_task( checkpoint_sha: None, branched_from_task_id: Some(source_task_id), conversation_history, - supervisor_worktree_task_id: None, // Not spawned by supervisor directive_id: None, directive_step_id: None, }; @@ -4357,11 +4167,9 @@ pub async fn branch_task( let _ = repository::record_history_event( pool, auth.owner_id, - None, // No contract for anonymous tasks Some(task.id), "task", Some("branched"), - None, serde_json::json!({ "name": &task.name, "sourceTaskId": source_task_id, @@ -4425,8 +4233,6 @@ pub async fn branch_task( completion_action: updated_task.completion_action.clone(), continue_from_task_id: updated_task.continue_from_task_id, copy_files: None, - contract_id: None, - is_supervisor: false, autonomous_loop: false, resume_session: message_count > 0, // Resume if we have conversation history conversation_history: updated_task.conversation_state.clone(), @@ -4434,7 +4240,6 @@ pub async fn branch_task( patch_base_sha, local_only: false, // No contract, so not local_only auto_merge_local: false, // No contract, so no auto_merge_local - supervisor_worktree_task_id: None, // Not spawned by supervisor directive_id: None, }; |
