diff options
| author | soryu <soryu@soryu.co> | 2026-01-16 17:07:44 +0000 |
|---|---|---|
| committer | soryu <soryu@soryu.co> | 2026-01-16 17:07:44 +0000 |
| commit | f84a7f2d820f6f432be2b1d78d6bf833b5b19380 (patch) | |
| tree | 06398f6a91ec6efe06d2c77e603a27728d72885c | |
| parent | dcec90d2c233671e64e412a9f7b883d8db6783ec (diff) | |
| download | soryu-f84a7f2d820f6f432be2b1d78d6bf833b5b19380.tar.gz soryu-f84a7f2d820f6f432be2b1d78d6bf833b5b19380.zip | |
Fixup: fix history call and try to start pending tasks when a daemon is available
| -rw-r--r-- | makima/src/db/models.rs | 38 | ||||
| -rw-r--r-- | makima/src/db/repository.rs | 21 | ||||
| -rw-r--r-- | makima/src/server/handlers/history.rs | 4 | ||||
| -rw-r--r-- | makima/src/server/handlers/mesh_supervisor.rs | 173 |
4 files changed, 215 insertions, 21 deletions
diff --git a/makima/src/db/models.rs b/makima/src/db/models.rs index 6accb48..0e1303c 100644 --- a/makima/src/db/models.rs +++ b/makima/src/db/models.rs @@ -6,6 +6,42 @@ use sqlx::FromRow; use utoipa::ToSchema; use uuid::Uuid; +/// Flexible datetime deserialization module. +/// Accepts both date-only ("2026-01-15") and full ISO 8601 datetime ("2026-01-15T00:00:00Z") formats. +pub mod flexible_datetime { + use chrono::{DateTime, NaiveDate, NaiveTime, TimeZone, Utc}; + use serde::{self, Deserialize, Deserializer}; + + /// Deserializes a datetime from either date-only or full datetime format. + pub fn deserialize<'de, D>(deserializer: D) -> Result<Option<DateTime<Utc>>, D::Error> + where + D: Deserializer<'de>, + { + let s: Option<String> = Option::deserialize(deserializer)?; + match s { + None => Ok(None), + Some(s) if s.is_empty() => Ok(None), + Some(s) => { + // Try full datetime first (RFC 3339 / ISO 8601) + if let Ok(dt) = DateTime::parse_from_rfc3339(&s) { + return Ok(Some(dt.with_timezone(&Utc))); + } + + // Try date-only format (YYYY-MM-DD) and convert to start of day UTC + if let Ok(date) = NaiveDate::parse_from_str(&s, "%Y-%m-%d") { + let datetime = date.and_time(NaiveTime::MIN); + return Ok(Some(Utc.from_utc_datetime(&datetime))); + } + + Err(serde::de::Error::custom(format!( + "Invalid datetime format '{}'. Expected ISO 8601 datetime (e.g., '2026-01-15T00:00:00Z') or date (e.g., '2026-01-15')", + s + ))) + } + } + } +} + /// TranscriptEntry stored in JSONB - matches frontend TranscriptEntry #[derive(Debug, Clone, Serialize, Deserialize, ToSchema)] #[serde(rename_all = "camelCase")] @@ -1646,7 +1682,9 @@ pub struct ToolCallInfo { pub struct HistoryQueryFilters { pub phase: Option<String>, pub event_types: Option<Vec<String>>, + #[serde(default, deserialize_with = "flexible_datetime::deserialize")] pub from: Option<DateTime<Utc>>, + #[serde(default, deserialize_with = "flexible_datetime::deserialize")] pub to: Option<DateTime<Utc>>, pub limit: Option<i32>, pub cursor: Option<String>, diff --git a/makima/src/db/repository.rs b/makima/src/db/repository.rs index cb9d52f..2b069d5 100644 --- a/makima/src/db/repository.rs +++ b/makima/src/db/repository.rs @@ -789,6 +789,27 @@ pub async fn list_tasks_by_contract( .await } +/// Get pending tasks for a contract (non-supervisor tasks only). +pub async fn get_pending_tasks_for_contract( + pool: &PgPool, + contract_id: Uuid, + owner_id: Uuid, +) -> Result<Vec<Task>, sqlx::Error> { + sqlx::query_as::<_, Task>( + r#" + SELECT * FROM tasks + WHERE contract_id = $1 AND owner_id = $2 + AND status = 'pending' + AND is_supervisor = false + ORDER BY priority DESC, created_at ASC + "#, + ) + .bind(contract_id) + .bind(owner_id) + .fetch_all(pool) + .await +} + /// Update a task by ID with optimistic locking. pub async fn update_task( pool: &PgPool, diff --git a/makima/src/server/handlers/history.rs b/makima/src/server/handlers/history.rs index 572eebd..bee6b02 100644 --- a/makima/src/server/handlers/history.rs +++ b/makima/src/server/handlers/history.rs @@ -11,7 +11,7 @@ use uuid::Uuid; use crate::{ db::{ models::{ - ContractHistoryResponse, ConversationMessage, HistoryQueryFilters, + flexible_datetime, ContractHistoryResponse, ConversationMessage, HistoryQueryFilters, SupervisorConversationResponse, TaskConversationResponse, TaskReference, }, repository, @@ -35,7 +35,9 @@ pub struct TimelineQueryFilters { pub contract_id: Option<Uuid>, pub task_id: Option<Uuid>, pub include_subtasks: Option<bool>, + #[serde(default, deserialize_with = "flexible_datetime::deserialize")] pub from: Option<chrono::DateTime<chrono::Utc>>, + #[serde(default, deserialize_with = "flexible_datetime::deserialize")] pub to: Option<chrono::DateTime<chrono::Utc>>, pub limit: Option<i32>, } diff --git a/makima/src/server/handlers/mesh_supervisor.rs b/makima/src/server/handlers/mesh_supervisor.rs index 4dc0f0d..1014fdc 100644 --- a/makima/src/server/handlers/mesh_supervisor.rs +++ b/makima/src/server/handlers/mesh_supervisor.rs @@ -254,6 +254,109 @@ async fn verify_supervisor_auth( Ok((task_id, task.owner_id)) } +/// Try to start a pending task on an available daemon. +/// Returns Ok(Some(task)) if a task was started, Ok(None) if no tasks could be started. +async fn try_start_pending_task( + state: &SharedState, + contract_id: Uuid, + owner_id: Uuid, +) -> Result<Option<Task>, String> { + let pool = state.db_pool.as_ref().ok_or("Database not configured")?; + + // Get pending tasks for this contract + let pending_tasks = repository::get_pending_tasks_for_contract(pool, contract_id, owner_id) + .await + .map_err(|e| format!("Failed to get pending tasks: {}", e))?; + + if pending_tasks.is_empty() { + return Ok(None); + } + + // Get available daemons with capacity + let daemons = repository::get_available_daemons(pool, owner_id) + .await + .map_err(|e| format!("Failed to get available daemons: {}", e))?; + + // Find a daemon with capacity + let available_daemon = daemons.iter().find(|d| { + d.current_task_count < d.max_concurrent_tasks + && state.daemon_connections.contains_key(&d.connection_id) + }); + + let daemon = match available_daemon { + Some(d) => d, + None => return Ok(None), // No daemon with capacity + }; + + // Try to start the first pending task + let task = &pending_tasks[0]; + + // Get repo URL from task or contract + let repo_url = if let Some(url) = &task.repository_url { + Some(url.clone()) + } else { + match repository::list_contract_repositories(pool, contract_id).await { + Ok(repos) => repos + .iter() + .find(|r| r.is_primary) + .or(repos.first()) + .and_then(|r| r.repository_url.clone().or_else(|| r.local_path.clone())), + Err(_) => None, + } + }; + + // Update task with daemon assignment + let update_req = UpdateTaskRequest { + status: Some("starting".to_string()), + daemon_id: Some(daemon.id), + version: Some(task.version), + ..Default::default() + }; + + let updated_task = match repository::update_task_for_owner(pool, task.id, owner_id, update_req).await { + Ok(Some(t)) => t, + Ok(None) => return Ok(None), + Err(e) => { + tracing::warn!(task_id = %task.id, error = %e, "Failed to update task for daemon assignment"); + return Ok(None); + } + }; + + // Send spawn command + let cmd = DaemonCommand::SpawnTask { + task_id: updated_task.id, + task_name: updated_task.name.clone(), + plan: updated_task.plan.clone(), + repo_url, + base_branch: updated_task.base_branch.clone(), + target_branch: updated_task.target_branch.clone(), + parent_task_id: updated_task.parent_task_id, + depth: updated_task.depth, + is_orchestrator: false, + target_repo_path: updated_task.target_repo_path.clone(), + completion_action: updated_task.completion_action.clone(), + continue_from_task_id: updated_task.continue_from_task_id, + copy_files: updated_task.copy_files.as_ref().and_then(|v| serde_json::from_value(v.clone()).ok()), + contract_id: updated_task.contract_id, + is_supervisor: false, + }; + + if let Err(e) = state.send_daemon_command(daemon.id, cmd).await { + tracing::warn!(error = %e, daemon_id = %daemon.id, task_id = %task.id, "Failed to send spawn command"); + // Rollback + let rollback_req = UpdateTaskRequest { + status: Some("pending".to_string()), + clear_daemon_id: true, + ..Default::default() + }; + let _ = repository::update_task_for_owner(pool, task.id, owner_id, rollback_req).await; + return Ok(None); + } + + tracing::info!(task_id = %task.id, daemon_id = %daemon.id, "Started pending task from wait loop"); + Ok(Some(updated_task)) +} + // ============================================================================= // Contract Task Handlers // ============================================================================= @@ -598,36 +701,66 @@ pub async fn wait_for_task( ).into_response(); } + // Get contract_id for pending task scheduling + let contract_id = task.contract_id; + // Subscribe to task completions let mut rx = state.task_completions.subscribe(); let timeout = tokio::time::Duration::from_secs(request.timeout_seconds as u64); - // Wait for completion or timeout + // Wait for completion or timeout, periodically trying to start pending tasks let result = tokio::time::timeout(timeout, async { + let mut pending_check_interval = tokio::time::interval(tokio::time::Duration::from_secs(5)); + pending_check_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); + loop { - match rx.recv().await { - Ok(notification) => { - if notification.task_id == task_id { - return Some(notification); + tokio::select! { + // Check for task completion notifications + recv_result = rx.recv() => { + match recv_result { + Ok(notification) => { + if notification.task_id == task_id { + return Some(notification); + } + } + Err(_) => { + // Channel closed or lagged - check DB directly + if let Ok(Some(t)) = repository::get_task(pool, task_id).await { + if t.status == "done" || t.status == "failed" || t.status == "merged" { + return Some(crate::server::state::TaskCompletionNotification { + task_id: t.id, + owner_id: Some(t.owner_id), + contract_id: t.contract_id, + parent_task_id: t.parent_task_id, + status: t.status, + output_summary: None, + worktree_path: None, + error_message: t.error_message, + }); + } + } + } } } - Err(_) => { - // Channel closed or lagged - check DB directly - if let Ok(Some(t)) = repository::get_task(pool, task_id).await { - if t.status == "done" || t.status == "failed" || t.status == "merged" { - return Some(crate::server::state::TaskCompletionNotification { - task_id: t.id, - owner_id: Some(t.owner_id), - contract_id: t.contract_id, - parent_task_id: t.parent_task_id, - status: t.status, - output_summary: None, - worktree_path: None, - error_message: t.error_message, - }); + // Periodically try to start pending tasks + _ = pending_check_interval.tick() => { + if let Some(cid) = contract_id { + match try_start_pending_task(&state, cid, owner_id).await { + Ok(Some(started_task)) => { + tracing::debug!( + task_id = %started_task.id, + task_name = %started_task.name, + "Started pending task while waiting" + ); + } + Ok(None) => { + // No pending tasks or no capacity - that's fine + } + Err(e) => { + tracing::warn!(error = %e, "Error trying to start pending task"); + } } } - tokio::time::sleep(tokio::time::Duration::from_secs(1)).await; } } } |
