summaryrefslogtreecommitdiff
path: root/makima/src/server/handlers/mesh_daemon.rs
diff options
context:
space:
mode:
Diffstat (limited to 'makima/src/server/handlers/mesh_daemon.rs')
-rw-r--r--makima/src/server/handlers/mesh_daemon.rs419
1 files changed, 4 insertions, 415 deletions
diff --git a/makima/src/server/handlers/mesh_daemon.rs b/makima/src/server/handlers/mesh_daemon.rs
index 19d2166..9900385 100644
--- a/makima/src/server/handlers/mesh_daemon.rs
+++ b/makima/src/server/handlers/mesh_daemon.rs
@@ -262,27 +262,6 @@ pub enum DaemonMessage {
#[serde(rename = "activeTasks")]
active_tasks: Vec<Uuid>,
},
- /// Enhanced supervisor heartbeat with detailed state
- SupervisorHeartbeat {
- #[serde(rename = "taskId")]
- task_id: Uuid,
- #[serde(rename = "contractId")]
- contract_id: Uuid,
- /// Supervisor state: initializing, idle, working, waiting_for_user, waiting_for_tasks, blocked, completed, failed, interrupted
- state: String,
- /// Current contract phase
- phase: String,
- /// Description of current activity
- #[serde(rename = "currentActivity")]
- current_activity: Option<String>,
- /// Progress percentage (0-100)
- progress: u8,
- /// Task IDs the supervisor is waiting on
- #[serde(rename = "pendingTaskIds")]
- pending_task_ids: Vec<Uuid>,
- /// Timestamp of this heartbeat
- timestamp: DateTime<Utc>,
- },
/// Task output streaming (stdout/stderr from Claude Code)
TaskOutput {
#[serde(rename = "taskId")]
@@ -618,96 +597,6 @@ struct DaemonAuthResult {
/// Automatically create a PR when all non-supervisor tasks for a contract are done.
/// Only applies to remote-repo contracts in the "execute" phase.
/// Fires as a best-effort operation — errors are logged but not propagated.
-async fn auto_create_pr_if_ready(
- pool: &sqlx::PgPool,
- state: &SharedState,
- contract_id: Uuid,
- owner_id: Uuid,
-) {
- // 1. Load contract — must be remote (not local_only) and in execute phase
- let contract = match repository::get_contract_for_owner(pool, contract_id, owner_id).await {
- Ok(Some(c)) => c,
- _ => return,
- };
- if contract.local_only || contract.phase != "execute" {
- return;
- }
-
- // 2. Load non-supervisor tasks — all must be done
- let tasks = match repository::list_tasks_by_contract(pool, contract_id, owner_id).await {
- Ok(t) => t,
- _ => return,
- };
- let non_supervisor_tasks: Vec<_> = tasks.iter().filter(|t| !t.is_supervisor).collect();
- if non_supervisor_tasks.is_empty() || !non_supervisor_tasks.iter().all(|t| t.status == "done") {
- return;
- }
-
- // 3. Check pull-request deliverable not already complete
- let completed_deliverables = contract.get_completed_deliverables(&contract.phase);
- if completed_deliverables.contains(&"pull-request".to_string()) {
- return;
- }
-
- // 4. Check at least one repository has a remote URL
- let repos = match repository::list_contract_repositories(pool, contract_id).await {
- Ok(r) => r,
- _ => return,
- };
- if !repos.iter().any(|r| r.repository_url.is_some()) {
- return;
- }
-
- // 5. Load supervisor task
- let supervisor = match repository::get_contract_supervisor_task(pool, contract_id).await {
- Ok(Some(s)) => s,
- _ => return,
- };
-
- // Need supervisor's daemon_id to send command
- let daemon_id = match supervisor.daemon_id {
- Some(id) => id,
- None => return,
- };
-
- // 6. Construct branch name
- let sanitized_name: String = supervisor
- .name
- .chars()
- .map(|c| if c.is_alphanumeric() || c == '-' || c == '_' { c } else { '-' })
- .collect::<String>()
- .to_lowercase();
- let short_id = &supervisor.id.to_string()[..8];
- let branch = format!("makima/{}-{}", sanitized_name, short_id);
-
- // 7. Send CreatePR command to supervisor's daemon
- let command = DaemonCommand::CreatePR {
- task_id: supervisor.id,
- title: contract.name.clone(),
- body: contract.description.clone(),
- base_branch: supervisor.base_branch.clone(),
- branch,
- };
-
- match state.send_daemon_command(daemon_id, command).await {
- Ok(()) => {
- tracing::info!(
- contract_id = %contract_id,
- supervisor_id = %supervisor.id,
- "Auto-PR: sent CreatePR command to supervisor daemon"
- );
- }
- Err(e) => {
- tracing::warn!(
- contract_id = %contract_id,
- error = %e,
- "Auto-PR: failed to send CreatePR command"
- );
- }
- }
-}
-
-/// Validate an API key and return (user_id, owner_id).
async fn validate_daemon_api_key(pool: &sqlx::PgPool, key: &str) -> Result<DaemonAuthResult, String> {
let key_hash = hash_api_key(key);
@@ -983,83 +872,6 @@ async fn handle_daemon_connection(socket: WebSocket, state: SharedState, auth_re
});
}
}
- Ok(DaemonMessage::SupervisorHeartbeat {
- task_id,
- contract_id,
- state: supervisor_state,
- phase,
- current_activity,
- progress,
- pending_task_ids,
- timestamp: _,
- }) => {
- tracing::debug!(
- task_id = %task_id,
- contract_id = %contract_id,
- state = %supervisor_state,
- phase = %phase,
- progress = progress,
- "Supervisor heartbeat received"
- );
-
- // Store heartbeat in database and update supervisor state (Task 3.3)
- if let Some(ref pool) = state.db_pool {
- let pool = pool.clone();
- let pending_ids = pending_task_ids.clone();
- let activity = current_activity.clone();
- let state_str = supervisor_state.clone();
- let phase_str = phase.clone();
- tokio::spawn(async move {
- // Store the heartbeat record
- if let Err(e) = repository::create_supervisor_heartbeat(
- &pool,
- task_id,
- contract_id,
- &state_str,
- &phase_str,
- activity.as_deref(),
- progress as i32,
- &pending_ids,
- ).await {
- tracing::warn!(
- task_id = %task_id,
- contract_id = %contract_id,
- error = %e,
- "Failed to store supervisor heartbeat"
- );
- }
-
- // Update supervisor_states table (lightweight heartbeat state update - Task 3.3)
- if let Err(e) = repository::update_supervisor_heartbeat_state(
- &pool,
- contract_id,
- &state_str,
- activity.as_deref(),
- progress as i32,
- &pending_ids,
- ).await {
- tracing::debug!(
- contract_id = %contract_id,
- error = %e,
- "Failed to update supervisor state from heartbeat (may not exist yet)"
- );
- }
-
- // Also update the daemon heartbeat
- if let Ok(Some(task)) = repository::get_task(&pool, task_id).await {
- if let Some(daemon_id) = task.daemon_id {
- if let Err(e) = repository::update_daemon_heartbeat(&pool, daemon_id).await {
- tracing::warn!(
- daemon_id = %daemon_id,
- error = %e,
- "Failed to update daemon heartbeat from supervisor"
- );
- }
- }
- }
- });
- }
- }
Ok(DaemonMessage::TaskOutput { task_id, output, is_partial }) => {
// Parse the output line and broadcast structured data
if let Some(notification) = parse_claude_output(task_id, owner_id, &output, is_partial) {
@@ -1120,136 +932,16 @@ async fn handle_daemon_connection(socket: WebSocket, state: SharedState, auth_re
updated_by: "daemon".into(),
});
- // Initialize or restore supervisor_state when supervisor task starts running (Task 3.4)
- if updated_task.is_supervisor && new_status_owned == "running" {
- if let Some(contract_id) = updated_task.contract_id {
- // Check if supervisor state already exists (restoration scenario)
- match repository::get_supervisor_state(&pool, contract_id).await {
- Ok(Some(existing_state)) => {
- // State exists - this is a restoration
- tracing::info!(
- task_id = %task_id,
- contract_id = %contract_id,
- existing_state = %existing_state.state,
- restoration_count = existing_state.restoration_count,
- "Supervisor starting with existing state - restoration in progress"
- );
-
- // Mark as restored (increments restoration_count)
- match repository::mark_supervisor_restored(
- &pool,
- contract_id,
- "daemon_restart",
- ).await {
- Ok(restored_state) => {
- tracing::info!(
- task_id = %task_id,
- contract_id = %contract_id,
- restoration_count = restored_state.restoration_count,
- "Supervisor restoration marked"
- );
-
- // Check for pending questions to re-deliver
- if let Ok(questions) = serde_json::from_value::<Vec<crate::db::models::PendingQuestion>>(
- restored_state.pending_questions.clone()
- ) {
- if !questions.is_empty() {
- tracing::info!(
- contract_id = %contract_id,
- question_count = questions.len(),
- "Pending questions found for re-delivery"
- );
- // Questions will be re-delivered by the supervisor when it restores
- }
- }
- }
- Err(e) => {
- tracing::warn!(
- task_id = %task_id,
- contract_id = %contract_id,
- error = %e,
- "Failed to mark supervisor as restored"
- );
- }
- }
- }
- Ok(None) => {
- // No existing state - fresh start
- // Get contract to get its phase
- match repository::get_contract_for_owner(
- &pool,
- contract_id,
- updated_task.owner_id,
- ).await {
- Ok(Some(contract)) => {
- match repository::upsert_supervisor_state(
- &pool,
- contract_id,
- task_id,
- serde_json::json!([]), // Empty conversation
- &[], // No pending tasks
- &contract.phase,
- ).await {
- Ok(_) => {
- tracing::info!(
- task_id = %task_id,
- contract_id = %contract_id,
- phase = %contract.phase,
- "Initialized fresh supervisor state"
- );
- }
- Err(e) => {
- tracing::warn!(
- task_id = %task_id,
- contract_id = %contract_id,
- error = %e,
- "Failed to initialize supervisor state"
- );
- }
- }
- }
- Ok(None) => {
- tracing::warn!(
- task_id = %task_id,
- contract_id = %contract_id,
- "Contract not found when initializing supervisor state"
- );
- }
- Err(e) => {
- tracing::warn!(
- task_id = %task_id,
- contract_id = %contract_id,
- error = %e,
- "Failed to get contract for supervisor state"
- );
- }
- }
- }
- Err(e) => {
- tracing::warn!(
- task_id = %task_id,
- contract_id = %contract_id,
- error = %e,
- "Failed to check existing supervisor state"
- );
- }
- }
- }
- }
-
// Record history event when task starts running
if new_status_owned == "running" {
let _ = repository::record_history_event(
&pool,
updated_task.owner_id,
- updated_task.contract_id,
Some(task_id),
"task",
Some("started"),
- None,
serde_json::json!({
"name": &updated_task.name,
- "isSupervisor": updated_task.is_supervisor,
}),
).await;
}
@@ -1329,51 +1021,19 @@ async fn handle_daemon_connection(socket: WebSocket, state: SharedState, auth_re
updated_by: "daemon".into(),
});
- // Notify supervisor if this task belongs to a contract
- if let Some(contract_id) = updated_task.contract_id {
- // Don't notify for supervisor tasks (they don't report to themselves)
- if !updated_task.is_supervisor {
- if let Ok(Some(supervisor)) = repository::get_contract_supervisor_task(&pool, contract_id).await {
- // action_directive used to come from
- // compute_action_directive (now removed alongside the
- // LLM module). Passing None preserves the existing
- // supervisor protocol; the auto-PR path below still
- // fires when every task is done.
- state.notify_supervisor_of_task_completion(
- supervisor.id,
- supervisor.daemon_id,
- updated_task.id,
- &updated_task.name,
- &updated_task.status,
- updated_task.progress_summary.as_deref(),
- updated_task.error_message.as_deref(),
- None,
- ).await;
- }
- }
- }
-
- // Auto-create PR if all tasks are done and repo is remote
- if updated_task.status == "done" {
- if let Some(contract_id) = updated_task.contract_id {
- let pool_c = pool.clone();
- let state_c = state.clone();
- tokio::spawn(async move {
- auto_create_pr_if_ready(&pool_c, &state_c, contract_id, owner_id).await;
- });
- }
- }
+ // Supervisor notification + auto-PR removed alongside
+ // legacy contracts. Directive completion is handled
+ // by the directive reconciler.
+ let _ = owner_id;
// Record history event for task completion
let subtype = if updated_task.status == "done" { "completed" } else { "failed" };
let _ = repository::record_history_event(
&pool,
updated_task.owner_id,
- updated_task.contract_id,
Some(task_id),
"task",
Some(subtype),
- None,
serde_json::json!({
"name": &updated_task.name,
"status": &updated_task.status,
@@ -1962,16 +1622,13 @@ async fn handle_daemon_connection(socket: WebSocket, state: SharedState, auth_re
});
// Record history event for checkpoint
- // Get task to get contract_id
if let Ok(Some(task)) = repository::get_task(pool, task_id).await {
let _ = repository::record_history_event(
pool,
task.owner_id,
- task.contract_id,
Some(task_id),
"checkpoint",
Some("created"),
- None,
serde_json::json!({
"checkpointNumber": checkpoint.checkpoint_number,
"commitSha": &sha,
@@ -2103,28 +1760,6 @@ async fn handle_daemon_connection(socket: WebSocket, state: SharedState, auth_re
conflicts: conflicts.clone(),
});
- // On successful merge, notify supervisor to check if all merges complete
- if success {
- if let Some(pool) = state.db_pool.as_ref() {
- if let Ok(Some(task)) = repository::get_task(pool, task_id).await {
- if let Some(contract_id) = task.contract_id {
- if let Ok(Some(supervisor)) = repository::get_contract_supervisor_task(pool, contract_id).await {
- let prompt = format!(
- "[INFO] Merge completed: {}\n\
- Check if all tasks are merged with `makima supervisor tasks`.\n\
- If ready, create PR with `makima supervisor pr`.",
- message
- );
- let _ = state.notify_supervisor(
- supervisor.id,
- supervisor.daemon_id,
- &prompt,
- ).await;
- }
- }
- }
- }
- }
}
Ok(DaemonMessage::PRCreated {
task_id,
@@ -2150,52 +1785,6 @@ async fn handle_daemon_connection(socket: WebSocket, state: SharedState, auth_re
pr_number,
});
- // Notify supervisor of PR result (both success and failure)
- if let Some(pool) = state.db_pool.as_ref() {
- if let Ok(Some(task)) = repository::get_task(pool, task_id).await {
- if let Some(contract_id) = task.contract_id {
- if let Ok(Some(supervisor)) = repository::get_contract_supervisor_task(pool, contract_id).await {
- let prompt = if success {
- // Get contract to determine next action
- let next_action = if let Ok(Some(contract)) = repository::get_contract_for_owner(pool, contract_id, task.owner_id).await {
- match (contract.contract_type.as_str(), contract.phase.as_str()) {
- ("simple", "execute") => {
- "Mark contract complete with `makima supervisor complete`".to_string()
- }
- ("specification", "execute") => {
- "Advance to review phase with `makima supervisor advance-phase review`".to_string()
- }
- _ => "Check contract status with `makima supervisor status`".to_string()
- }
- } else {
- "Check contract status with `makima supervisor status`".to_string()
- };
-
- format!(
- "[ACTION REQUIRED] PR created successfully!\n\
- PR: {}\n\n\
- Next step: {}",
- pr_url.as_deref().unwrap_or(&message),
- next_action
- )
- } else {
- format!(
- "[ERROR] PR creation failed for task {}:\n\
- {}\n\n\
- Please fix the issue and retry with `makima supervisor pr`.",
- task_id,
- message
- )
- };
- let _ = state.notify_supervisor(
- supervisor.id,
- supervisor.daemon_id,
- &prompt,
- ).await;
- }
- }
- }
- }
}
Ok(DaemonMessage::GitConfigInherited {
success,