summaryrefslogtreecommitdiff
path: root/makima/src/server/handlers/mesh.rs
diff options
context:
space:
mode:
Diffstat (limited to 'makima/src/server/handlers/mesh.rs')
-rw-r--r--makima/src/server/handlers/mesh.rs247
1 files changed, 26 insertions, 221 deletions
diff --git a/makima/src/server/handlers/mesh.rs b/makima/src/server/handlers/mesh.rs
index be5387e..6ba4c8b 100644
--- a/makima/src/server/handlers/mesh.rs
+++ b/makima/src/server/handlers/mesh.rs
@@ -274,35 +274,16 @@ pub async fn create_task(
let _ = repository::record_history_event(
pool,
auth.owner_id,
- task.contract_id,
Some(task.id),
"task",
Some("created"),
- None,
serde_json::json!({
"name": &task.name,
- "isSupervisor": task.is_supervisor,
}),
).await;
- // Notify supervisor of new task creation if task belongs to a contract
- if let Some(contract_id) = task.contract_id {
- if !task.is_supervisor {
- let pool = pool.clone();
- let state_clone = state.clone();
- let task_clone = task.clone();
- tokio::spawn(async move {
- if let Ok(Some(supervisor)) = repository::get_contract_supervisor_task(&pool, contract_id).await {
- state_clone.notify_supervisor_of_task_created(
- supervisor.id,
- supervisor.daemon_id,
- task_clone.id,
- &task_clone.name,
- ).await;
- }
- });
- }
- }
+ // Supervisor notification on task creation removed alongside
+ // legacy contracts.
(StatusCode::CREATED, Json(task)).into_response()
}
Err(e) => {
@@ -352,26 +333,6 @@ pub async fn update_task(
.into_response();
};
- // Check if trying to set a supervisor task to a terminal status
- if let Some(ref new_status) = req.status {
- let terminal_statuses = ["done", "failed", "merged"];
- if terminal_statuses.contains(&new_status.as_str()) {
- // Get the task to check if it's a supervisor
- if let Ok(Some(task)) = repository::get_task_for_owner(pool, id, auth.owner_id).await {
- if task.is_supervisor {
- return (
- StatusCode::BAD_REQUEST,
- Json(ApiError::new(
- "SUPERVISOR_CANNOT_COMPLETE",
- "Supervisor tasks cannot be marked as done, failed, or merged. They run for the lifetime of the contract.",
- )),
- )
- .into_response();
- }
- }
- }
- }
-
// Track which fields are being updated for the notification
let mut updated_fields = Vec::new();
if req.name.is_some() {
@@ -410,26 +371,9 @@ pub async fn update_task(
updated_by: "user".to_string(),
});
- // Notify supervisor of status change if task belongs to a contract
- if let Some(contract_id) = task.contract_id {
- if !task.is_supervisor && updated_fields_clone.contains(&"status".to_string()) {
- let pool = pool.clone();
- let state_clone = state.clone();
- let task_clone = task.clone();
- tokio::spawn(async move {
- if let Ok(Some(supervisor)) = repository::get_contract_supervisor_task(&pool, contract_id).await {
- state_clone.notify_supervisor_of_task_update(
- supervisor.id,
- supervisor.daemon_id,
- task_clone.id,
- &task_clone.name,
- &task_clone.status,
- &updated_fields_clone,
- ).await;
- }
- });
- }
- }
+ // Supervisor notification on task update removed alongside
+ // legacy contracts.
+ let _ = updated_fields_clone;
Json(task).into_response()
}
@@ -657,15 +601,10 @@ pub async fn start_task(
.into_response();
}
- // Get local_only and auto_merge_local flags from contract if task has one
- let (local_only, auto_merge_local) = if let Some(contract_id) = task.contract_id {
- match repository::get_contract_for_owner(pool, contract_id, auth.owner_id).await {
- Ok(Some(contract)) => (contract.local_only, contract.auto_merge_local),
- _ => (false, false),
- }
- } else {
- (false, false)
- };
+ // local_only / auto_merge_local used to come from the parent contract.
+ // With legacy contracts removed they default to false; the directive
+ // lifecycle handles its own completion now.
+ let (local_only, auto_merge_local) = (false, false);
// Get list of daemons that have previously failed this task
let mut exclude_daemon_ids: Vec<Uuid> = task.failed_daemon_ids.clone().unwrap_or_default();
@@ -708,8 +647,7 @@ pub async fn start_task(
task_depth = task.depth,
subtask_count = subtask_count,
is_orchestrator = is_orchestrator,
- is_supervisor = task.is_supervisor,
- "Starting task with orchestrator/supervisor determination"
+ "Starting task"
);
// IMPORTANT: Update database FIRST to assign daemon_id before sending command
@@ -755,8 +693,6 @@ pub async fn start_task(
completion_action: task.completion_action.clone(),
continue_from_task_id: task.continue_from_task_id,
copy_files: task.copy_files.as_ref().and_then(|v| serde_json::from_value(v.clone()).ok()),
- contract_id: task.contract_id,
- is_supervisor: task.is_supervisor,
autonomous_loop: false,
resume_session: false,
conversation_history: None,
@@ -764,13 +700,11 @@ pub async fn start_task(
patch_base_sha: None,
local_only,
auto_merge_local,
- supervisor_worktree_task_id: None, // Not spawned by supervisor
directive_id: task.directive_id,
};
tracing::info!(
task_id = %id,
- is_supervisor = task.is_supervisor,
is_orchestrator = is_orchestrator,
daemon_id = %target_daemon_id,
"Sending SpawnTask command to daemon"
@@ -811,8 +745,6 @@ pub async fn start_task(
completion_action: task.completion_action.clone(),
continue_from_task_id: task.continue_from_task_id,
copy_files: task.copy_files.as_ref().and_then(|v| serde_json::from_value(v.clone()).ok()),
- contract_id: task.contract_id,
- is_supervisor: task.is_supervisor,
autonomous_loop: false,
resume_session: false,
conversation_history: None,
@@ -820,7 +752,6 @@ pub async fn start_task(
patch_base_sha: None,
local_only,
auto_merge_local,
- supervisor_worktree_task_id: None, // Not spawned by supervisor
directive_id: task.directive_id,
};
@@ -1128,11 +1059,10 @@ pub async fn send_message(
// Check if task is in a state that can receive messages
// Allow "running" and "starting" (to handle race between status update and message send)
- // Also allow AUTH_CODE messages and supervisor tasks regardless of status
+ // Also allow AUTH_CODE messages regardless of status
let is_auth_code = req.message.starts_with("AUTH_CODE:");
- let is_supervisor = task.is_supervisor;
let can_receive_message = task.status == "running" || task.status == "starting";
- if !can_receive_message && !is_auth_code && !is_supervisor {
+ if !can_receive_message && !is_auth_code {
return (
StatusCode::BAD_REQUEST,
Json(ApiError::new(
@@ -1147,27 +1077,8 @@ pub async fn send_message(
}
// Find the daemon running this task
- // For supervisors, if no daemon is assigned, find any available daemon for this owner
let target_daemon_id = if let Some(daemon_id) = task.daemon_id {
daemon_id
- } else if is_supervisor {
- // Supervisor without daemon - find one
- match state.daemon_connections
- .iter()
- .find(|d| d.value().owner_id == auth.owner_id)
- {
- Some(entry) => entry.value().id,
- None => {
- return (
- StatusCode::SERVICE_UNAVAILABLE,
- Json(ApiError::new(
- "NO_DAEMON",
- "No daemon available. Please start a daemon.",
- )),
- )
- .into_response();
- }
- }
} else {
return (
StatusCode::SERVICE_UNAVAILABLE,
@@ -1206,15 +1117,7 @@ pub async fn send_message(
};
if let Ok(Some(updated_task)) = repository::update_task_for_owner(pool, id, auth.owner_id, update_req).await {
- // Get local_only and auto_merge_local from contract if task has one
- let (local_only, auto_merge_local) = if let Some(contract_id) = updated_task.contract_id {
- match repository::get_contract_for_owner(pool, contract_id, auth.owner_id).await {
- Ok(Some(contract)) => (contract.local_only, contract.auto_merge_local),
- _ => (false, false),
- }
- } else {
- (false, false)
- };
+ let (local_only, auto_merge_local) = (false, false);
// Send spawn command to new daemon
let spawn_cmd = DaemonCommand::SpawnTask {
@@ -1231,8 +1134,6 @@ pub async fn send_message(
completion_action: updated_task.completion_action.clone(),
continue_from_task_id: updated_task.continue_from_task_id,
copy_files: updated_task.copy_files.as_ref().and_then(|v| serde_json::from_value(v.clone()).ok()),
- contract_id: updated_task.contract_id,
- is_supervisor: updated_task.is_supervisor,
autonomous_loop: false,
resume_session: false,
conversation_history: None,
@@ -1240,7 +1141,6 @@ pub async fn send_message(
patch_base_sha: None,
local_only,
auto_merge_local,
- supervisor_worktree_task_id: None, // Not spawned by supervisor
directive_id: updated_task.directive_id,
};
@@ -2433,13 +2333,11 @@ pub async fn commit_worktree(
// Task Patches
// =============================================================================
-/// Query parameters for listing task patches
+/// Query parameters for listing task patches (legacy contract scope
+/// removed; query is currently empty).
#[derive(Debug, serde::Deserialize)]
#[serde(rename_all = "camelCase")]
-pub struct ListPatchesQuery {
- /// Contract ID to scope the patches
- pub contract_id: Uuid,
-}
+pub struct ListPatchesQuery {}
/// Patch summary for API response
#[derive(Debug, serde::Serialize, utoipa::ToSchema)]
@@ -2453,8 +2351,6 @@ pub struct PatchSummary {
pub description: Option<String>,
/// Task ID
pub task_id: Uuid,
- /// Contract ID
- pub contract_id: Uuid,
/// Number of files in the patch
pub files_count: i32,
/// Total lines added (estimated from patch size)
@@ -2523,14 +2419,8 @@ pub async fn list_task_patches(
}
};
- // Verify task belongs to the specified contract
- if task.contract_id != Some(query.contract_id) {
- return (
- StatusCode::BAD_REQUEST,
- Json(ApiError::new("INVALID_CONTRACT", "Task does not belong to the specified contract")),
- )
- .into_response();
- }
+ // Legacy contract verification removed; checkpoint patches are
+ // accessible to any owner of the task.
// Get checkpoint patches for this task
let patches = match repository::list_checkpoint_patches(pool, id).await {
@@ -2586,7 +2476,6 @@ pub async fn list_task_patches(
name,
description,
task_id: p.task_id,
- contract_id: query.contract_id,
files_count: p.files_count,
lines_added,
lines_removed,
@@ -3040,12 +2929,10 @@ pub async fn reassign_task(
// Create a NEW task with the conversation context
let create_req = CreateTaskRequest {
- contract_id: task.contract_id,
name: format!("{} (resumed)", task.name),
description: task.description.clone(),
plan: updated_plan.clone(),
parent_task_id: task.parent_task_id,
- is_supervisor: task.is_supervisor,
priority: task.priority,
repository_url: task.repository_url.clone(),
base_branch: task.base_branch.clone(),
@@ -3058,7 +2945,6 @@ pub async fn reassign_task(
checkpoint_sha: task.last_checkpoint_sha.clone(),
branched_from_task_id: None,
conversation_history: None,
- supervisor_worktree_task_id: None, // Not spawned by supervisor
directive_id: None,
directive_step_id: None,
};
@@ -3126,15 +3012,8 @@ pub async fn reassign_task(
}
};
- // Get local_only and auto_merge_local from contract if task has one
- let (local_only, auto_merge_local) = if let Some(contract_id) = task.contract_id {
- match repository::get_contract_for_owner(pool, contract_id, auth.owner_id).await {
- Ok(Some(contract)) => (contract.local_only, contract.auto_merge_local),
- _ => (false, false),
- }
- } else {
- (false, false)
- };
+ // Legacy contract scope removed; defaults to false.
+ let (local_only, auto_merge_local) = (false, false);
// Send SpawnTask command to daemon for the new task
let command = DaemonCommand::SpawnTask {
@@ -3151,8 +3030,6 @@ pub async fn reassign_task(
completion_action: task.completion_action.clone(),
continue_from_task_id: Some(id), // Continue from old task's worktree
copy_files: None,
- contract_id: task.contract_id,
- is_supervisor: task.is_supervisor,
autonomous_loop: false,
resume_session: false,
conversation_history: None,
@@ -3160,7 +3037,6 @@ pub async fn reassign_task(
patch_base_sha,
local_only,
auto_merge_local,
- supervisor_worktree_task_id: None, // Not spawned by supervisor
directive_id: task.directive_id,
};
@@ -3190,56 +3066,10 @@ pub async fn reassign_task(
// Don't fail the request, the new task is already running
}
- // Notify the contract's supervisor about the reassignment (if applicable)
- if let Some(contract_id) = task.contract_id {
- if let Ok(Some(contract)) = repository::get_contract_for_owner(pool, contract_id, auth.owner_id).await {
- if let Some(supervisor_task_id) = contract.supervisor_task_id {
- // Don't notify if we're reassigning the supervisor itself
- if supervisor_task_id != old_task_id {
- // Find the supervisor's daemon and send a message
- if let Ok(Some(supervisor_task)) = repository::get_task_for_owner(pool, supervisor_task_id, auth.owner_id).await {
- if supervisor_task.status == "running" {
- if let Some(supervisor_daemon_id) = supervisor_task.daemon_id {
- // Find the daemon by its UUID
- if let Some(daemon_entry) = state.daemon_connections.iter().find(|d| d.value().id == supervisor_daemon_id) {
- let notification_msg = format!(
- "\n\n[SYSTEM NOTIFICATION] Task '{}' (ID: {}) was reassigned due to daemon disconnect. \
- A new task '{}' (ID: {}) has been created to continue the work. \
- The new task has {} context entries from the previous conversation.\n\n",
- task.name,
- old_task_id,
- final_task.name,
- new_task.id,
- context_entries
- );
-
- let notify_cmd = DaemonCommand::SendMessage {
- task_id: supervisor_task_id,
- message: notification_msg,
- };
-
- if let Err(e) = state.send_daemon_command(daemon_entry.value().id, notify_cmd).await {
- tracing::warn!(
- supervisor_id = %supervisor_task_id,
- error = %e,
- "Failed to notify supervisor about task reassignment"
- );
- } else {
- tracing::info!(
- supervisor_id = %supervisor_task_id,
- old_task_id = %old_task_id,
- new_task_id = %new_task.id,
- "Notified supervisor about task reassignment"
- );
- }
- }
- }
- }
- }
- }
- }
- }
- }
+ // Supervisor reassignment notification removed alongside legacy
+ // contracts. The directive reconciler picks up reassigned tasks on
+ // its next tick.
+ let _ = context_entries;
// Broadcast task update for the new task
state.broadcast_task_update(TaskUpdateNotification {
@@ -3467,15 +3297,8 @@ pub async fn continue_task(
};
let is_orchestrator = task.depth == 0 && subtask_count > 0;
- // Get local_only and auto_merge_local from contract if task has one
- let (local_only, auto_merge_local) = if let Some(contract_id) = task.contract_id {
- match repository::get_contract_for_owner(pool, contract_id, auth.owner_id).await {
- Ok(Some(contract)) => (contract.local_only, contract.auto_merge_local),
- _ => (false, false),
- }
- } else {
- (false, false)
- };
+ // Legacy contract scope removed; defaults to false.
+ let (local_only, auto_merge_local) = (false, false);
// Send SpawnTask command to daemon
let command = DaemonCommand::SpawnTask {
@@ -3492,8 +3315,6 @@ pub async fn continue_task(
completion_action: task.completion_action.clone(),
continue_from_task_id: task.continue_from_task_id,
copy_files: task.copy_files.as_ref().and_then(|v| serde_json::from_value(v.clone()).ok()),
- contract_id: task.contract_id,
- is_supervisor: task.is_supervisor,
autonomous_loop: false,
resume_session: false,
conversation_history: None,
@@ -3501,7 +3322,6 @@ pub async fn continue_task(
patch_base_sha: None,
local_only,
auto_merge_local,
- supervisor_worktree_task_id: None, // Not spawned by supervisor
directive_id: task.directive_id,
};
@@ -3509,7 +3329,6 @@ pub async fn continue_task(
task_id = %id,
daemon_id = %target_daemon_id,
context_entries = context_entries,
- is_supervisor = task.is_supervisor,
"Continuing task with conversation context"
);
@@ -3820,12 +3639,10 @@ pub async fn fork_task(
// Create the new forked task
let create_req = CreateTaskRequest {
- contract_id: task.contract_id,
name: req.new_task_name.clone(),
description: task.description.clone(),
plan: req.new_task_plan.clone(),
parent_task_id: None, // Forked tasks are independent
- is_supervisor: false,
priority: task.priority,
repository_url: task.repository_url.clone(),
base_branch: task.base_branch.clone(),
@@ -3838,7 +3655,6 @@ pub async fn fork_task(
checkpoint_sha: Some(checkpoint.commit_sha.clone()),
branched_from_task_id: None,
conversation_history: None,
- supervisor_worktree_task_id: None, // Not spawned by supervisor
directive_id: None,
directive_step_id: None,
};
@@ -3980,12 +3796,10 @@ pub async fn resume_from_checkpoint(
});
let create_req = CreateTaskRequest {
- contract_id: task.contract_id,
name: task_name,
description: task.description.clone(),
plan: req.plan,
parent_task_id: None,
- is_supervisor: false,
priority: task.priority,
repository_url: task.repository_url.clone(),
base_branch: task.base_branch.clone(),
@@ -3998,7 +3812,6 @@ pub async fn resume_from_checkpoint(
checkpoint_sha: Some(checkpoint.commit_sha.clone()),
branched_from_task_id: None,
conversation_history: None,
- supervisor_worktree_task_id: None, // Not spawned by supervisor
directive_id: None,
directive_step_id: None,
};
@@ -4318,12 +4131,10 @@ pub async fn branch_task(
// Create the branched task (anonymous - no contract_id)
let create_req = CreateTaskRequest {
- contract_id: None, // Anonymous task
name: task_name,
description: Some(format!("Branched from task: {}", source_task.name)),
plan: req.message,
parent_task_id: None,
- is_supervisor: false,
priority: source_task.priority,
repository_url: source_task.repository_url.clone(),
base_branch: source_task.base_branch.clone(),
@@ -4336,7 +4147,6 @@ pub async fn branch_task(
checkpoint_sha: None,
branched_from_task_id: Some(source_task_id),
conversation_history,
- supervisor_worktree_task_id: None, // Not spawned by supervisor
directive_id: None,
directive_step_id: None,
};
@@ -4357,11 +4167,9 @@ pub async fn branch_task(
let _ = repository::record_history_event(
pool,
auth.owner_id,
- None, // No contract for anonymous tasks
Some(task.id),
"task",
Some("branched"),
- None,
serde_json::json!({
"name": &task.name,
"sourceTaskId": source_task_id,
@@ -4425,8 +4233,6 @@ pub async fn branch_task(
completion_action: updated_task.completion_action.clone(),
continue_from_task_id: updated_task.continue_from_task_id,
copy_files: None,
- contract_id: None,
- is_supervisor: false,
autonomous_loop: false,
resume_session: message_count > 0, // Resume if we have conversation history
conversation_history: updated_task.conversation_state.clone(),
@@ -4434,7 +4240,6 @@ pub async fn branch_task(
patch_base_sha,
local_only: false, // No contract, so not local_only
auto_merge_local: false, // No contract, so no auto_merge_local
- supervisor_worktree_task_id: None, // Not spawned by supervisor
directive_id: None,
};