diff options
| author | soryu <soryu@soryu.co> | 2026-01-22 03:08:44 +0000 |
|---|---|---|
| committer | soryu <soryu@soryu.co> | 2026-01-22 03:08:48 +0000 |
| commit | c55c6775b1db1dceba6ea533a22d91e65df071a5 (patch) | |
| tree | 8fad872def9c35ed108099c8e429519e917c3be1 | |
| parent | 2d763ea63e133c083114151abc5b9c76bc6ab54e (diff) | |
| download | soryu-c55c6775b1db1dceba6ea533a22d91e65df071a5.tar.gz soryu-c55c6775b1db1dceba6ea533a22d91e65df071a5.zip | |
Change daemon limit mechanism
| -rw-r--r-- | makima/src/bin/makima.rs | 1 | ||||
| -rw-r--r-- | makima/src/daemon/config.rs | 14 | ||||
| -rw-r--r-- | makima/src/daemon/error.rs | 5 | ||||
| -rw-r--r-- | makima/src/daemon/task/manager.rs | 114 |
4 files changed, 112 insertions, 22 deletions
diff --git a/makima/src/bin/makima.rs b/makima/src/bin/makima.rs index 3be6003..cb80c7f 100644 --- a/makima/src/bin/makima.rs +++ b/makima/src/bin/makima.rs @@ -229,6 +229,7 @@ async fn run_daemon( .replace("ws://", "http://"); let task_config = TaskConfig { max_concurrent_tasks: config.process.max_concurrent_tasks, + max_tasks_per_contract: config.process.max_tasks_per_contract, worktree_base_dir: config.worktree.base_dir.clone(), env_vars: config.process.env_vars.clone(), claude_command: config.process.claude_command.clone(), diff --git a/makima/src/daemon/config.rs b/makima/src/daemon/config.rs index 476b57e..9a00166 100644 --- a/makima/src/daemon/config.rs +++ b/makima/src/daemon/config.rs @@ -198,10 +198,15 @@ pub struct ProcessConfig { #[serde(default, alias = "disableverbose")] pub disable_verbose: bool, - /// Maximum concurrent tasks. + /// Maximum concurrent tasks (global cap). #[serde(default = "default_max_tasks", alias = "maxconcurrenttasks")] pub max_concurrent_tasks: u32, + /// Maximum concurrent tasks per contract/supervisor. + /// Standalone tasks are treated as their own single-task contract. + #[serde(default = "default_max_tasks_per_contract", alias = "maxtaskspercontract")] + pub max_tasks_per_contract: u32, + /// Default timeout for tasks in seconds (0 = no timeout). #[serde(default, alias = "defaulttimeoutsecs")] pub default_timeout_secs: u64, @@ -232,6 +237,10 @@ fn default_max_tasks() -> u32 { 4 } +fn default_max_tasks_per_contract() -> u32 { + 10 +} + impl Default for ProcessConfig { fn default() -> Self { Self { @@ -241,6 +250,7 @@ impl Default for ProcessConfig { enable_permissions: false, disable_verbose: false, max_concurrent_tasks: default_max_tasks(), + max_tasks_per_contract: default_max_tasks_per_contract(), default_timeout_secs: 0, env_vars: HashMap::new(), bubblewrap: BubblewrapConfig::default(), @@ -561,9 +571,11 @@ impl DaemonConfig { enable_permissions: false, disable_verbose: false, max_concurrent_tasks: 2, + max_tasks_per_contract: 10, default_timeout_secs: 0, env_vars: HashMap::new(), bubblewrap: BubblewrapConfig::default(), + heartbeat_commit_interval_secs: 300, }, local_db: LocalDbConfig { path: PathBuf::from("/tmp/makima-daemon-test/state.db"), diff --git a/makima/src/daemon/error.rs b/makima/src/daemon/error.rs index b993169..ea00d25 100644 --- a/makima/src/daemon/error.rs +++ b/makima/src/daemon/error.rs @@ -49,9 +49,12 @@ pub enum TaskError { #[error("Invalid state transition from {from} to {to}")] InvalidStateTransition { from: String, to: String }, - #[error("Concurrency limit reached")] + #[error("Global concurrency limit reached")] ConcurrencyLimit, + #[error("Contract concurrency limit reached")] + ContractConcurrencyLimit, + #[error("Task already exists: {0}")] AlreadyExists(Uuid), diff --git a/makima/src/daemon/task/manager.rs b/makima/src/daemon/task/manager.rs index a3e4732..95554e9 100644 --- a/makima/src/daemon/task/manager.rs +++ b/makima/src/daemon/task/manager.rs @@ -7,7 +7,7 @@ use std::time::Instant; use rand::Rng; use tokio::io::AsyncWriteExt; -use tokio::sync::{mpsc, RwLock, Semaphore}; +use tokio::sync::{mpsc, RwLock}; use uuid::Uuid; use std::collections::HashSet; @@ -945,6 +945,8 @@ pub struct ManagedTask { pub copy_files: Option<Vec<String>>, /// Contract ID if this task is associated with a contract. pub contract_id: Option<Uuid>, + /// Key used for concurrency tracking (contract_id or task_id for standalone). + pub concurrency_key: Uuid, /// Whether to run in autonomous loop mode. pub autonomous_loop: bool, /// Time task was created. @@ -960,8 +962,10 @@ pub struct ManagedTask { /// Configuration for task execution. #[derive(Clone)] pub struct TaskConfig { - /// Maximum concurrent tasks. + /// Maximum concurrent tasks (global cap). pub max_concurrent_tasks: u32, + /// Maximum concurrent tasks per contract/supervisor. + pub max_tasks_per_contract: u32, /// Base directory for worktrees. pub worktree_base_dir: PathBuf, /// Environment variables to pass to Claude. @@ -991,6 +995,7 @@ impl Default for TaskConfig { fn default() -> Self { Self { max_concurrent_tasks: 4, + max_tasks_per_contract: 10, worktree_base_dir: WorktreeManager::default_base_dir(), env_vars: HashMap::new(), claude_command: "claude".to_string(), @@ -1015,14 +1020,14 @@ pub struct TaskManager { /// Temp directory manager. temp_manager: Arc<TempManager>, /// Task configuration. - #[allow(dead_code)] config: TaskConfig, /// Active tasks. tasks: Arc<RwLock<HashMap<Uuid, ManagedTask>>>, /// Channel to send messages to server. ws_tx: mpsc::Sender<DaemonMessage>, - /// Semaphore for limiting concurrent tasks. - semaphore: Arc<Semaphore>, + /// Tracks running task count per contract (or per standalone task). + /// Key is contract_id for contract tasks, or task_id for standalone tasks. + contract_task_counts: Arc<RwLock<HashMap<Uuid, usize>>>, /// Channels for sending input to running tasks. /// Each sender allows sending messages to the stdin of a running Claude process. task_inputs: Arc<RwLock<HashMap<Uuid, mpsc::Sender<String>>>>, @@ -1039,7 +1044,6 @@ pub struct TaskManager { impl TaskManager { /// Create a new task manager. pub fn new(config: TaskConfig, ws_tx: mpsc::Sender<DaemonMessage>) -> Self { - let max_concurrent = config.max_concurrent_tasks as usize; let worktree_manager = Arc::new(WorktreeManager::new(config.worktree_base_dir.clone())); let process_manager = Arc::new( ProcessManager::with_command(config.claude_command.clone()) @@ -1059,7 +1063,7 @@ impl TaskManager { config, tasks: Arc::new(RwLock::new(HashMap::new())), ws_tx, - semaphore: Arc::new(Semaphore::new(max_concurrent)), + contract_task_counts: Arc::new(RwLock::new(HashMap::new())), task_inputs: Arc::new(RwLock::new(HashMap::new())), merge_trackers: Arc::new(RwLock::new(HashMap::new())), active_pids: Arc::new(RwLock::new(HashMap::new())), @@ -1068,6 +1072,59 @@ impl TaskManager { } } + /// Check if a task can be spawned given contract-based concurrency limits. + /// Returns the concurrency key to use (contract_id or task_id for standalone). + async fn try_acquire_concurrency_slot( + &self, + contract_id: Option<Uuid>, + task_id: Uuid, + ) -> TaskResult<Uuid> { + let mut counts = self.contract_task_counts.write().await; + + // Determine the concurrency key: + // - For contract tasks: use contract_id + // - For standalone tasks: use task_id (each standalone task is its own "contract") + let concurrency_key = contract_id.unwrap_or(task_id); + + // Check global cap + let total: usize = counts.values().sum(); + if total >= self.config.max_concurrent_tasks as usize { + tracing::warn!( + task_id = %task_id, + total_running = total, + max = self.config.max_concurrent_tasks, + "Global concurrency limit reached, cannot spawn task" + ); + return Err(TaskError::ConcurrencyLimit); + } + + // Check per-contract cap + let contract_count = counts.get(&concurrency_key).copied().unwrap_or(0); + if contract_count >= self.config.max_tasks_per_contract as usize { + tracing::warn!( + task_id = %task_id, + contract_id = ?contract_id, + concurrency_key = %concurrency_key, + contract_running = contract_count, + max_per_contract = self.config.max_tasks_per_contract, + "Contract concurrency limit reached, cannot spawn task" + ); + return Err(TaskError::ContractConcurrencyLimit); + } + + // Increment count for this contract + *counts.entry(concurrency_key).or_insert(0) += 1; + tracing::debug!( + task_id = %task_id, + concurrency_key = %concurrency_key, + new_count = counts.get(&concurrency_key).copied().unwrap_or(0), + total = total + 1, + "Acquired concurrency slot" + ); + + Ok(concurrency_key) + } + /// Gracefully shutdown all running Claude processes. /// /// This sends SIGTERM to all active processes, waits for them to exit gracefully, @@ -1718,17 +1775,10 @@ impl TaskManager { } } - // Acquire semaphore permit - tracing::info!(task_id = %task_id, "Acquiring concurrency permit..."); - let permit = self - .semaphore - .clone() - .try_acquire_owned() - .map_err(|_| { - tracing::warn!(task_id = %task_id, "Concurrency limit reached, cannot spawn task"); - TaskError::ConcurrencyLimit - })?; - tracing::info!(task_id = %task_id, "Concurrency permit acquired"); + // Acquire concurrency slot (contract-based concurrency) + tracing::info!(task_id = %task_id, contract_id = ?contract_id, "Acquiring concurrency slot..."); + let concurrency_key = self.try_acquire_concurrency_slot(contract_id, task_id).await?; + tracing::info!(task_id = %task_id, concurrency_key = %concurrency_key, "Concurrency slot acquired"); // Create task entry tracing::info!(task_id = %task_id, "Creating task entry in state: Initializing"); @@ -1750,6 +1800,7 @@ impl TaskManager { continue_from_task_id, copy_files: copy_files.clone(), contract_id, + concurrency_key, autonomous_loop, created_at: Instant::now(), started_at: None, @@ -1768,7 +1819,6 @@ impl TaskManager { tracing::info!(task_id = %task_id, "Spawning background task runner"); let inner = self.clone_inner(); tokio::spawn(async move { - let _permit = permit; // Hold permit until done tracing::info!(task_id = %task_id, "Background task runner started"); if let Err(e) = inner.run_task( @@ -1780,7 +1830,10 @@ impl TaskManager { tracing::error!(task_id = %task_id, error = %e, "Task execution failed"); inner.mark_failed(task_id, &e.to_string()).await; } - tracing::info!(task_id = %task_id, "Background task runner completed"); + + // Release concurrency slot + inner.release_concurrency_slot(concurrency_key).await; + tracing::info!(task_id = %task_id, concurrency_key = %concurrency_key, "Background task runner completed, concurrency slot released"); }); tracing::info!(task_id = %task_id, "=== SPAWN_TASK END (task running in background) ==="); @@ -1801,6 +1854,7 @@ impl TaskManager { git_user_name: self.git_user_name.clone(), api_url: self.config.api_url.clone(), heartbeat_commit_interval_secs: self.config.heartbeat_commit_interval_secs, + contract_task_counts: self.contract_task_counts.clone(), } } @@ -3097,9 +3151,28 @@ struct TaskManagerInner { git_user_name: Arc<RwLock<Option<String>>>, api_url: String, heartbeat_commit_interval_secs: u64, + /// Shared contract task counts for releasing concurrency slots. + contract_task_counts: Arc<RwLock<HashMap<Uuid, usize>>>, } impl TaskManagerInner { + /// Release a concurrency slot when a task completes. + async fn release_concurrency_slot(&self, concurrency_key: Uuid) { + let mut counts = self.contract_task_counts.write().await; + if let Some(count) = counts.get_mut(&concurrency_key) { + *count = count.saturating_sub(1); + let new_count = *count; + if new_count == 0 { + counts.remove(&concurrency_key); + } + tracing::debug!( + concurrency_key = %concurrency_key, + new_count = new_count, + "Released concurrency slot (from TaskManagerInner)" + ); + } + } + /// Run a task to completion. #[allow(clippy::too_many_arguments)] async fn run_task( @@ -4546,6 +4619,7 @@ impl Clone for TaskManagerInner { git_user_name: self.git_user_name.clone(), api_url: self.api_url.clone(), heartbeat_commit_interval_secs: self.heartbeat_commit_interval_secs, + contract_task_counts: self.contract_task_counts.clone(), } } } |
