summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorsoryu <soryu@soryu.co>2026-01-16 17:07:44 +0000
committersoryu <soryu@soryu.co>2026-01-16 17:07:44 +0000
commitf84a7f2d820f6f432be2b1d78d6bf833b5b19380 (patch)
tree06398f6a91ec6efe06d2c77e603a27728d72885c
parentdcec90d2c233671e64e412a9f7b883d8db6783ec (diff)
downloadsoryu-f84a7f2d820f6f432be2b1d78d6bf833b5b19380.tar.gz
soryu-f84a7f2d820f6f432be2b1d78d6bf833b5b19380.zip
Fixup: fix history call and try to start pending tasks when a daemon is available
-rw-r--r--makima/src/db/models.rs38
-rw-r--r--makima/src/db/repository.rs21
-rw-r--r--makima/src/server/handlers/history.rs4
-rw-r--r--makima/src/server/handlers/mesh_supervisor.rs173
4 files changed, 215 insertions, 21 deletions
diff --git a/makima/src/db/models.rs b/makima/src/db/models.rs
index 6accb48..0e1303c 100644
--- a/makima/src/db/models.rs
+++ b/makima/src/db/models.rs
@@ -6,6 +6,42 @@ use sqlx::FromRow;
use utoipa::ToSchema;
use uuid::Uuid;
+/// Flexible datetime deserialization module.
+/// Accepts both date-only ("2026-01-15") and full ISO 8601 datetime ("2026-01-15T00:00:00Z") formats.
+pub mod flexible_datetime {
+ use chrono::{DateTime, NaiveDate, NaiveTime, TimeZone, Utc};
+ use serde::{self, Deserialize, Deserializer};
+
+ /// Deserializes a datetime from either date-only or full datetime format.
+ pub fn deserialize<'de, D>(deserializer: D) -> Result<Option<DateTime<Utc>>, D::Error>
+ where
+ D: Deserializer<'de>,
+ {
+ let s: Option<String> = Option::deserialize(deserializer)?;
+ match s {
+ None => Ok(None),
+ Some(s) if s.is_empty() => Ok(None),
+ Some(s) => {
+ // Try full datetime first (RFC 3339 / ISO 8601)
+ if let Ok(dt) = DateTime::parse_from_rfc3339(&s) {
+ return Ok(Some(dt.with_timezone(&Utc)));
+ }
+
+ // Try date-only format (YYYY-MM-DD) and convert to start of day UTC
+ if let Ok(date) = NaiveDate::parse_from_str(&s, "%Y-%m-%d") {
+ let datetime = date.and_time(NaiveTime::MIN);
+ return Ok(Some(Utc.from_utc_datetime(&datetime)));
+ }
+
+ Err(serde::de::Error::custom(format!(
+ "Invalid datetime format '{}'. Expected ISO 8601 datetime (e.g., '2026-01-15T00:00:00Z') or date (e.g., '2026-01-15')",
+ s
+ )))
+ }
+ }
+ }
+}
+
/// TranscriptEntry stored in JSONB - matches frontend TranscriptEntry
#[derive(Debug, Clone, Serialize, Deserialize, ToSchema)]
#[serde(rename_all = "camelCase")]
@@ -1646,7 +1682,9 @@ pub struct ToolCallInfo {
pub struct HistoryQueryFilters {
pub phase: Option<String>,
pub event_types: Option<Vec<String>>,
+ #[serde(default, deserialize_with = "flexible_datetime::deserialize")]
pub from: Option<DateTime<Utc>>,
+ #[serde(default, deserialize_with = "flexible_datetime::deserialize")]
pub to: Option<DateTime<Utc>>,
pub limit: Option<i32>,
pub cursor: Option<String>,
diff --git a/makima/src/db/repository.rs b/makima/src/db/repository.rs
index cb9d52f..2b069d5 100644
--- a/makima/src/db/repository.rs
+++ b/makima/src/db/repository.rs
@@ -789,6 +789,27 @@ pub async fn list_tasks_by_contract(
.await
}
+/// Get pending tasks for a contract (non-supervisor tasks only).
+pub async fn get_pending_tasks_for_contract(
+ pool: &PgPool,
+ contract_id: Uuid,
+ owner_id: Uuid,
+) -> Result<Vec<Task>, sqlx::Error> {
+ sqlx::query_as::<_, Task>(
+ r#"
+ SELECT * FROM tasks
+ WHERE contract_id = $1 AND owner_id = $2
+ AND status = 'pending'
+ AND is_supervisor = false
+ ORDER BY priority DESC, created_at ASC
+ "#,
+ )
+ .bind(contract_id)
+ .bind(owner_id)
+ .fetch_all(pool)
+ .await
+}
+
/// Update a task by ID with optimistic locking.
pub async fn update_task(
pool: &PgPool,
diff --git a/makima/src/server/handlers/history.rs b/makima/src/server/handlers/history.rs
index 572eebd..bee6b02 100644
--- a/makima/src/server/handlers/history.rs
+++ b/makima/src/server/handlers/history.rs
@@ -11,7 +11,7 @@ use uuid::Uuid;
use crate::{
db::{
models::{
- ContractHistoryResponse, ConversationMessage, HistoryQueryFilters,
+ flexible_datetime, ContractHistoryResponse, ConversationMessage, HistoryQueryFilters,
SupervisorConversationResponse, TaskConversationResponse, TaskReference,
},
repository,
@@ -35,7 +35,9 @@ pub struct TimelineQueryFilters {
pub contract_id: Option<Uuid>,
pub task_id: Option<Uuid>,
pub include_subtasks: Option<bool>,
+ #[serde(default, deserialize_with = "flexible_datetime::deserialize")]
pub from: Option<chrono::DateTime<chrono::Utc>>,
+ #[serde(default, deserialize_with = "flexible_datetime::deserialize")]
pub to: Option<chrono::DateTime<chrono::Utc>>,
pub limit: Option<i32>,
}
diff --git a/makima/src/server/handlers/mesh_supervisor.rs b/makima/src/server/handlers/mesh_supervisor.rs
index 4dc0f0d..1014fdc 100644
--- a/makima/src/server/handlers/mesh_supervisor.rs
+++ b/makima/src/server/handlers/mesh_supervisor.rs
@@ -254,6 +254,109 @@ async fn verify_supervisor_auth(
Ok((task_id, task.owner_id))
}
+/// Try to start a pending task on an available daemon.
+/// Returns Ok(Some(task)) if a task was started, Ok(None) if no tasks could be started.
+async fn try_start_pending_task(
+ state: &SharedState,
+ contract_id: Uuid,
+ owner_id: Uuid,
+) -> Result<Option<Task>, String> {
+ let pool = state.db_pool.as_ref().ok_or("Database not configured")?;
+
+ // Get pending tasks for this contract
+ let pending_tasks = repository::get_pending_tasks_for_contract(pool, contract_id, owner_id)
+ .await
+ .map_err(|e| format!("Failed to get pending tasks: {}", e))?;
+
+ if pending_tasks.is_empty() {
+ return Ok(None);
+ }
+
+ // Get available daemons with capacity
+ let daemons = repository::get_available_daemons(pool, owner_id)
+ .await
+ .map_err(|e| format!("Failed to get available daemons: {}", e))?;
+
+ // Find a daemon with capacity
+ let available_daemon = daemons.iter().find(|d| {
+ d.current_task_count < d.max_concurrent_tasks
+ && state.daemon_connections.contains_key(&d.connection_id)
+ });
+
+ let daemon = match available_daemon {
+ Some(d) => d,
+ None => return Ok(None), // No daemon with capacity
+ };
+
+ // Try to start the first pending task
+ let task = &pending_tasks[0];
+
+ // Get repo URL from task or contract
+ let repo_url = if let Some(url) = &task.repository_url {
+ Some(url.clone())
+ } else {
+ match repository::list_contract_repositories(pool, contract_id).await {
+ Ok(repos) => repos
+ .iter()
+ .find(|r| r.is_primary)
+ .or(repos.first())
+ .and_then(|r| r.repository_url.clone().or_else(|| r.local_path.clone())),
+ Err(_) => None,
+ }
+ };
+
+ // Update task with daemon assignment
+ let update_req = UpdateTaskRequest {
+ status: Some("starting".to_string()),
+ daemon_id: Some(daemon.id),
+ version: Some(task.version),
+ ..Default::default()
+ };
+
+ let updated_task = match repository::update_task_for_owner(pool, task.id, owner_id, update_req).await {
+ Ok(Some(t)) => t,
+ Ok(None) => return Ok(None),
+ Err(e) => {
+ tracing::warn!(task_id = %task.id, error = %e, "Failed to update task for daemon assignment");
+ return Ok(None);
+ }
+ };
+
+ // Send spawn command
+ let cmd = DaemonCommand::SpawnTask {
+ task_id: updated_task.id,
+ task_name: updated_task.name.clone(),
+ plan: updated_task.plan.clone(),
+ repo_url,
+ base_branch: updated_task.base_branch.clone(),
+ target_branch: updated_task.target_branch.clone(),
+ parent_task_id: updated_task.parent_task_id,
+ depth: updated_task.depth,
+ is_orchestrator: false,
+ target_repo_path: updated_task.target_repo_path.clone(),
+ completion_action: updated_task.completion_action.clone(),
+ continue_from_task_id: updated_task.continue_from_task_id,
+ copy_files: updated_task.copy_files.as_ref().and_then(|v| serde_json::from_value(v.clone()).ok()),
+ contract_id: updated_task.contract_id,
+ is_supervisor: false,
+ };
+
+ if let Err(e) = state.send_daemon_command(daemon.id, cmd).await {
+ tracing::warn!(error = %e, daemon_id = %daemon.id, task_id = %task.id, "Failed to send spawn command");
+ // Rollback
+ let rollback_req = UpdateTaskRequest {
+ status: Some("pending".to_string()),
+ clear_daemon_id: true,
+ ..Default::default()
+ };
+ let _ = repository::update_task_for_owner(pool, task.id, owner_id, rollback_req).await;
+ return Ok(None);
+ }
+
+ tracing::info!(task_id = %task.id, daemon_id = %daemon.id, "Started pending task from wait loop");
+ Ok(Some(updated_task))
+}
+
// =============================================================================
// Contract Task Handlers
// =============================================================================
@@ -598,36 +701,66 @@ pub async fn wait_for_task(
).into_response();
}
+ // Get contract_id for pending task scheduling
+ let contract_id = task.contract_id;
+
// Subscribe to task completions
let mut rx = state.task_completions.subscribe();
let timeout = tokio::time::Duration::from_secs(request.timeout_seconds as u64);
- // Wait for completion or timeout
+ // Wait for completion or timeout, periodically trying to start pending tasks
let result = tokio::time::timeout(timeout, async {
+ let mut pending_check_interval = tokio::time::interval(tokio::time::Duration::from_secs(5));
+ pending_check_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
+
loop {
- match rx.recv().await {
- Ok(notification) => {
- if notification.task_id == task_id {
- return Some(notification);
+ tokio::select! {
+ // Check for task completion notifications
+ recv_result = rx.recv() => {
+ match recv_result {
+ Ok(notification) => {
+ if notification.task_id == task_id {
+ return Some(notification);
+ }
+ }
+ Err(_) => {
+ // Channel closed or lagged - check DB directly
+ if let Ok(Some(t)) = repository::get_task(pool, task_id).await {
+ if t.status == "done" || t.status == "failed" || t.status == "merged" {
+ return Some(crate::server::state::TaskCompletionNotification {
+ task_id: t.id,
+ owner_id: Some(t.owner_id),
+ contract_id: t.contract_id,
+ parent_task_id: t.parent_task_id,
+ status: t.status,
+ output_summary: None,
+ worktree_path: None,
+ error_message: t.error_message,
+ });
+ }
+ }
+ }
}
}
- Err(_) => {
- // Channel closed or lagged - check DB directly
- if let Ok(Some(t)) = repository::get_task(pool, task_id).await {
- if t.status == "done" || t.status == "failed" || t.status == "merged" {
- return Some(crate::server::state::TaskCompletionNotification {
- task_id: t.id,
- owner_id: Some(t.owner_id),
- contract_id: t.contract_id,
- parent_task_id: t.parent_task_id,
- status: t.status,
- output_summary: None,
- worktree_path: None,
- error_message: t.error_message,
- });
+ // Periodically try to start pending tasks
+ _ = pending_check_interval.tick() => {
+ if let Some(cid) = contract_id {
+ match try_start_pending_task(&state, cid, owner_id).await {
+ Ok(Some(started_task)) => {
+ tracing::debug!(
+ task_id = %started_task.id,
+ task_name = %started_task.name,
+ "Started pending task while waiting"
+ );
+ }
+ Ok(None) => {
+ // No pending tasks or no capacity - that's fine
+ }
+ Err(e) => {
+ tracing::warn!(error = %e, "Error trying to start pending task");
+ }
}
}
- tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
}
}
}