summaryrefslogtreecommitdiff
path: root/makima/src/server
diff options
context:
space:
mode:
Diffstat (limited to 'makima/src/server')
-rw-r--r--makima/src/server/handlers/contract_chat.rs4
-rw-r--r--makima/src/server/handlers/contracts.rs1
-rw-r--r--makima/src/server/handlers/mesh.rs33
-rw-r--r--makima/src/server/handlers/mesh_chat.rs1
-rw-r--r--makima/src/server/handlers/mesh_daemon.rs117
-rw-r--r--makima/src/server/handlers/mesh_supervisor.rs40
-rw-r--r--makima/src/server/handlers/transcript_analysis.rs2
-rw-r--r--makima/src/server/mod.rs61
8 files changed, 252 insertions, 7 deletions
diff --git a/makima/src/server/handlers/contract_chat.rs b/makima/src/server/handlers/contract_chat.rs
index e2adb72..86f9500 100644
--- a/makima/src/server/handlers/contract_chat.rs
+++ b/makima/src/server/handlers/contract_chat.rs
@@ -1374,6 +1374,7 @@ async fn handle_contract_request(
checkpoint_sha: None,
branched_from_task_id: None,
conversation_history: None,
+ depends_on: None,
};
match repository::create_task_for_owner(pool, owner_id, create_req).await {
@@ -1470,6 +1471,7 @@ async fn handle_contract_request(
checkpoint_sha: None,
branched_from_task_id: None,
conversation_history: None,
+ depends_on: None,
};
match repository::create_task_for_owner(pool, owner_id, create_req).await {
@@ -2079,6 +2081,7 @@ async fn handle_contract_request(
checkpoint_sha: None,
branched_from_task_id: None,
conversation_history: None,
+ depends_on: None,
};
match repository::create_task_for_owner(pool, owner_id, create_req).await {
@@ -2595,6 +2598,7 @@ async fn handle_contract_request(
checkpoint_sha: None,
branched_from_task_id: None,
conversation_history: None,
+ depends_on: None,
};
if repository::create_task_for_owner(pool, owner_id, task_req).await.is_ok() {
diff --git a/makima/src/server/handlers/contracts.rs b/makima/src/server/handlers/contracts.rs
index 462b385..d9a47a2 100644
--- a/makima/src/server/handlers/contracts.rs
+++ b/makima/src/server/handlers/contracts.rs
@@ -298,6 +298,7 @@ pub async fn create_contract(
merge_mode: None,
branched_from_task_id: None,
conversation_history: None,
+ depends_on: None,
};
match repository::create_task_for_owner(pool, auth.owner_id, supervisor_req).await {
diff --git a/makima/src/server/handlers/mesh.rs b/makima/src/server/handlers/mesh.rs
index 240e1f7..f89f067 100644
--- a/makima/src/server/handlers/mesh.rs
+++ b/makima/src/server/handlers/mesh.rs
@@ -6,6 +6,7 @@ use axum::{
response::IntoResponse,
Json,
};
+use base64::Engine;
use uuid::Uuid;
use crate::db::models::{
@@ -2224,6 +2225,7 @@ pub async fn reassign_task(
checkpoint_sha: task.last_checkpoint_sha.clone(),
branched_from_task_id: None,
conversation_history: None,
+ depends_on: None,
};
let new_task = match repository::create_task_for_owner(pool, auth.owner_id, create_req).await {
@@ -2265,6 +2267,30 @@ pub async fn reassign_task(
}
};
+ // Fetch latest checkpoint patch for worktree recovery during reassignment
+ let (patch_data, patch_base_sha) = match repository::get_latest_checkpoint_patch(pool, id).await {
+ Ok(Some(patch)) => {
+ tracing::info!(
+ old_task_id = %id,
+ new_task_id = %new_task.id,
+ patch_size = patch.patch_size_bytes,
+ base_sha = %patch.base_commit_sha,
+ files_count = patch.files_count,
+ "Including checkpoint patch for task reassignment recovery"
+ );
+ let encoded = base64::engine::general_purpose::STANDARD.encode(&patch.patch_data);
+ (Some(encoded), Some(patch.base_commit_sha))
+ }
+ Ok(None) => {
+ tracing::debug!(old_task_id = %id, "No checkpoint patch found for reassignment");
+ (None, None)
+ }
+ Err(e) => {
+ tracing::warn!(old_task_id = %id, error = %e, "Failed to fetch checkpoint patch for reassignment");
+ (None, None)
+ }
+ };
+
// Send SpawnTask command to daemon for the new task
let command = DaemonCommand::SpawnTask {
task_id: new_task.id,
@@ -2285,8 +2311,8 @@ pub async fn reassign_task(
autonomous_loop: false,
resume_session: false,
conversation_history: None,
- patch_data: None,
- patch_base_sha: None,
+ patch_data,
+ patch_base_sha,
};
tracing::info!(
@@ -2949,6 +2975,7 @@ pub async fn fork_task(
checkpoint_sha: Some(checkpoint.commit_sha.clone()),
branched_from_task_id: None,
conversation_history: None,
+ depends_on: None,
};
let new_task = match repository::create_task_for_owner(pool, auth.owner_id, create_req).await {
@@ -3106,6 +3133,7 @@ pub async fn resume_from_checkpoint(
checkpoint_sha: Some(checkpoint.commit_sha.clone()),
branched_from_task_id: None,
conversation_history: None,
+ depends_on: None,
};
let new_task = match repository::create_task_for_owner(pool, auth.owner_id, create_req).await {
@@ -3441,6 +3469,7 @@ pub async fn branch_task(
checkpoint_sha: None,
branched_from_task_id: Some(source_task_id),
conversation_history,
+ depends_on: None,
};
let task = match repository::create_task_for_owner(pool, auth.owner_id, create_req).await {
diff --git a/makima/src/server/handlers/mesh_chat.rs b/makima/src/server/handlers/mesh_chat.rs
index 1ff0724..157bad0 100644
--- a/makima/src/server/handlers/mesh_chat.rs
+++ b/makima/src/server/handlers/mesh_chat.rs
@@ -1020,6 +1020,7 @@ async fn handle_mesh_request(
checkpoint_sha: None,
branched_from_task_id: None,
conversation_history: None,
+ depends_on: None,
};
match repository::create_task_for_owner(pool, owner_id, create_req).await {
diff --git a/makima/src/server/handlers/mesh_daemon.rs b/makima/src/server/handlers/mesh_daemon.rs
index 65db373..53ee806 100644
--- a/makima/src/server/handlers/mesh_daemon.rs
+++ b/makima/src/server/handlers/mesh_daemon.rs
@@ -291,6 +291,19 @@ pub enum DaemonMessage {
success: bool,
error: Option<String>,
},
+ /// Task recovery detected after daemon restart
+ TaskRecoveryDetected {
+ #[serde(rename = "taskId")]
+ task_id: Uuid,
+ #[serde(rename = "previousState")]
+ previous_state: String,
+ #[serde(rename = "worktreeIntact")]
+ worktree_intact: bool,
+ #[serde(rename = "worktreePath")]
+ worktree_path: Option<String>,
+ #[serde(rename = "needsPatch")]
+ needs_patch: bool,
+ },
/// Register a tool key for orchestrator API access
RegisterToolKey {
#[serde(rename = "taskId")]
@@ -990,6 +1003,110 @@ async fn handle_daemon_connection(socket: WebSocket, state: SharedState, auth_re
});
}
}
+ Ok(DaemonMessage::TaskRecoveryDetected {
+ task_id,
+ previous_state,
+ worktree_intact,
+ worktree_path,
+ needs_patch,
+ }) => {
+ tracing::info!(
+ task_id = %task_id,
+ previous_state = %previous_state,
+ worktree_intact = worktree_intact,
+ worktree_path = ?worktree_path,
+ needs_patch = needs_patch,
+ "Task recovery detected after daemon restart"
+ );
+
+ // Update task in database based on recovery state
+ if let Some(ref pool) = state.db_pool {
+ let pool = pool.clone();
+ let state = state.clone();
+ tokio::spawn(async move {
+ if worktree_intact {
+ // Worktree exists - task can be resumed on this daemon
+ // Update task status to 'pending' so it can be picked up
+ match sqlx::query(
+ r#"
+ UPDATE tasks
+ SET status = 'pending',
+ daemon_id = NULL,
+ error_message = 'Daemon restarted - task ready for resumption',
+ interrupted_at = NOW(),
+ updated_at = NOW()
+ WHERE id = $1 AND owner_id = $2
+ RETURNING id
+ "#,
+ )
+ .bind(task_id)
+ .bind(owner_id)
+ .fetch_optional(&pool)
+ .await
+ {
+ Ok(Some(_)) => {
+ tracing::info!(
+ task_id = %task_id,
+ "Task marked as pending for resumption"
+ );
+ state.broadcast_task_update(TaskUpdateNotification {
+ task_id,
+ owner_id: Some(owner_id),
+ version: 0,
+ status: "pending".into(),
+ updated_fields: vec![
+ "status".into(),
+ "daemon_id".into(),
+ "interrupted_at".into(),
+ ],
+ updated_by: "daemon_recovery".into(),
+ });
+ }
+ Ok(None) => {
+ tracing::warn!(
+ task_id = %task_id,
+ "Task not found during recovery update"
+ );
+ }
+ Err(e) => {
+ tracing::error!(
+ task_id = %task_id,
+ error = %e,
+ "Failed to update task during recovery"
+ );
+ }
+ }
+ } else {
+ // Worktree missing - mark for retry with patch restoration
+ match repository::mark_task_for_retry(
+ &pool,
+ task_id,
+ daemon_uuid, // Mark this daemon as failed
+ ).await {
+ Ok(Some(_)) => {
+ tracing::info!(
+ task_id = %task_id,
+ "Task marked for retry (worktree missing)"
+ );
+ }
+ Ok(None) => {
+ tracing::warn!(
+ task_id = %task_id,
+ "Task not found or exceeded retries"
+ );
+ }
+ Err(e) => {
+ tracing::error!(
+ task_id = %task_id,
+ error = %e,
+ "Failed to mark task for retry"
+ );
+ }
+ }
+ }
+ });
+ }
+ }
Ok(DaemonMessage::Authenticate { .. }) => {
// Already authenticated, ignore
}
diff --git a/makima/src/server/handlers/mesh_supervisor.rs b/makima/src/server/handlers/mesh_supervisor.rs
index 21c9515..3d85c45 100644
--- a/makima/src/server/handlers/mesh_supervisor.rs
+++ b/makima/src/server/handlers/mesh_supervisor.rs
@@ -279,8 +279,9 @@ async fn verify_supervisor_auth(
/// Try to start a pending task on an available daemon.
/// Returns Ok(Some(task)) if a task was started, Ok(None) if no tasks could be started.
-/// For retried tasks, excludes daemons that previously failed the task.
-async fn try_start_pending_task(
+/// For retried tasks, excludes daemons that previously failed the task and includes
+/// checkpoint patch data for worktree recovery.
+pub async fn try_start_pending_task(
state: &SharedState,
contract_id: Uuid,
owner_id: Uuid,
@@ -348,6 +349,34 @@ async fn try_start_pending_task(
}
};
+ // For retried tasks, fetch checkpoint patch for worktree recovery
+ let (patch_data, patch_base_sha) = if task.retry_count > 0 {
+ // This is a retry - try to restore from checkpoint
+ match repository::get_latest_checkpoint_patch(pool, task.id).await {
+ Ok(Some(patch)) => {
+ tracing::info!(
+ task_id = %task.id,
+ retry_count = task.retry_count,
+ patch_size = patch.patch_size_bytes,
+ base_sha = %patch.base_commit_sha,
+ "Including checkpoint patch for task retry recovery"
+ );
+ let encoded = base64::engine::general_purpose::STANDARD.encode(&patch.patch_data);
+ (Some(encoded), Some(patch.base_commit_sha))
+ }
+ Ok(None) => {
+ tracing::debug!(task_id = %task.id, "No checkpoint patch found for retry");
+ (None, None)
+ }
+ Err(e) => {
+ tracing::warn!(task_id = %task.id, error = %e, "Failed to fetch checkpoint patch for retry");
+ (None, None)
+ }
+ }
+ } else {
+ (None, None)
+ };
+
// Send spawn command
let cmd = DaemonCommand::SpawnTask {
task_id: updated_task.id,
@@ -366,10 +395,10 @@ async fn try_start_pending_task(
contract_id: updated_task.contract_id,
is_supervisor: false,
autonomous_loop: false,
- resume_session: false,
+ resume_session: task.retry_count > 0, // Use --continue for retried tasks
conversation_history: None,
- patch_data: None,
- patch_base_sha: None,
+ patch_data,
+ patch_base_sha,
};
if let Err(e) = state.send_daemon_command(daemon.id, cmd).await {
@@ -585,6 +614,7 @@ pub async fn spawn_task(
copy_files: None,
branched_from_task_id: None,
conversation_history: None,
+ depends_on: None,
};
// Create task in DB
diff --git a/makima/src/server/handlers/transcript_analysis.rs b/makima/src/server/handlers/transcript_analysis.rs
index 3b71eca..cc62eb4 100644
--- a/makima/src/server/handlers/transcript_analysis.rs
+++ b/makima/src/server/handlers/transcript_analysis.rs
@@ -366,6 +366,7 @@ pub async fn create_contract_from_analysis(
merge_mode: None,
branched_from_task_id: None,
conversation_history: None,
+ depends_on: None,
};
if let Ok(t) = repository::create_task_for_owner(pool, auth.owner_id, task_req).await {
@@ -535,6 +536,7 @@ pub async fn update_contract_from_analysis(
merge_mode: None,
branched_from_task_id: None,
conversation_history: None,
+ depends_on: None,
};
if let Ok(t) = repository::create_task_for_owner(pool, auth.owner_id, task_req).await {
diff --git a/makima/src/server/mod.rs b/makima/src/server/mod.rs
index 3a27513..de20569 100644
--- a/makima/src/server/mod.rs
+++ b/makima/src/server/mod.rs
@@ -251,6 +251,9 @@ const ANONYMOUS_TASK_MAX_AGE_DAYS: i32 = 7;
/// Interval for checkpoint patch cleanup (hourly)
const CHECKPOINT_PATCH_CLEANUP_INTERVAL_SECS: u64 = 3600;
+// Retry orchestrator checks for pending tasks every 30 seconds
+const RETRY_ORCHESTRATOR_INTERVAL_SECS: u64 = 30;
+
/// Run the HTTP server with graceful shutdown support.
///
/// # Arguments
@@ -387,6 +390,64 @@ pub async fn run_server(state: SharedState, addr: &str) -> anyhow::Result<()> {
}
}
});
+
+ // Clone state and pool for retry orchestrator
+ let retry_pool = pool.clone();
+ let retry_state = state.clone();
+
+ // Spawn retry orchestrator - periodically retries pending tasks on available daemons
+ tokio::spawn(async move {
+ let mut interval = tokio::time::interval(
+ std::time::Duration::from_secs(RETRY_ORCHESTRATOR_INTERVAL_SECS)
+ );
+ loop {
+ interval.tick().await;
+
+ // Get all contracts with pending tasks awaiting retry
+ match crate::db::repository::get_all_pending_task_contracts(&retry_pool).await {
+ Ok(contract_owners) => {
+ for (contract_id, owner_id) in contract_owners {
+ // Try to start a pending task for this contract
+ match handlers::mesh_supervisor::try_start_pending_task(
+ &retry_state,
+ contract_id,
+ owner_id,
+ ).await {
+ Ok(Some(task)) => {
+ tracing::info!(
+ task_id = %task.id,
+ contract_id = %contract_id,
+ retry_count = task.retry_count,
+ "Retry orchestrator started pending task"
+ );
+ }
+ Ok(None) => {
+ // No tasks could be started (no available daemons, etc.)
+ }
+ Err(e) => {
+ tracing::warn!(
+ contract_id = %contract_id,
+ error = %e,
+ "Retry orchestrator failed to start pending task"
+ );
+ }
+ }
+ }
+ }
+ Err(e) => {
+ tracing::warn!(
+ error = %e,
+ "Retry orchestrator failed to query pending task contracts"
+ );
+ }
+ }
+ }
+ });
+
+ tracing::info!(
+ "Retry orchestrator started (interval: {}s)",
+ RETRY_ORCHESTRATOR_INTERVAL_SECS
+ );
}
let app = make_router(state);