summaryrefslogtreecommitdiff
path: root/makima/src/server/handlers
diff options
context:
space:
mode:
authorsoryu <soryu@soryu.co>2026-01-16 19:50:27 +0000
committersoryu <soryu@soryu.co>2026-01-17 05:38:07 +0000
commit75d9644d44ba998a32ed14c072e883a75145ab72 (patch)
treeb82dee94632fd40764a92a9b11da24ef21600ed5 /makima/src/server/handlers
parent6b94b5895ed27e3aef052a1843fb3f334397d1b4 (diff)
downloadsoryu-75d9644d44ba998a32ed14c072e883a75145ab72.tar.gz
soryu-75d9644d44ba998a32ed14c072e883a75145ab72.zip
Add autopilot panel and retry system
Diffstat (limited to 'makima/src/server/handlers')
-rw-r--r--makima/src/server/handlers/mesh_daemon.rs83
-rw-r--r--makima/src/server/handlers/mesh_supervisor.rs155
2 files changed, 145 insertions, 93 deletions
diff --git a/makima/src/server/handlers/mesh_daemon.rs b/makima/src/server/handlers/mesh_daemon.rs
index 4bcb5cd..beb676e 100644
--- a/makima/src/server/handlers/mesh_daemon.rs
+++ b/makima/src/server/handlers/mesh_daemon.rs
@@ -20,6 +20,7 @@ use sqlx::Row;
use tokio::sync::mpsc;
use uuid::Uuid;
+use crate::db::models::Task;
use crate::db::repository;
use crate::server::auth::{hash_api_key, API_KEY_HEADER};
use crate::server::messages::ApiError;
@@ -1334,42 +1335,86 @@ async fn handle_daemon_connection(socket: WebSocket, state: SharedState, auth_re
);
}
- // Find tasks assigned to this daemon that are still active
- if let Err(e) = clear_daemon_from_tasks(&pool, daemon_uuid).await {
+ // Find tasks assigned to this daemon and mark for retry or fail permanently
+ if let Err(e) = handle_daemon_disconnect_tasks(&pool, daemon_uuid).await {
tracing::error!(
daemon_id = %daemon_uuid,
error = %e,
- "Failed to clear daemon from tasks on disconnect"
+ "Failed to handle daemon disconnect for tasks"
);
}
});
}
}
-/// Clear daemon_id from tasks when daemon disconnects
-async fn clear_daemon_from_tasks(pool: &sqlx::PgPool, daemon_id: Uuid) -> Result<(), sqlx::Error> {
- // Update tasks that were running on this daemon to failed state
- let result = sqlx::query(
+/// Handle tasks when daemon disconnects - mark for retry or fail permanently.
+async fn handle_daemon_disconnect_tasks(pool: &sqlx::PgPool, daemon_id: Uuid) -> Result<(), sqlx::Error> {
+ // Get all active tasks on this daemon
+ let active_tasks: Vec<Task> = sqlx::query_as(
r#"
- UPDATE tasks
- SET daemon_id = NULL,
- status = 'failed',
- error_message = 'Daemon disconnected',
- updated_at = NOW()
+ SELECT * FROM tasks
WHERE daemon_id = $1
AND status IN ('starting', 'running', 'initializing')
"#,
)
.bind(daemon_id)
- .execute(pool)
+ .fetch_all(pool)
.await?;
- if result.rows_affected() > 0 {
- tracing::warn!(
- daemon_id = %daemon_id,
- tasks_affected = result.rows_affected(),
- "Marked tasks as failed due to daemon disconnect"
- );
+ if active_tasks.is_empty() {
+ return Ok(());
+ }
+
+ tracing::info!(
+ daemon_id = %daemon_id,
+ task_count = active_tasks.len(),
+ "Processing tasks for disconnected daemon"
+ );
+
+ for task in active_tasks {
+ if task.retry_count < task.max_retries {
+ // Mark for retry
+ match repository::mark_task_for_retry(pool, task.id, daemon_id).await {
+ Ok(Some(updated_task)) => {
+ tracing::info!(
+ task_id = %task.id,
+ task_name = %task.name,
+ retry_count = updated_task.retry_count,
+ max_retries = updated_task.max_retries,
+ "Task marked for retry after daemon disconnect"
+ );
+ }
+ Ok(None) => {
+ tracing::warn!(
+ task_id = %task.id,
+ "Task not found or already at max retries"
+ );
+ }
+ Err(e) => {
+ tracing::error!(
+ task_id = %task.id,
+ error = %e,
+ "Failed to mark task for retry"
+ );
+ }
+ }
+ } else {
+ // Exceeded retries, mark as permanently failed
+ if let Err(e) = repository::mark_task_permanently_failed(pool, task.id, daemon_id).await {
+ tracing::error!(
+ task_id = %task.id,
+ error = %e,
+ "Failed to mark task as permanently failed"
+ );
+ } else {
+ tracing::warn!(
+ task_id = %task.id,
+ task_name = %task.name,
+ retry_count = task.retry_count + 1,
+ "Task permanently failed: exceeded maximum retries"
+ );
+ }
+ }
}
Ok(())
diff --git a/makima/src/server/handlers/mesh_supervisor.rs b/makima/src/server/handlers/mesh_supervisor.rs
index 1014fdc..754d086 100644
--- a/makima/src/server/handlers/mesh_supervisor.rs
+++ b/makima/src/server/handlers/mesh_supervisor.rs
@@ -256,6 +256,7 @@ async fn verify_supervisor_auth(
/// Try to start a pending task on an available daemon.
/// Returns Ok(Some(task)) if a task was started, Ok(None) if no tasks could be started.
+/// For retried tasks, excludes daemons that previously failed the task.
async fn try_start_pending_task(
state: &SharedState,
contract_id: Uuid,
@@ -263,7 +264,7 @@ async fn try_start_pending_task(
) -> Result<Option<Task>, String> {
let pool = state.db_pool.as_ref().ok_or("Database not configured")?;
- // Get pending tasks for this contract
+ // Get pending tasks for this contract (includes interrupted tasks awaiting retry)
let pending_tasks = repository::get_pending_tasks_for_contract(pool, contract_id, owner_id)
.await
.map_err(|e| format!("Failed to get pending tasks: {}", e))?;
@@ -272,89 +273,95 @@ async fn try_start_pending_task(
return Ok(None);
}
- // Get available daemons with capacity
- let daemons = repository::get_available_daemons(pool, owner_id)
- .await
- .map_err(|e| format!("Failed to get available daemons: {}", e))?;
-
- // Find a daemon with capacity
- let available_daemon = daemons.iter().find(|d| {
- d.current_task_count < d.max_concurrent_tasks
- && state.daemon_connections.contains_key(&d.connection_id)
- });
+ // Try each pending task until we find one we can start
+ for task in &pending_tasks {
+ // Get excluded daemon IDs for this task (daemons that have already failed it)
+ let exclude_ids: Vec<Uuid> = task.failed_daemon_ids.clone().unwrap_or_default();
- let daemon = match available_daemon {
- Some(d) => d,
- None => return Ok(None), // No daemon with capacity
- };
+ // Get available daemons excluding failed ones for this task
+ let daemons = repository::get_available_daemons_excluding(pool, owner_id, &exclude_ids)
+ .await
+ .map_err(|e| format!("Failed to get available daemons: {}", e))?;
- // Try to start the first pending task
- let task = &pending_tasks[0];
+ // Find a daemon with capacity
+ let available_daemon = daemons.iter().find(|d| {
+ d.current_task_count < d.max_concurrent_tasks
+ && state.daemon_connections.contains_key(&d.connection_id)
+ });
- // Get repo URL from task or contract
- let repo_url = if let Some(url) = &task.repository_url {
- Some(url.clone())
- } else {
- match repository::list_contract_repositories(pool, contract_id).await {
- Ok(repos) => repos
- .iter()
- .find(|r| r.is_primary)
- .or(repos.first())
- .and_then(|r| r.repository_url.clone().or_else(|| r.local_path.clone())),
- Err(_) => None,
- }
- };
+ let daemon = match available_daemon {
+ Some(d) => d,
+ None => continue, // Try next task
+ };
- // Update task with daemon assignment
- let update_req = UpdateTaskRequest {
- status: Some("starting".to_string()),
- daemon_id: Some(daemon.id),
- version: Some(task.version),
- ..Default::default()
- };
+ // Get repo URL from task or contract
+ let repo_url = if let Some(url) = &task.repository_url {
+ Some(url.clone())
+ } else {
+ match repository::list_contract_repositories(pool, contract_id).await {
+ Ok(repos) => repos
+ .iter()
+ .find(|r| r.is_primary)
+ .or(repos.first())
+ .and_then(|r| r.repository_url.clone().or_else(|| r.local_path.clone())),
+ Err(_) => None,
+ }
+ };
- let updated_task = match repository::update_task_for_owner(pool, task.id, owner_id, update_req).await {
- Ok(Some(t)) => t,
- Ok(None) => return Ok(None),
- Err(e) => {
- tracing::warn!(task_id = %task.id, error = %e, "Failed to update task for daemon assignment");
- return Ok(None);
- }
- };
+ // Update task with daemon assignment
+ let update_req = UpdateTaskRequest {
+ status: Some("starting".to_string()),
+ daemon_id: Some(daemon.id),
+ version: Some(task.version),
+ ..Default::default()
+ };
- // Send spawn command
- let cmd = DaemonCommand::SpawnTask {
- task_id: updated_task.id,
- task_name: updated_task.name.clone(),
- plan: updated_task.plan.clone(),
- repo_url,
- base_branch: updated_task.base_branch.clone(),
- target_branch: updated_task.target_branch.clone(),
- parent_task_id: updated_task.parent_task_id,
- depth: updated_task.depth,
- is_orchestrator: false,
- target_repo_path: updated_task.target_repo_path.clone(),
- completion_action: updated_task.completion_action.clone(),
- continue_from_task_id: updated_task.continue_from_task_id,
- copy_files: updated_task.copy_files.as_ref().and_then(|v| serde_json::from_value(v.clone()).ok()),
- contract_id: updated_task.contract_id,
- is_supervisor: false,
- };
+ let updated_task = match repository::update_task_for_owner(pool, task.id, owner_id, update_req).await {
+ Ok(Some(t)) => t,
+ Ok(None) => continue, // Task was modified concurrently, try next
+ Err(e) => {
+ tracing::warn!(task_id = %task.id, error = %e, "Failed to update task for daemon assignment");
+ continue; // Try next task
+ }
+ };
- if let Err(e) = state.send_daemon_command(daemon.id, cmd).await {
- tracing::warn!(error = %e, daemon_id = %daemon.id, task_id = %task.id, "Failed to send spawn command");
- // Rollback
- let rollback_req = UpdateTaskRequest {
- status: Some("pending".to_string()),
- clear_daemon_id: true,
- ..Default::default()
+ // Send spawn command
+ let cmd = DaemonCommand::SpawnTask {
+ task_id: updated_task.id,
+ task_name: updated_task.name.clone(),
+ plan: updated_task.plan.clone(),
+ repo_url,
+ base_branch: updated_task.base_branch.clone(),
+ target_branch: updated_task.target_branch.clone(),
+ parent_task_id: updated_task.parent_task_id,
+ depth: updated_task.depth,
+ is_orchestrator: false,
+ target_repo_path: updated_task.target_repo_path.clone(),
+ completion_action: updated_task.completion_action.clone(),
+ continue_from_task_id: updated_task.continue_from_task_id,
+ copy_files: updated_task.copy_files.as_ref().and_then(|v| serde_json::from_value(v.clone()).ok()),
+ contract_id: updated_task.contract_id,
+ is_supervisor: false,
};
- let _ = repository::update_task_for_owner(pool, task.id, owner_id, rollback_req).await;
- return Ok(None);
+
+ if let Err(e) = state.send_daemon_command(daemon.id, cmd).await {
+ tracing::warn!(error = %e, daemon_id = %daemon.id, task_id = %task.id, "Failed to send spawn command");
+ // Rollback
+ let rollback_req = UpdateTaskRequest {
+ status: Some("pending".to_string()),
+ clear_daemon_id: true,
+ ..Default::default()
+ };
+ let _ = repository::update_task_for_owner(pool, task.id, owner_id, rollback_req).await;
+ continue; // Try next task
+ }
+
+ tracing::info!(task_id = %task.id, daemon_id = %daemon.id, "Started pending task from wait loop");
+ return Ok(Some(updated_task));
}
- tracing::info!(task_id = %task.id, daemon_id = %daemon.id, "Started pending task from wait loop");
- Ok(Some(updated_task))
+ // No tasks could be started
+ Ok(None)
}
// =============================================================================