diff options
| author | soryu <soryu@soryu.co> | 2026-05-18 01:21:30 +0100 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2026-05-18 01:21:30 +0100 |
| commit | f240675da99bc7705e473b8f70a2628812aa4c10 (patch) | |
| tree | 3ee2d24b431ccb8cd1a3013c86b34a5782a3e224 /makima/src/server/handlers/mesh_daemon.rs | |
| parent | 0d996cf7590e3e52f424859c7d6f0e68640f119e (diff) | |
| download | soryu-f240675da99bc7705e473b8f70a2628812aa4c10.tar.gz soryu-f240675da99bc7705e473b8f70a2628812aa4c10.zip | |
The contracts table, supervisor task type, and all their backing
machinery have been inert for several PRs. The directives system reads
its own active contract body for spec text, and PR #135 removed the
last LLM surface that spawned supervisors.
This PR wipes the dead surface in one shot — the user authorised a DB
wipe, so the migration drops every legacy table with CASCADE rather
than carrying forward stub rows. Net change: −12k LOC across handlers,
repository, state, models, the TUI, and the listen module.
What's gone:
- contracts, contract_chat_*, contract_events, contract_repositories,
contract_type_templates tables.
- supervisor_states, supervisor_heartbeats tables.
- mesh_chat_conversations, mesh_chat_messages tables.
- tasks.contract_id/is_supervisor/supervisor_task_id/supervisor_worktree_task_id columns.
- directive_steps.contract_id/contract_type columns.
- files.contract_id/contract_phase columns.
- history_events.contract_id/phase columns.
- The Contract/Supervisor/MeshChat handler + model + repository
surface, plus the daemon TUI views that read them.
- The standalone listen.rs websocket handler (orphaned with the LLM).
What stays:
- mesh_supervisor handler: trimmed to just the questions + orders
backchannel used by `makima directive ask` / `create-order` (kept
the URL prefix for CLI client compat).
- directive_documents (the user-facing "contracts" surface).
- pending_questions in-memory state for the directive Ask flow.
cargo check, cargo test --lib (68 passed), tsc, and vite build all
clean.
Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Diffstat (limited to 'makima/src/server/handlers/mesh_daemon.rs')
| -rw-r--r-- | makima/src/server/handlers/mesh_daemon.rs | 419 |
1 files changed, 4 insertions, 415 deletions
diff --git a/makima/src/server/handlers/mesh_daemon.rs b/makima/src/server/handlers/mesh_daemon.rs index 19d2166..9900385 100644 --- a/makima/src/server/handlers/mesh_daemon.rs +++ b/makima/src/server/handlers/mesh_daemon.rs @@ -262,27 +262,6 @@ pub enum DaemonMessage { #[serde(rename = "activeTasks")] active_tasks: Vec<Uuid>, }, - /// Enhanced supervisor heartbeat with detailed state - SupervisorHeartbeat { - #[serde(rename = "taskId")] - task_id: Uuid, - #[serde(rename = "contractId")] - contract_id: Uuid, - /// Supervisor state: initializing, idle, working, waiting_for_user, waiting_for_tasks, blocked, completed, failed, interrupted - state: String, - /// Current contract phase - phase: String, - /// Description of current activity - #[serde(rename = "currentActivity")] - current_activity: Option<String>, - /// Progress percentage (0-100) - progress: u8, - /// Task IDs the supervisor is waiting on - #[serde(rename = "pendingTaskIds")] - pending_task_ids: Vec<Uuid>, - /// Timestamp of this heartbeat - timestamp: DateTime<Utc>, - }, /// Task output streaming (stdout/stderr from Claude Code) TaskOutput { #[serde(rename = "taskId")] @@ -618,96 +597,6 @@ struct DaemonAuthResult { /// Automatically create a PR when all non-supervisor tasks for a contract are done. /// Only applies to remote-repo contracts in the "execute" phase. /// Fires as a best-effort operation — errors are logged but not propagated. -async fn auto_create_pr_if_ready( - pool: &sqlx::PgPool, - state: &SharedState, - contract_id: Uuid, - owner_id: Uuid, -) { - // 1. Load contract — must be remote (not local_only) and in execute phase - let contract = match repository::get_contract_for_owner(pool, contract_id, owner_id).await { - Ok(Some(c)) => c, - _ => return, - }; - if contract.local_only || contract.phase != "execute" { - return; - } - - // 2. Load non-supervisor tasks — all must be done - let tasks = match repository::list_tasks_by_contract(pool, contract_id, owner_id).await { - Ok(t) => t, - _ => return, - }; - let non_supervisor_tasks: Vec<_> = tasks.iter().filter(|t| !t.is_supervisor).collect(); - if non_supervisor_tasks.is_empty() || !non_supervisor_tasks.iter().all(|t| t.status == "done") { - return; - } - - // 3. Check pull-request deliverable not already complete - let completed_deliverables = contract.get_completed_deliverables(&contract.phase); - if completed_deliverables.contains(&"pull-request".to_string()) { - return; - } - - // 4. Check at least one repository has a remote URL - let repos = match repository::list_contract_repositories(pool, contract_id).await { - Ok(r) => r, - _ => return, - }; - if !repos.iter().any(|r| r.repository_url.is_some()) { - return; - } - - // 5. Load supervisor task - let supervisor = match repository::get_contract_supervisor_task(pool, contract_id).await { - Ok(Some(s)) => s, - _ => return, - }; - - // Need supervisor's daemon_id to send command - let daemon_id = match supervisor.daemon_id { - Some(id) => id, - None => return, - }; - - // 6. Construct branch name - let sanitized_name: String = supervisor - .name - .chars() - .map(|c| if c.is_alphanumeric() || c == '-' || c == '_' { c } else { '-' }) - .collect::<String>() - .to_lowercase(); - let short_id = &supervisor.id.to_string()[..8]; - let branch = format!("makima/{}-{}", sanitized_name, short_id); - - // 7. Send CreatePR command to supervisor's daemon - let command = DaemonCommand::CreatePR { - task_id: supervisor.id, - title: contract.name.clone(), - body: contract.description.clone(), - base_branch: supervisor.base_branch.clone(), - branch, - }; - - match state.send_daemon_command(daemon_id, command).await { - Ok(()) => { - tracing::info!( - contract_id = %contract_id, - supervisor_id = %supervisor.id, - "Auto-PR: sent CreatePR command to supervisor daemon" - ); - } - Err(e) => { - tracing::warn!( - contract_id = %contract_id, - error = %e, - "Auto-PR: failed to send CreatePR command" - ); - } - } -} - -/// Validate an API key and return (user_id, owner_id). async fn validate_daemon_api_key(pool: &sqlx::PgPool, key: &str) -> Result<DaemonAuthResult, String> { let key_hash = hash_api_key(key); @@ -983,83 +872,6 @@ async fn handle_daemon_connection(socket: WebSocket, state: SharedState, auth_re }); } } - Ok(DaemonMessage::SupervisorHeartbeat { - task_id, - contract_id, - state: supervisor_state, - phase, - current_activity, - progress, - pending_task_ids, - timestamp: _, - }) => { - tracing::debug!( - task_id = %task_id, - contract_id = %contract_id, - state = %supervisor_state, - phase = %phase, - progress = progress, - "Supervisor heartbeat received" - ); - - // Store heartbeat in database and update supervisor state (Task 3.3) - if let Some(ref pool) = state.db_pool { - let pool = pool.clone(); - let pending_ids = pending_task_ids.clone(); - let activity = current_activity.clone(); - let state_str = supervisor_state.clone(); - let phase_str = phase.clone(); - tokio::spawn(async move { - // Store the heartbeat record - if let Err(e) = repository::create_supervisor_heartbeat( - &pool, - task_id, - contract_id, - &state_str, - &phase_str, - activity.as_deref(), - progress as i32, - &pending_ids, - ).await { - tracing::warn!( - task_id = %task_id, - contract_id = %contract_id, - error = %e, - "Failed to store supervisor heartbeat" - ); - } - - // Update supervisor_states table (lightweight heartbeat state update - Task 3.3) - if let Err(e) = repository::update_supervisor_heartbeat_state( - &pool, - contract_id, - &state_str, - activity.as_deref(), - progress as i32, - &pending_ids, - ).await { - tracing::debug!( - contract_id = %contract_id, - error = %e, - "Failed to update supervisor state from heartbeat (may not exist yet)" - ); - } - - // Also update the daemon heartbeat - if let Ok(Some(task)) = repository::get_task(&pool, task_id).await { - if let Some(daemon_id) = task.daemon_id { - if let Err(e) = repository::update_daemon_heartbeat(&pool, daemon_id).await { - tracing::warn!( - daemon_id = %daemon_id, - error = %e, - "Failed to update daemon heartbeat from supervisor" - ); - } - } - } - }); - } - } Ok(DaemonMessage::TaskOutput { task_id, output, is_partial }) => { // Parse the output line and broadcast structured data if let Some(notification) = parse_claude_output(task_id, owner_id, &output, is_partial) { @@ -1120,136 +932,16 @@ async fn handle_daemon_connection(socket: WebSocket, state: SharedState, auth_re updated_by: "daemon".into(), }); - // Initialize or restore supervisor_state when supervisor task starts running (Task 3.4) - if updated_task.is_supervisor && new_status_owned == "running" { - if let Some(contract_id) = updated_task.contract_id { - // Check if supervisor state already exists (restoration scenario) - match repository::get_supervisor_state(&pool, contract_id).await { - Ok(Some(existing_state)) => { - // State exists - this is a restoration - tracing::info!( - task_id = %task_id, - contract_id = %contract_id, - existing_state = %existing_state.state, - restoration_count = existing_state.restoration_count, - "Supervisor starting with existing state - restoration in progress" - ); - - // Mark as restored (increments restoration_count) - match repository::mark_supervisor_restored( - &pool, - contract_id, - "daemon_restart", - ).await { - Ok(restored_state) => { - tracing::info!( - task_id = %task_id, - contract_id = %contract_id, - restoration_count = restored_state.restoration_count, - "Supervisor restoration marked" - ); - - // Check for pending questions to re-deliver - if let Ok(questions) = serde_json::from_value::<Vec<crate::db::models::PendingQuestion>>( - restored_state.pending_questions.clone() - ) { - if !questions.is_empty() { - tracing::info!( - contract_id = %contract_id, - question_count = questions.len(), - "Pending questions found for re-delivery" - ); - // Questions will be re-delivered by the supervisor when it restores - } - } - } - Err(e) => { - tracing::warn!( - task_id = %task_id, - contract_id = %contract_id, - error = %e, - "Failed to mark supervisor as restored" - ); - } - } - } - Ok(None) => { - // No existing state - fresh start - // Get contract to get its phase - match repository::get_contract_for_owner( - &pool, - contract_id, - updated_task.owner_id, - ).await { - Ok(Some(contract)) => { - match repository::upsert_supervisor_state( - &pool, - contract_id, - task_id, - serde_json::json!([]), // Empty conversation - &[], // No pending tasks - &contract.phase, - ).await { - Ok(_) => { - tracing::info!( - task_id = %task_id, - contract_id = %contract_id, - phase = %contract.phase, - "Initialized fresh supervisor state" - ); - } - Err(e) => { - tracing::warn!( - task_id = %task_id, - contract_id = %contract_id, - error = %e, - "Failed to initialize supervisor state" - ); - } - } - } - Ok(None) => { - tracing::warn!( - task_id = %task_id, - contract_id = %contract_id, - "Contract not found when initializing supervisor state" - ); - } - Err(e) => { - tracing::warn!( - task_id = %task_id, - contract_id = %contract_id, - error = %e, - "Failed to get contract for supervisor state" - ); - } - } - } - Err(e) => { - tracing::warn!( - task_id = %task_id, - contract_id = %contract_id, - error = %e, - "Failed to check existing supervisor state" - ); - } - } - } - } - // Record history event when task starts running if new_status_owned == "running" { let _ = repository::record_history_event( &pool, updated_task.owner_id, - updated_task.contract_id, Some(task_id), "task", Some("started"), - None, serde_json::json!({ "name": &updated_task.name, - "isSupervisor": updated_task.is_supervisor, }), ).await; } @@ -1329,51 +1021,19 @@ async fn handle_daemon_connection(socket: WebSocket, state: SharedState, auth_re updated_by: "daemon".into(), }); - // Notify supervisor if this task belongs to a contract - if let Some(contract_id) = updated_task.contract_id { - // Don't notify for supervisor tasks (they don't report to themselves) - if !updated_task.is_supervisor { - if let Ok(Some(supervisor)) = repository::get_contract_supervisor_task(&pool, contract_id).await { - // action_directive used to come from - // compute_action_directive (now removed alongside the - // LLM module). Passing None preserves the existing - // supervisor protocol; the auto-PR path below still - // fires when every task is done. - state.notify_supervisor_of_task_completion( - supervisor.id, - supervisor.daemon_id, - updated_task.id, - &updated_task.name, - &updated_task.status, - updated_task.progress_summary.as_deref(), - updated_task.error_message.as_deref(), - None, - ).await; - } - } - } - - // Auto-create PR if all tasks are done and repo is remote - if updated_task.status == "done" { - if let Some(contract_id) = updated_task.contract_id { - let pool_c = pool.clone(); - let state_c = state.clone(); - tokio::spawn(async move { - auto_create_pr_if_ready(&pool_c, &state_c, contract_id, owner_id).await; - }); - } - } + // Supervisor notification + auto-PR removed alongside + // legacy contracts. Directive completion is handled + // by the directive reconciler. + let _ = owner_id; // Record history event for task completion let subtype = if updated_task.status == "done" { "completed" } else { "failed" }; let _ = repository::record_history_event( &pool, updated_task.owner_id, - updated_task.contract_id, Some(task_id), "task", Some(subtype), - None, serde_json::json!({ "name": &updated_task.name, "status": &updated_task.status, @@ -1962,16 +1622,13 @@ async fn handle_daemon_connection(socket: WebSocket, state: SharedState, auth_re }); // Record history event for checkpoint - // Get task to get contract_id if let Ok(Some(task)) = repository::get_task(pool, task_id).await { let _ = repository::record_history_event( pool, task.owner_id, - task.contract_id, Some(task_id), "checkpoint", Some("created"), - None, serde_json::json!({ "checkpointNumber": checkpoint.checkpoint_number, "commitSha": &sha, @@ -2103,28 +1760,6 @@ async fn handle_daemon_connection(socket: WebSocket, state: SharedState, auth_re conflicts: conflicts.clone(), }); - // On successful merge, notify supervisor to check if all merges complete - if success { - if let Some(pool) = state.db_pool.as_ref() { - if let Ok(Some(task)) = repository::get_task(pool, task_id).await { - if let Some(contract_id) = task.contract_id { - if let Ok(Some(supervisor)) = repository::get_contract_supervisor_task(pool, contract_id).await { - let prompt = format!( - "[INFO] Merge completed: {}\n\ - Check if all tasks are merged with `makima supervisor tasks`.\n\ - If ready, create PR with `makima supervisor pr`.", - message - ); - let _ = state.notify_supervisor( - supervisor.id, - supervisor.daemon_id, - &prompt, - ).await; - } - } - } - } - } } Ok(DaemonMessage::PRCreated { task_id, @@ -2150,52 +1785,6 @@ async fn handle_daemon_connection(socket: WebSocket, state: SharedState, auth_re pr_number, }); - // Notify supervisor of PR result (both success and failure) - if let Some(pool) = state.db_pool.as_ref() { - if let Ok(Some(task)) = repository::get_task(pool, task_id).await { - if let Some(contract_id) = task.contract_id { - if let Ok(Some(supervisor)) = repository::get_contract_supervisor_task(pool, contract_id).await { - let prompt = if success { - // Get contract to determine next action - let next_action = if let Ok(Some(contract)) = repository::get_contract_for_owner(pool, contract_id, task.owner_id).await { - match (contract.contract_type.as_str(), contract.phase.as_str()) { - ("simple", "execute") => { - "Mark contract complete with `makima supervisor complete`".to_string() - } - ("specification", "execute") => { - "Advance to review phase with `makima supervisor advance-phase review`".to_string() - } - _ => "Check contract status with `makima supervisor status`".to_string() - } - } else { - "Check contract status with `makima supervisor status`".to_string() - }; - - format!( - "[ACTION REQUIRED] PR created successfully!\n\ - PR: {}\n\n\ - Next step: {}", - pr_url.as_deref().unwrap_or(&message), - next_action - ) - } else { - format!( - "[ERROR] PR creation failed for task {}:\n\ - {}\n\n\ - Please fix the issue and retry with `makima supervisor pr`.", - task_id, - message - ) - }; - let _ = state.notify_supervisor( - supervisor.id, - supervisor.daemon_id, - &prompt, - ).await; - } - } - } - } } Ok(DaemonMessage::GitConfigInherited { success, |
