diff options
| author | soryu <soryu@soryu.co> | 2026-05-18 01:21:30 +0100 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2026-05-18 01:21:30 +0100 |
| commit | f240675da99bc7705e473b8f70a2628812aa4c10 (patch) | |
| tree | 3ee2d24b431ccb8cd1a3013c86b34a5782a3e224 /makima/src/server/handlers/mesh.rs | |
| parent | 0d996cf7590e3e52f424859c7d6f0e68640f119e (diff) | |
| download | soryu-master.tar.gz soryu-master.zip | |
The contracts table, supervisor task type, and all their backing
machinery have been inert for several PRs. The directives system reads
its own active contract body for spec text, and PR #135 removed the
last LLM surface that spawned supervisors.
This PR wipes the dead surface in one shot — the user authorised a DB
wipe, so the migration drops every legacy table with CASCADE rather
than carrying forward stub rows. Net change: −12k LOC across handlers,
repository, state, models, the TUI, and the listen module.
What's gone:
- contracts, contract_chat_*, contract_events, contract_repositories,
contract_type_templates tables.
- supervisor_states, supervisor_heartbeats tables.
- mesh_chat_conversations, mesh_chat_messages tables.
- tasks.contract_id/is_supervisor/supervisor_task_id/supervisor_worktree_task_id columns.
- directive_steps.contract_id/contract_type columns.
- files.contract_id/contract_phase columns.
- history_events.contract_id/phase columns.
- The Contract/Supervisor/MeshChat handler + model + repository
surface, plus the daemon TUI views that read them.
- The standalone listen.rs websocket handler (orphaned with the LLM).
What stays:
- mesh_supervisor handler: trimmed to just the questions + orders
backchannel used by `makima directive ask` / `create-order` (kept
the URL prefix for CLI client compat).
- directive_documents (the user-facing "contracts" surface).
- pending_questions in-memory state for the directive Ask flow.
cargo check, cargo test --lib (68 passed), tsc, and vite build all
clean.
Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Diffstat (limited to 'makima/src/server/handlers/mesh.rs')
| -rw-r--r-- | makima/src/server/handlers/mesh.rs | 247 |
1 files changed, 26 insertions, 221 deletions
diff --git a/makima/src/server/handlers/mesh.rs b/makima/src/server/handlers/mesh.rs index be5387e..6ba4c8b 100644 --- a/makima/src/server/handlers/mesh.rs +++ b/makima/src/server/handlers/mesh.rs @@ -274,35 +274,16 @@ pub async fn create_task( let _ = repository::record_history_event( pool, auth.owner_id, - task.contract_id, Some(task.id), "task", Some("created"), - None, serde_json::json!({ "name": &task.name, - "isSupervisor": task.is_supervisor, }), ).await; - // Notify supervisor of new task creation if task belongs to a contract - if let Some(contract_id) = task.contract_id { - if !task.is_supervisor { - let pool = pool.clone(); - let state_clone = state.clone(); - let task_clone = task.clone(); - tokio::spawn(async move { - if let Ok(Some(supervisor)) = repository::get_contract_supervisor_task(&pool, contract_id).await { - state_clone.notify_supervisor_of_task_created( - supervisor.id, - supervisor.daemon_id, - task_clone.id, - &task_clone.name, - ).await; - } - }); - } - } + // Supervisor notification on task creation removed alongside + // legacy contracts. (StatusCode::CREATED, Json(task)).into_response() } Err(e) => { @@ -352,26 +333,6 @@ pub async fn update_task( .into_response(); }; - // Check if trying to set a supervisor task to a terminal status - if let Some(ref new_status) = req.status { - let terminal_statuses = ["done", "failed", "merged"]; - if terminal_statuses.contains(&new_status.as_str()) { - // Get the task to check if it's a supervisor - if let Ok(Some(task)) = repository::get_task_for_owner(pool, id, auth.owner_id).await { - if task.is_supervisor { - return ( - StatusCode::BAD_REQUEST, - Json(ApiError::new( - "SUPERVISOR_CANNOT_COMPLETE", - "Supervisor tasks cannot be marked as done, failed, or merged. They run for the lifetime of the contract.", - )), - ) - .into_response(); - } - } - } - } - // Track which fields are being updated for the notification let mut updated_fields = Vec::new(); if req.name.is_some() { @@ -410,26 +371,9 @@ pub async fn update_task( updated_by: "user".to_string(), }); - // Notify supervisor of status change if task belongs to a contract - if let Some(contract_id) = task.contract_id { - if !task.is_supervisor && updated_fields_clone.contains(&"status".to_string()) { - let pool = pool.clone(); - let state_clone = state.clone(); - let task_clone = task.clone(); - tokio::spawn(async move { - if let Ok(Some(supervisor)) = repository::get_contract_supervisor_task(&pool, contract_id).await { - state_clone.notify_supervisor_of_task_update( - supervisor.id, - supervisor.daemon_id, - task_clone.id, - &task_clone.name, - &task_clone.status, - &updated_fields_clone, - ).await; - } - }); - } - } + // Supervisor notification on task update removed alongside + // legacy contracts. + let _ = updated_fields_clone; Json(task).into_response() } @@ -657,15 +601,10 @@ pub async fn start_task( .into_response(); } - // Get local_only and auto_merge_local flags from contract if task has one - let (local_only, auto_merge_local) = if let Some(contract_id) = task.contract_id { - match repository::get_contract_for_owner(pool, contract_id, auth.owner_id).await { - Ok(Some(contract)) => (contract.local_only, contract.auto_merge_local), - _ => (false, false), - } - } else { - (false, false) - }; + // local_only / auto_merge_local used to come from the parent contract. + // With legacy contracts removed they default to false; the directive + // lifecycle handles its own completion now. + let (local_only, auto_merge_local) = (false, false); // Get list of daemons that have previously failed this task let mut exclude_daemon_ids: Vec<Uuid> = task.failed_daemon_ids.clone().unwrap_or_default(); @@ -708,8 +647,7 @@ pub async fn start_task( task_depth = task.depth, subtask_count = subtask_count, is_orchestrator = is_orchestrator, - is_supervisor = task.is_supervisor, - "Starting task with orchestrator/supervisor determination" + "Starting task" ); // IMPORTANT: Update database FIRST to assign daemon_id before sending command @@ -755,8 +693,6 @@ pub async fn start_task( completion_action: task.completion_action.clone(), continue_from_task_id: task.continue_from_task_id, copy_files: task.copy_files.as_ref().and_then(|v| serde_json::from_value(v.clone()).ok()), - contract_id: task.contract_id, - is_supervisor: task.is_supervisor, autonomous_loop: false, resume_session: false, conversation_history: None, @@ -764,13 +700,11 @@ pub async fn start_task( patch_base_sha: None, local_only, auto_merge_local, - supervisor_worktree_task_id: None, // Not spawned by supervisor directive_id: task.directive_id, }; tracing::info!( task_id = %id, - is_supervisor = task.is_supervisor, is_orchestrator = is_orchestrator, daemon_id = %target_daemon_id, "Sending SpawnTask command to daemon" @@ -811,8 +745,6 @@ pub async fn start_task( completion_action: task.completion_action.clone(), continue_from_task_id: task.continue_from_task_id, copy_files: task.copy_files.as_ref().and_then(|v| serde_json::from_value(v.clone()).ok()), - contract_id: task.contract_id, - is_supervisor: task.is_supervisor, autonomous_loop: false, resume_session: false, conversation_history: None, @@ -820,7 +752,6 @@ pub async fn start_task( patch_base_sha: None, local_only, auto_merge_local, - supervisor_worktree_task_id: None, // Not spawned by supervisor directive_id: task.directive_id, }; @@ -1128,11 +1059,10 @@ pub async fn send_message( // Check if task is in a state that can receive messages // Allow "running" and "starting" (to handle race between status update and message send) - // Also allow AUTH_CODE messages and supervisor tasks regardless of status + // Also allow AUTH_CODE messages regardless of status let is_auth_code = req.message.starts_with("AUTH_CODE:"); - let is_supervisor = task.is_supervisor; let can_receive_message = task.status == "running" || task.status == "starting"; - if !can_receive_message && !is_auth_code && !is_supervisor { + if !can_receive_message && !is_auth_code { return ( StatusCode::BAD_REQUEST, Json(ApiError::new( @@ -1147,27 +1077,8 @@ pub async fn send_message( } // Find the daemon running this task - // For supervisors, if no daemon is assigned, find any available daemon for this owner let target_daemon_id = if let Some(daemon_id) = task.daemon_id { daemon_id - } else if is_supervisor { - // Supervisor without daemon - find one - match state.daemon_connections - .iter() - .find(|d| d.value().owner_id == auth.owner_id) - { - Some(entry) => entry.value().id, - None => { - return ( - StatusCode::SERVICE_UNAVAILABLE, - Json(ApiError::new( - "NO_DAEMON", - "No daemon available. Please start a daemon.", - )), - ) - .into_response(); - } - } } else { return ( StatusCode::SERVICE_UNAVAILABLE, @@ -1206,15 +1117,7 @@ pub async fn send_message( }; if let Ok(Some(updated_task)) = repository::update_task_for_owner(pool, id, auth.owner_id, update_req).await { - // Get local_only and auto_merge_local from contract if task has one - let (local_only, auto_merge_local) = if let Some(contract_id) = updated_task.contract_id { - match repository::get_contract_for_owner(pool, contract_id, auth.owner_id).await { - Ok(Some(contract)) => (contract.local_only, contract.auto_merge_local), - _ => (false, false), - } - } else { - (false, false) - }; + let (local_only, auto_merge_local) = (false, false); // Send spawn command to new daemon let spawn_cmd = DaemonCommand::SpawnTask { @@ -1231,8 +1134,6 @@ pub async fn send_message( completion_action: updated_task.completion_action.clone(), continue_from_task_id: updated_task.continue_from_task_id, copy_files: updated_task.copy_files.as_ref().and_then(|v| serde_json::from_value(v.clone()).ok()), - contract_id: updated_task.contract_id, - is_supervisor: updated_task.is_supervisor, autonomous_loop: false, resume_session: false, conversation_history: None, @@ -1240,7 +1141,6 @@ pub async fn send_message( patch_base_sha: None, local_only, auto_merge_local, - supervisor_worktree_task_id: None, // Not spawned by supervisor directive_id: updated_task.directive_id, }; @@ -2433,13 +2333,11 @@ pub async fn commit_worktree( // Task Patches // ============================================================================= -/// Query parameters for listing task patches +/// Query parameters for listing task patches (legacy contract scope +/// removed; query is currently empty). #[derive(Debug, serde::Deserialize)] #[serde(rename_all = "camelCase")] -pub struct ListPatchesQuery { - /// Contract ID to scope the patches - pub contract_id: Uuid, -} +pub struct ListPatchesQuery {} /// Patch summary for API response #[derive(Debug, serde::Serialize, utoipa::ToSchema)] @@ -2453,8 +2351,6 @@ pub struct PatchSummary { pub description: Option<String>, /// Task ID pub task_id: Uuid, - /// Contract ID - pub contract_id: Uuid, /// Number of files in the patch pub files_count: i32, /// Total lines added (estimated from patch size) @@ -2523,14 +2419,8 @@ pub async fn list_task_patches( } }; - // Verify task belongs to the specified contract - if task.contract_id != Some(query.contract_id) { - return ( - StatusCode::BAD_REQUEST, - Json(ApiError::new("INVALID_CONTRACT", "Task does not belong to the specified contract")), - ) - .into_response(); - } + // Legacy contract verification removed; checkpoint patches are + // accessible to any owner of the task. // Get checkpoint patches for this task let patches = match repository::list_checkpoint_patches(pool, id).await { @@ -2586,7 +2476,6 @@ pub async fn list_task_patches( name, description, task_id: p.task_id, - contract_id: query.contract_id, files_count: p.files_count, lines_added, lines_removed, @@ -3040,12 +2929,10 @@ pub async fn reassign_task( // Create a NEW task with the conversation context let create_req = CreateTaskRequest { - contract_id: task.contract_id, name: format!("{} (resumed)", task.name), description: task.description.clone(), plan: updated_plan.clone(), parent_task_id: task.parent_task_id, - is_supervisor: task.is_supervisor, priority: task.priority, repository_url: task.repository_url.clone(), base_branch: task.base_branch.clone(), @@ -3058,7 +2945,6 @@ pub async fn reassign_task( checkpoint_sha: task.last_checkpoint_sha.clone(), branched_from_task_id: None, conversation_history: None, - supervisor_worktree_task_id: None, // Not spawned by supervisor directive_id: None, directive_step_id: None, }; @@ -3126,15 +3012,8 @@ pub async fn reassign_task( } }; - // Get local_only and auto_merge_local from contract if task has one - let (local_only, auto_merge_local) = if let Some(contract_id) = task.contract_id { - match repository::get_contract_for_owner(pool, contract_id, auth.owner_id).await { - Ok(Some(contract)) => (contract.local_only, contract.auto_merge_local), - _ => (false, false), - } - } else { - (false, false) - }; + // Legacy contract scope removed; defaults to false. + let (local_only, auto_merge_local) = (false, false); // Send SpawnTask command to daemon for the new task let command = DaemonCommand::SpawnTask { @@ -3151,8 +3030,6 @@ pub async fn reassign_task( completion_action: task.completion_action.clone(), continue_from_task_id: Some(id), // Continue from old task's worktree copy_files: None, - contract_id: task.contract_id, - is_supervisor: task.is_supervisor, autonomous_loop: false, resume_session: false, conversation_history: None, @@ -3160,7 +3037,6 @@ pub async fn reassign_task( patch_base_sha, local_only, auto_merge_local, - supervisor_worktree_task_id: None, // Not spawned by supervisor directive_id: task.directive_id, }; @@ -3190,56 +3066,10 @@ pub async fn reassign_task( // Don't fail the request, the new task is already running } - // Notify the contract's supervisor about the reassignment (if applicable) - if let Some(contract_id) = task.contract_id { - if let Ok(Some(contract)) = repository::get_contract_for_owner(pool, contract_id, auth.owner_id).await { - if let Some(supervisor_task_id) = contract.supervisor_task_id { - // Don't notify if we're reassigning the supervisor itself - if supervisor_task_id != old_task_id { - // Find the supervisor's daemon and send a message - if let Ok(Some(supervisor_task)) = repository::get_task_for_owner(pool, supervisor_task_id, auth.owner_id).await { - if supervisor_task.status == "running" { - if let Some(supervisor_daemon_id) = supervisor_task.daemon_id { - // Find the daemon by its UUID - if let Some(daemon_entry) = state.daemon_connections.iter().find(|d| d.value().id == supervisor_daemon_id) { - let notification_msg = format!( - "\n\n[SYSTEM NOTIFICATION] Task '{}' (ID: {}) was reassigned due to daemon disconnect. \ - A new task '{}' (ID: {}) has been created to continue the work. \ - The new task has {} context entries from the previous conversation.\n\n", - task.name, - old_task_id, - final_task.name, - new_task.id, - context_entries - ); - - let notify_cmd = DaemonCommand::SendMessage { - task_id: supervisor_task_id, - message: notification_msg, - }; - - if let Err(e) = state.send_daemon_command(daemon_entry.value().id, notify_cmd).await { - tracing::warn!( - supervisor_id = %supervisor_task_id, - error = %e, - "Failed to notify supervisor about task reassignment" - ); - } else { - tracing::info!( - supervisor_id = %supervisor_task_id, - old_task_id = %old_task_id, - new_task_id = %new_task.id, - "Notified supervisor about task reassignment" - ); - } - } - } - } - } - } - } - } - } + // Supervisor reassignment notification removed alongside legacy + // contracts. The directive reconciler picks up reassigned tasks on + // its next tick. + let _ = context_entries; // Broadcast task update for the new task state.broadcast_task_update(TaskUpdateNotification { @@ -3467,15 +3297,8 @@ pub async fn continue_task( }; let is_orchestrator = task.depth == 0 && subtask_count > 0; - // Get local_only and auto_merge_local from contract if task has one - let (local_only, auto_merge_local) = if let Some(contract_id) = task.contract_id { - match repository::get_contract_for_owner(pool, contract_id, auth.owner_id).await { - Ok(Some(contract)) => (contract.local_only, contract.auto_merge_local), - _ => (false, false), - } - } else { - (false, false) - }; + // Legacy contract scope removed; defaults to false. + let (local_only, auto_merge_local) = (false, false); // Send SpawnTask command to daemon let command = DaemonCommand::SpawnTask { @@ -3492,8 +3315,6 @@ pub async fn continue_task( completion_action: task.completion_action.clone(), continue_from_task_id: task.continue_from_task_id, copy_files: task.copy_files.as_ref().and_then(|v| serde_json::from_value(v.clone()).ok()), - contract_id: task.contract_id, - is_supervisor: task.is_supervisor, autonomous_loop: false, resume_session: false, conversation_history: None, @@ -3501,7 +3322,6 @@ pub async fn continue_task( patch_base_sha: None, local_only, auto_merge_local, - supervisor_worktree_task_id: None, // Not spawned by supervisor directive_id: task.directive_id, }; @@ -3509,7 +3329,6 @@ pub async fn continue_task( task_id = %id, daemon_id = %target_daemon_id, context_entries = context_entries, - is_supervisor = task.is_supervisor, "Continuing task with conversation context" ); @@ -3820,12 +3639,10 @@ pub async fn fork_task( // Create the new forked task let create_req = CreateTaskRequest { - contract_id: task.contract_id, name: req.new_task_name.clone(), description: task.description.clone(), plan: req.new_task_plan.clone(), parent_task_id: None, // Forked tasks are independent - is_supervisor: false, priority: task.priority, repository_url: task.repository_url.clone(), base_branch: task.base_branch.clone(), @@ -3838,7 +3655,6 @@ pub async fn fork_task( checkpoint_sha: Some(checkpoint.commit_sha.clone()), branched_from_task_id: None, conversation_history: None, - supervisor_worktree_task_id: None, // Not spawned by supervisor directive_id: None, directive_step_id: None, }; @@ -3980,12 +3796,10 @@ pub async fn resume_from_checkpoint( }); let create_req = CreateTaskRequest { - contract_id: task.contract_id, name: task_name, description: task.description.clone(), plan: req.plan, parent_task_id: None, - is_supervisor: false, priority: task.priority, repository_url: task.repository_url.clone(), base_branch: task.base_branch.clone(), @@ -3998,7 +3812,6 @@ pub async fn resume_from_checkpoint( checkpoint_sha: Some(checkpoint.commit_sha.clone()), branched_from_task_id: None, conversation_history: None, - supervisor_worktree_task_id: None, // Not spawned by supervisor directive_id: None, directive_step_id: None, }; @@ -4318,12 +4131,10 @@ pub async fn branch_task( // Create the branched task (anonymous - no contract_id) let create_req = CreateTaskRequest { - contract_id: None, // Anonymous task name: task_name, description: Some(format!("Branched from task: {}", source_task.name)), plan: req.message, parent_task_id: None, - is_supervisor: false, priority: source_task.priority, repository_url: source_task.repository_url.clone(), base_branch: source_task.base_branch.clone(), @@ -4336,7 +4147,6 @@ pub async fn branch_task( checkpoint_sha: None, branched_from_task_id: Some(source_task_id), conversation_history, - supervisor_worktree_task_id: None, // Not spawned by supervisor directive_id: None, directive_step_id: None, }; @@ -4357,11 +4167,9 @@ pub async fn branch_task( let _ = repository::record_history_event( pool, auth.owner_id, - None, // No contract for anonymous tasks Some(task.id), "task", Some("branched"), - None, serde_json::json!({ "name": &task.name, "sourceTaskId": source_task_id, @@ -4425,8 +4233,6 @@ pub async fn branch_task( completion_action: updated_task.completion_action.clone(), continue_from_task_id: updated_task.continue_from_task_id, copy_files: None, - contract_id: None, - is_supervisor: false, autonomous_loop: false, resume_session: message_count > 0, // Resume if we have conversation history conversation_history: updated_task.conversation_state.clone(), @@ -4434,7 +4240,6 @@ pub async fn branch_task( patch_base_sha, local_only: false, // No contract, so not local_only auto_merge_local: false, // No contract, so no auto_merge_local - supervisor_worktree_task_id: None, // Not spawned by supervisor directive_id: None, }; |
