summaryrefslogtreecommitdiff
path: root/makima/src/daemon
diff options
context:
space:
mode:
Diffstat (limited to 'makima/src/daemon')
-rw-r--r--makima/src/daemon/api/client.rs287
-rw-r--r--makima/src/daemon/config.rs2
-rw-r--r--makima/src/daemon/task/manager.rs5
-rw-r--r--makima/src/daemon/worktree/manager.rs64
4 files changed, 275 insertions, 83 deletions
diff --git a/makima/src/daemon/api/client.rs b/makima/src/daemon/api/client.rs
index ca1b2a8..4ba4778 100644
--- a/makima/src/daemon/api/client.rs
+++ b/makima/src/daemon/api/client.rs
@@ -2,6 +2,7 @@
use reqwest::Client;
use serde::{de::DeserializeOwned, Serialize};
+use std::time::Duration;
use thiserror::Error;
/// API client errors.
@@ -17,6 +18,12 @@ pub enum ApiError {
Parse(String),
}
+/// Maximum number of retry attempts for failed requests.
+const MAX_RETRIES: u32 = 3;
+
+/// Initial backoff delay in milliseconds.
+const INITIAL_BACKOFF_MS: u64 = 100;
+
/// HTTP client for makima API.
pub struct ApiClient {
client: Client,
@@ -37,94 +44,236 @@ impl ApiClient {
})
}
- /// Make a GET request.
+ /// Make a GET request with retry.
pub async fn get<T: DeserializeOwned>(&self, path: &str) -> Result<T, ApiError> {
let url = format!("{}{}", self.base_url, path);
- let response = self.client
- .get(&url)
- // Send both headers - server will try tool key first, then API key
- .header("X-Makima-Tool-Key", &self.api_key)
- .header("X-Makima-API-Key", &self.api_key)
- .send()
- .await?;
-
- self.handle_response(response).await
+ let mut last_error = None;
+
+ for attempt in 0..MAX_RETRIES {
+ if attempt > 0 {
+ tokio::time::sleep(Self::backoff_delay(attempt - 1)).await;
+ }
+
+ let result = self.client
+ .get(&url)
+ // Send both headers - server will try tool key first, then API key
+ .header("X-Makima-Tool-Key", &self.api_key)
+ .header("X-Makima-API-Key", &self.api_key)
+ .send()
+ .await;
+
+ match result {
+ Ok(response) => {
+ match self.handle_response(response).await {
+ Ok(value) => return Ok(value),
+ Err(e) if Self::is_retryable(&e) && attempt < MAX_RETRIES - 1 => {
+ last_error = Some(e);
+ continue;
+ }
+ Err(e) => return Err(e),
+ }
+ }
+ Err(e) => {
+ let error = ApiError::Request(e);
+ if Self::is_retryable(&error) && attempt < MAX_RETRIES - 1 {
+ last_error = Some(error);
+ continue;
+ }
+ return Err(error);
+ }
+ }
+ }
+
+ Err(last_error.unwrap())
}
- /// Make a POST request with JSON body.
+ /// Make a POST request with JSON body and retry.
pub async fn post<T: DeserializeOwned, B: Serialize>(
&self,
path: &str,
body: &B,
) -> Result<T, ApiError> {
let url = format!("{}{}", self.base_url, path);
- let response = self.client
- .post(&url)
- // Send both headers - server will try tool key first, then API key
- .header("X-Makima-Tool-Key", &self.api_key)
- .header("X-Makima-API-Key", &self.api_key)
- .header("Content-Type", "application/json")
- .json(body)
- .send()
- .await?;
-
- self.handle_response(response).await
+ let mut last_error = None;
+
+ for attempt in 0..MAX_RETRIES {
+ if attempt > 0 {
+ tokio::time::sleep(Self::backoff_delay(attempt - 1)).await;
+ }
+
+ let result = self.client
+ .post(&url)
+ // Send both headers - server will try tool key first, then API key
+ .header("X-Makima-Tool-Key", &self.api_key)
+ .header("X-Makima-API-Key", &self.api_key)
+ .header("Content-Type", "application/json")
+ .json(body)
+ .send()
+ .await;
+
+ match result {
+ Ok(response) => {
+ match self.handle_response(response).await {
+ Ok(value) => return Ok(value),
+ Err(e) if Self::is_retryable(&e) && attempt < MAX_RETRIES - 1 => {
+ last_error = Some(e);
+ continue;
+ }
+ Err(e) => return Err(e),
+ }
+ }
+ Err(e) => {
+ let error = ApiError::Request(e);
+ if Self::is_retryable(&error) && attempt < MAX_RETRIES - 1 {
+ last_error = Some(error);
+ continue;
+ }
+ return Err(error);
+ }
+ }
+ }
+
+ Err(last_error.unwrap())
}
- /// Make a POST request without body.
+ /// Make a POST request without body and retry.
pub async fn post_empty<T: DeserializeOwned>(&self, path: &str) -> Result<T, ApiError> {
let url = format!("{}{}", self.base_url, path);
- let response = self.client
- .post(&url)
- // Send both headers - server will try tool key first, then API key
- .header("X-Makima-Tool-Key", &self.api_key)
- .header("X-Makima-API-Key", &self.api_key)
- .send()
- .await?;
-
- self.handle_response(response).await
+ let mut last_error = None;
+
+ for attempt in 0..MAX_RETRIES {
+ if attempt > 0 {
+ tokio::time::sleep(Self::backoff_delay(attempt - 1)).await;
+ }
+
+ let result = self.client
+ .post(&url)
+ // Send both headers - server will try tool key first, then API key
+ .header("X-Makima-Tool-Key", &self.api_key)
+ .header("X-Makima-API-Key", &self.api_key)
+ .send()
+ .await;
+
+ match result {
+ Ok(response) => {
+ match self.handle_response(response).await {
+ Ok(value) => return Ok(value),
+ Err(e) if Self::is_retryable(&e) && attempt < MAX_RETRIES - 1 => {
+ last_error = Some(e);
+ continue;
+ }
+ Err(e) => return Err(e),
+ }
+ }
+ Err(e) => {
+ let error = ApiError::Request(e);
+ if Self::is_retryable(&error) && attempt < MAX_RETRIES - 1 {
+ last_error = Some(error);
+ continue;
+ }
+ return Err(error);
+ }
+ }
+ }
+
+ Err(last_error.unwrap())
}
- /// Make a PUT request with JSON body.
+ /// Make a PUT request with JSON body and retry.
pub async fn put<T: DeserializeOwned, B: Serialize>(
&self,
path: &str,
body: &B,
) -> Result<T, ApiError> {
let url = format!("{}{}", self.base_url, path);
- let response = self.client
- .put(&url)
- // Send both headers - server will try tool key first, then API key
- .header("X-Makima-Tool-Key", &self.api_key)
- .header("X-Makima-API-Key", &self.api_key)
- .header("Content-Type", "application/json")
- .json(body)
- .send()
- .await?;
-
- self.handle_response(response).await
+ let mut last_error = None;
+
+ for attempt in 0..MAX_RETRIES {
+ if attempt > 0 {
+ tokio::time::sleep(Self::backoff_delay(attempt - 1)).await;
+ }
+
+ let result = self.client
+ .put(&url)
+ // Send both headers - server will try tool key first, then API key
+ .header("X-Makima-Tool-Key", &self.api_key)
+ .header("X-Makima-API-Key", &self.api_key)
+ .header("Content-Type", "application/json")
+ .json(body)
+ .send()
+ .await;
+
+ match result {
+ Ok(response) => {
+ match self.handle_response(response).await {
+ Ok(value) => return Ok(value),
+ Err(e) if Self::is_retryable(&e) && attempt < MAX_RETRIES - 1 => {
+ last_error = Some(e);
+ continue;
+ }
+ Err(e) => return Err(e),
+ }
+ }
+ Err(e) => {
+ let error = ApiError::Request(e);
+ if Self::is_retryable(&error) && attempt < MAX_RETRIES - 1 {
+ last_error = Some(error);
+ continue;
+ }
+ return Err(error);
+ }
+ }
+ }
+
+ Err(last_error.unwrap())
}
- /// Make a DELETE request.
+ /// Make a DELETE request with retry.
pub async fn delete(&self, path: &str) -> Result<(), ApiError> {
let url = format!("{}{}", self.base_url, path);
- let response = self.client
- .delete(&url)
- .header("X-Makima-Tool-Key", &self.api_key)
- .header("X-Makima-API-Key", &self.api_key)
- .send()
- .await?;
+ let mut last_error = None;
- let status = response.status();
- if !status.is_success() {
- let body = response.text().await.unwrap_or_default();
- return Err(ApiError::Api {
- status: status.as_u16(),
- message: body,
- });
+ for attempt in 0..MAX_RETRIES {
+ if attempt > 0 {
+ tokio::time::sleep(Self::backoff_delay(attempt - 1)).await;
+ }
+
+ let result = self.client
+ .delete(&url)
+ .header("X-Makima-Tool-Key", &self.api_key)
+ .header("X-Makima-API-Key", &self.api_key)
+ .send()
+ .await;
+
+ match result {
+ Ok(response) => {
+ let status = response.status();
+ if !status.is_success() {
+ let body = response.text().await.unwrap_or_default();
+ let error = ApiError::Api {
+ status: status.as_u16(),
+ message: body,
+ };
+ if Self::is_retryable(&error) && attempt < MAX_RETRIES - 1 {
+ last_error = Some(error);
+ continue;
+ }
+ return Err(error);
+ }
+ return Ok(());
+ }
+ Err(e) => {
+ let error = ApiError::Request(e);
+ if Self::is_retryable(&error) && attempt < MAX_RETRIES - 1 {
+ last_error = Some(error);
+ continue;
+ }
+ return Err(error);
+ }
+ }
}
- Ok(())
+ Err(last_error.unwrap())
}
/// Handle API response.
@@ -156,4 +305,24 @@ impl ApiClient {
.map_err(|e| ApiError::Parse(format!("{}: {}", e, body)))
}
}
+
+ /// Check if an error is retryable (connection errors or 5xx server errors).
+ fn is_retryable(error: &ApiError) -> bool {
+ match error {
+ ApiError::Request(e) => {
+ // Retry on connection errors, timeouts, etc.
+ e.is_connect() || e.is_timeout() || e.is_request()
+ }
+ ApiError::Api { status, .. } => {
+ // Retry on 5xx server errors
+ *status >= 500
+ }
+ ApiError::Parse(_) => false,
+ }
+ }
+
+ /// Calculate backoff delay for a given attempt (exponential backoff).
+ fn backoff_delay(attempt: u32) -> Duration {
+ Duration::from_millis(INITIAL_BACKOFF_MS * 2u64.pow(attempt))
+ }
}
diff --git a/makima/src/daemon/config.rs b/makima/src/daemon/config.rs
index b7cb1e8..0b28701 100644
--- a/makima/src/daemon/config.rs
+++ b/makima/src/daemon/config.rs
@@ -276,7 +276,7 @@ fn default_heartbeat_commit_interval() -> u64 {
}
fn default_max_tasks() -> u32 {
- 4
+ 10
}
fn default_max_tasks_per_contract() -> u32 {
diff --git a/makima/src/daemon/task/manager.rs b/makima/src/daemon/task/manager.rs
index 3fdde9b..6ba0f52 100644
--- a/makima/src/daemon/task/manager.rs
+++ b/makima/src/daemon/task/manager.rs
@@ -1000,7 +1000,7 @@ pub struct TaskConfig {
impl Default for TaskConfig {
fn default() -> Self {
Self {
- max_concurrent_tasks: 4,
+ max_concurrent_tasks: 10,
max_tasks_per_contract: 10,
worktree_base_dir: WorktreeManager::default_base_dir(),
env_vars: HashMap::new(),
@@ -4993,9 +4993,10 @@ impl TaskManagerInner {
.unwrap_or_else(|| "unknown".to_string());
// 7. Push to remote (best effort - don't fail if push fails)
+ // Use -u origin HEAD to set upstream if not already set (new branches won't have upstream)
let push_output = tokio::process::Command::new("git")
.current_dir(worktree_path)
- .args(["push"])
+ .args(["push", "-u", "origin", "HEAD"])
.output()
.await;
diff --git a/makima/src/daemon/worktree/manager.rs b/makima/src/daemon/worktree/manager.rs
index 04cb307..fa8a9de 100644
--- a/makima/src/daemon/worktree/manager.rs
+++ b/makima/src/daemon/worktree/manager.rs
@@ -286,34 +286,56 @@ impl WorktreeManager {
tokio::fs::create_dir_all(&self.repos_dir).await?;
if repo_path.exists() {
- // Fetch latest changes
- tracing::info!("Fetching updates for existing repo: {}", repo_name);
- let output = Command::new("git")
- .args(["fetch", "--all", "--prune"])
+ // Verify this is actually a git repository before trying to fetch
+ let is_git_repo = Command::new("git")
+ .args(["rev-parse", "--is-bare-repository"])
.current_dir(&repo_path)
.output()
- .await?;
+ .await
+ .map(|o| o.status.success())
+ .unwrap_or(false);
- if !output.status.success() {
- let stderr = String::from_utf8_lossy(&output.stderr);
- tracing::warn!("Git fetch warning: {}", stderr);
- // Don't fail on fetch errors, repo might still be usable
- }
- } else {
- // Clone the repository
- tracing::info!("Cloning repository: {} -> {}", url, repo_path.display());
- let output = Command::new("git")
- .args(["clone", "--bare", url])
- .arg(&repo_path)
- .output()
- .await?;
+ if !is_git_repo {
+ // Directory exists but is not a git repository - remove and re-clone
+ tracing::warn!(
+ "Directory {} exists but is not a git repository, removing and re-cloning",
+ repo_path.display()
+ );
+ tokio::fs::remove_dir_all(&repo_path).await?;
- if !output.status.success() {
- let stderr = String::from_utf8_lossy(&output.stderr);
- return Err(WorktreeError::CloneFailed(stderr.to_string()));
+ // Fall through to clone below
+ } else {
+ // Fetch latest changes
+ tracing::info!("Fetching updates for existing repo: {}", repo_name);
+ let output = Command::new("git")
+ .args(["fetch", "--all", "--prune"])
+ .current_dir(&repo_path)
+ .output()
+ .await?;
+
+ if !output.status.success() {
+ let stderr = String::from_utf8_lossy(&output.stderr);
+ tracing::warn!("Git fetch warning: {}", stderr);
+ // Don't fail on fetch errors, repo might still be usable
+ }
+
+ return Ok(repo_path);
}
}
+ // Clone the repository
+ tracing::info!("Cloning repository: {} -> {}", url, repo_path.display());
+ let output = Command::new("git")
+ .args(["clone", "--bare", url])
+ .arg(&repo_path)
+ .output()
+ .await?;
+
+ if !output.status.success() {
+ let stderr = String::from_utf8_lossy(&output.stderr);
+ return Err(WorktreeError::CloneFailed(stderr.to_string()));
+ }
+
Ok(repo_path)
}