summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorsoryu <soryu@soryu.co>2026-01-22 03:08:44 +0000
committersoryu <soryu@soryu.co>2026-01-22 03:08:48 +0000
commitc55c6775b1db1dceba6ea533a22d91e65df071a5 (patch)
tree8fad872def9c35ed108099c8e429519e917c3be1
parent2d763ea63e133c083114151abc5b9c76bc6ab54e (diff)
downloadsoryu-c55c6775b1db1dceba6ea533a22d91e65df071a5.tar.gz
soryu-c55c6775b1db1dceba6ea533a22d91e65df071a5.zip
Change daemon limit mechanism
-rw-r--r--makima/src/bin/makima.rs1
-rw-r--r--makima/src/daemon/config.rs14
-rw-r--r--makima/src/daemon/error.rs5
-rw-r--r--makima/src/daemon/task/manager.rs114
4 files changed, 112 insertions, 22 deletions
diff --git a/makima/src/bin/makima.rs b/makima/src/bin/makima.rs
index 3be6003..cb80c7f 100644
--- a/makima/src/bin/makima.rs
+++ b/makima/src/bin/makima.rs
@@ -229,6 +229,7 @@ async fn run_daemon(
.replace("ws://", "http://");
let task_config = TaskConfig {
max_concurrent_tasks: config.process.max_concurrent_tasks,
+ max_tasks_per_contract: config.process.max_tasks_per_contract,
worktree_base_dir: config.worktree.base_dir.clone(),
env_vars: config.process.env_vars.clone(),
claude_command: config.process.claude_command.clone(),
diff --git a/makima/src/daemon/config.rs b/makima/src/daemon/config.rs
index 476b57e..9a00166 100644
--- a/makima/src/daemon/config.rs
+++ b/makima/src/daemon/config.rs
@@ -198,10 +198,15 @@ pub struct ProcessConfig {
#[serde(default, alias = "disableverbose")]
pub disable_verbose: bool,
- /// Maximum concurrent tasks.
+ /// Maximum concurrent tasks (global cap).
#[serde(default = "default_max_tasks", alias = "maxconcurrenttasks")]
pub max_concurrent_tasks: u32,
+ /// Maximum concurrent tasks per contract/supervisor.
+ /// Standalone tasks are treated as their own single-task contract.
+ #[serde(default = "default_max_tasks_per_contract", alias = "maxtaskspercontract")]
+ pub max_tasks_per_contract: u32,
+
/// Default timeout for tasks in seconds (0 = no timeout).
#[serde(default, alias = "defaulttimeoutsecs")]
pub default_timeout_secs: u64,
@@ -232,6 +237,10 @@ fn default_max_tasks() -> u32 {
4
}
+fn default_max_tasks_per_contract() -> u32 {
+ 10
+}
+
impl Default for ProcessConfig {
fn default() -> Self {
Self {
@@ -241,6 +250,7 @@ impl Default for ProcessConfig {
enable_permissions: false,
disable_verbose: false,
max_concurrent_tasks: default_max_tasks(),
+ max_tasks_per_contract: default_max_tasks_per_contract(),
default_timeout_secs: 0,
env_vars: HashMap::new(),
bubblewrap: BubblewrapConfig::default(),
@@ -561,9 +571,11 @@ impl DaemonConfig {
enable_permissions: false,
disable_verbose: false,
max_concurrent_tasks: 2,
+ max_tasks_per_contract: 10,
default_timeout_secs: 0,
env_vars: HashMap::new(),
bubblewrap: BubblewrapConfig::default(),
+ heartbeat_commit_interval_secs: 300,
},
local_db: LocalDbConfig {
path: PathBuf::from("/tmp/makima-daemon-test/state.db"),
diff --git a/makima/src/daemon/error.rs b/makima/src/daemon/error.rs
index b993169..ea00d25 100644
--- a/makima/src/daemon/error.rs
+++ b/makima/src/daemon/error.rs
@@ -49,9 +49,12 @@ pub enum TaskError {
#[error("Invalid state transition from {from} to {to}")]
InvalidStateTransition { from: String, to: String },
- #[error("Concurrency limit reached")]
+ #[error("Global concurrency limit reached")]
ConcurrencyLimit,
+ #[error("Contract concurrency limit reached")]
+ ContractConcurrencyLimit,
+
#[error("Task already exists: {0}")]
AlreadyExists(Uuid),
diff --git a/makima/src/daemon/task/manager.rs b/makima/src/daemon/task/manager.rs
index a3e4732..95554e9 100644
--- a/makima/src/daemon/task/manager.rs
+++ b/makima/src/daemon/task/manager.rs
@@ -7,7 +7,7 @@ use std::time::Instant;
use rand::Rng;
use tokio::io::AsyncWriteExt;
-use tokio::sync::{mpsc, RwLock, Semaphore};
+use tokio::sync::{mpsc, RwLock};
use uuid::Uuid;
use std::collections::HashSet;
@@ -945,6 +945,8 @@ pub struct ManagedTask {
pub copy_files: Option<Vec<String>>,
/// Contract ID if this task is associated with a contract.
pub contract_id: Option<Uuid>,
+ /// Key used for concurrency tracking (contract_id or task_id for standalone).
+ pub concurrency_key: Uuid,
/// Whether to run in autonomous loop mode.
pub autonomous_loop: bool,
/// Time task was created.
@@ -960,8 +962,10 @@ pub struct ManagedTask {
/// Configuration for task execution.
#[derive(Clone)]
pub struct TaskConfig {
- /// Maximum concurrent tasks.
+ /// Maximum concurrent tasks (global cap).
pub max_concurrent_tasks: u32,
+ /// Maximum concurrent tasks per contract/supervisor.
+ pub max_tasks_per_contract: u32,
/// Base directory for worktrees.
pub worktree_base_dir: PathBuf,
/// Environment variables to pass to Claude.
@@ -991,6 +995,7 @@ impl Default for TaskConfig {
fn default() -> Self {
Self {
max_concurrent_tasks: 4,
+ max_tasks_per_contract: 10,
worktree_base_dir: WorktreeManager::default_base_dir(),
env_vars: HashMap::new(),
claude_command: "claude".to_string(),
@@ -1015,14 +1020,14 @@ pub struct TaskManager {
/// Temp directory manager.
temp_manager: Arc<TempManager>,
/// Task configuration.
- #[allow(dead_code)]
config: TaskConfig,
/// Active tasks.
tasks: Arc<RwLock<HashMap<Uuid, ManagedTask>>>,
/// Channel to send messages to server.
ws_tx: mpsc::Sender<DaemonMessage>,
- /// Semaphore for limiting concurrent tasks.
- semaphore: Arc<Semaphore>,
+ /// Tracks running task count per contract (or per standalone task).
+ /// Key is contract_id for contract tasks, or task_id for standalone tasks.
+ contract_task_counts: Arc<RwLock<HashMap<Uuid, usize>>>,
/// Channels for sending input to running tasks.
/// Each sender allows sending messages to the stdin of a running Claude process.
task_inputs: Arc<RwLock<HashMap<Uuid, mpsc::Sender<String>>>>,
@@ -1039,7 +1044,6 @@ pub struct TaskManager {
impl TaskManager {
/// Create a new task manager.
pub fn new(config: TaskConfig, ws_tx: mpsc::Sender<DaemonMessage>) -> Self {
- let max_concurrent = config.max_concurrent_tasks as usize;
let worktree_manager = Arc::new(WorktreeManager::new(config.worktree_base_dir.clone()));
let process_manager = Arc::new(
ProcessManager::with_command(config.claude_command.clone())
@@ -1059,7 +1063,7 @@ impl TaskManager {
config,
tasks: Arc::new(RwLock::new(HashMap::new())),
ws_tx,
- semaphore: Arc::new(Semaphore::new(max_concurrent)),
+ contract_task_counts: Arc::new(RwLock::new(HashMap::new())),
task_inputs: Arc::new(RwLock::new(HashMap::new())),
merge_trackers: Arc::new(RwLock::new(HashMap::new())),
active_pids: Arc::new(RwLock::new(HashMap::new())),
@@ -1068,6 +1072,59 @@ impl TaskManager {
}
}
+ /// Check if a task can be spawned given contract-based concurrency limits.
+ /// Returns the concurrency key to use (contract_id or task_id for standalone).
+ async fn try_acquire_concurrency_slot(
+ &self,
+ contract_id: Option<Uuid>,
+ task_id: Uuid,
+ ) -> TaskResult<Uuid> {
+ let mut counts = self.contract_task_counts.write().await;
+
+ // Determine the concurrency key:
+ // - For contract tasks: use contract_id
+ // - For standalone tasks: use task_id (each standalone task is its own "contract")
+ let concurrency_key = contract_id.unwrap_or(task_id);
+
+ // Check global cap
+ let total: usize = counts.values().sum();
+ if total >= self.config.max_concurrent_tasks as usize {
+ tracing::warn!(
+ task_id = %task_id,
+ total_running = total,
+ max = self.config.max_concurrent_tasks,
+ "Global concurrency limit reached, cannot spawn task"
+ );
+ return Err(TaskError::ConcurrencyLimit);
+ }
+
+ // Check per-contract cap
+ let contract_count = counts.get(&concurrency_key).copied().unwrap_or(0);
+ if contract_count >= self.config.max_tasks_per_contract as usize {
+ tracing::warn!(
+ task_id = %task_id,
+ contract_id = ?contract_id,
+ concurrency_key = %concurrency_key,
+ contract_running = contract_count,
+ max_per_contract = self.config.max_tasks_per_contract,
+ "Contract concurrency limit reached, cannot spawn task"
+ );
+ return Err(TaskError::ContractConcurrencyLimit);
+ }
+
+ // Increment count for this contract
+ *counts.entry(concurrency_key).or_insert(0) += 1;
+ tracing::debug!(
+ task_id = %task_id,
+ concurrency_key = %concurrency_key,
+ new_count = counts.get(&concurrency_key).copied().unwrap_or(0),
+ total = total + 1,
+ "Acquired concurrency slot"
+ );
+
+ Ok(concurrency_key)
+ }
+
/// Gracefully shutdown all running Claude processes.
///
/// This sends SIGTERM to all active processes, waits for them to exit gracefully,
@@ -1718,17 +1775,10 @@ impl TaskManager {
}
}
- // Acquire semaphore permit
- tracing::info!(task_id = %task_id, "Acquiring concurrency permit...");
- let permit = self
- .semaphore
- .clone()
- .try_acquire_owned()
- .map_err(|_| {
- tracing::warn!(task_id = %task_id, "Concurrency limit reached, cannot spawn task");
- TaskError::ConcurrencyLimit
- })?;
- tracing::info!(task_id = %task_id, "Concurrency permit acquired");
+ // Acquire concurrency slot (contract-based concurrency)
+ tracing::info!(task_id = %task_id, contract_id = ?contract_id, "Acquiring concurrency slot...");
+ let concurrency_key = self.try_acquire_concurrency_slot(contract_id, task_id).await?;
+ tracing::info!(task_id = %task_id, concurrency_key = %concurrency_key, "Concurrency slot acquired");
// Create task entry
tracing::info!(task_id = %task_id, "Creating task entry in state: Initializing");
@@ -1750,6 +1800,7 @@ impl TaskManager {
continue_from_task_id,
copy_files: copy_files.clone(),
contract_id,
+ concurrency_key,
autonomous_loop,
created_at: Instant::now(),
started_at: None,
@@ -1768,7 +1819,6 @@ impl TaskManager {
tracing::info!(task_id = %task_id, "Spawning background task runner");
let inner = self.clone_inner();
tokio::spawn(async move {
- let _permit = permit; // Hold permit until done
tracing::info!(task_id = %task_id, "Background task runner started");
if let Err(e) = inner.run_task(
@@ -1780,7 +1830,10 @@ impl TaskManager {
tracing::error!(task_id = %task_id, error = %e, "Task execution failed");
inner.mark_failed(task_id, &e.to_string()).await;
}
- tracing::info!(task_id = %task_id, "Background task runner completed");
+
+ // Release concurrency slot
+ inner.release_concurrency_slot(concurrency_key).await;
+ tracing::info!(task_id = %task_id, concurrency_key = %concurrency_key, "Background task runner completed, concurrency slot released");
});
tracing::info!(task_id = %task_id, "=== SPAWN_TASK END (task running in background) ===");
@@ -1801,6 +1854,7 @@ impl TaskManager {
git_user_name: self.git_user_name.clone(),
api_url: self.config.api_url.clone(),
heartbeat_commit_interval_secs: self.config.heartbeat_commit_interval_secs,
+ contract_task_counts: self.contract_task_counts.clone(),
}
}
@@ -3097,9 +3151,28 @@ struct TaskManagerInner {
git_user_name: Arc<RwLock<Option<String>>>,
api_url: String,
heartbeat_commit_interval_secs: u64,
+ /// Shared contract task counts for releasing concurrency slots.
+ contract_task_counts: Arc<RwLock<HashMap<Uuid, usize>>>,
}
impl TaskManagerInner {
+ /// Release a concurrency slot when a task completes.
+ async fn release_concurrency_slot(&self, concurrency_key: Uuid) {
+ let mut counts = self.contract_task_counts.write().await;
+ if let Some(count) = counts.get_mut(&concurrency_key) {
+ *count = count.saturating_sub(1);
+ let new_count = *count;
+ if new_count == 0 {
+ counts.remove(&concurrency_key);
+ }
+ tracing::debug!(
+ concurrency_key = %concurrency_key,
+ new_count = new_count,
+ "Released concurrency slot (from TaskManagerInner)"
+ );
+ }
+ }
+
/// Run a task to completion.
#[allow(clippy::too_many_arguments)]
async fn run_task(
@@ -4546,6 +4619,7 @@ impl Clone for TaskManagerInner {
git_user_name: self.git_user_name.clone(),
api_url: self.api_url.clone(),
heartbeat_commit_interval_secs: self.heartbeat_commit_interval_secs,
+ contract_task_counts: self.contract_task_counts.clone(),
}
}
}