summaryrefslogtreecommitdiff
path: root/makima/src/daemon
diff options
context:
space:
mode:
authorsoryu <soryu@soryu.co>2026-01-11 05:52:14 +0000
committersoryu <soryu@soryu.co>2026-01-15 00:21:16 +0000
commit87044a747b47bd83249d61a45842c7f7b2eae56d (patch)
treeef2000ce79ffcc2723ef841acef5aa1deb1d5378 /makima/src/daemon
parent077820c4167c168072d217a1b01df840463a12a8 (diff)
downloadsoryu-87044a747b47bd83249d61a45842c7f7b2eae56d.tar.gz
soryu-87044a747b47bd83249d61a45842c7f7b2eae56d.zip
Contract system
Diffstat (limited to 'makima/src/daemon')
-rw-r--r--makima/src/daemon/api/client.rs129
-rw-r--r--makima/src/daemon/api/contract.rs161
-rw-r--r--makima/src/daemon/api/mod.rs7
-rw-r--r--makima/src/daemon/api/supervisor.rs186
-rw-r--r--makima/src/daemon/cli/contract.rs87
-rw-r--r--makima/src/daemon/cli/daemon.rs36
-rw-r--r--makima/src/daemon/cli/mod.rs120
-rw-r--r--makima/src/daemon/cli/server.rs43
-rw-r--r--makima/src/daemon/cli/supervisor.rs146
-rw-r--r--makima/src/daemon/config.rs555
-rw-r--r--makima/src/daemon/db/local.rs391
-rw-r--r--makima/src/daemon/db/mod.rs5
-rw-r--r--makima/src/daemon/error.rs75
-rw-r--r--makima/src/daemon/mod.rs22
-rw-r--r--makima/src/daemon/process/claude.rs509
-rw-r--r--makima/src/daemon/process/claude_protocol.rs59
-rw-r--r--makima/src/daemon/process/mod.rs10
-rw-r--r--makima/src/daemon/task/manager.rs3215
-rw-r--r--makima/src/daemon/task/mod.rs7
-rw-r--r--makima/src/daemon/task/state.rs161
-rw-r--r--makima/src/daemon/temp.rs224
-rw-r--r--makima/src/daemon/worktree/manager.rs1623
-rw-r--r--makima/src/daemon/worktree/mod.rs11
-rw-r--r--makima/src/daemon/ws/client.rs290
-rw-r--r--makima/src/daemon/ws/mod.rs7
-rw-r--r--makima/src/daemon/ws/protocol.rs658
26 files changed, 8737 insertions, 0 deletions
diff --git a/makima/src/daemon/api/client.rs b/makima/src/daemon/api/client.rs
new file mode 100644
index 0000000..b27d606
--- /dev/null
+++ b/makima/src/daemon/api/client.rs
@@ -0,0 +1,129 @@
+//! Base HTTP client for makima API.
+
+use reqwest::Client;
+use serde::{de::DeserializeOwned, Serialize};
+use thiserror::Error;
+
+/// API client errors.
+#[derive(Error, Debug)]
+pub enum ApiError {
+ #[error("HTTP request failed: {0}")]
+ Request(#[from] reqwest::Error),
+
+ #[error("API error (HTTP {status}): {message}")]
+ Api { status: u16, message: String },
+
+ #[error("Failed to parse response: {0}")]
+ Parse(String),
+}
+
+/// HTTP client for makima API.
+pub struct ApiClient {
+ client: Client,
+ base_url: String,
+ api_key: String,
+}
+
+impl ApiClient {
+ /// Create a new API client.
+ pub fn new(base_url: String, api_key: String) -> Result<Self, ApiError> {
+ let client = Client::builder()
+ .build()?;
+
+ Ok(Self {
+ client,
+ base_url: base_url.trim_end_matches('/').to_string(),
+ api_key,
+ })
+ }
+
+ /// Make a GET request.
+ pub async fn get<T: DeserializeOwned>(&self, path: &str) -> Result<T, ApiError> {
+ let url = format!("{}{}", self.base_url, path);
+ let response = self.client
+ .get(&url)
+ .header("X-Makima-Tool-Key", &self.api_key)
+ .send()
+ .await?;
+
+ self.handle_response(response).await
+ }
+
+ /// Make a POST request with JSON body.
+ pub async fn post<T: DeserializeOwned, B: Serialize>(
+ &self,
+ path: &str,
+ body: &B,
+ ) -> Result<T, ApiError> {
+ let url = format!("{}{}", self.base_url, path);
+ let response = self.client
+ .post(&url)
+ .header("X-Makima-Tool-Key", &self.api_key)
+ .header("Content-Type", "application/json")
+ .json(body)
+ .send()
+ .await?;
+
+ self.handle_response(response).await
+ }
+
+ /// Make a POST request without body.
+ pub async fn post_empty<T: DeserializeOwned>(&self, path: &str) -> Result<T, ApiError> {
+ let url = format!("{}{}", self.base_url, path);
+ let response = self.client
+ .post(&url)
+ .header("X-Makima-Tool-Key", &self.api_key)
+ .send()
+ .await?;
+
+ self.handle_response(response).await
+ }
+
+ /// Make a PUT request with JSON body.
+ pub async fn put<T: DeserializeOwned, B: Serialize>(
+ &self,
+ path: &str,
+ body: &B,
+ ) -> Result<T, ApiError> {
+ let url = format!("{}{}", self.base_url, path);
+ let response = self.client
+ .put(&url)
+ .header("X-Makima-Tool-Key", &self.api_key)
+ .header("Content-Type", "application/json")
+ .json(body)
+ .send()
+ .await?;
+
+ self.handle_response(response).await
+ }
+
+ /// Handle API response.
+ async fn handle_response<T: DeserializeOwned>(
+ &self,
+ response: reqwest::Response,
+ ) -> Result<T, ApiError> {
+ let status = response.status();
+ let status_code = status.as_u16();
+
+ if !status.is_success() {
+ let body = response.text().await.unwrap_or_default();
+ return Err(ApiError::Api {
+ status: status_code,
+ message: body,
+ });
+ }
+
+ let body = response.text().await?;
+
+ // Handle empty responses
+ if body.is_empty() || body == "null" {
+ // Try to parse empty/null as the target type
+ serde_json::from_str::<T>("null")
+ .or_else(|_| serde_json::from_str::<T>("{}"))
+ .map_err(|e| ApiError::Parse(e.to_string()))
+ } else {
+ serde_json::from_str::<T>(&body)
+ .map_err(|e| ApiError::Parse(format!("{}: {}", e, body)))
+ }
+ }
+}
diff --git a/makima/src/daemon/api/contract.rs b/makima/src/daemon/api/contract.rs
new file mode 100644
index 0000000..aac6b94
--- /dev/null
+++ b/makima/src/daemon/api/contract.rs
@@ -0,0 +1,161 @@
+//! Contract API methods.
+
+use serde::Serialize;
+use uuid::Uuid;
+
+use super::client::{ApiClient, ApiError};
+use super::supervisor::JsonValue;
+
+// Request types
+
+#[derive(Serialize)]
+#[serde(rename_all = "camelCase")]
+pub struct ReportRequest {
+ pub message: String,
+ #[serde(skip_serializing_if = "Option::is_none")]
+ pub task_id: Option<Uuid>,
+}
+
+#[derive(Serialize)]
+#[serde(rename_all = "camelCase")]
+pub struct CompletionActionRequest {
+ pub lines_added: i32,
+ pub lines_removed: i32,
+ pub has_code_changes: bool,
+ #[serde(skip_serializing_if = "Option::is_none")]
+ pub task_id: Option<Uuid>,
+ #[serde(skip_serializing_if = "Option::is_none")]
+ pub files_modified: Option<Vec<String>>,
+}
+
+#[derive(Serialize)]
+#[serde(rename_all = "camelCase")]
+pub struct UpdateFileRequest {
+ pub content: String,
+}
+
+#[derive(Serialize)]
+#[serde(rename_all = "camelCase")]
+pub struct CreateFileRequest {
+ pub name: String,
+ pub content: String,
+}
+
+impl ApiClient {
+ /// Get contract status.
+ pub async fn contract_status(&self, contract_id: Uuid) -> Result<JsonValue, ApiError> {
+ self.get(&format!("/api/v1/contracts/{}/daemon/status", contract_id))
+ .await
+ }
+
+ /// Get phase checklist.
+ pub async fn contract_checklist(&self, contract_id: Uuid) -> Result<JsonValue, ApiError> {
+ self.get(&format!("/api/v1/contracts/{}/daemon/checklist", contract_id))
+ .await
+ }
+
+ /// Get contract goals.
+ pub async fn contract_goals(&self, contract_id: Uuid) -> Result<JsonValue, ApiError> {
+ self.get(&format!("/api/v1/contracts/{}/daemon/goals", contract_id))
+ .await
+ }
+
+ /// List contract files.
+ pub async fn contract_files(&self, contract_id: Uuid) -> Result<JsonValue, ApiError> {
+ self.get(&format!("/api/v1/contracts/{}/daemon/files", contract_id))
+ .await
+ }
+
+ /// Get a specific file.
+ pub async fn contract_file(
+ &self,
+ contract_id: Uuid,
+ file_id: Uuid,
+ ) -> Result<JsonValue, ApiError> {
+ self.get(&format!(
+ "/api/v1/contracts/{}/daemon/files/{}",
+ contract_id, file_id
+ ))
+ .await
+ }
+
+ /// Report progress.
+ pub async fn contract_report(
+ &self,
+ contract_id: Uuid,
+ message: &str,
+ task_id: Option<Uuid>,
+ ) -> Result<JsonValue, ApiError> {
+ let req = ReportRequest {
+ message: message.to_string(),
+ task_id,
+ };
+ self.post(&format!("/api/v1/contracts/{}/daemon/report", contract_id), &req)
+ .await
+ }
+
+ /// Get suggested action.
+ pub async fn contract_suggest_action(&self, contract_id: Uuid) -> Result<JsonValue, ApiError> {
+ self.post_empty(&format!(
+ "/api/v1/contracts/{}/daemon/suggest-action",
+ contract_id
+ ))
+ .await
+ }
+
+ /// Get completion action recommendation.
+ pub async fn contract_completion_action(
+ &self,
+ contract_id: Uuid,
+ task_id: Option<Uuid>,
+ files_modified: Option<Vec<String>>,
+ lines_added: i32,
+ lines_removed: i32,
+ has_code_changes: bool,
+ ) -> Result<JsonValue, ApiError> {
+ let req = CompletionActionRequest {
+ task_id,
+ files_modified,
+ lines_added,
+ lines_removed,
+ has_code_changes,
+ };
+ self.post(
+ &format!("/api/v1/contracts/{}/daemon/completion-action", contract_id),
+ &req,
+ )
+ .await
+ }
+
+ /// Update a file.
+ pub async fn contract_update_file(
+ &self,
+ contract_id: Uuid,
+ file_id: Uuid,
+ content: &str,
+ ) -> Result<JsonValue, ApiError> {
+ let req = UpdateFileRequest {
+ content: content.to_string(),
+ };
+ self.put(
+ &format!("/api/v1/contracts/{}/daemon/files/{}", contract_id, file_id),
+ &req,
+ )
+ .await
+ }
+
+ /// Create a new file.
+ pub async fn contract_create_file(
+ &self,
+ contract_id: Uuid,
+ name: &str,
+ content: &str,
+ ) -> Result<JsonValue, ApiError> {
+ let req = CreateFileRequest {
+ name: name.to_string(),
+ content: content.to_string(),
+ };
+ self.post(&format!("/api/v1/contracts/{}/daemon/files", contract_id), &req)
+ .await
+ }
+}
diff --git a/makima/src/daemon/api/mod.rs b/makima/src/daemon/api/mod.rs
new file mode 100644
index 0000000..0c05fb4
--- /dev/null
+++ b/makima/src/daemon/api/mod.rs
@@ -0,0 +1,7 @@
+//! HTTP API client for makima CLI commands.
+
+pub mod client;
+pub mod contract;
+pub mod supervisor;
+
+pub use client::ApiClient;
diff --git a/makima/src/daemon/api/supervisor.rs b/makima/src/daemon/api/supervisor.rs
new file mode 100644
index 0000000..b691cc4
--- /dev/null
+++ b/makima/src/daemon/api/supervisor.rs
@@ -0,0 +1,186 @@
+//! Supervisor API methods.
+
+use serde::{Deserialize, Serialize};
+use uuid::Uuid;
+
+use super::client::{ApiClient, ApiError};
+
+// Request/Response types
+
+#[derive(Serialize)]
+#[serde(rename_all = "camelCase")]
+pub struct SpawnTaskRequest {
+ pub name: String,
+ pub plan: String,
+ pub contract_id: Uuid,
+ #[serde(skip_serializing_if = "Option::is_none")]
+ pub parent_task_id: Option<Uuid>,
+ #[serde(skip_serializing_if = "Option::is_none")]
+ pub checkpoint_sha: Option<String>,
+}
+
+#[derive(Serialize)]
+#[serde(rename_all = "camelCase")]
+pub struct WaitRequest {
+ pub timeout_seconds: i32,
+}
+
+#[derive(Serialize)]
+#[serde(rename_all = "camelCase")]
+pub struct ReadFileRequest {
+ pub file_path: String,
+}
+
+#[derive(Serialize)]
+#[serde(rename_all = "camelCase")]
+pub struct CreateBranchRequest {
+ pub branch_name: String,
+ #[serde(skip_serializing_if = "Option::is_none")]
+ pub from_ref: Option<String>,
+}
+
+#[derive(Serialize)]
+#[serde(rename_all = "camelCase")]
+pub struct MergeRequest {
+ pub squash: bool,
+ #[serde(skip_serializing_if = "Option::is_none")]
+ pub target_branch: Option<String>,
+}
+
+#[derive(Serialize)]
+#[serde(rename_all = "camelCase")]
+pub struct CreatePrRequest {
+ pub task_id: Uuid,
+ pub title: String,
+ pub body: String,
+ pub base_branch: String,
+}
+
+#[derive(Serialize)]
+#[serde(rename_all = "camelCase")]
+pub struct CheckpointRequest {
+ pub message: String,
+}
+
+// Generic response type for JSON output
+#[derive(Deserialize, Serialize)]
+pub struct JsonValue(pub serde_json::Value);
+
+impl ApiClient {
+ /// Get all tasks in a contract.
+ pub async fn supervisor_tasks(&self, contract_id: Uuid) -> Result<JsonValue, ApiError> {
+ self.get(&format!("/api/v1/mesh/supervisor/contracts/{}/tasks", contract_id))
+ .await
+ }
+
+ /// Get task tree structure.
+ pub async fn supervisor_tree(&self, contract_id: Uuid) -> Result<JsonValue, ApiError> {
+ self.get(&format!("/api/v1/mesh/supervisor/contracts/{}/tree", contract_id))
+ .await
+ }
+
+ /// Spawn a new task.
+ pub async fn supervisor_spawn(&self, req: SpawnTaskRequest) -> Result<JsonValue, ApiError> {
+ self.post("/api/v1/mesh/supervisor/tasks", &req).await
+ }
+
+ /// Wait for a task to complete.
+ pub async fn supervisor_wait(
+ &self,
+ task_id: Uuid,
+ timeout_seconds: i32,
+ ) -> Result<JsonValue, ApiError> {
+ let req = WaitRequest { timeout_seconds };
+ self.post(&format!("/api/v1/mesh/supervisor/tasks/{}/wait", task_id), &req)
+ .await
+ }
+
+ /// Read a file from a task's worktree.
+ pub async fn supervisor_read_file(
+ &self,
+ task_id: Uuid,
+ file_path: &str,
+ ) -> Result<JsonValue, ApiError> {
+ let req = ReadFileRequest {
+ file_path: file_path.to_string(),
+ };
+ self.post(&format!("/api/v1/mesh/supervisor/tasks/{}/read-file", task_id), &req)
+ .await
+ }
+
+ /// Create a new branch.
+ pub async fn supervisor_branch(
+ &self,
+ branch_name: &str,
+ from_ref: Option<String>,
+ ) -> Result<JsonValue, ApiError> {
+ let req = CreateBranchRequest {
+ branch_name: branch_name.to_string(),
+ from_ref,
+ };
+ self.post("/api/v1/mesh/supervisor/branches", &req).await
+ }
+
+ /// Merge a task's changes.
+ pub async fn supervisor_merge(
+ &self,
+ task_id: Uuid,
+ target_branch: Option<String>,
+ squash: bool,
+ ) -> Result<JsonValue, ApiError> {
+ let req = MergeRequest {
+ squash,
+ target_branch,
+ };
+ self.post(&format!("/api/v1/mesh/supervisor/tasks/{}/merge", task_id), &req)
+ .await
+ }
+
+ /// Create a pull request.
+ pub async fn supervisor_pr(
+ &self,
+ task_id: Uuid,
+ title: &str,
+ body: &str,
+ base_branch: &str,
+ ) -> Result<JsonValue, ApiError> {
+ let req = CreatePrRequest {
+ task_id,
+ title: title.to_string(),
+ body: body.to_string(),
+ base_branch: base_branch.to_string(),
+ };
+ self.post("/api/v1/mesh/supervisor/pr", &req).await
+ }
+
+ /// Get task diff.
+ pub async fn supervisor_diff(&self, task_id: Uuid) -> Result<JsonValue, ApiError> {
+ self.get(&format!("/api/v1/mesh/supervisor/tasks/{}/diff", task_id))
+ .await
+ }
+
+ /// Create a checkpoint.
+ pub async fn supervisor_checkpoint(
+ &self,
+ task_id: Uuid,
+ message: &str,
+ ) -> Result<JsonValue, ApiError> {
+ let req = CheckpointRequest {
+ message: message.to_string(),
+ };
+ self.post(&format!("/api/v1/mesh/tasks/{}/checkpoint", task_id), &req)
+ .await
+ }
+
+ /// List checkpoints.
+ pub async fn supervisor_checkpoints(&self, task_id: Uuid) -> Result<JsonValue, ApiError> {
+ self.get(&format!("/api/v1/mesh/tasks/{}/checkpoints", task_id))
+ .await
+ }
+
+ /// Get contract status.
+ pub async fn supervisor_status(&self, contract_id: Uuid) -> Result<JsonValue, ApiError> {
+ self.get(&format!("/api/v1/contracts/{}/daemon/status", contract_id))
+ .await
+ }
+}
diff --git a/makima/src/daemon/cli/contract.rs b/makima/src/daemon/cli/contract.rs
new file mode 100644
index 0000000..5fef5ec
--- /dev/null
+++ b/makima/src/daemon/cli/contract.rs
@@ -0,0 +1,87 @@
+//! Contract subcommand - task-contract interaction commands.
+
+use clap::Args;
+use uuid::Uuid;
+
+/// Common arguments for contract commands.
+#[derive(Args, Debug, Clone)]
+pub struct ContractArgs {
+ /// API URL
+ #[arg(long, env = "MAKIMA_API_URL", default_value = "http://localhost:8080", global = true)]
+ pub api_url: String,
+
+ /// API key for authentication
+ #[arg(long, env = "MAKIMA_API_KEY", global = true)]
+ pub api_key: String,
+
+ /// Current task ID (optional)
+ #[arg(long, env = "MAKIMA_TASK_ID", global = true)]
+ pub task_id: Option<Uuid>,
+
+ /// Contract ID
+ #[arg(long, env = "MAKIMA_CONTRACT_ID", global = true)]
+ pub contract_id: Uuid,
+}
+
+/// Arguments for file command (get specific file).
+#[derive(Args, Debug)]
+pub struct FileArgs {
+ #[command(flatten)]
+ pub common: ContractArgs,
+
+ /// File ID to retrieve
+ pub file_id: Uuid,
+}
+
+/// Arguments for report command.
+#[derive(Args, Debug)]
+pub struct ReportArgs {
+ #[command(flatten)]
+ pub common: ContractArgs,
+
+ /// Progress message
+ pub message: String,
+}
+
+/// Arguments for completion-action command.
+#[derive(Args, Debug)]
+pub struct CompletionActionArgs {
+ #[command(flatten)]
+ pub common: ContractArgs,
+
+ /// Comma-separated list of modified files
+ #[arg(long)]
+ pub files: Option<String>,
+
+ /// Number of lines added
+ #[arg(long, default_value = "0")]
+ pub lines_added: i32,
+
+ /// Number of lines removed
+ #[arg(long, default_value = "0")]
+ pub lines_removed: i32,
+
+ /// Whether there are code changes
+ #[arg(long)]
+ pub code: bool,
+}
+
+/// Arguments for update-file command.
+#[derive(Args, Debug)]
+pub struct UpdateFileArgs {
+ #[command(flatten)]
+ pub common: ContractArgs,
+
+ /// File ID to update
+ pub file_id: Uuid,
+}
+
+/// Arguments for create-file command.
+#[derive(Args, Debug)]
+pub struct CreateFileArgs {
+ #[command(flatten)]
+ pub common: ContractArgs,
+
+ /// Name of the new file
+ pub name: String,
+}
diff --git a/makima/src/daemon/cli/daemon.rs b/makima/src/daemon/cli/daemon.rs
new file mode 100644
index 0000000..de4cff4
--- /dev/null
+++ b/makima/src/daemon/cli/daemon.rs
@@ -0,0 +1,36 @@
+//! Daemon subcommand - connect to server and manage tasks.
+
+use clap::Args;
+use std::path::PathBuf;
+
+/// Run the makima daemon (connect to server and manage tasks).
+#[derive(Args, Debug)]
+pub struct DaemonArgs {
+ /// Path to custom config file
+ #[arg(short, long)]
+ pub config: Option<PathBuf>,
+
+ /// Directory where repositories are cloned
+ #[arg(long, env = "MAKIMA_DAEMON_REPOS_DIR")]
+ pub repos_dir: Option<PathBuf>,
+
+ /// Directory where worktrees are created
+ #[arg(long, env = "MAKIMA_DAEMON_WORKTREES_DIR")]
+ pub worktrees_dir: Option<PathBuf>,
+
+ /// WebSocket server URL to connect to
+ #[arg(long, env = "MAKIMA_DAEMON_SERVER_URL")]
+ pub server_url: Option<String>,
+
+ /// API key for server authentication
+ #[arg(long, env = "MAKIMA_DAEMON_SERVER_APIKEY")]
+ pub api_key: Option<String>,
+
+ /// Maximum number of concurrent tasks
+ #[arg(long)]
+ pub max_tasks: Option<u32>,
+
+ /// Log level (trace, debug, info, warn, error)
+ #[arg(short, long, default_value = "info")]
+ pub log_level: String,
+}
diff --git a/makima/src/daemon/cli/mod.rs b/makima/src/daemon/cli/mod.rs
new file mode 100644
index 0000000..24c19c6
--- /dev/null
+++ b/makima/src/daemon/cli/mod.rs
@@ -0,0 +1,120 @@
+//! Command-line interface for the makima CLI.
+
+pub mod contract;
+pub mod daemon;
+pub mod server;
+pub mod supervisor;
+
+use clap::{Parser, Subcommand};
+
+pub use contract::ContractArgs;
+pub use daemon::DaemonArgs;
+pub use server::ServerArgs;
+pub use supervisor::SupervisorArgs;
+
+/// Makima - unified CLI for server, daemon, and task management.
+#[derive(Parser, Debug)]
+#[command(name = "makima")]
+#[command(version, about = "Makima CLI - server, daemon, and task management", long_about = None)]
+pub struct Cli {
+ #[command(subcommand)]
+ pub command: Commands,
+}
+
+#[derive(Subcommand, Debug)]
+pub enum Commands {
+ /// Run the makima server
+ Server(ServerArgs),
+
+ /// Run the daemon (connect to server, manage tasks)
+ Daemon(DaemonArgs),
+
+ /// Supervisor commands for contract orchestration
+ #[command(subcommand)]
+ Supervisor(SupervisorCommand),
+
+ /// Contract commands for task-contract interaction
+ #[command(subcommand)]
+ Contract(ContractCommand),
+}
+
+/// Supervisor subcommands for contract orchestration.
+#[derive(Subcommand, Debug)]
+pub enum SupervisorCommand {
+ /// List all tasks in the contract
+ Tasks(SupervisorArgs),
+
+ /// Get the task tree structure
+ Tree(SupervisorArgs),
+
+ /// Create and start a new task
+ Spawn(supervisor::SpawnArgs),
+
+ /// Wait for a task to complete
+ Wait(supervisor::WaitArgs),
+
+ /// Read a file from a task's worktree
+ ReadFile(supervisor::ReadFileArgs),
+
+ /// Create a git branch
+ Branch(supervisor::BranchArgs),
+
+ /// Merge a task's changes to a branch
+ Merge(supervisor::MergeArgs),
+
+ /// Create a pull request
+ Pr(supervisor::PrArgs),
+
+ /// View task diff
+ Diff(supervisor::DiffArgs),
+
+ /// Create a checkpoint
+ Checkpoint(supervisor::CheckpointArgs),
+
+ /// List checkpoints
+ Checkpoints(SupervisorArgs),
+
+ /// Get contract status
+ Status(SupervisorArgs),
+}
+
+/// Contract subcommands for task-contract interaction.
+#[derive(Subcommand, Debug)]
+pub enum ContractCommand {
+ /// Get contract status
+ Status(ContractArgs),
+
+ /// Get the phase checklist
+ Checklist(ContractArgs),
+
+ /// Get contract goals
+ Goals(ContractArgs),
+
+ /// List contract files
+ Files(ContractArgs),
+
+ /// Get a specific file's content
+ File(contract::FileArgs),
+
+ /// Report progress on the contract
+ Report(contract::ReportArgs),
+
+ /// Get suggested next action
+ SuggestAction(ContractArgs),
+
+ /// Get completion recommendation
+ CompletionAction(contract::CompletionActionArgs),
+
+ /// Update a file (reads content from stdin)
+ UpdateFile(contract::UpdateFileArgs),
+
+ /// Create a new file (reads content from stdin)
+ CreateFile(contract::CreateFileArgs),
+}
+
+impl Cli {
+ /// Parse command-line arguments
+ pub fn parse_args() -> Self {
+ Self::parse()
+ }
+}
diff --git a/makima/src/daemon/cli/server.rs b/makima/src/daemon/cli/server.rs
new file mode 100644
index 0000000..371a912
--- /dev/null
+++ b/makima/src/daemon/cli/server.rs
@@ -0,0 +1,43 @@
+//! Server subcommand - run the makima server.
+
+use clap::Args;
+
+/// Run the makima server.
+#[derive(Args, Debug)]
+pub struct ServerArgs {
+ /// Server port
+ #[arg(long, env = "PORT", default_value = "8080")]
+ pub port: u16,
+
+ /// Path to parakeet model directory
+ #[arg(
+ long,
+ env = "PARAKEET_MODEL_DIR",
+ default_value = "models/parakeet-tdt-0.6b-v3"
+ )]
+ pub parakeet_model_dir: String,
+
+ /// Path to parakeet EOU model directory
+ #[arg(
+ long,
+ env = "PARAKEET_EOU_DIR",
+ default_value = "models/realtime_eou_120m-v1-onnx"
+ )]
+ pub parakeet_eou_dir: String,
+
+ /// Path to sortformer model
+ #[arg(
+ long,
+ env = "SORTFORMER_MODEL_PATH",
+ default_value = "models/diarization/diar_streaming_sortformer_4spk-v2.1.onnx"
+ )]
+ pub sortformer_model_path: String,
+
+ /// PostgreSQL connection URI
+ #[arg(long, env = "POSTGRES_CONNECTION_URI")]
+ pub database_url: Option<String>,
+
+ /// Log level (trace, debug, info, warn, error)
+ #[arg(short, long, default_value = "info")]
+ pub log_level: String,
+}
diff --git a/makima/src/daemon/cli/supervisor.rs b/makima/src/daemon/cli/supervisor.rs
new file mode 100644
index 0000000..00c7ff4
--- /dev/null
+++ b/makima/src/daemon/cli/supervisor.rs
@@ -0,0 +1,146 @@
+//! Supervisor subcommand - contract orchestration commands.
+
+use clap::Args;
+use uuid::Uuid;
+
+/// Common arguments for supervisor commands.
+#[derive(Args, Debug, Clone)]
+pub struct SupervisorArgs {
+ /// API URL
+ #[arg(long, env = "MAKIMA_API_URL", default_value = "http://localhost:8080", global = true)]
+ pub api_url: String,
+
+ /// API key for authentication
+ #[arg(long, env = "MAKIMA_API_KEY", global = true)]
+ pub api_key: String,
+
+ /// Current task ID (optional)
+ #[arg(long, env = "MAKIMA_TASK_ID", global = true)]
+ pub task_id: Option<Uuid>,
+
+ /// Contract ID
+ #[arg(long, env = "MAKIMA_CONTRACT_ID", global = true)]
+ pub contract_id: Uuid,
+}
+
+/// Arguments for spawn command.
+#[derive(Args, Debug)]
+pub struct SpawnArgs {
+ #[command(flatten)]
+ pub common: SupervisorArgs,
+
+ /// Name of the task
+ pub name: String,
+
+ /// Plan/description for the task
+ pub plan: String,
+
+ /// Parent task ID to branch from
+ #[arg(long)]
+ pub parent: Option<Uuid>,
+
+ /// Checkpoint SHA to start from
+ #[arg(long)]
+ pub checkpoint: Option<String>,
+}
+
+/// Arguments for wait command.
+#[derive(Args, Debug)]
+pub struct WaitArgs {
+ #[command(flatten)]
+ pub common: SupervisorArgs,
+
+ /// Task ID to wait for
+ pub task_id: Uuid,
+
+ /// Timeout in seconds
+ #[arg(default_value = "300")]
+ pub timeout: i32,
+}
+
+/// Arguments for read-file command.
+#[derive(Args, Debug)]
+pub struct ReadFileArgs {
+ #[command(flatten)]
+ pub common: SupervisorArgs,
+
+ /// Task ID to read from
+ pub task_id: Uuid,
+
+ /// File path to read
+ pub file_path: String,
+}
+
+/// Arguments for branch command.
+#[derive(Args, Debug)]
+pub struct BranchArgs {
+ #[command(flatten)]
+ pub common: SupervisorArgs,
+
+ /// Branch name to create
+ pub name: String,
+
+ /// Reference (task ID or SHA) to branch from
+ #[arg(long)]
+ pub from: Option<String>,
+}
+
+/// Arguments for merge command.
+#[derive(Args, Debug)]
+pub struct MergeArgs {
+ #[command(flatten)]
+ pub common: SupervisorArgs,
+
+ /// Task ID to merge
+ pub task_id: Uuid,
+
+ /// Target branch to merge into
+ #[arg(long)]
+ pub to: Option<String>,
+
+ /// Squash commits on merge
+ #[arg(long)]
+ pub squash: bool,
+}
+
+/// Arguments for pr command.
+#[derive(Args, Debug)]
+pub struct PrArgs {
+ #[command(flatten)]
+ pub common: SupervisorArgs,
+
+ /// Task ID to create PR for
+ pub task_id: Uuid,
+
+ /// PR title
+ #[arg(long)]
+ pub title: String,
+
+ /// PR body/description
+ #[arg(long)]
+ pub body: Option<String>,
+
+ /// Base branch (default: main)
+ #[arg(long, default_value = "main")]
+ pub base: String,
+}
+
+/// Arguments for diff command.
+#[derive(Args, Debug)]
+pub struct DiffArgs {
+ #[command(flatten)]
+ pub common: SupervisorArgs,
+
+ /// Task ID to get diff for
+ pub task_id: Uuid,
+}
+
+/// Arguments for checkpoint command.
+#[derive(Args, Debug)]
+pub struct CheckpointArgs {
+ #[command(flatten)]
+ pub common: SupervisorArgs,
+
+ /// Checkpoint message
+ pub message: String,
+}
diff --git a/makima/src/daemon/config.rs b/makima/src/daemon/config.rs
new file mode 100644
index 0000000..866ee70
--- /dev/null
+++ b/makima/src/daemon/config.rs
@@ -0,0 +1,555 @@
+//! Configuration management for the makima daemon.
+
+use config::{Config, Environment, File};
+use serde::Deserialize;
+use std::collections::HashMap;
+use std::path::PathBuf;
+
+/// Root daemon configuration.
+#[derive(Debug, Clone, Deserialize)]
+pub struct DaemonConfig {
+ /// Server connection settings.
+ #[serde(default)]
+ pub server: ServerConfig,
+
+ /// Worktree settings.
+ #[serde(default)]
+ pub worktree: WorktreeConfig,
+
+ /// Process settings.
+ #[serde(default)]
+ pub process: ProcessConfig,
+
+ /// Local database settings.
+ #[serde(default)]
+ pub local_db: LocalDbConfig,
+
+ /// Logging settings.
+ #[serde(default)]
+ pub logging: LoggingConfig,
+
+ /// Repositories to auto-clone on startup.
+ #[serde(default)]
+ pub repos: ReposConfig,
+}
+
+/// Server connection configuration.
+#[derive(Debug, Clone, Deserialize)]
+#[serde(default)]
+pub struct ServerConfig {
+ /// WebSocket URL of makima server (e.g., ws://localhost:8080 or wss://makima.example.com).
+ /// Defaults to wss://api.makima.jp.
+ #[serde(default = "default_server_url")]
+ pub url: String,
+
+ /// API key for authentication.
+ #[serde(default, alias = "apikey")]
+ pub api_key: String,
+
+ /// Heartbeat interval in seconds.
+ #[serde(default = "default_heartbeat_interval", alias = "heartbeatintervalsecs")]
+ pub heartbeat_interval_secs: u64,
+
+ /// Reconnect interval in seconds after connection loss.
+ #[serde(default = "default_reconnect_interval", alias = "reconnectintervalsecs")]
+ pub reconnect_interval_secs: u64,
+
+ /// Maximum reconnect attempts before giving up (0 = infinite).
+ #[serde(default, alias = "maxreconnectattempts")]
+ pub max_reconnect_attempts: u32,
+}
+
+fn default_heartbeat_interval() -> u64 {
+ 30
+}
+
+fn default_reconnect_interval() -> u64 {
+ 5
+}
+
+fn default_server_url() -> String {
+ "wss://api.makima.jp".to_string()
+}
+
+impl Default for ServerConfig {
+ fn default() -> Self {
+ Self {
+ url: default_server_url(),
+ api_key: String::new(),
+ heartbeat_interval_secs: default_heartbeat_interval(),
+ reconnect_interval_secs: default_reconnect_interval(),
+ max_reconnect_attempts: 0,
+ }
+ }
+}
+
+/// Worktree configuration for task isolation.
+#[derive(Debug, Clone, Deserialize)]
+#[serde(default)]
+pub struct WorktreeConfig {
+ /// Base directory for worktrees (~/.makima/worktrees).
+ #[serde(default = "default_worktree_base_dir", alias = "basedir")]
+ pub base_dir: PathBuf,
+
+ /// Base directory for cloned repositories (~/.makima/repos).
+ #[serde(default = "default_repos_base_dir", alias = "reposdir")]
+ pub repos_dir: PathBuf,
+
+ /// Branch prefix for task branches.
+ #[serde(default = "default_branch_prefix", alias = "branchprefix")]
+ pub branch_prefix: String,
+
+ /// Clean up worktrees on daemon start.
+ #[serde(default, alias = "cleanuponstart")]
+ pub cleanup_on_start: bool,
+
+ /// Default target repository path for pushing completed branches.
+ /// Used when task.target_repo_path is not set.
+ #[serde(default, alias = "defaulttargetrepo")]
+ pub default_target_repo: Option<PathBuf>,
+}
+
+fn default_worktree_base_dir() -> PathBuf {
+ dirs::home_dir()
+ .unwrap_or_else(|| PathBuf::from("."))
+ .join(".makima")
+ .join("worktrees")
+}
+
+fn default_repos_base_dir() -> PathBuf {
+ dirs::home_dir()
+ .unwrap_or_else(|| PathBuf::from("."))
+ .join(".makima")
+ .join("repos")
+}
+
+fn default_branch_prefix() -> String {
+ "makima/task-".to_string()
+}
+
+impl Default for WorktreeConfig {
+ fn default() -> Self {
+ Self {
+ base_dir: default_worktree_base_dir(),
+ repos_dir: default_repos_base_dir(),
+ branch_prefix: default_branch_prefix(),
+ cleanup_on_start: false,
+ default_target_repo: None,
+ }
+ }
+}
+
+/// Process configuration for Claude Code subprocess execution.
+#[derive(Debug, Clone, Deserialize)]
+#[serde(default)]
+pub struct ProcessConfig {
+ /// Path or command for Claude Code CLI.
+ #[serde(default = "default_claude_command", alias = "claudecommand")]
+ pub claude_command: String,
+
+ /// Additional arguments to pass to Claude Code.
+ /// These are added after the default arguments.
+ #[serde(default, alias = "claudeargs")]
+ pub claude_args: Vec<String>,
+
+ /// Arguments to pass before the default arguments.
+ /// Useful for overriding defaults.
+ #[serde(default, alias = "claudepreargs")]
+ pub claude_pre_args: Vec<String>,
+
+ /// Skip the --dangerously-skip-permissions flag (default: false).
+ /// Set to true if you want to use Claude's permission system.
+ #[serde(default, alias = "enablepermissions")]
+ pub enable_permissions: bool,
+
+ /// Skip the --verbose flag (default: false).
+ #[serde(default, alias = "disableverbose")]
+ pub disable_verbose: bool,
+
+ /// Maximum concurrent tasks.
+ #[serde(default = "default_max_tasks", alias = "maxconcurrenttasks")]
+ pub max_concurrent_tasks: u32,
+
+ /// Default timeout for tasks in seconds (0 = no timeout).
+ #[serde(default, alias = "defaulttimeoutsecs")]
+ pub default_timeout_secs: u64,
+
+ /// Additional environment variables to pass to Claude Code.
+ #[serde(default, alias = "envvars")]
+ pub env_vars: HashMap<String, String>,
+}
+
+fn default_claude_command() -> String {
+ "claude".to_string()
+}
+
+fn default_max_tasks() -> u32 {
+ 4
+}
+
+impl Default for ProcessConfig {
+ fn default() -> Self {
+ Self {
+ claude_command: default_claude_command(),
+ claude_args: Vec::new(),
+ claude_pre_args: Vec::new(),
+ enable_permissions: false,
+ disable_verbose: false,
+ max_concurrent_tasks: default_max_tasks(),
+ default_timeout_secs: 0,
+ env_vars: HashMap::new(),
+ }
+ }
+}
+
+/// Local database configuration.
+#[derive(Debug, Clone, Deserialize)]
+#[serde(default)]
+pub struct LocalDbConfig {
+ /// Path to local SQLite database.
+ #[serde(default = "default_db_path")]
+ pub path: PathBuf,
+}
+
+impl Default for LocalDbConfig {
+ fn default() -> Self {
+ Self {
+ path: default_db_path(),
+ }
+ }
+}
+
+fn default_db_path() -> PathBuf {
+ dirs::home_dir()
+ .unwrap_or_else(|| PathBuf::from("."))
+ .join(".makima")
+ .join("daemon.db")
+}
+
+/// Logging configuration.
+#[derive(Debug, Clone, Deserialize, Default)]
+pub struct LoggingConfig {
+ /// Log level: "trace", "debug", "info", "warn", "error".
+ #[serde(default = "default_log_level")]
+ pub level: String,
+
+ /// Log format: "pretty" or "json".
+ #[serde(default = "default_log_format")]
+ pub format: String,
+}
+
+fn default_log_level() -> String {
+ "info".to_string()
+}
+
+fn default_log_format() -> String {
+ "pretty".to_string()
+}
+
+/// Repository auto-clone configuration.
+#[derive(Debug, Clone, Deserialize, Default)]
+pub struct ReposConfig {
+ /// Directory to clone repositories into (default: ~/.makima/home).
+ #[serde(default = "default_home_dir")]
+ pub home_dir: PathBuf,
+
+ /// List of repositories to auto-clone on startup.
+ /// Each entry can be a URL (e.g., "https://github.com/user/repo.git")
+ /// or a shorthand (e.g., "github:user/repo").
+ #[serde(default, alias = "autoclone")]
+ pub auto_clone: Vec<RepoEntry>,
+}
+
+/// A repository entry for auto-cloning.
+#[derive(Debug, Clone, Deserialize)]
+#[serde(untagged)]
+pub enum RepoEntry {
+ /// Simple URL string.
+ Url(String),
+ /// Detailed configuration.
+ Config {
+ /// Repository URL.
+ url: String,
+ /// Custom directory name (defaults to repo name from URL).
+ #[serde(default)]
+ name: Option<String>,
+ /// Branch to checkout after cloning (defaults to default branch).
+ #[serde(default)]
+ branch: Option<String>,
+ /// Whether to do a shallow clone (default: false).
+ #[serde(default)]
+ shallow: bool,
+ },
+}
+
+impl RepoEntry {
+ /// Get the URL for this repo entry.
+ pub fn url(&self) -> &str {
+ match self {
+ RepoEntry::Url(url) => url,
+ RepoEntry::Config { url, .. } => url,
+ }
+ }
+
+ /// Get the custom name, if any.
+ pub fn name(&self) -> Option<&str> {
+ match self {
+ RepoEntry::Url(_) => None,
+ RepoEntry::Config { name, .. } => name.as_deref(),
+ }
+ }
+
+ /// Get the branch to checkout, if any.
+ pub fn branch(&self) -> Option<&str> {
+ match self {
+ RepoEntry::Url(_) => None,
+ RepoEntry::Config { branch, .. } => branch.as_deref(),
+ }
+ }
+
+ /// Whether to do a shallow clone.
+ pub fn shallow(&self) -> bool {
+ match self {
+ RepoEntry::Url(_) => false,
+ RepoEntry::Config { shallow, .. } => *shallow,
+ }
+ }
+
+ /// Get the directory name to use (either custom name or derived from URL).
+ pub fn dir_name(&self) -> Option<String> {
+ if let Some(name) = self.name() {
+ return Some(name.to_string());
+ }
+
+ // Derive from URL
+ let url = self.url();
+
+ // Handle shorthand formats
+ let url = if url.starts_with("github:") {
+ url.strip_prefix("github:").unwrap_or(url)
+ } else if url.starts_with("gitlab:") {
+ url.strip_prefix("gitlab:").unwrap_or(url)
+ } else {
+ url
+ };
+
+ // Extract repo name from URL
+ url.trim_end_matches('/')
+ .trim_end_matches(".git")
+ .rsplit('/')
+ .next()
+ .map(|s| s.to_string())
+ }
+
+ /// Expand the URL (e.g., convert shorthand to full URL).
+ pub fn expanded_url(&self) -> String {
+ let url = self.url();
+
+ if url.starts_with("github:") {
+ format!("https://github.com/{}.git", url.strip_prefix("github:").unwrap_or(""))
+ } else if url.starts_with("gitlab:") {
+ format!("https://gitlab.com/{}.git", url.strip_prefix("gitlab:").unwrap_or(""))
+ } else {
+ url.to_string()
+ }
+ }
+}
+
+fn default_home_dir() -> PathBuf {
+ dirs::home_dir()
+ .unwrap_or_else(|| PathBuf::from("."))
+ .join(".makima")
+ .join("home")
+}
+
+impl DaemonConfig {
+ /// Load configuration from files and environment variables.
+ ///
+ /// Configuration sources (in order of precedence):
+ /// 1. Environment variables (MAKIMA_API_KEY, MAKIMA_DAEMON_SERVER_URL, etc.)
+ /// 2. ./makima-daemon.toml (current directory)
+ /// 3. ~/.config/makima-daemon/config.toml
+ /// 4. /etc/makima-daemon/config.toml (Linux only)
+ ///
+ /// Environment variable examples:
+ /// - MAKIMA_API_KEY=your-api-key (preferred)
+ /// - MAKIMA_DAEMON_SERVER_URL=ws://localhost:8080
+ /// - MAKIMA_DAEMON_PROCESS_MAXCONCURRENTTASKS=4
+ pub fn load() -> Result<Self, config::ConfigError> {
+ Self::load_from_path(None)
+ }
+
+ /// Load configuration from a specific path plus standard sources.
+ fn load_from_path(config_path: Option<&std::path::Path>) -> Result<Self, config::ConfigError> {
+ let mut builder = Config::builder();
+
+ // System-wide config (Linux only)
+ #[cfg(target_os = "linux")]
+ {
+ builder = builder.add_source(
+ File::with_name("/etc/makima-daemon/config").required(false),
+ );
+ }
+
+ // User config
+ if let Some(config_dir) = dirs::config_dir() {
+ let user_config = config_dir.join("makima-daemon").join("config");
+ builder = builder.add_source(
+ File::with_name(user_config.to_str().unwrap_or("")).required(false),
+ );
+ }
+
+ // Local config
+ builder = builder.add_source(File::with_name("makima-daemon").required(false));
+
+ // Custom config file (if provided)
+ if let Some(path) = config_path {
+ builder = builder.add_source(
+ File::with_name(path.to_str().unwrap_or("")).required(true),
+ );
+ }
+
+ // Environment variables with underscore separator for nesting
+ // e.g., MAKIMA_DAEMON_SERVER_URL -> server.url
+ // MAKIMA_DAEMON_SERVER_APIKEY -> server.api_key
+ builder = builder.add_source(
+ Environment::with_prefix("MAKIMA_DAEMON")
+ .separator("_")
+ .try_parsing(true),
+ );
+
+ let config = builder.build()?;
+ let mut config: DaemonConfig = config.try_deserialize()?;
+
+ // Check for MAKIMA_API_KEY environment variable (preferred over MAKIMA_DAEMON_SERVER_APIKEY)
+ if let Ok(api_key) = std::env::var("MAKIMA_API_KEY") {
+ config.server.api_key = api_key;
+ }
+
+ // Validate required fields (don't validate here - let load_with_cli do final validation)
+ Ok(config)
+ }
+
+ /// Validate that required configuration fields are set.
+ pub fn validate(&self) -> Result<(), config::ConfigError> {
+ if self.server.api_key.is_empty() {
+ return Err(config::ConfigError::Message(
+ "API key is required. Set via MAKIMA_API_KEY, config file, or --api-key".to_string()
+ ));
+ }
+ Ok(())
+ }
+
+ /// Load configuration with CLI argument overrides.
+ ///
+ /// Configuration sources (in order of precedence, highest first):
+ /// 1. CLI arguments
+ /// 2. Environment variables
+ /// 3. Custom config file (if --config specified)
+ /// 4. ./makima-daemon.toml (current directory)
+ /// 5. ~/.config/makima-daemon/config.toml
+ /// 6. /etc/makima-daemon/config.toml (Linux only)
+ /// 7. Default values
+ pub fn load_with_cli(cli: &super::cli::daemon::DaemonArgs) -> Result<Self, config::ConfigError> {
+ Self::load_with_daemon_args(cli)
+ }
+
+ /// Load configuration from various sources with daemon CLI overrides.
+ pub fn load_with_daemon_args(args: &super::cli::daemon::DaemonArgs) -> Result<Self, config::ConfigError> {
+ // Load base config (with optional custom config file)
+ let mut config = Self::load_from_path(args.config.as_deref())?;
+
+ // Apply CLI overrides (highest priority)
+ if let Some(ref repos_dir) = args.repos_dir {
+ config.worktree.repos_dir = repos_dir.clone();
+ }
+ if let Some(ref worktrees_dir) = args.worktrees_dir {
+ config.worktree.base_dir = worktrees_dir.clone();
+ }
+ if let Some(ref server_url) = args.server_url {
+ config.server.url = server_url.clone();
+ }
+ if let Some(ref api_key) = args.api_key {
+ config.server.api_key = api_key.clone();
+ }
+ if let Some(max_tasks) = args.max_tasks {
+ config.process.max_concurrent_tasks = max_tasks;
+ }
+ // Log level is always set (has default)
+ config.logging.level = args.log_level.clone();
+
+ // Validate required fields after all sources are merged
+ config.validate()?;
+
+ Ok(config)
+ }
+
+ /// Create a minimal config for testing.
+ #[cfg(test)]
+ pub fn test_config() -> Self {
+ Self {
+ server: ServerConfig {
+ url: "ws://localhost:8080".to_string(),
+ api_key: "test-key".to_string(),
+ heartbeat_interval_secs: 30,
+ reconnect_interval_secs: 5,
+ max_reconnect_attempts: 0,
+ },
+ worktree: WorktreeConfig {
+ base_dir: PathBuf::from("/tmp/makima-daemon-test/worktrees"),
+ repos_dir: PathBuf::from("/tmp/makima-daemon-test/repos"),
+ branch_prefix: "makima/task-".to_string(),
+ cleanup_on_start: true,
+ default_target_repo: None,
+ },
+ process: ProcessConfig {
+ claude_command: "claude".to_string(),
+ claude_args: Vec::new(),
+ claude_pre_args: Vec::new(),
+ enable_permissions: false,
+ disable_verbose: false,
+ max_concurrent_tasks: 2,
+ default_timeout_secs: 0,
+ env_vars: HashMap::new(),
+ },
+ local_db: LocalDbConfig {
+ path: PathBuf::from("/tmp/makima-daemon-test/state.db"),
+ },
+ logging: LoggingConfig::default(),
+ repos: ReposConfig::default(),
+ }
+ }
+}
+
+/// Helper module for dirs crate (minimal subset).
+mod dirs {
+ use std::path::PathBuf;
+
+ pub fn home_dir() -> Option<PathBuf> {
+ std::env::var("HOME").ok().map(PathBuf::from)
+ }
+
+ pub fn config_dir() -> Option<PathBuf> {
+ #[cfg(target_os = "macos")]
+ {
+ std::env::var("HOME")
+ .ok()
+ .map(|h| PathBuf::from(h).join("Library").join("Application Support"))
+ }
+ #[cfg(target_os = "linux")]
+ {
+ std::env::var("XDG_CONFIG_HOME")
+ .ok()
+ .map(PathBuf::from)
+ .or_else(|| std::env::var("HOME").ok().map(|h| PathBuf::from(h).join(".config")))
+ }
+ #[cfg(target_os = "windows")]
+ {
+ std::env::var("APPDATA").ok().map(PathBuf::from)
+ }
+ #[cfg(not(any(target_os = "macos", target_os = "linux", target_os = "windows")))]
+ {
+ None
+ }
+ }
+}
diff --git a/makima/src/daemon/db/local.rs b/makima/src/daemon/db/local.rs
new file mode 100644
index 0000000..f3ed45a
--- /dev/null
+++ b/makima/src/daemon/db/local.rs
@@ -0,0 +1,391 @@
+//! Local SQLite database for crash recovery and state persistence.
+
+use std::path::Path;
+
+use chrono::{DateTime, Utc};
+use rusqlite::{params, Connection, Result as SqliteResult};
+use uuid::Uuid;
+
+use crate::daemon::task::TaskState;
+
+/// Local task record for persistence.
+#[derive(Debug, Clone)]
+pub struct LocalTask {
+ pub id: Uuid,
+ pub server_task_id: Uuid,
+ pub state: TaskState,
+ pub container_id: Option<String>,
+ pub overlay_path: Option<String>,
+ pub repo_url: Option<String>,
+ pub base_branch: Option<String>,
+ pub plan: String,
+ pub created_at: DateTime<Utc>,
+ pub started_at: Option<DateTime<Utc>>,
+ pub completed_at: Option<DateTime<Utc>>,
+ pub error_message: Option<String>,
+}
+
+/// Buffered output for reliable delivery.
+#[derive(Debug, Clone)]
+pub struct BufferedOutput {
+ pub id: i64,
+ pub task_id: Uuid,
+ pub output: String,
+ pub is_partial: bool,
+ pub timestamp: DateTime<Utc>,
+}
+
+/// Local database for daemon state persistence.
+pub struct LocalDb {
+ conn: Connection,
+}
+
+impl LocalDb {
+ /// Open or create the local database.
+ pub fn open(path: &Path) -> SqliteResult<Self> {
+ // Create parent directory if needed
+ if let Some(parent) = path.parent() {
+ std::fs::create_dir_all(parent).ok();
+ }
+
+ let conn = Connection::open(path)?;
+
+ // Initialize schema
+ conn.execute_batch(Self::schema())?;
+
+ Ok(Self { conn })
+ }
+
+ /// Open an in-memory database (for testing).
+ #[cfg(test)]
+ pub fn open_memory() -> SqliteResult<Self> {
+ let conn = Connection::open_in_memory()?;
+ conn.execute_batch(Self::schema())?;
+ Ok(Self { conn })
+ }
+
+ /// Database schema.
+ fn schema() -> &'static str {
+ r#"
+ -- Local task state for crash recovery
+ CREATE TABLE IF NOT EXISTS tasks (
+ id TEXT PRIMARY KEY,
+ server_task_id TEXT NOT NULL,
+ state TEXT NOT NULL,
+ container_id TEXT,
+ overlay_path TEXT,
+ repo_url TEXT,
+ base_branch TEXT,
+ plan TEXT NOT NULL,
+ created_at TEXT NOT NULL,
+ started_at TEXT,
+ completed_at TEXT,
+ error_message TEXT
+ );
+
+ -- Buffered output for reliable delivery
+ CREATE TABLE IF NOT EXISTS output_buffer (
+ id INTEGER PRIMARY KEY AUTOINCREMENT,
+ task_id TEXT NOT NULL,
+ output TEXT NOT NULL,
+ is_partial INTEGER NOT NULL,
+ timestamp TEXT NOT NULL,
+ sent INTEGER NOT NULL DEFAULT 0
+ );
+
+ -- Daemon state key-value store
+ CREATE TABLE IF NOT EXISTS daemon_state (
+ key TEXT PRIMARY KEY,
+ value TEXT NOT NULL,
+ updated_at TEXT NOT NULL
+ );
+
+ -- Indexes
+ CREATE INDEX IF NOT EXISTS idx_tasks_state ON tasks(state);
+ CREATE INDEX IF NOT EXISTS idx_output_buffer_sent ON output_buffer(sent, id);
+ CREATE INDEX IF NOT EXISTS idx_output_buffer_task ON output_buffer(task_id);
+ "#
+ }
+
+ /// Save a task.
+ pub fn save_task(&self, task: &LocalTask) -> SqliteResult<()> {
+ self.conn.execute(
+ r#"
+ INSERT OR REPLACE INTO tasks
+ (id, server_task_id, state, container_id, overlay_path, repo_url, base_branch, plan, created_at, started_at, completed_at, error_message)
+ VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12)
+ "#,
+ params![
+ task.id.to_string(),
+ task.server_task_id.to_string(),
+ task.state.as_str(),
+ task.container_id,
+ task.overlay_path,
+ task.repo_url,
+ task.base_branch,
+ task.plan,
+ task.created_at.to_rfc3339(),
+ task.started_at.map(|t| t.to_rfc3339()),
+ task.completed_at.map(|t| t.to_rfc3339()),
+ task.error_message,
+ ],
+ )?;
+ Ok(())
+ }
+
+ /// Get a task by ID.
+ pub fn get_task(&self, id: Uuid) -> SqliteResult<Option<LocalTask>> {
+ let mut stmt = self.conn.prepare(
+ "SELECT id, server_task_id, state, container_id, overlay_path, repo_url, base_branch, plan, created_at, started_at, completed_at, error_message FROM tasks WHERE id = ?1",
+ )?;
+
+ let mut rows = stmt.query(params![id.to_string()])?;
+
+ if let Some(row) = rows.next()? {
+ Ok(Some(Self::task_from_row(row)?))
+ } else {
+ Ok(None)
+ }
+ }
+
+ /// Get all running/active tasks (for recovery).
+ pub fn get_active_tasks(&self) -> SqliteResult<Vec<LocalTask>> {
+ let mut stmt = self.conn.prepare(
+ r#"
+ SELECT id, server_task_id, state, container_id, overlay_path, repo_url, base_branch, plan, created_at, started_at, completed_at, error_message
+ FROM tasks
+ WHERE state IN ('initializing', 'starting', 'running', 'paused', 'blocked')
+ "#,
+ )?;
+
+ let rows = stmt.query_map([], |row| Self::task_from_row(row))?;
+
+ rows.collect()
+ }
+
+ /// Delete a task.
+ pub fn delete_task(&self, id: Uuid) -> SqliteResult<()> {
+ self.conn.execute(
+ "DELETE FROM tasks WHERE id = ?1",
+ params![id.to_string()],
+ )?;
+ Ok(())
+ }
+
+ /// Update task state.
+ pub fn update_task_state(&self, id: Uuid, state: TaskState) -> SqliteResult<()> {
+ self.conn.execute(
+ "UPDATE tasks SET state = ?2 WHERE id = ?1",
+ params![id.to_string(), state.as_str()],
+ )?;
+ Ok(())
+ }
+
+ /// Buffer output for reliable delivery.
+ pub fn buffer_output(&self, task_id: Uuid, output: &str, is_partial: bool) -> SqliteResult<i64> {
+ self.conn.execute(
+ r#"
+ INSERT INTO output_buffer (task_id, output, is_partial, timestamp, sent)
+ VALUES (?1, ?2, ?3, datetime('now'), 0)
+ "#,
+ params![task_id.to_string(), output, is_partial as i32],
+ )?;
+ Ok(self.conn.last_insert_rowid())
+ }
+
+ /// Get unsent outputs.
+ pub fn get_unsent_outputs(&self, limit: i64) -> SqliteResult<Vec<BufferedOutput>> {
+ let mut stmt = self.conn.prepare(
+ r#"
+ SELECT id, task_id, output, is_partial, timestamp
+ FROM output_buffer
+ WHERE sent = 0
+ ORDER BY id
+ LIMIT ?1
+ "#,
+ )?;
+
+ let rows = stmt.query_map(params![limit], |row| {
+ let id: i64 = row.get(0)?;
+ let task_id_str: String = row.get(1)?;
+ let task_id = Uuid::parse_str(&task_id_str).unwrap_or_default();
+ let output: String = row.get(2)?;
+ let is_partial: i32 = row.get(3)?;
+ let timestamp_str: String = row.get(4)?;
+ let timestamp = DateTime::parse_from_rfc3339(&timestamp_str)
+ .map(|dt| dt.with_timezone(&Utc))
+ .unwrap_or_else(|_| Utc::now());
+
+ Ok(BufferedOutput {
+ id,
+ task_id,
+ output,
+ is_partial: is_partial != 0,
+ timestamp,
+ })
+ })?;
+
+ rows.collect()
+ }
+
+ /// Mark outputs as sent.
+ pub fn mark_outputs_sent(&self, ids: &[i64]) -> SqliteResult<()> {
+ if ids.is_empty() {
+ return Ok(());
+ }
+
+ let placeholders: Vec<&str> = ids.iter().map(|_| "?").collect();
+ let sql = format!(
+ "UPDATE output_buffer SET sent = 1 WHERE id IN ({})",
+ placeholders.join(",")
+ );
+
+ let params: Vec<rusqlite::types::Value> = ids
+ .iter()
+ .map(|id| rusqlite::types::Value::Integer(*id))
+ .collect();
+
+ self.conn.execute(&sql, rusqlite::params_from_iter(params))?;
+ Ok(())
+ }
+
+ /// Clean up old sent outputs.
+ pub fn cleanup_sent_outputs(&self, older_than_hours: i64) -> SqliteResult<usize> {
+ let result = self.conn.execute(
+ r#"
+ DELETE FROM output_buffer
+ WHERE sent = 1 AND timestamp < datetime('now', ?1 || ' hours')
+ "#,
+ params![format!("-{}", older_than_hours)],
+ )?;
+ Ok(result)
+ }
+
+ /// Get daemon state value.
+ pub fn get_state(&self, key: &str) -> SqliteResult<Option<String>> {
+ let mut stmt = self.conn.prepare(
+ "SELECT value FROM daemon_state WHERE key = ?1",
+ )?;
+
+ let mut rows = stmt.query(params![key])?;
+
+ if let Some(row) = rows.next()? {
+ let value: String = row.get(0)?;
+ Ok(Some(value))
+ } else {
+ Ok(None)
+ }
+ }
+
+ /// Set daemon state value.
+ pub fn set_state(&self, key: &str, value: &str) -> SqliteResult<()> {
+ self.conn.execute(
+ r#"
+ INSERT OR REPLACE INTO daemon_state (key, value, updated_at)
+ VALUES (?1, ?2, datetime('now'))
+ "#,
+ params![key, value],
+ )?;
+ Ok(())
+ }
+
+ /// Parse a task from a database row.
+ fn task_from_row(row: &rusqlite::Row) -> SqliteResult<LocalTask> {
+ let id_str: String = row.get(0)?;
+ let server_task_id_str: String = row.get(1)?;
+ let state_str: String = row.get(2)?;
+ let container_id: Option<String> = row.get(3)?;
+ let overlay_path: Option<String> = row.get(4)?;
+ let repo_url: Option<String> = row.get(5)?;
+ let base_branch: Option<String> = row.get(6)?;
+ let plan: String = row.get(7)?;
+ let created_at_str: String = row.get(8)?;
+ let started_at_str: Option<String> = row.get(9)?;
+ let completed_at_str: Option<String> = row.get(10)?;
+ let error_message: Option<String> = row.get(11)?;
+
+ let id = Uuid::parse_str(&id_str).unwrap_or_default();
+ let server_task_id = Uuid::parse_str(&server_task_id_str).unwrap_or_default();
+ let state = TaskState::from_str(&state_str).unwrap_or_default();
+ let created_at = DateTime::parse_from_rfc3339(&created_at_str)
+ .map(|dt| dt.with_timezone(&Utc))
+ .unwrap_or_else(|_| Utc::now());
+ let started_at = started_at_str
+ .and_then(|s| DateTime::parse_from_rfc3339(&s).ok())
+ .map(|dt| dt.with_timezone(&Utc));
+ let completed_at = completed_at_str
+ .and_then(|s| DateTime::parse_from_rfc3339(&s).ok())
+ .map(|dt| dt.with_timezone(&Utc));
+
+ Ok(LocalTask {
+ id,
+ server_task_id,
+ state,
+ container_id,
+ overlay_path,
+ repo_url,
+ base_branch,
+ plan,
+ created_at,
+ started_at,
+ completed_at,
+ error_message,
+ })
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use crate::daemon::*;
+
+ #[test]
+ fn test_open_memory() {
+ let db = LocalDb::open_memory().unwrap();
+ assert!(db.get_active_tasks().unwrap().is_empty());
+ }
+
+ #[test]
+ fn test_save_and_get_task() {
+ let db = LocalDb::open_memory().unwrap();
+
+ let task = LocalTask {
+ id: Uuid::new_v4(),
+ server_task_id: Uuid::new_v4(),
+ state: TaskState::Running,
+ container_id: Some("abc123".to_string()),
+ overlay_path: Some("/tmp/overlay".to_string()),
+ repo_url: Some("https://github.com/test/repo".to_string()),
+ base_branch: Some("main".to_string()),
+ plan: "Build the feature".to_string(),
+ created_at: Utc::now(),
+ started_at: Some(Utc::now()),
+ completed_at: None,
+ error_message: None,
+ };
+
+ db.save_task(&task).unwrap();
+
+ let loaded = db.get_task(task.id).unwrap().unwrap();
+ assert_eq!(loaded.id, task.id);
+ assert_eq!(loaded.state, TaskState::Running);
+ assert_eq!(loaded.plan, "Build the feature");
+ }
+
+ #[test]
+ fn test_output_buffer() {
+ let db = LocalDb::open_memory().unwrap();
+ let task_id = Uuid::new_v4();
+
+ db.buffer_output(task_id, "line 1", false).unwrap();
+ db.buffer_output(task_id, "line 2", false).unwrap();
+
+ let unsent = db.get_unsent_outputs(10).unwrap();
+ assert_eq!(unsent.len(), 2);
+
+ let ids: Vec<i64> = unsent.iter().map(|o| o.id).collect();
+ db.mark_outputs_sent(&ids).unwrap();
+
+ let unsent = db.get_unsent_outputs(10).unwrap();
+ assert!(unsent.is_empty());
+ }
+}
diff --git a/makima/src/daemon/db/mod.rs b/makima/src/daemon/db/mod.rs
new file mode 100644
index 0000000..2c6e0f3
--- /dev/null
+++ b/makima/src/daemon/db/mod.rs
@@ -0,0 +1,5 @@
+//! Local database for daemon state persistence.
+
+pub mod local;
+
+pub use local::{BufferedOutput, LocalDb, LocalTask};
diff --git a/makima/src/daemon/error.rs b/makima/src/daemon/error.rs
new file mode 100644
index 0000000..b993169
--- /dev/null
+++ b/makima/src/daemon/error.rs
@@ -0,0 +1,75 @@
+//! Error types for the makima daemon.
+
+use thiserror::Error;
+use uuid::Uuid;
+
+/// Top-level daemon error type.
+#[derive(Error, Debug)]
+pub enum DaemonError {
+ #[error("WebSocket error: {0}")]
+ WebSocket(#[from] tokio_tungstenite::tungstenite::Error),
+
+ #[error("Worktree error: {0}")]
+ Worktree(#[from] crate::daemon::worktree::WorktreeError),
+
+ #[error("Process error: {0}")]
+ Process(#[from] crate::daemon::process::ClaudeProcessError),
+
+ #[error("Task error: {0}")]
+ Task(#[from] TaskError),
+
+ #[error("Configuration error: {0}")]
+ Config(#[from] config::ConfigError),
+
+ #[error("Database error: {0}")]
+ Database(#[from] rusqlite::Error),
+
+ #[error("IO error: {0}")]
+ Io(#[from] std::io::Error),
+
+ #[error("JSON error: {0}")]
+ Json(#[from] serde_json::Error),
+
+ #[error("Authentication failed: {0}")]
+ AuthFailed(String),
+
+ #[error("Connection lost")]
+ ConnectionLost,
+
+ #[error("Server error: {code} - {message}")]
+ ServerError { code: String, message: String },
+}
+
+/// Task management errors.
+#[derive(Error, Debug)]
+pub enum TaskError {
+ #[error("Task not found: {0}")]
+ NotFound(Uuid),
+
+ #[error("Invalid state transition from {from} to {to}")]
+ InvalidStateTransition { from: String, to: String },
+
+ #[error("Concurrency limit reached")]
+ ConcurrencyLimit,
+
+ #[error("Task already exists: {0}")]
+ AlreadyExists(Uuid),
+
+ #[error("Task not running: {0}")]
+ NotRunning(Uuid),
+
+ #[error("Failed to send message to task: {0}")]
+ MessageFailed(String),
+
+ #[error("Task setup failed: {0}")]
+ SetupFailed(String),
+
+ #[error("Task execution failed: {0}")]
+ ExecutionFailed(String),
+}
+
+/// Result type alias for daemon operations.
+pub type Result<T> = std::result::Result<T, DaemonError>;
+
+/// Result type alias for task operations.
+pub type TaskResult<T> = std::result::Result<T, TaskError>;
diff --git a/makima/src/daemon/mod.rs b/makima/src/daemon/mod.rs
new file mode 100644
index 0000000..d7ec3f0
--- /dev/null
+++ b/makima/src/daemon/mod.rs
@@ -0,0 +1,22 @@
+//! Makima CLI - Unified CLI for server, daemon, and task management.
+//!
+//! This crate provides:
+//! - `makima server` - Run the makima server
+//! - `makima daemon` - Run the daemon (connect to server, manage tasks)
+//! - `makima supervisor` - Contract orchestration commands
+//! - `makima contract` - Task-contract interaction commands
+
+pub mod api;
+pub mod cli;
+pub mod config;
+pub mod db;
+pub mod error;
+pub mod process;
+pub mod task;
+pub mod temp;
+pub mod worktree;
+pub mod ws;
+
+pub use cli::{Cli, Commands, ContractCommand, SupervisorCommand};
+pub use config::DaemonConfig;
+pub use error::{DaemonError, Result};
diff --git a/makima/src/daemon/process/claude.rs b/makima/src/daemon/process/claude.rs
new file mode 100644
index 0000000..93b097c
--- /dev/null
+++ b/makima/src/daemon/process/claude.rs
@@ -0,0 +1,509 @@
+//! Claude Code process management.
+
+use std::collections::HashMap;
+use std::path::Path;
+use std::process::Stdio;
+use std::sync::Arc;
+
+use futures::Stream;
+use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
+use tokio::process::{Child, ChildStdin, Command};
+use tokio::sync::{mpsc, Mutex};
+
+use super::claude_protocol::ClaudeInputMessage;
+
+/// Errors that can occur during Claude process management.
+#[derive(Debug, thiserror::Error)]
+pub enum ClaudeProcessError {
+ #[error("Failed to spawn Claude process: {0}")]
+ SpawnFailed(#[from] std::io::Error),
+
+ #[error("Claude command not found: {0}")]
+ CommandNotFound(String),
+
+ #[error("Process already exited")]
+ AlreadyExited,
+
+ #[error("Failed to read output: {0}")]
+ OutputRead(String),
+}
+
+/// A line of output from Claude Code.
+#[derive(Debug, Clone)]
+pub struct OutputLine {
+ /// The raw content of the line.
+ pub content: String,
+ /// Whether this is from stdout (true) or stderr (false).
+ pub is_stdout: bool,
+ /// Parsed JSON type if available (e.g., "system", "assistant", "result").
+ pub json_type: Option<String>,
+}
+
+impl OutputLine {
+ /// Create a new stdout output line.
+ pub fn stdout(content: String) -> Self {
+ let json_type = extract_json_type(&content);
+ Self {
+ content,
+ is_stdout: true,
+ json_type,
+ }
+ }
+
+ /// Create a new stderr output line.
+ pub fn stderr(content: String) -> Self {
+ Self {
+ content,
+ is_stdout: false,
+ json_type: None,
+ }
+ }
+}
+
+/// Extract the "type" field from a JSON line if present.
+fn extract_json_type(line: &str) -> Option<String> {
+ // Quick check for JSON
+ if !line.starts_with('{') {
+ return None;
+ }
+
+ // Try to parse and extract type
+ if let Ok(json) = serde_json::from_str::<serde_json::Value>(line) {
+ json.get("type")
+ .and_then(|v| v.as_str())
+ .map(|s| s.to_string())
+ } else {
+ None
+ }
+}
+
+/// Handle to a running Claude Code process.
+pub struct ClaudeProcess {
+ /// The child process.
+ child: Child,
+ /// Receiver for output lines.
+ output_rx: mpsc::Receiver<OutputLine>,
+ /// Stdin handle for sending input to the process (thread-safe).
+ stdin: Arc<Mutex<Option<ChildStdin>>>,
+}
+
+impl ClaudeProcess {
+ /// Wait for the process to exit and return the exit code.
+ pub async fn wait(&mut self) -> Result<i64, ClaudeProcessError> {
+ let status = self.child.wait().await?;
+ Ok(status.code().unwrap_or(-1) as i64)
+ }
+
+ /// Check if the process has exited.
+ pub fn try_wait(&mut self) -> Result<Option<i64>, ClaudeProcessError> {
+ match self.child.try_wait()? {
+ Some(status) => Ok(Some(status.code().unwrap_or(-1) as i64)),
+ None => Ok(None),
+ }
+ }
+
+ /// Kill the process.
+ pub async fn kill(&mut self) -> Result<(), ClaudeProcessError> {
+ self.child.kill().await?;
+ Ok(())
+ }
+
+ /// Get the next output line, if available.
+ pub async fn next_output(&mut self) -> Option<OutputLine> {
+ self.output_rx.recv().await
+ }
+
+ /// Send a raw message to the process via stdin.
+ ///
+ /// This can be used to provide input when Claude Code is waiting for user input.
+ pub async fn send_input(&self, message: &str) -> Result<(), ClaudeProcessError> {
+ let mut stdin_guard = self.stdin.lock().await;
+ if let Some(ref mut stdin) = *stdin_guard {
+ stdin
+ .write_all(message.as_bytes())
+ .await
+ .map_err(|e| ClaudeProcessError::OutputRead(format!("Failed to write to stdin: {}", e)))?;
+ stdin
+ .write_all(b"\n")
+ .await
+ .map_err(|e| ClaudeProcessError::OutputRead(format!("Failed to write newline: {}", e)))?;
+ stdin
+ .flush()
+ .await
+ .map_err(|e| ClaudeProcessError::OutputRead(format!("Failed to flush stdin: {}", e)))?;
+ Ok(())
+ } else {
+ Err(ClaudeProcessError::OutputRead("Stdin not available".to_string()))
+ }
+ }
+
+ /// Send a user message to the process via stdin using JSON protocol.
+ ///
+ /// This is the preferred method when using `--input-format=stream-json`.
+ /// The message is serialized as JSON and sent as a single line.
+ pub async fn send_user_message(&self, content: &str) -> Result<(), ClaudeProcessError> {
+ let message = ClaudeInputMessage::user(content);
+ let json_line = message.to_json_line().map_err(|e| {
+ ClaudeProcessError::OutputRead(format!("Failed to serialize message: {}", e))
+ })?;
+
+ tracing::debug!(content_len = content.len(), "Sending user message to Claude process");
+
+ let mut stdin_guard = self.stdin.lock().await;
+ if let Some(ref mut stdin) = *stdin_guard {
+ stdin
+ .write_all(json_line.as_bytes())
+ .await
+ .map_err(|e| ClaudeProcessError::OutputRead(format!("Failed to write to stdin: {}", e)))?;
+ stdin
+ .flush()
+ .await
+ .map_err(|e| ClaudeProcessError::OutputRead(format!("Failed to flush stdin: {}", e)))?;
+ Ok(())
+ } else {
+ Err(ClaudeProcessError::OutputRead("Stdin not available".to_string()))
+ }
+ }
+
+ /// Get a clone of the stdin handle for external use.
+ pub fn stdin_handle(&self) -> Arc<Mutex<Option<ChildStdin>>> {
+ Arc::clone(&self.stdin)
+ }
+
+ /// Close stdin, signaling EOF to the process.
+ pub async fn close_stdin(&self) -> Result<(), ClaudeProcessError> {
+ let mut stdin_guard = self.stdin.lock().await;
+ if let Some(mut stdin) = stdin_guard.take() {
+ let _ = stdin.shutdown().await;
+ }
+ Ok(())
+ }
+
+ /// Convert to a stream of output lines.
+ pub fn into_stream(self) -> impl Stream<Item = OutputLine> {
+ futures::stream::unfold(self.output_rx, |mut rx| async move {
+ rx.recv().await.map(|line| (line, rx))
+ })
+ }
+}
+
+/// Manages Claude Code process spawning.
+pub struct ProcessManager {
+ /// Path to the claude command.
+ claude_command: String,
+ /// Additional arguments to pass to Claude Code (after defaults).
+ claude_args: Vec<String>,
+ /// Arguments to pass before defaults.
+ claude_pre_args: Vec<String>,
+ /// Whether to enable Claude's permission system (skip --dangerously-skip-permissions).
+ enable_permissions: bool,
+ /// Whether to disable verbose output.
+ disable_verbose: bool,
+ /// Default environment variables to pass.
+ default_env: HashMap<String, String>,
+}
+
+impl Default for ProcessManager {
+ fn default() -> Self {
+ Self::new()
+ }
+}
+
+impl ProcessManager {
+ /// Create a new ProcessManager with default settings.
+ pub fn new() -> Self {
+ Self {
+ claude_command: "claude".to_string(),
+ claude_args: Vec::new(),
+ claude_pre_args: Vec::new(),
+ enable_permissions: false,
+ disable_verbose: false,
+ default_env: HashMap::new(),
+ }
+ }
+
+ /// Create a ProcessManager with a custom claude command path.
+ pub fn with_command(command: String) -> Self {
+ Self {
+ claude_command: command,
+ claude_args: Vec::new(),
+ claude_pre_args: Vec::new(),
+ enable_permissions: false,
+ disable_verbose: false,
+ default_env: HashMap::new(),
+ }
+ }
+
+ /// Set additional arguments to pass after default arguments.
+ pub fn with_args(mut self, args: Vec<String>) -> Self {
+ self.claude_args = args;
+ self
+ }
+
+ /// Set arguments to pass before default arguments.
+ pub fn with_pre_args(mut self, args: Vec<String>) -> Self {
+ self.claude_pre_args = args;
+ self
+ }
+
+ /// Enable Claude's permission system (don't pass --dangerously-skip-permissions).
+ pub fn with_permissions_enabled(mut self, enabled: bool) -> Self {
+ self.enable_permissions = enabled;
+ self
+ }
+
+ /// Disable verbose output.
+ pub fn with_verbose_disabled(mut self, disabled: bool) -> Self {
+ self.disable_verbose = disabled;
+ self
+ }
+
+ /// Add default environment variables.
+ pub fn with_env(mut self, env: HashMap<String, String>) -> Self {
+ self.default_env = env;
+ self
+ }
+
+ /// Get the claude command path.
+ pub fn claude_command(&self) -> &str {
+ &self.claude_command
+ }
+
+ /// Spawn a Claude Code process to execute a plan.
+ ///
+ /// The process runs in the specified working directory with stream-json output format.
+ /// If `system_prompt` is provided, it will be passed via --system-prompt flag.
+ pub async fn spawn(
+ &self,
+ working_dir: &Path,
+ plan: &str,
+ extra_env: Option<HashMap<String, String>>,
+ ) -> Result<ClaudeProcess, ClaudeProcessError> {
+ self.spawn_with_system_prompt(working_dir, plan, extra_env, None).await
+ }
+
+ /// Spawn a Claude Code process with an optional system prompt.
+ ///
+ /// The process runs in the specified working directory with stream-json output format.
+ /// If `system_prompt` is provided, it will be passed via --system-prompt flag as
+ /// behavioral constraints that Claude will treat as system-level instructions.
+ pub async fn spawn_with_system_prompt(
+ &self,
+ working_dir: &Path,
+ plan: &str,
+ extra_env: Option<HashMap<String, String>>,
+ system_prompt: Option<&str>,
+ ) -> Result<ClaudeProcess, ClaudeProcessError> {
+ tracing::info!(
+ working_dir = %working_dir.display(),
+ plan_len = plan.len(),
+ plan_preview = %if plan.len() > 200 { &plan[..200] } else { plan },
+ has_system_prompt = system_prompt.is_some(),
+ "Spawning Claude Code process"
+ );
+
+ // Verify working directory exists
+ if !working_dir.exists() {
+ tracing::error!(working_dir = %working_dir.display(), "Working directory does not exist!");
+ return Err(ClaudeProcessError::SpawnFailed(std::io::Error::new(
+ std::io::ErrorKind::NotFound,
+ format!("Working directory does not exist: {}", working_dir.display()),
+ )));
+ }
+
+ // Build environment
+ let mut env = self.default_env.clone();
+ if let Some(extra) = extra_env {
+ env.extend(extra);
+ }
+
+ // Build arguments list
+ let mut args = Vec::new();
+
+ // Pre-args (before defaults)
+ args.extend(self.claude_pre_args.clone());
+
+ // Required arguments for stream-json protocol
+ args.push("--output-format=stream-json".to_string());
+ args.push("--input-format=stream-json".to_string());
+
+ // Optional default arguments
+ if !self.disable_verbose {
+ args.push("--verbose".to_string());
+ }
+ if !self.enable_permissions {
+ args.push("--dangerously-skip-permissions".to_string());
+ }
+
+ // System prompt - passed via --system-prompt flag for system-level constraints
+ if let Some(prompt) = system_prompt {
+ args.push("--system-prompt".to_string());
+ args.push(prompt.to_string());
+ }
+
+ // Additional user-configured arguments
+ args.extend(self.claude_args.clone());
+
+ tracing::debug!(args = ?args, "Claude command arguments");
+
+ // Spawn the process
+ let mut child = Command::new(&self.claude_command)
+ .args(&args)
+ .current_dir(working_dir)
+ .envs(env)
+ .stdin(Stdio::piped())
+ .stdout(Stdio::piped())
+ .stderr(Stdio::piped())
+ .kill_on_drop(true)
+ .spawn()
+ .map_err(|e| {
+ if e.kind() == std::io::ErrorKind::NotFound {
+ ClaudeProcessError::CommandNotFound(self.claude_command.clone())
+ } else {
+ ClaudeProcessError::SpawnFailed(e)
+ }
+ })?;
+
+ // Create output channel
+ let (tx, rx) = mpsc::channel(1000);
+
+ // Take stdout, stderr, and stdin
+ // With --input-format=stream-json, we keep stdin open for sending messages
+ let stdin = child.stdin.take();
+ let stdin = Arc::new(Mutex::new(stdin));
+
+ let stdout = child.stdout.take().expect("stdout should be piped");
+ let stderr = child.stderr.take().expect("stderr should be piped");
+
+ // Spawn task to read stdout
+ let tx_stdout = tx.clone();
+ tokio::spawn(async move {
+ use tokio::io::AsyncReadExt;
+ let mut reader = BufReader::new(stdout);
+ let mut buffer = vec![0u8; 4096];
+ let mut line_buffer = String::new();
+
+ loop {
+ // Try to read with a timeout to detect if we're stuck
+ match tokio::time::timeout(
+ tokio::time::Duration::from_secs(5),
+ reader.read(&mut buffer)
+ ).await {
+ Ok(Ok(0)) => {
+ // EOF
+ tracing::debug!("Claude stdout EOF");
+ // Send any remaining content
+ if !line_buffer.is_empty() {
+ let _ = tx_stdout.send(OutputLine::stdout(line_buffer)).await;
+ }
+ break;
+ }
+ Ok(Ok(n)) => {
+ let chunk = String::from_utf8_lossy(&buffer[..n]);
+ tracing::debug!(bytes = n, chunk_preview = %if chunk.len() > 100 { &chunk[..100] } else { &chunk }, "Got stdout chunk from Claude");
+
+ // Accumulate into line buffer and emit complete lines
+ line_buffer.push_str(&chunk);
+ while let Some(newline_pos) = line_buffer.find('\n') {
+ let line = line_buffer[..newline_pos].to_string();
+ line_buffer = line_buffer[newline_pos + 1..].to_string();
+ if tx_stdout.send(OutputLine::stdout(line)).await.is_err() {
+ return;
+ }
+ }
+ }
+ Ok(Err(e)) => {
+ tracing::error!(error = %e, "Error reading Claude stdout");
+ break;
+ }
+ Err(_) => {
+ // Timeout - no data for 5 seconds
+ tracing::warn!("No stdout data from Claude for 5 seconds");
+ }
+ }
+ }
+ tracing::debug!("Claude stdout reader task ended");
+ });
+
+ // Spawn task to read stderr
+ let tx_stderr = tx;
+ tokio::spawn(async move {
+ let reader = BufReader::new(stderr);
+ let mut lines = reader.lines();
+ while let Ok(Some(line)) = lines.next_line().await {
+ tracing::debug!(line = %line, "Claude stderr");
+ if tx_stderr.send(OutputLine::stderr(line)).await.is_err() {
+ break;
+ }
+ }
+ tracing::debug!("Claude stderr reader task ended");
+ });
+
+ tracing::info!("Claude Code process spawned successfully");
+
+ let process = ClaudeProcess {
+ child,
+ output_rx: rx,
+ stdin,
+ };
+
+ // Send the initial plan as the first user message
+ tracing::info!(plan_len = plan.len(), "Sending initial plan to Claude via stdin");
+ process.send_user_message(plan).await?;
+
+ Ok(process)
+ }
+
+ /// Check if the claude command is available.
+ pub async fn check_claude_available(&self) -> Result<String, ClaudeProcessError> {
+ let output = Command::new(&self.claude_command)
+ .arg("--version")
+ .output()
+ .await
+ .map_err(|e| {
+ if e.kind() == std::io::ErrorKind::NotFound {
+ ClaudeProcessError::CommandNotFound(self.claude_command.clone())
+ } else {
+ ClaudeProcessError::SpawnFailed(e)
+ }
+ })?;
+
+ if output.status.success() {
+ Ok(String::from_utf8_lossy(&output.stdout).trim().to_string())
+ } else {
+ Err(ClaudeProcessError::CommandNotFound(
+ self.claude_command.clone(),
+ ))
+ }
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+
+ #[test]
+ fn test_extract_json_type() {
+ assert_eq!(
+ extract_json_type(r#"{"type":"system","subtype":"init"}"#),
+ Some("system".to_string())
+ );
+ assert_eq!(
+ extract_json_type(r#"{"type":"assistant","message":{}}"#),
+ Some("assistant".to_string())
+ );
+ assert_eq!(extract_json_type("not json"), None);
+ assert_eq!(extract_json_type(r#"{"no_type": true}"#), None);
+ }
+
+ #[test]
+ fn test_output_line_creation() {
+ let line = OutputLine::stdout(r#"{"type":"result"}"#.to_string());
+ assert!(line.is_stdout);
+ assert_eq!(line.json_type, Some("result".to_string()));
+
+ let line = OutputLine::stderr("error message".to_string());
+ assert!(!line.is_stdout);
+ assert_eq!(line.json_type, None);
+ }
+}
diff --git a/makima/src/daemon/process/claude_protocol.rs b/makima/src/daemon/process/claude_protocol.rs
new file mode 100644
index 0000000..96e5377
--- /dev/null
+++ b/makima/src/daemon/process/claude_protocol.rs
@@ -0,0 +1,59 @@
+//! Claude Code JSON protocol types for stdin communication.
+//!
+//! When using `--input-format=stream-json`, Claude Code expects
+//! newline-delimited JSON messages on stdin.
+
+use serde::Serialize;
+
+/// Message sent to Claude Code via stdin.
+///
+/// Format based on Claude Code's stream-json input protocol.
+#[derive(Debug, Clone, Serialize)]
+#[serde(tag = "type", rename_all = "snake_case")]
+pub enum ClaudeInputMessage {
+ /// A user message to send to Claude.
+ User { message: UserMessage },
+}
+
+/// The inner user message structure.
+#[derive(Debug, Clone, Serialize)]
+pub struct UserMessage {
+ /// Always "user" for user messages.
+ pub role: String,
+ /// The message content.
+ pub content: String,
+}
+
+impl ClaudeInputMessage {
+ /// Create a new user message.
+ pub fn user(content: impl Into<String>) -> Self {
+ Self::User {
+ message: UserMessage {
+ role: "user".to_string(),
+ content: content.into(),
+ },
+ }
+ }
+
+ /// Serialize to a JSON string with trailing newline (NDJSON format).
+ pub fn to_json_line(&self) -> Result<String, serde_json::Error> {
+ let mut json = serde_json::to_string(self)?;
+ json.push('\n');
+ Ok(json)
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use crate::daemon::*;
+
+ #[test]
+ fn test_user_message_serialization() {
+ let msg = ClaudeInputMessage::user("Hello, Claude!");
+ let json = msg.to_json_line().unwrap();
+
+ // Should produce: {"type":"user","message":{"role":"user","content":"Hello, Claude!"}}\n
+ assert!(json.starts_with(r#"{"type":"user","message":{"role":"user","content":"Hello, Claude!"}}"#));
+ assert!(json.ends_with('\n'));
+ }
+}
diff --git a/makima/src/daemon/process/mod.rs b/makima/src/daemon/process/mod.rs
new file mode 100644
index 0000000..814a3c5
--- /dev/null
+++ b/makima/src/daemon/process/mod.rs
@@ -0,0 +1,10 @@
+//! Process management for Claude Code subprocess execution.
+//!
+//! Spawns and manages Claude Code processes in worktree directories,
+//! streaming JSON output back to the daemon.
+
+mod claude;
+mod claude_protocol;
+
+pub use claude::{ClaudeProcess, ClaudeProcessError, OutputLine, ProcessManager};
+pub use claude_protocol::ClaudeInputMessage;
diff --git a/makima/src/daemon/task/manager.rs b/makima/src/daemon/task/manager.rs
new file mode 100644
index 0000000..8269083
--- /dev/null
+++ b/makima/src/daemon/task/manager.rs
@@ -0,0 +1,3215 @@
+//! Task lifecycle manager using git worktrees and Claude Code subprocesses.
+
+use std::collections::HashMap;
+use std::path::PathBuf;
+use std::sync::Arc;
+use std::time::Instant;
+
+use rand::Rng;
+use tokio::io::AsyncWriteExt;
+use tokio::sync::{mpsc, RwLock, Semaphore};
+use uuid::Uuid;
+
+use std::collections::HashSet;
+
+use super::state::TaskState;
+use crate::daemon::error::{DaemonError, TaskError, TaskResult};
+use crate::daemon::process::{ClaudeInputMessage, ProcessManager};
+use crate::daemon::temp::TempManager;
+use crate::daemon::worktree::{is_new_repo_request, ConflictResolution, WorktreeInfo, WorktreeManager};
+use crate::daemon::ws::{BranchInfo, DaemonCommand, DaemonMessage};
+
+/// Generate a secure random API key for orchestrator tool access.
+fn generate_tool_key() -> String {
+ let mut rng = rand::thread_rng();
+ let bytes: [u8; 32] = rng.r#gen();
+ hex::encode(bytes)
+}
+
+/// Check if output contains an OAuth authentication error.
+fn is_oauth_auth_error(output: &str) -> bool {
+ // Match various authentication error patterns from Claude Code
+ if output.contains("Please run /login") {
+ return true;
+ }
+ if output.contains("Invalid API key") {
+ return true;
+ }
+ if output.contains("authentication_error")
+ && (output.contains("OAuth token has expired")
+ || output.contains("Please obtain a new token"))
+ {
+ return true;
+ }
+ false
+}
+
+/// Extract OAuth URL from text (looks for claude.ai OAuth URLs).
+fn extract_url(text: &str) -> Option<String> {
+ // Look for claude.ai OAuth URLs - try multiple patterns
+ let patterns = [
+ "https://claude.ai/oauth",
+ "https://console.anthropic.com/oauth",
+ ];
+
+ for pattern in patterns {
+ if let Some(start) = text.find(pattern) {
+ let remaining = &text[start..];
+ // Find the end of the URL - stop at:
+ // - Whitespace, common URL terminators, escape sequences
+ let end = remaining
+ .find(|c: char| {
+ c.is_whitespace() || c == '"' || c == '\'' || c == '>' || c == ')' || c == ']' || c == '\x07' || c == '\x1b'
+ })
+ .unwrap_or(remaining.len());
+
+ let url = &remaining[..end];
+
+ // Also check if there's another https:// within the match (hyperlink duplication)
+ // Skip first 20 chars to avoid matching the start
+ let url = if url.len() > 30 {
+ if let Some(second_https) = url[20..].find("https://") {
+ &url[..second_https + 20] // Keep only first URL
+ } else {
+ url
+ }
+ } else {
+ url
+ };
+
+ if url.len() > 20 {
+ return Some(url.to_string());
+ }
+ }
+ }
+ None
+}
+
+/// Global storage for pending OAuth flow (only one can be active at a time per daemon)
+static PENDING_AUTH_FLOW: std::sync::OnceLock<std::sync::Mutex<Option<std::sync::mpsc::Sender<String>>>> = std::sync::OnceLock::new();
+
+fn get_auth_flow_storage() -> &'static std::sync::Mutex<Option<std::sync::mpsc::Sender<String>>> {
+ PENDING_AUTH_FLOW.get_or_init(|| std::sync::Mutex::new(None))
+}
+
+/// Send an auth code to the pending OAuth flow.
+pub fn send_auth_code(code: &str) -> bool {
+ let storage = get_auth_flow_storage();
+ if let Ok(mut guard) = storage.lock() {
+ if let Some(sender) = guard.take() {
+ if sender.send(code.to_string()).is_ok() {
+ tracing::info!("Auth code sent to setup-token process");
+ return true;
+ }
+ }
+ }
+ tracing::warn!("No pending auth flow to send code to");
+ false
+}
+
+/// Spawn `claude setup-token` to initiate OAuth flow and capture the login URL.
+/// This spawns the process in a PTY (required by Ink) and reads output until we find a URL.
+/// The process continues running in the background waiting for auth completion.
+async fn get_oauth_login_url(claude_command: &str) -> Option<String> {
+ use portable_pty::{native_pty_system, CommandBuilder, PtySize};
+ use std::io::{Read, Write};
+
+ tracing::info!("Spawning claude setup-token in PTY to get OAuth login URL");
+
+ // Create a PTY - Ink requires a real terminal
+ let pty_system = native_pty_system();
+ let pair = match pty_system.openpty(PtySize {
+ rows: 24,
+ cols: 200, // Wide enough to avoid line wrapping for long URLs/codes
+ pixel_width: 0,
+ pixel_height: 0,
+ }) {
+ Ok(pair) => pair,
+ Err(e) => {
+ tracing::error!(error = %e, "Failed to open PTY");
+ return None;
+ }
+ };
+
+ // Build the command
+ let mut cmd = CommandBuilder::new(claude_command);
+ cmd.arg("setup-token");
+ // Set environment variables to prevent browser from opening and disable fancy output
+ // Use "false" so the browser command fails, forcing setup-token to show URL and wait for manual input
+ cmd.env("BROWSER", "false");
+ cmd.env("TERM", "dumb"); // Disable hyperlinks and fancy terminal features
+ cmd.env("NO_COLOR", "1"); // Disable colors
+
+ // Spawn the process in the PTY
+ let mut child = match pair.slave.spawn_command(cmd) {
+ Ok(child) => child,
+ Err(e) => {
+ tracing::error!(error = %e, "Failed to spawn claude setup-token in PTY");
+ return None;
+ }
+ };
+
+ // Get the reader and writer from the master side
+ let mut reader = match pair.master.try_clone_reader() {
+ Ok(reader) => reader,
+ Err(e) => {
+ tracing::error!(error = %e, "Failed to clone PTY reader");
+ return None;
+ }
+ };
+
+ let mut writer = match pair.master.take_writer() {
+ Ok(writer) => writer,
+ Err(e) => {
+ tracing::error!(error = %e, "Failed to take PTY writer");
+ return None;
+ }
+ };
+
+ // Create channels for communication
+ let (code_tx, code_rx) = std::sync::mpsc::channel::<String>();
+ let (url_tx, url_rx) = std::sync::mpsc::channel::<String>();
+
+ // Store the code sender globally so it can be used when AUTH_CODE message arrives
+ {
+ let storage = get_auth_flow_storage();
+ if let Ok(mut guard) = storage.lock() {
+ *guard = Some(code_tx);
+ }
+ }
+
+ // Spawn reader thread - reads PTY output and sends URL when found
+ let reader_handle = std::thread::spawn(move || {
+ let mut buffer = [0u8; 4096];
+ let mut accumulated = String::new();
+ let mut url_sent = false;
+ let mut read_count = 0;
+
+ tracing::info!("setup-token reader thread started");
+
+ loop {
+ match reader.read(&mut buffer) {
+ Ok(0) => {
+ tracing::info!("setup-token PTY EOF reached after {} reads", read_count);
+ break;
+ }
+ Ok(n) => {
+ read_count += 1;
+ let chunk = String::from_utf8_lossy(&buffer[..n]);
+ accumulated.push_str(&chunk);
+
+ // Process complete lines
+ while let Some(newline_pos) = accumulated.find('\n') {
+ let line = accumulated[..newline_pos].to_string();
+ accumulated = accumulated[newline_pos + 1..].to_string();
+
+ let clean_line = strip_ansi_codes(&line);
+ if !clean_line.trim().is_empty() {
+ tracing::info!(line = %clean_line, "setup-token output");
+ }
+
+ // Look for OAuth URL if not found yet
+ if !url_sent {
+ if let Some(url) = extract_url(&line) {
+ tracing::info!(url = %url, "Found OAuth login URL");
+ let _ = url_tx.send(url);
+ url_sent = true;
+ }
+ }
+
+ // Check for success/failure messages
+ if clean_line.contains("successfully") || clean_line.contains("authenticated") || clean_line.contains("Success") {
+ tracing::info!("Authentication appears successful!");
+ }
+ if clean_line.contains("error") || clean_line.contains("failed") || clean_line.contains("invalid") {
+ tracing::warn!(line = %clean_line, "setup-token may have encountered an error");
+ }
+ }
+ }
+ Err(e) => {
+ tracing::warn!(error = %e, "PTY read error after {} reads", read_count);
+ break;
+ }
+ }
+ }
+ tracing::info!("setup-token reader thread ended");
+ });
+
+ // Spawn writer thread - waits for auth code and writes it to PTY
+ std::thread::spawn(move || {
+ tracing::info!("setup-token writer thread started, waiting for auth code (10 min timeout)");
+
+ // Wait for auth code from frontend (with long timeout - user needs time to authenticate)
+ match code_rx.recv_timeout(std::time::Duration::from_secs(600)) {
+ Ok(code) => {
+ tracing::info!(code_len = code.len(), "Received auth code from frontend, writing to PTY");
+ // Write code followed by carriage return (Enter key in raw terminal mode)
+ let code_with_enter = format!("{}\r", code);
+ if let Err(e) = writer.write_all(code_with_enter.as_bytes()) {
+ tracing::error!(error = %e, "Failed to write auth code to PTY");
+ } else if let Err(e) = writer.flush() {
+ tracing::error!(error = %e, "Failed to flush PTY writer");
+ } else {
+ tracing::info!("Auth code written to setup-token PTY successfully");
+ // Give Ink a moment to process, then send another Enter in case first was buffered
+ std::thread::sleep(std::time::Duration::from_millis(100));
+ let _ = writer.write_all(b"\r");
+ let _ = writer.flush();
+ tracing::info!("Sent additional Enter keypress");
+ }
+ }
+ Err(e) => {
+ tracing::info!(error = %e, "Auth code receive ended (timeout or channel closed)");
+ }
+ }
+
+ // Wait for reader thread to finish
+ tracing::debug!("Waiting for reader thread to finish...");
+ let _ = reader_handle.join();
+
+ // Wait for child to fully exit
+ tracing::debug!("Waiting for setup-token child process to exit...");
+ match child.wait() {
+ Ok(status) => {
+ tracing::info!(exit_status = ?status, "setup-token process exited");
+ }
+ Err(e) => {
+ tracing::error!(error = %e, "Failed to wait for setup-token process");
+ }
+ }
+ });
+
+ // Wait for URL with timeout
+ match url_rx.recv_timeout(std::time::Duration::from_secs(30)) {
+ Ok(url) => Some(url),
+ Err(e) => {
+ tracing::error!(error = %e, "Timed out waiting for OAuth login URL");
+ None
+ }
+ }
+}
+
+/// Strip ANSI escape codes from a string for cleaner logging.
+fn strip_ansi_codes(s: &str) -> String {
+ let mut result = String::with_capacity(s.len());
+ let mut chars = s.chars().peekable();
+
+ while let Some(c) = chars.next() {
+ if c == '\x1b' {
+ // Check what type of escape sequence
+ match chars.peek() {
+ Some(&'[') => {
+ // CSI sequence: ESC [ ... letter
+ chars.next(); // consume '['
+ while let Some(&next) = chars.peek() {
+ chars.next();
+ if next.is_ascii_alphabetic() {
+ break;
+ }
+ }
+ }
+ Some(&']') => {
+ // OSC sequence: ESC ] ... ST (where ST is BEL or ESC \)
+ chars.next(); // consume ']'
+ while let Some(&next) = chars.peek() {
+ if next == '\x07' {
+ chars.next(); // consume BEL (string terminator)
+ break;
+ }
+ if next == '\x1b' {
+ chars.next(); // consume ESC
+ if chars.peek() == Some(&'\\') {
+ chars.next(); // consume \ (string terminator)
+ }
+ break;
+ }
+ chars.next();
+ }
+ }
+ _ => {
+ // Unknown escape, skip just the ESC
+ }
+ }
+ } else if !c.is_control() || c == '\n' {
+ result.push(c);
+ }
+ }
+
+ result
+}
+
+/// System prompt for regular (non-orchestrator) subtasks.
+/// This ensures subtasks work only within their isolated worktree directory.
+const SUBTASK_SYSTEM_PROMPT: &str = r#"You are working in an isolated worktree directory that contains a snapshot of the codebase.
+
+## IMPORTANT: Directory Restrictions
+
+**You MUST only work within the current working directory (your worktree).**
+
+- DO NOT use `cd` to navigate to directories outside your worktree
+- DO NOT use absolute paths that point outside your worktree (e.g., don't write to ~/some/path, /tmp, or the original repository)
+- DO NOT modify files in parent directories or sibling directories
+- All your file operations should be relative to the current directory
+
+Your working directory is your sandboxed workspace. When you complete your task, your changes will be reviewed and integrated by the orchestrator.
+
+**Why?** Your worktree is isolated so that:
+1. Your changes don't affect other running tasks
+2. Changes can be reviewed before integration
+3. Multiple tasks can work on the codebase in parallel without conflicts
+
+---
+
+"#;
+
+/// The orchestrator system prompt that tells Claude how to use the helper script.
+const ORCHESTRATOR_SYSTEM_PROMPT: &str = r#"You are an orchestrator task. Your job is to coordinate subtasks and integrate their work, NOT to write code directly.
+
+## FIRST STEP
+
+Start by checking if you have existing subtasks:
+
+```bash
+# List all subtasks to see what work needs to be done
+./.makima/orchestrate.sh list
+```
+
+If subtasks exist, start them. If you need additional subtasks or no subtasks exist yet, you can create them.
+
+---
+
+## Creating Subtasks
+
+You can create new subtasks to break down work:
+
+```bash
+# Create a new subtask with a name and plan
+./.makima/orchestrate.sh create "Subtask Name" "Detailed plan for what the subtask should do..."
+
+# The command returns the new subtask ID - use it to start the subtask
+./.makima/orchestrate.sh start <new_subtask_id>
+```
+
+Create subtasks when you need to:
+- Break down complex work into smaller pieces
+- Run multiple tasks in parallel on different parts of the codebase
+- Delegate specific implementation work
+
+## Task Continuation (Sequential Dependencies)
+
+When subtasks need to build on each other's work (e.g., Task B depends on Task A's changes), use `--continue-from`:
+
+```bash
+# Create Task B that continues from Task A's worktree
+./.makima/orchestrate.sh create "Task B" "Build on Task A's work..." --continue-from <task_a_id>
+```
+
+This copies all files from Task A's worktree into Task B's worktree, so Task B starts with Task A's changes.
+
+**When to use continuation:**
+- Sequential work: Task B needs Task A's output files
+- Staged implementation: Building features incrementally
+- Fix-and-extend: One task fixes issues, another adds features on top
+
+**When NOT to use continuation:**
+- Parallel tasks working on different files
+- Independent subtasks that can be merged separately
+
+**Important for merging:** When tasks continue from each other, only merge the FINAL task in the chain. Earlier tasks' changes are already included in later tasks' worktrees.
+
+## Sharing Files with Subtasks
+
+Use `--files` to copy specific files from your orchestrator worktree to subtasks. This is useful for sharing plans, configs, or data files:
+
+```bash
+# Create subtask with specific files copied from orchestrator
+./.makima/orchestrate.sh create "Implement Feature" "Follow the plan in PLAN.md" --files "PLAN.md"
+
+# Copy multiple files (comma-separated)
+./.makima/orchestrate.sh create "API Work" "Use the spec..." --files "PLAN.md,api-spec.yaml,types.ts"
+
+# Combine with --continue-from to share files AND continue from another task
+./.makima/orchestrate.sh create "Step 2" "Continue..." --continue-from <task_a_id> --files "requirements.md"
+```
+
+**Use cases for --files:**
+- Share a PLAN.md with detailed implementation steps
+- Distribute configuration or spec files
+- Pass generated data or intermediate results
+
+## How Subtasks Work
+
+Each subtask runs in its own **worktree** - a separate directory with a copy of the codebase. When subtasks complete:
+- Their work remains in the worktree files (NOT committed to git)
+- **Subtasks do NOT auto-merge** - YOU must integrate their work into your worktree
+- You can view and copy files from subtask worktrees using their paths
+- The worktree path is returned when you get subtask status
+
+**IMPORTANT:** Subtasks never create PRs or merge to the target repository. Only the orchestrator (you) can trigger completion actions like PR creation or merging after integrating all subtask work.
+
+## Subtask Commands
+```bash
+# List all subtasks and their current status
+./.makima/orchestrate.sh list
+
+# Create a new subtask (returns the subtask ID)
+./.makima/orchestrate.sh create "Name" "Plan/description"
+
+# Create a subtask that continues from another task's worktree
+./.makima/orchestrate.sh create "Name" "Plan" --continue-from <other_task_id>
+
+# Create a subtask with specific files copied from orchestrator worktree
+./.makima/orchestrate.sh create "Name" "Plan" --files "file1.md,file2.yaml"
+
+# Start a specific subtask (it will run in its own Claude instance)
+./.makima/orchestrate.sh start <subtask_id>
+
+# Stop a running subtask
+./.makima/orchestrate.sh stop <subtask_id>
+
+# Get detailed status of a subtask (includes worktree_path when available)
+./.makima/orchestrate.sh status <subtask_id>
+
+# Get the output/logs of a subtask
+./.makima/orchestrate.sh output <subtask_id>
+
+# Get the worktree path for a subtask
+./.makima/orchestrate.sh worktree <subtask_id>
+```
+
+## Integrating Subtask Work
+
+When subtasks complete, their changes exist as files in their worktree directories:
+- Files are NOT committed to git branches
+- You must copy/integrate files from subtask worktrees into your worktree
+- Use standard file operations (cp, cat, etc.) to review and integrate changes
+
+### Handling Continuation Chains
+
+**CRITICAL:** When subtasks use `--continue-from`, they form a chain where each task includes all changes from previous tasks. You must ONLY integrate the FINAL task in each chain.
+
+Example chain: Task A → Task B (continues from A) → Task C (continues from B)
+- Task C's worktree contains ALL changes from A, B, and C
+- You should ONLY integrate Task C's worktree
+- DO NOT integrate Task A or Task B separately (their changes are already in C)
+
+**How to track continuation chains:**
+1. When you create tasks with `--continue-from`, note which task continues from which
+2. Build a mental model: Independent tasks (no continuation) + Continuation chains
+3. For each chain, only integrate the LAST task in the chain
+
+**Example with mixed independent and chained tasks:**
+```
+Independent tasks (integrate all):
+- Task X: API endpoints
+- Task Y: Database models
+
+Continuation chain (integrate ONLY the last one):
+- Task A: Core feature → Task B: Tests (continues from A) → Task C: Docs (continues from B)
+ Only integrate Task C!
+```
+
+### Integration Examples
+
+For independent subtasks (no continuation):
+```bash
+# Get the worktree path for a completed subtask
+SUBTASK_PATH=$(./.makima/orchestrate.sh worktree <subtask_id>)
+
+# View what files were changed
+ls -la "$SUBTASK_PATH"
+diff -r . "$SUBTASK_PATH" --exclude=.git --exclude=.makima
+
+# Copy specific files from subtask
+cp "$SUBTASK_PATH/src/new_file.rs" ./src/
+cp "$SUBTASK_PATH/src/modified_file.rs" ./src/
+
+# Or use diff/patch for more control
+diff -u ./src/file.rs "$SUBTASK_PATH/src/file.rs" > changes.patch
+patch -p0 < changes.patch
+```
+
+For continuation chains (only integrate the final task):
+```bash
+# If you have: Task A → Task B → Task C (each continues from previous)
+# ONLY get and integrate Task C's worktree - it has everything!
+
+FINAL_TASK_PATH=$(./.makima/orchestrate.sh worktree <task_c_id>)
+
+# Copy all changes from the final task
+rsync -av --exclude='.git' --exclude='.makima' "$FINAL_TASK_PATH/" ./
+```
+
+## Completion
+```bash
+# Mark yourself as complete after integrating all subtask work
+./.makima/orchestrate.sh done "Summary of what was accomplished"
+```
+
+## Workflow
+1. **List existing subtasks**: Run `list` to see current subtasks
+2. **Create subtasks if needed**: Use `create` to add new subtasks for the work
+ - For independent parallel work: create without `--continue-from`
+ - For sequential dependencies: use `--continue-from <previous_task_id>`
+ - Track which tasks continue from which (continuation chains)
+3. **Start subtasks**: Run `start` for each subtask
+4. **Monitor progress**: Check status and output as subtasks run
+5. **Integrate work**: When subtasks complete:
+ - For independent tasks: integrate each one's worktree
+ - For continuation chains: ONLY integrate the FINAL task (it has all changes)
+ - Get worktree path with `worktree <subtask_id>`
+ - Copy or merge files into your worktree
+6. **Complete**: Call `done` once all work is integrated
+
+## Important Notes
+- Subtask files are in worktrees, NOT committed git branches
+- **Subtasks do NOT auto-merge or create PRs** - you must integrate their work
+- You can read files from subtask worktrees using their paths
+- Use standard file tools (cp, diff, cat, rsync) to integrate changes
+- You should NOT edit files directly - that's what subtasks are for
+- DO NOT DO THE SUBTASKS' WORK! Your only job is to coordinate, not implement.
+- When you call `done`, YOUR worktree may be used for the final PR/merge
+"#;
+
+
+/// System prompt for supervisor tasks (contract orchestrators).
+/// Supervisors monitor all tasks in a contract, create new tasks, and drive the contract to completion.
+const SUPERVISOR_SYSTEM_PROMPT: &str = r#"You are the SUPERVISOR for this contract. Your ONLY job is to coordinate work by spawning tasks, waiting for them to complete, and managing git operations.
+
+## CRITICAL RULES - READ CAREFULLY
+
+1. **NEVER write code or edit files yourself** - you are a coordinator ONLY
+2. **NEVER make commits yourself** - tasks do their own commits
+3. **ALWAYS spawn tasks** for ANY work that involves:
+ - Writing or editing code
+ - Creating or modifying files
+ - Making implementation changes
+ - Any actual development work
+4. **ALWAYS wait for tasks to complete** - you MUST use `wait` after spawning
+5. **Your role is ONLY to**:
+ - Analyze the contract goal and break it into tasks
+ - Spawn tasks AND wait for them to complete
+ - Review completed task results
+ - Merge completed work using `merge`
+ - Create PRs when ready using `pr`
+
+## REQUIRED WORKFLOW - Follow This Pattern
+
+For EVERY task you spawn, you MUST:
+1. Spawn the task with `spawn`
+2. IMMEDIATELY call `wait` to block until completion
+3. Check the result and handle success/failure
+4. Merge if successful
+
+```bash
+# CORRECT PATTERN - spawn then wait
+RESULT=$(makima supervisor spawn "Task Name" "Detailed plan...")
+TASK_ID=$(echo "$RESULT" | jq -r '.taskId')
+echo "Spawned task: $TASK_ID"
+
+# MUST wait for the task - DO NOT skip this step!
+makima supervisor wait "$TASK_ID"
+
+# Check result, view diff, merge if successful
+makima supervisor diff "$TASK_ID"
+makima supervisor merge "$TASK_ID"
+```
+
+## Example - Full Workflow
+
+Goal: "Add user authentication"
+
+```bash
+# Step 1: Create a makima branch for this work (use makima/{name} convention)
+makima supervisor branch "makima/user-authentication"
+
+# Step 2: Spawn tasks, wait for each, and merge to the branch
+
+# Task 1: Research (spawn and wait)
+RESULT=$(makima supervisor spawn "Research auth patterns" "Explore the codebase for existing authentication. Document findings.")
+TASK_ID=$(echo "$RESULT" | jq -r '.taskId')
+makima supervisor wait "$TASK_ID"
+# Review findings before continuing
+
+# Task 2: Login endpoint (spawn and wait)
+RESULT=$(makima supervisor spawn "Implement login" "Create POST /api/login endpoint...")
+TASK_ID=$(echo "$RESULT" | jq -r '.taskId')
+makima supervisor wait "$TASK_ID"
+makima supervisor diff "$TASK_ID"
+makima supervisor merge "$TASK_ID" --to "makima/user-authentication"
+
+# Task 3: Logout endpoint (spawn and wait)
+RESULT=$(makima supervisor spawn "Implement logout" "Create POST /api/logout endpoint...")
+TASK_ID=$(echo "$RESULT" | jq -r '.taskId')
+makima supervisor wait "$TASK_ID"
+makima supervisor merge "$TASK_ID" --to "makima/user-authentication"
+
+# Step 3: All tasks complete - create PR from makima branch
+makima supervisor pr "makima/user-authentication" --title "Add user authentication" --base main
+```
+
+## Available Tools (via makima supervisor)
+
+### Task Management
+```bash
+# List all tasks in this contract
+makima supervisor tasks
+
+# Spawn a new task (returns JSON with taskId)
+makima supervisor spawn "Task Name" "Detailed plan..."
+
+# IMPORTANT: Wait for task to complete (blocks until done/failed)
+makima supervisor wait <task_id> [timeout_seconds]
+
+# Read a file from any task's worktree
+makima supervisor read-file <task_id> <file_path>
+
+# Get the full task tree structure
+makima supervisor tree
+```
+
+### Git Operations
+```bash
+# Create a new branch
+makima supervisor branch <branch_name> [--from <task_id|sha>]
+
+# Merge a task's changes to a branch
+makima supervisor merge <task_id> [--to <branch>] [--squash]
+
+# Create a pull request
+makima supervisor pr <task_id> --title "Title" [--body "Body"] [--base main]
+
+# View a task's diff
+makima supervisor diff <task_id>
+
+# Create a git checkpoint
+makima supervisor checkpoint "Checkpoint message"
+
+# List checkpoints for a task
+makima supervisor checkpoints [task_id]
+```
+
+### Contract
+```bash
+# Get contract status
+makima supervisor status
+```
+
+## Key Points
+
+1. **Create a makima branch first** - use `branch "makima/{name}"` for the contract's work
+2. **spawn returns immediately** - the task runs in the background
+3. **wait blocks until complete** - you MUST call this to know when a task finishes
+4. **Never fire-and-forget** - always wait for each task before moving on
+5. **Merge to your makima branch** - use `merge <task_id> --to "makima/{name}"` to collect completed work
+6. **Create PR when done** - use `pr "makima/{name}" --title "..." --base main`
+
+## Standard Workflow
+
+1. `branch "makima/{name}"` - Create branch (e.g., "makima/add-auth")
+2. For each piece of work:
+ - `spawn` - Create task
+ - `wait` - Block until complete
+ - `merge --to "makima/{name}"` - Merge to branch
+3. `pr "makima/{name}" --title "..." --base main` - Create PR
+
+## Important Reminders
+
+- **ONLY YOU can spawn tasks** - regular tasks cannot create children
+- **NEVER implement anything yourself** - always spawn tasks
+- **ALWAYS create a makima branch** - use `makima/{name}` naming convention
+- Tasks run independently - you just coordinate
+- You will be resumed if interrupted - your conversation is preserved
+- Create checkpoints before major transitions
+
+---
+
+"#;
+
+/// System prompt for tasks that are part of a contract.
+/// This tells the task about contract.sh and how to use it to interact with the contract.
+const CONTRACT_INTEGRATION_PROMPT: &str = r##"
+## Contract Integration
+
+This task is part of a contract. You have access to contract tools via the `makima contract` CLI.
+
+### Contract Commands
+
+```bash
+# Get contract context (name, phase, goals)
+makima contract status
+
+# Get phase checklist and deliverables
+makima contract checklist
+
+# List contract files
+makima contract files
+
+# Read a specific file content
+makima contract file <file_id>
+
+# Report progress to the contract
+makima contract report "Completed X, working on Y..."
+
+# Create a new contract file (content via stdin)
+echo "# New Documentation" | makima contract create-file "New Document"
+
+# Update an existing contract file (content via stdin)
+cat updated_content.md | makima contract update-file <file_id>
+
+# Get suggested next action when done
+makima contract suggest-action
+
+# Report completion with metrics
+makima contract completion-action --files "file1.rs,file2.rs" --code
+```
+
+### What You Should Do
+
+**Before starting:**
+1. Run `makima contract status` to understand the contract context
+2. Run `makima contract checklist` to see phase deliverables
+3. Run `makima contract files` to see existing documentation
+
+**While working:**
+- Report significant progress with `makima contract report "..."`
+
+**When completing:**
+1. If your work should be documented, create or update contract files
+2. Run `makima contract completion-action` to see recommended next steps
+3. Consider what contract files or phases might need updating
+
+**Important:** Your work should contribute to the contract's goals. Check the contract status to understand what's expected.
+
+---
+
+"##;
+
+/// Tracks merge state for an orchestrator task.
+#[derive(Default)]
+struct MergeTracker {
+ /// Subtask branches that have been successfully merged.
+ merged_subtasks: HashSet<Uuid>,
+ /// Subtask branches that were explicitly skipped (with reason).
+ skipped_subtasks: HashMap<Uuid, String>,
+}
+
+/// Managed task information.
+#[derive(Clone)]
+pub struct ManagedTask {
+ /// Task ID.
+ pub id: Uuid,
+ /// Human-readable task name.
+ pub task_name: String,
+ /// Current state.
+ pub state: TaskState,
+ /// Worktree info if created.
+ pub worktree: Option<WorktreeInfo>,
+ /// Task plan.
+ pub plan: String,
+ /// Repository URL or path.
+ pub repo_source: Option<String>,
+ /// Base branch.
+ pub base_branch: Option<String>,
+ /// Target branch to merge into.
+ pub target_branch: Option<String>,
+ /// Parent task ID if this is a subtask.
+ pub parent_task_id: Option<Uuid>,
+ /// Depth in task hierarchy (0=top-level, 1=subtask, 2=sub-subtask).
+ pub depth: i32,
+ /// Whether this task runs as an orchestrator (coordinates subtasks).
+ pub is_orchestrator: bool,
+ /// Whether this task is a supervisor (long-running contract orchestrator).
+ pub is_supervisor: bool,
+ /// Path to target repository for completion actions.
+ pub target_repo_path: Option<String>,
+ /// Completion action: "none", "branch", "merge", "pr".
+ pub completion_action: Option<String>,
+ /// Task ID to continue from (copy worktree from this task).
+ pub continue_from_task_id: Option<Uuid>,
+ /// Files to copy from parent task's worktree.
+ pub copy_files: Option<Vec<String>>,
+ /// Contract ID if this task is associated with a contract.
+ pub contract_id: Option<Uuid>,
+ /// Time task was created.
+ pub created_at: Instant,
+ /// Time task started running.
+ pub started_at: Option<Instant>,
+ /// Time task completed.
+ pub completed_at: Option<Instant>,
+ /// Error message if failed.
+ pub error: Option<String>,
+}
+
+/// Configuration for task execution.
+#[derive(Clone)]
+pub struct TaskConfig {
+ /// Maximum concurrent tasks.
+ pub max_concurrent_tasks: u32,
+ /// Base directory for worktrees.
+ pub worktree_base_dir: PathBuf,
+ /// Environment variables to pass to Claude.
+ pub env_vars: HashMap<String, String>,
+ /// Claude command path.
+ pub claude_command: String,
+ /// Additional arguments to pass to Claude Code.
+ pub claude_args: Vec<String>,
+ /// Arguments to pass before defaults.
+ pub claude_pre_args: Vec<String>,
+ /// Enable Claude's permission system.
+ pub enable_permissions: bool,
+ /// Disable verbose output.
+ pub disable_verbose: bool,
+}
+
+impl Default for TaskConfig {
+ fn default() -> Self {
+ Self {
+ max_concurrent_tasks: 4,
+ worktree_base_dir: WorktreeManager::default_base_dir(),
+ env_vars: HashMap::new(),
+ claude_command: "claude".to_string(),
+ claude_args: Vec::new(),
+ claude_pre_args: Vec::new(),
+ enable_permissions: false,
+ disable_verbose: false,
+ }
+ }
+}
+
+/// Task manager for handling task lifecycle.
+pub struct TaskManager {
+ /// Worktree manager.
+ worktree_manager: Arc<WorktreeManager>,
+ /// Process manager.
+ process_manager: Arc<ProcessManager>,
+ /// Temp directory manager.
+ temp_manager: Arc<TempManager>,
+ /// Task configuration.
+ #[allow(dead_code)]
+ config: TaskConfig,
+ /// Active tasks.
+ tasks: Arc<RwLock<HashMap<Uuid, ManagedTask>>>,
+ /// Channel to send messages to server.
+ ws_tx: mpsc::Sender<DaemonMessage>,
+ /// Semaphore for limiting concurrent tasks.
+ semaphore: Arc<Semaphore>,
+ /// Channels for sending input to running tasks.
+ /// Each sender allows sending messages to the stdin of a running Claude process.
+ task_inputs: Arc<RwLock<HashMap<Uuid, mpsc::Sender<String>>>>,
+ /// Tracks merge state per orchestrator task (for completion gate).
+ merge_trackers: Arc<RwLock<HashMap<Uuid, MergeTracker>>>,
+}
+
+impl TaskManager {
+ /// Create a new task manager.
+ pub fn new(config: TaskConfig, ws_tx: mpsc::Sender<DaemonMessage>) -> Self {
+ let max_concurrent = config.max_concurrent_tasks as usize;
+ let worktree_manager = Arc::new(WorktreeManager::new(config.worktree_base_dir.clone()));
+ let process_manager = Arc::new(
+ ProcessManager::with_command(config.claude_command.clone())
+ .with_args(config.claude_args.clone())
+ .with_pre_args(config.claude_pre_args.clone())
+ .with_permissions_enabled(config.enable_permissions)
+ .with_verbose_disabled(config.disable_verbose)
+ .with_env(config.env_vars.clone()),
+ );
+ let temp_manager = Arc::new(TempManager::new());
+
+ Self {
+ worktree_manager,
+ process_manager,
+ temp_manager,
+ config,
+ tasks: Arc::new(RwLock::new(HashMap::new())),
+ ws_tx,
+ semaphore: Arc::new(Semaphore::new(max_concurrent)),
+ task_inputs: Arc::new(RwLock::new(HashMap::new())),
+ merge_trackers: Arc::new(RwLock::new(HashMap::new())),
+ }
+ }
+
+ /// Handle a command from the server.
+ pub async fn handle_command(&self, command: DaemonCommand) -> Result<(), DaemonError> {
+ tracing::info!("Received command from server: {:?}", command);
+
+ match command {
+ DaemonCommand::SpawnTask {
+ task_id,
+ task_name,
+ plan,
+ repo_url,
+ base_branch,
+ target_branch,
+ parent_task_id,
+ depth,
+ is_orchestrator,
+ target_repo_path,
+ completion_action,
+ continue_from_task_id,
+ copy_files,
+ contract_id,
+ is_supervisor,
+ } => {
+ tracing::info!(
+ task_id = %task_id,
+ task_name = %task_name,
+ repo_url = ?repo_url,
+ base_branch = ?base_branch,
+ target_branch = ?target_branch,
+ parent_task_id = ?parent_task_id,
+ depth = depth,
+ is_orchestrator = is_orchestrator,
+ is_supervisor = is_supervisor,
+ target_repo_path = ?target_repo_path,
+ completion_action = ?completion_action,
+ continue_from_task_id = ?continue_from_task_id,
+ copy_files = ?copy_files,
+ contract_id = ?contract_id,
+ plan_len = plan.len(),
+ "Spawning new task"
+ );
+ self.spawn_task(
+ task_id, task_name, plan, repo_url, base_branch, target_branch,
+ parent_task_id, depth, is_orchestrator, is_supervisor,
+ target_repo_path, completion_action, continue_from_task_id,
+ copy_files, contract_id
+ ).await?;
+ }
+ DaemonCommand::PauseTask { task_id } => {
+ tracing::info!(task_id = %task_id, "Pause not supported for subprocess tasks");
+ // Subprocesses don't support pause, just log and ignore
+ }
+ DaemonCommand::ResumeTask { task_id } => {
+ tracing::info!(task_id = %task_id, "Resume not supported for subprocess tasks");
+ // Subprocesses don't support resume, just log and ignore
+ }
+ DaemonCommand::InterruptTask { task_id, graceful: _ } => {
+ tracing::info!(task_id = %task_id, "Interrupting task");
+ self.interrupt_task(task_id).await?;
+ }
+ DaemonCommand::SendMessage { task_id, message } => {
+ // Check if this is an auth code message
+ if message.starts_with("AUTH_CODE:") {
+ let code = message.strip_prefix("AUTH_CODE:").unwrap_or("").trim();
+ tracing::info!(task_id = %task_id, "Received auth code from frontend");
+ if send_auth_code(code) {
+ tracing::info!(task_id = %task_id, "Auth code forwarded to setup-token");
+ } else {
+ tracing::warn!(task_id = %task_id, "No pending auth flow to receive code");
+ }
+ } else {
+ // Regular message - send to task's stdin
+ tracing::info!(task_id = %task_id, message_len = message.len(), "Sending message to task");
+ // Send message to the task's stdin via the input channel
+ let inputs = self.task_inputs.read().await;
+ if let Some(sender) = inputs.get(&task_id) {
+ if let Err(e) = sender.send(message).await {
+ tracing::warn!(task_id = %task_id, error = %e, "Failed to send message to task input channel");
+ } else {
+ tracing::info!(task_id = %task_id, "Message sent to task successfully");
+ }
+ } else {
+ drop(inputs); // Release read lock before checking if we need to respawn
+
+ // Check if this is a supervisor that needs to be respawned
+ let task_info = {
+ let tasks = self.tasks.read().await;
+ tasks.get(&task_id).cloned()
+ };
+
+ if let Some(task) = task_info {
+ if task.is_supervisor {
+ tracing::info!(
+ task_id = %task_id,
+ "Supervisor has no active Claude process, respawning with message"
+ );
+
+ // Respawn the supervisor with the new message as the plan
+ // Claude Code will use --continue to maintain conversation history
+ let inner = self.clone_inner();
+ let task_name = task.task_name.clone();
+ let repo_source = task.repo_source.clone();
+ let base_branch = task.base_branch.clone();
+ let target_branch = task.target_branch.clone();
+ let target_repo_path = task.target_repo_path.clone();
+ let completion_action = task.completion_action.clone();
+ let contract_id = task.contract_id;
+
+ // Spawn in background to not block the command handler
+ tokio::spawn(async move {
+ if let Err(e) = inner.run_task(
+ task_id,
+ task_name,
+ message, // Use the message as the new prompt
+ repo_source,
+ base_branch,
+ target_branch,
+ false, // is_orchestrator
+ true, // is_supervisor
+ target_repo_path,
+ completion_action,
+ None, // continue_from_task_id
+ None, // copy_files
+ contract_id,
+ ).await {
+ tracing::error!(
+ task_id = %task_id,
+ error = %e,
+ "Failed to respawn supervisor"
+ );
+ }
+ });
+ } else {
+ tracing::warn!(task_id = %task_id, "No input channel for task (task may not be running)");
+ }
+ } else {
+ tracing::warn!(task_id = %task_id, "Task not found");
+ }
+ }
+ }
+ }
+ DaemonCommand::InjectSiblingContext { task_id, .. } => {
+ tracing::debug!(task_id = %task_id, "Sibling context injection not supported for subprocess tasks");
+ }
+ DaemonCommand::Authenticated { daemon_id } => {
+ tracing::debug!(daemon_id = %daemon_id, "Authenticated command (handled by WS client)");
+ }
+ DaemonCommand::Error { code, message } => {
+ tracing::warn!(code = %code, message = %message, "Error command from server");
+ }
+
+ // =========================================================================
+ // Merge Commands
+ // =========================================================================
+
+ DaemonCommand::ListBranches { task_id } => {
+ tracing::info!(task_id = %task_id, "Listing task branches");
+ self.handle_list_branches(task_id).await?;
+ }
+ DaemonCommand::MergeStart { task_id, source_branch } => {
+ tracing::info!(task_id = %task_id, source_branch = %source_branch, "Starting merge");
+ self.handle_merge_start(task_id, source_branch).await?;
+ }
+ DaemonCommand::MergeStatus { task_id } => {
+ tracing::info!(task_id = %task_id, "Getting merge status");
+ self.handle_merge_status(task_id).await?;
+ }
+ DaemonCommand::MergeResolve { task_id, file, strategy } => {
+ tracing::info!(task_id = %task_id, file = %file, strategy = %strategy, "Resolving conflict");
+ self.handle_merge_resolve(task_id, file, strategy).await?;
+ }
+ DaemonCommand::MergeCommit { task_id, message } => {
+ tracing::info!(task_id = %task_id, "Committing merge");
+ self.handle_merge_commit(task_id, message).await?;
+ }
+ DaemonCommand::MergeAbort { task_id } => {
+ tracing::info!(task_id = %task_id, "Aborting merge");
+ self.handle_merge_abort(task_id).await?;
+ }
+ DaemonCommand::MergeSkip { task_id, subtask_id, reason } => {
+ tracing::info!(task_id = %task_id, subtask_id = %subtask_id, reason = %reason, "Skipping subtask merge");
+ self.handle_merge_skip(task_id, subtask_id, reason).await?;
+ }
+ DaemonCommand::CheckMergeComplete { task_id } => {
+ tracing::info!(task_id = %task_id, "Checking merge completion");
+ self.handle_check_merge_complete(task_id).await?;
+ }
+
+ // =========================================================================
+ // Completion Action Commands
+ // =========================================================================
+
+ DaemonCommand::RetryCompletionAction {
+ task_id,
+ task_name,
+ action,
+ target_repo_path,
+ target_branch,
+ } => {
+ tracing::info!(
+ task_id = %task_id,
+ task_name = %task_name,
+ action = %action,
+ target_repo_path = %target_repo_path,
+ target_branch = ?target_branch,
+ "Retrying completion action"
+ );
+ self.handle_retry_completion_action(task_id, task_name, action, target_repo_path, target_branch).await?;
+ }
+
+ DaemonCommand::CloneWorktree { task_id, target_dir } => {
+ tracing::info!(
+ task_id = %task_id,
+ target_dir = %target_dir,
+ "Cloning worktree to target directory"
+ );
+ self.handle_clone_worktree(task_id, target_dir).await?;
+ }
+
+ DaemonCommand::CheckTargetExists { task_id, target_dir } => {
+ tracing::debug!(
+ task_id = %task_id,
+ target_dir = %target_dir,
+ "Checking if target directory exists"
+ );
+ self.handle_check_target_exists(task_id, target_dir).await?;
+ }
+
+ // =========================================================================
+ // Contract File Commands
+ // =========================================================================
+
+ DaemonCommand::ReadRepoFile {
+ request_id,
+ contract_id,
+ file_path,
+ repo_path,
+ } => {
+ tracing::info!(
+ request_id = %request_id,
+ contract_id = %contract_id,
+ file_path = %file_path,
+ repo_path = %repo_path,
+ "Reading file from repository"
+ );
+ self.handle_read_repo_file(request_id, file_path, repo_path).await?;
+ }
+ DaemonCommand::CreateBranch {
+ task_id,
+ branch_name,
+ from_ref,
+ } => {
+ tracing::info!(
+ task_id = %task_id,
+ branch_name = %branch_name,
+ from_ref = ?from_ref,
+ "Creating branch"
+ );
+ self.handle_create_branch(task_id, branch_name, from_ref).await?;
+ }
+ DaemonCommand::MergeTaskToTarget {
+ task_id,
+ target_branch,
+ squash,
+ } => {
+ tracing::info!(
+ task_id = %task_id,
+ target_branch = ?target_branch,
+ squash = squash,
+ "Merging task to target branch"
+ );
+ self.handle_merge_task_to_target(task_id, target_branch, squash).await?;
+ }
+ DaemonCommand::CreatePR {
+ task_id,
+ title,
+ body,
+ base_branch,
+ } => {
+ tracing::info!(
+ task_id = %task_id,
+ title = %title,
+ base_branch = %base_branch,
+ "Creating pull request"
+ );
+ self.handle_create_pr(task_id, title, body, base_branch).await?;
+ }
+ DaemonCommand::GetTaskDiff {
+ task_id,
+ } => {
+ tracing::info!(task_id = %task_id, "Getting task diff");
+ self.handle_get_task_diff(task_id).await?;
+ }
+ }
+ Ok(())
+ }
+
+ /// Spawn a new task.
+ #[allow(clippy::too_many_arguments)]
+ pub async fn spawn_task(
+ &self,
+ task_id: Uuid,
+ task_name: String,
+ plan: String,
+ repo_url: Option<String>,
+ base_branch: Option<String>,
+ target_branch: Option<String>,
+ parent_task_id: Option<Uuid>,
+ depth: i32,
+ is_orchestrator: bool,
+ is_supervisor: bool,
+ target_repo_path: Option<String>,
+ completion_action: Option<String>,
+ continue_from_task_id: Option<Uuid>,
+ copy_files: Option<Vec<String>>,
+ contract_id: Option<Uuid>,
+ ) -> TaskResult<()> {
+ tracing::info!(task_id = %task_id, is_orchestrator = is_orchestrator, is_supervisor = is_supervisor, depth = depth, "=== SPAWN_TASK START ===");
+
+ // Check if task already exists - allow re-spawning if in terminal state
+ {
+ let mut tasks = self.tasks.write().await;
+ if let Some(existing) = tasks.get(&task_id) {
+ if existing.state.is_terminal() {
+ // Task exists but is in terminal state (completed, failed, interrupted)
+ // Remove it so we can re-spawn
+ tracing::info!(task_id = %task_id, old_state = ?existing.state, "Removing terminated task to allow re-spawn");
+ tasks.remove(&task_id);
+ } else {
+ // Task is still active, reject
+ tracing::warn!(task_id = %task_id, state = ?existing.state, "Task already exists and is active, rejecting spawn");
+ return Err(TaskError::AlreadyExists(task_id));
+ }
+ }
+ }
+
+ // Acquire semaphore permit
+ tracing::info!(task_id = %task_id, "Acquiring concurrency permit...");
+ let permit = self
+ .semaphore
+ .clone()
+ .try_acquire_owned()
+ .map_err(|_| {
+ tracing::warn!(task_id = %task_id, "Concurrency limit reached, cannot spawn task");
+ TaskError::ConcurrencyLimit
+ })?;
+ tracing::info!(task_id = %task_id, "Concurrency permit acquired");
+
+ // Create task entry
+ tracing::info!(task_id = %task_id, "Creating task entry in state: Initializing");
+ let task = ManagedTask {
+ id: task_id,
+ task_name: task_name.clone(),
+ state: TaskState::Initializing,
+ worktree: None,
+ plan: plan.clone(),
+ repo_source: repo_url.clone(),
+ base_branch: base_branch.clone(),
+ target_branch: target_branch.clone(),
+ parent_task_id,
+ depth,
+ is_orchestrator,
+ is_supervisor,
+ target_repo_path: target_repo_path.clone(),
+ completion_action: completion_action.clone(),
+ continue_from_task_id,
+ copy_files: copy_files.clone(),
+ contract_id,
+ created_at: Instant::now(),
+ started_at: None,
+ completed_at: None,
+ error: None,
+ };
+
+ self.tasks.write().await.insert(task_id, task);
+ tracing::info!(task_id = %task_id, "Task entry created and stored");
+
+ // Notify server of status change
+ tracing::info!(task_id = %task_id, "Notifying server: pending -> initializing");
+ self.send_status_change(task_id, "pending", "initializing").await;
+
+ // Spawn task in background
+ tracing::info!(task_id = %task_id, "Spawning background task runner");
+ let inner = self.clone_inner();
+ tokio::spawn(async move {
+ let _permit = permit; // Hold permit until done
+ tracing::info!(task_id = %task_id, "Background task runner started");
+
+ if let Err(e) = inner.run_task(
+ task_id, task_name, plan, repo_url, base_branch, target_branch,
+ is_orchestrator, is_supervisor, target_repo_path, completion_action,
+ continue_from_task_id, copy_files, contract_id
+ ).await {
+ tracing::error!(task_id = %task_id, error = %e, "Task execution failed");
+ inner.mark_failed(task_id, &e.to_string()).await;
+ }
+ tracing::info!(task_id = %task_id, "Background task runner completed");
+ });
+
+ tracing::info!(task_id = %task_id, "=== SPAWN_TASK END (task running in background) ===");
+ Ok(())
+ }
+
+ /// Clone inner state for spawned tasks.
+ fn clone_inner(&self) -> TaskManagerInner {
+ TaskManagerInner {
+ worktree_manager: self.worktree_manager.clone(),
+ process_manager: self.process_manager.clone(),
+ temp_manager: self.temp_manager.clone(),
+ tasks: self.tasks.clone(),
+ ws_tx: self.ws_tx.clone(),
+ task_inputs: self.task_inputs.clone(),
+ }
+ }
+
+ /// Interrupt a task.
+ pub async fn interrupt_task(&self, task_id: Uuid) -> TaskResult<()> {
+ let mut tasks = self.tasks.write().await;
+ let task = tasks.get_mut(&task_id).ok_or(TaskError::NotFound(task_id))?;
+
+ if task.state.is_terminal() {
+ return Ok(()); // Already done
+ }
+
+ let old_state = task.state;
+ task.state = TaskState::Interrupted;
+ task.completed_at = Some(Instant::now());
+
+ // Notify server
+ drop(tasks);
+ self.send_status_change(task_id, old_state.as_str(), "interrupted").await;
+
+ // Note: The process will be killed when the ClaudeProcess is dropped
+ // Worktrees are kept until explicitly deleted
+
+ Ok(())
+ }
+
+ /// Get list of active task IDs.
+ pub async fn active_task_ids(&self) -> Vec<Uuid> {
+ self.tasks
+ .read()
+ .await
+ .iter()
+ .filter(|(_, t)| t.state.is_active())
+ .map(|(id, _)| *id)
+ .collect()
+ }
+
+ /// Get task state.
+ pub async fn get_task_state(&self, task_id: Uuid) -> Option<TaskState> {
+ self.tasks.read().await.get(&task_id).map(|t| t.state)
+ }
+
+ /// Send status change notification to server.
+ async fn send_status_change(&self, task_id: Uuid, old_status: &str, new_status: &str) {
+ let msg = DaemonMessage::task_status_change(task_id, old_status, new_status);
+ let _ = self.ws_tx.send(msg).await;
+ }
+
+ // =========================================================================
+ // Merge Handler Methods
+ // =========================================================================
+
+ /// Get worktree path for a task, or return error if not found.
+ /// First checks in-memory tasks, then scans the worktrees directory.
+ async fn get_task_worktree_path(&self, task_id: Uuid) -> Result<std::path::PathBuf, DaemonError> {
+ // First try to get from in-memory tasks
+ {
+ let tasks = self.tasks.read().await;
+ if let Some(task) = tasks.get(&task_id) {
+ if let Some(ref worktree) = task.worktree {
+ return Ok(worktree.path.clone());
+ }
+ }
+ }
+
+ // Task not in memory - scan worktrees directory for matching task ID
+ let short_id = &task_id.to_string()[..8];
+ let worktrees_dir = self.worktree_manager.base_dir();
+
+ if let Ok(mut entries) = tokio::fs::read_dir(worktrees_dir).await {
+ while let Ok(Some(entry)) = entries.next_entry().await {
+ let name = entry.file_name();
+ let name_str = name.to_string_lossy();
+ if name_str.starts_with(short_id) {
+ let path = entry.path();
+ // Verify it's a valid git directory
+ if path.join(".git").exists() {
+ tracing::info!(
+ task_id = %task_id,
+ worktree_path = %path.display(),
+ "Found worktree by scanning directory"
+ );
+ return Ok(path);
+ }
+ }
+ }
+ }
+
+ Err(DaemonError::Task(TaskError::SetupFailed(
+ format!("No worktree found for task {}. The worktree may have been cleaned up.", task_id)
+ )))
+ }
+
+ /// Handle ListBranches command.
+ async fn handle_list_branches(&self, task_id: Uuid) -> Result<(), DaemonError> {
+ let worktree_path = self.get_task_worktree_path(task_id).await?;
+
+ match self.worktree_manager.list_task_branches(&worktree_path).await {
+ Ok(branches) => {
+ let branch_infos: Vec<BranchInfo> = branches
+ .into_iter()
+ .map(|b| BranchInfo {
+ name: b.name,
+ task_id: b.task_id,
+ is_merged: b.is_merged,
+ last_commit: b.last_commit,
+ last_commit_message: b.last_commit_message,
+ })
+ .collect();
+
+ let msg = DaemonMessage::BranchList {
+ task_id,
+ branches: branch_infos,
+ };
+ let _ = self.ws_tx.send(msg).await;
+ }
+ Err(e) => {
+ tracing::error!(task_id = %task_id, error = %e, "Failed to list branches");
+ let msg = DaemonMessage::MergeResult {
+ task_id,
+ success: false,
+ message: e.to_string(),
+ commit_sha: None,
+ conflicts: None,
+ };
+ let _ = self.ws_tx.send(msg).await;
+ }
+ }
+ Ok(())
+ }
+
+ /// Handle MergeStart command.
+ async fn handle_merge_start(&self, task_id: Uuid, source_branch: String) -> Result<(), DaemonError> {
+ let worktree_path = self.get_task_worktree_path(task_id).await?;
+
+ match self.worktree_manager.merge_branch(&worktree_path, &source_branch).await {
+ Ok(None) => {
+ // Merge succeeded without conflicts
+ let msg = DaemonMessage::MergeResult {
+ task_id,
+ success: true,
+ message: "Merge completed without conflicts".to_string(),
+ commit_sha: None,
+ conflicts: None,
+ };
+ let _ = self.ws_tx.send(msg).await;
+ }
+ Ok(Some(conflicts)) => {
+ // Merge has conflicts
+ let msg = DaemonMessage::MergeResult {
+ task_id,
+ success: false,
+ message: format!("Merge has {} conflicts", conflicts.len()),
+ commit_sha: None,
+ conflicts: Some(conflicts),
+ };
+ let _ = self.ws_tx.send(msg).await;
+ }
+ Err(e) => {
+ let msg = DaemonMessage::MergeResult {
+ task_id,
+ success: false,
+ message: e.to_string(),
+ commit_sha: None,
+ conflicts: None,
+ };
+ let _ = self.ws_tx.send(msg).await;
+ }
+ }
+ Ok(())
+ }
+
+ /// Handle MergeStatus command.
+ async fn handle_merge_status(&self, task_id: Uuid) -> Result<(), DaemonError> {
+ let worktree_path = self.get_task_worktree_path(task_id).await?;
+
+ match self.worktree_manager.get_merge_state(&worktree_path).await {
+ Ok(state) => {
+ let msg = DaemonMessage::MergeStatusResponse {
+ task_id,
+ in_progress: state.in_progress,
+ source_branch: if state.in_progress { Some(state.source_branch) } else { None },
+ conflicted_files: state.conflicted_files,
+ };
+ let _ = self.ws_tx.send(msg).await;
+ }
+ Err(e) => {
+ tracing::error!(task_id = %task_id, error = %e, "Failed to get merge status");
+ let msg = DaemonMessage::MergeStatusResponse {
+ task_id,
+ in_progress: false,
+ source_branch: None,
+ conflicted_files: vec![],
+ };
+ let _ = self.ws_tx.send(msg).await;
+ }
+ }
+ Ok(())
+ }
+
+ /// Handle MergeResolve command.
+ async fn handle_merge_resolve(&self, task_id: Uuid, file: String, strategy: String) -> Result<(), DaemonError> {
+ let worktree_path = self.get_task_worktree_path(task_id).await?;
+
+ let resolution = match strategy.to_lowercase().as_str() {
+ "ours" => ConflictResolution::Ours,
+ "theirs" => ConflictResolution::Theirs,
+ _ => {
+ let msg = DaemonMessage::MergeResult {
+ task_id,
+ success: false,
+ message: format!("Invalid strategy '{}', must be 'ours' or 'theirs'", strategy),
+ commit_sha: None,
+ conflicts: None,
+ };
+ let _ = self.ws_tx.send(msg).await;
+ return Ok(());
+ }
+ };
+
+ match self.worktree_manager.resolve_conflict(&worktree_path, &file, resolution).await {
+ Ok(()) => {
+ let msg = DaemonMessage::MergeResult {
+ task_id,
+ success: true,
+ message: format!("Resolved conflict in {}", file),
+ commit_sha: None,
+ conflicts: None,
+ };
+ let _ = self.ws_tx.send(msg).await;
+ }
+ Err(e) => {
+ let msg = DaemonMessage::MergeResult {
+ task_id,
+ success: false,
+ message: e.to_string(),
+ commit_sha: None,
+ conflicts: None,
+ };
+ let _ = self.ws_tx.send(msg).await;
+ }
+ }
+ Ok(())
+ }
+
+ /// Handle MergeCommit command.
+ async fn handle_merge_commit(&self, task_id: Uuid, message: String) -> Result<(), DaemonError> {
+ let worktree_path = self.get_task_worktree_path(task_id).await?;
+
+ match self.worktree_manager.commit_merge(&worktree_path, &message).await {
+ Ok(commit_sha) => {
+ // Track this merge as completed (extract subtask ID from branch if possible)
+ // For now, we'll track it when MergeSkip is called or based on branch names
+ let msg = DaemonMessage::MergeResult {
+ task_id,
+ success: true,
+ message: "Merge committed successfully".to_string(),
+ commit_sha: Some(commit_sha),
+ conflicts: None,
+ };
+ let _ = self.ws_tx.send(msg).await;
+ }
+ Err(e) => {
+ let msg = DaemonMessage::MergeResult {
+ task_id,
+ success: false,
+ message: e.to_string(),
+ commit_sha: None,
+ conflicts: None,
+ };
+ let _ = self.ws_tx.send(msg).await;
+ }
+ }
+ Ok(())
+ }
+
+ /// Handle MergeAbort command.
+ async fn handle_merge_abort(&self, task_id: Uuid) -> Result<(), DaemonError> {
+ let worktree_path = self.get_task_worktree_path(task_id).await?;
+
+ match self.worktree_manager.abort_merge(&worktree_path).await {
+ Ok(()) => {
+ let msg = DaemonMessage::MergeResult {
+ task_id,
+ success: true,
+ message: "Merge aborted".to_string(),
+ commit_sha: None,
+ conflicts: None,
+ };
+ let _ = self.ws_tx.send(msg).await;
+ }
+ Err(e) => {
+ let msg = DaemonMessage::MergeResult {
+ task_id,
+ success: false,
+ message: e.to_string(),
+ commit_sha: None,
+ conflicts: None,
+ };
+ let _ = self.ws_tx.send(msg).await;
+ }
+ }
+ Ok(())
+ }
+
+ /// Handle MergeSkip command.
+ async fn handle_merge_skip(&self, task_id: Uuid, subtask_id: Uuid, reason: String) -> Result<(), DaemonError> {
+ // Record that this subtask was skipped
+ {
+ let mut trackers = self.merge_trackers.write().await;
+ let tracker = trackers.entry(task_id).or_insert_with(MergeTracker::default);
+ tracker.skipped_subtasks.insert(subtask_id, reason.clone());
+ }
+
+ let msg = DaemonMessage::MergeResult {
+ task_id,
+ success: true,
+ message: format!("Subtask {} skipped: {}", subtask_id, reason),
+ commit_sha: None,
+ conflicts: None,
+ };
+ let _ = self.ws_tx.send(msg).await;
+ Ok(())
+ }
+
+ /// Handle CheckMergeComplete command.
+ async fn handle_check_merge_complete(&self, task_id: Uuid) -> Result<(), DaemonError> {
+ let worktree_path = self.get_task_worktree_path(task_id).await?;
+
+ // Get all task branches
+ let branches = match self.worktree_manager.list_task_branches(&worktree_path).await {
+ Ok(b) => b,
+ Err(e) => {
+ let msg = DaemonMessage::MergeCompleteCheck {
+ task_id,
+ can_complete: false,
+ unmerged_branches: vec![format!("Error listing branches: {}", e)],
+ merged_count: 0,
+ skipped_count: 0,
+ };
+ let _ = self.ws_tx.send(msg).await;
+ return Ok(());
+ }
+ };
+
+ // Get tracker state
+ let trackers = self.merge_trackers.read().await;
+ let empty_merged: HashSet<Uuid> = HashSet::new();
+ let empty_skipped: HashMap<Uuid, String> = HashMap::new();
+ let tracker = trackers.get(&task_id);
+ let merged_set = tracker.map(|t| &t.merged_subtasks).unwrap_or(&empty_merged);
+ let skipped_set = tracker.map(|t| &t.skipped_subtasks).unwrap_or(&empty_skipped);
+
+ let mut merged_count = 0u32;
+ let mut skipped_count = 0u32;
+ let mut unmerged_branches = Vec::new();
+
+ for branch in &branches {
+ if branch.is_merged {
+ merged_count += 1;
+ } else if let Some(subtask_id) = branch.task_id {
+ if merged_set.contains(&subtask_id) {
+ merged_count += 1;
+ } else if skipped_set.contains_key(&subtask_id) {
+ skipped_count += 1;
+ } else {
+ unmerged_branches.push(branch.name.clone());
+ }
+ } else {
+ // Branch without task ID - check if it's merged
+ unmerged_branches.push(branch.name.clone());
+ }
+ }
+
+ let can_complete = unmerged_branches.is_empty();
+
+ let msg = DaemonMessage::MergeCompleteCheck {
+ task_id,
+ can_complete,
+ unmerged_branches,
+ merged_count,
+ skipped_count,
+ };
+ let _ = self.ws_tx.send(msg).await;
+ Ok(())
+ }
+
+ /// Mark a subtask as merged in the tracker.
+ #[allow(dead_code)]
+ pub async fn mark_subtask_merged(&self, orchestrator_task_id: Uuid, subtask_id: Uuid) {
+ let mut trackers = self.merge_trackers.write().await;
+ let tracker = trackers.entry(orchestrator_task_id).or_insert_with(MergeTracker::default);
+ tracker.merged_subtasks.insert(subtask_id);
+ }
+
+ // =========================================================================
+ // Completion Action Handler Methods
+ // =========================================================================
+
+ /// Handle RetryCompletionAction command.
+ async fn handle_retry_completion_action(
+ &self,
+ task_id: Uuid,
+ task_name: String,
+ action: String,
+ target_repo_path: String,
+ target_branch: Option<String>,
+ ) -> Result<(), DaemonError> {
+ // Get the task's worktree path
+ let worktree_path = self.get_task_worktree_path(task_id).await?;
+
+ // Execute the completion action
+ let inner = self.clone_inner();
+ let result = inner.execute_completion_action(
+ task_id,
+ &task_name,
+ &worktree_path,
+ &action,
+ Some(target_repo_path.as_str()),
+ target_branch.as_deref(),
+ ).await;
+
+ // Send result back to server
+ let msg = match result {
+ Ok(pr_url) => DaemonMessage::CompletionActionResult {
+ task_id,
+ success: true,
+ message: match action.as_str() {
+ "branch" => format!("Branch pushed to {}", target_repo_path),
+ "merge" => format!("Merged into {}", target_branch.as_deref().unwrap_or("main")),
+ "pr" => format!("Pull request created"),
+ _ => format!("Completion action '{}' executed", action),
+ },
+ pr_url,
+ },
+ Err(e) => DaemonMessage::CompletionActionResult {
+ task_id,
+ success: false,
+ message: e,
+ pr_url: None,
+ },
+ };
+ let _ = self.ws_tx.send(msg).await;
+ Ok(())
+ }
+
+ /// Handle CloneWorktree command.
+ async fn handle_clone_worktree(
+ &self,
+ task_id: Uuid,
+ target_dir: String,
+ ) -> Result<(), DaemonError> {
+ // Get the task's worktree path
+ let worktree_path = self.get_task_worktree_path(task_id).await?;
+
+ // Expand tilde in target path
+ let target_path = crate::daemon::worktree::expand_tilde(&target_dir);
+
+ // Clone the worktree to target directory
+ let result = self.worktree_manager.clone_worktree_to_directory(
+ &worktree_path,
+ &target_path,
+ ).await;
+
+ // Send result back to server
+ let msg = match result {
+ Ok(message) => DaemonMessage::CloneWorktreeResult {
+ task_id,
+ success: true,
+ message,
+ target_dir: Some(target_path.to_string_lossy().to_string()),
+ },
+ Err(e) => DaemonMessage::CloneWorktreeResult {
+ task_id,
+ success: false,
+ message: e.to_string(),
+ target_dir: None,
+ },
+ };
+ let _ = self.ws_tx.send(msg).await;
+ Ok(())
+ }
+
+ /// Handle CheckTargetExists command.
+ async fn handle_check_target_exists(
+ &self,
+ task_id: Uuid,
+ target_dir: String,
+ ) -> Result<(), DaemonError> {
+ // Expand tilde in target path
+ let target_path = crate::daemon::worktree::expand_tilde(&target_dir);
+
+ // Check if target exists
+ let exists = self.worktree_manager.target_directory_exists(&target_path).await;
+
+ // Send result back to server
+ let msg = DaemonMessage::CheckTargetExistsResult {
+ task_id,
+ exists,
+ target_dir: target_path.to_string_lossy().to_string(),
+ };
+ let _ = self.ws_tx.send(msg).await;
+ Ok(())
+ }
+
+ /// Handle ReadRepoFile command.
+ ///
+ /// Reads a file from a repository on the daemon's filesystem and sends
+ /// the content back to the server for syncing contract files.
+ async fn handle_read_repo_file(
+ &self,
+ request_id: Uuid,
+ file_path: String,
+ repo_path: String,
+ ) -> Result<(), DaemonError> {
+ // Expand tilde in repo path
+ let repo_path_expanded = crate::daemon::worktree::expand_tilde(&repo_path);
+
+ // Construct full file path
+ let full_path = repo_path_expanded.join(&file_path);
+
+ // Try to read the file
+ let (content, success, error) = match tokio::fs::read_to_string(&full_path).await {
+ Ok(content) => (Some(content), true, None),
+ Err(e) => {
+ tracing::warn!(
+ request_id = %request_id,
+ file_path = %file_path,
+ repo_path = %repo_path,
+ full_path = %full_path.display(),
+ error = %e,
+ "Failed to read repo file"
+ );
+ (None, false, Some(e.to_string()))
+ }
+ };
+
+ // Send result back to server
+ let msg = DaemonMessage::RepoFileContent {
+ request_id,
+ file_path,
+ content,
+ success,
+ error,
+ };
+ let _ = self.ws_tx.send(msg).await;
+ Ok(())
+ }
+
+ /// Handle CreateBranch command - create a new branch in a task's worktree.
+ async fn handle_create_branch(
+ &self,
+ task_id: Uuid,
+ branch_name: String,
+ from_ref: Option<String>,
+ ) -> Result<(), DaemonError> {
+ // Get task's worktree path
+ let worktree_path = {
+ let tasks = self.tasks.read().await;
+ tasks.get(&task_id)
+ .and_then(|t| t.worktree.as_ref())
+ .map(|w| w.path.clone())
+ };
+
+ let (success, message) = if let Some(path) = worktree_path {
+ // Build git checkout command
+ let mut cmd = tokio::process::Command::new("git");
+ cmd.current_dir(&path);
+ cmd.arg("checkout").arg("-b").arg(&branch_name);
+
+ if let Some(ref from) = from_ref {
+ cmd.arg(from);
+ }
+
+ match cmd.output().await {
+ Ok(output) => {
+ if output.status.success() {
+ (true, format!("Branch '{}' created successfully", branch_name))
+ } else {
+ let stderr = String::from_utf8_lossy(&output.stderr);
+ (false, format!("Failed to create branch: {}", stderr))
+ }
+ }
+ Err(e) => (false, format!("Failed to execute git: {}", e)),
+ }
+ } else {
+ (false, format!("Task {} not found or has no worktree", task_id))
+ };
+
+ let msg = DaemonMessage::BranchCreated {
+ task_id,
+ success,
+ branch_name,
+ message,
+ };
+ let _ = self.ws_tx.send(msg).await;
+ Ok(())
+ }
+
+ /// Handle MergeTaskToTarget command - merge a task's changes to a target branch.
+ async fn handle_merge_task_to_target(
+ &self,
+ task_id: Uuid,
+ target_branch: Option<String>,
+ squash: bool,
+ ) -> Result<(), DaemonError> {
+ // Get task info
+ let task_info = {
+ let tasks = self.tasks.read().await;
+ tasks.get(&task_id).map(|t| (
+ t.worktree.as_ref().map(|w| w.path.clone()),
+ t.base_branch.clone(),
+ ))
+ };
+
+ let (success, message, commit_sha, conflicts) = match task_info {
+ Some((Some(worktree_path), base)) => {
+ let target = target_branch.unwrap_or_else(|| base.unwrap_or_else(|| "main".to_string()));
+
+ // First, stage and commit any uncommitted changes
+ let add_result = tokio::process::Command::new("git")
+ .current_dir(&worktree_path)
+ .args(["add", "-A"])
+ .output()
+ .await;
+
+ if let Err(e) = add_result {
+ (false, format!("Failed to stage changes: {}", e), None, None)
+ } else {
+ // Commit if there are staged changes
+ let commit_result = tokio::process::Command::new("git")
+ .current_dir(&worktree_path)
+ .args(["commit", "-m", "Task completion checkpoint", "--allow-empty"])
+ .output()
+ .await;
+
+ if let Err(e) = commit_result {
+ tracing::warn!(task_id = %task_id, error = %e, "Commit failed (may be empty)");
+ }
+
+ // Get current branch name
+ let branch_output = tokio::process::Command::new("git")
+ .current_dir(&worktree_path)
+ .args(["rev-parse", "--abbrev-ref", "HEAD"])
+ .output()
+ .await;
+
+ let source_branch = branch_output
+ .map(|o| String::from_utf8_lossy(&o.stdout).trim().to_string())
+ .unwrap_or_else(|_| "unknown".to_string());
+
+ // Checkout target branch
+ let checkout = tokio::process::Command::new("git")
+ .current_dir(&worktree_path)
+ .args(["checkout", &target])
+ .output()
+ .await;
+
+ match checkout {
+ Ok(output) if output.status.success() => {
+ // Merge the source branch
+ let mut merge_cmd = tokio::process::Command::new("git");
+ merge_cmd.current_dir(&worktree_path);
+ merge_cmd.arg("merge");
+ if squash {
+ merge_cmd.arg("--squash");
+ }
+ merge_cmd.arg(&source_branch);
+ merge_cmd.arg("-m").arg(format!("Merge task {} into {}", task_id, target));
+
+ match merge_cmd.output().await {
+ Ok(output) if output.status.success() => {
+ // Get the commit SHA
+ let sha_output = tokio::process::Command::new("git")
+ .current_dir(&worktree_path)
+ .args(["rev-parse", "HEAD"])
+ .output()
+ .await;
+
+ let sha = sha_output
+ .ok()
+ .map(|o| String::from_utf8_lossy(&o.stdout).trim().to_string());
+
+ if squash {
+ // For squash merge, we need to commit
+ let _ = tokio::process::Command::new("git")
+ .current_dir(&worktree_path)
+ .args(["commit", "-m", &format!("Squashed merge of task {}", task_id)])
+ .output()
+ .await;
+ }
+
+ (true, format!("Merged {} into {}", source_branch, target), sha, None)
+ }
+ Ok(output) => {
+ let stderr = String::from_utf8_lossy(&output.stderr);
+ // Check for merge conflicts
+ if stderr.contains("CONFLICT") {
+ let conflict_files = stderr
+ .lines()
+ .filter(|l| l.contains("CONFLICT"))
+ .map(|l| l.to_string())
+ .collect::<Vec<_>>();
+ (false, "Merge conflicts detected".to_string(), None, Some(conflict_files))
+ } else {
+ (false, format!("Merge failed: {}", stderr), None, None)
+ }
+ }
+ Err(e) => (false, format!("Failed to merge: {}", e), None, None),
+ }
+ }
+ Ok(output) => {
+ let stderr = String::from_utf8_lossy(&output.stderr);
+ (false, format!("Failed to checkout target branch: {}", stderr), None, None)
+ }
+ Err(e) => (false, format!("Failed to checkout: {}", e), None, None),
+ }
+ }
+ }
+ Some((None, _)) => (false, format!("Task {} has no worktree", task_id), None, None),
+ None => (false, format!("Task {} not found", task_id), None, None),
+ };
+
+ let msg = DaemonMessage::MergeToTargetResult {
+ task_id,
+ success,
+ message,
+ commit_sha,
+ conflicts,
+ };
+ let _ = self.ws_tx.send(msg).await;
+ Ok(())
+ }
+
+ /// Handle CreatePR command - create a pull request for a task's changes.
+ async fn handle_create_pr(
+ &self,
+ task_id: Uuid,
+ title: String,
+ body: Option<String>,
+ base_branch: String,
+ ) -> Result<(), DaemonError> {
+ // Get task's worktree path
+ let worktree_path = {
+ let tasks = self.tasks.read().await;
+ tasks.get(&task_id)
+ .and_then(|t| t.worktree.as_ref())
+ .map(|w| w.path.clone())
+ };
+
+ let (success, message, pr_url, pr_number) = if let Some(path) = worktree_path {
+ // Push the current branch first
+ let push_result = tokio::process::Command::new("git")
+ .current_dir(&path)
+ .args(["push", "-u", "origin", "HEAD"])
+ .output()
+ .await;
+
+ if let Err(e) = push_result {
+ (false, format!("Failed to push branch: {}", e), None, None)
+ } else {
+ // Create PR using gh CLI
+ let mut pr_cmd = tokio::process::Command::new("gh");
+ pr_cmd.current_dir(&path);
+ pr_cmd.args(["pr", "create", "--title", &title, "--base", &base_branch]);
+
+ if let Some(ref body_text) = body {
+ pr_cmd.args(["--body", body_text]);
+ } else {
+ pr_cmd.args(["--body", ""]);
+ }
+
+ match pr_cmd.output().await {
+ Ok(output) if output.status.success() => {
+ let stdout = String::from_utf8_lossy(&output.stdout);
+ // gh pr create outputs the PR URL
+ let url = stdout.lines().last().map(|s| s.trim().to_string());
+ // Extract PR number from URL
+ let number = url.as_ref().and_then(|u| {
+ u.split('/').last().and_then(|n| n.parse::<i32>().ok())
+ });
+ (true, "Pull request created".to_string(), url, number)
+ }
+ Ok(output) => {
+ let stderr = String::from_utf8_lossy(&output.stderr);
+ (false, format!("Failed to create PR: {}", stderr), None, None)
+ }
+ Err(e) => (false, format!("Failed to run gh: {}", e), None, None),
+ }
+ }
+ } else {
+ (false, format!("Task {} not found or has no worktree", task_id), None, None)
+ };
+
+ let msg = DaemonMessage::PRCreated {
+ task_id,
+ success,
+ message,
+ pr_url,
+ pr_number,
+ };
+ let _ = self.ws_tx.send(msg).await;
+ Ok(())
+ }
+
+ /// Handle GetTaskDiff command - get the diff for a task's changes.
+ async fn handle_get_task_diff(
+ &self,
+ task_id: Uuid,
+ ) -> Result<(), DaemonError> {
+ // Get task's worktree path
+ let worktree_path = {
+ let tasks = self.tasks.read().await;
+ tasks.get(&task_id)
+ .and_then(|t| t.worktree.as_ref())
+ .map(|w| w.path.clone())
+ };
+
+ let (success, diff, error) = if let Some(path) = worktree_path {
+ // Get diff of all changes (staged and unstaged)
+ let diff_result = tokio::process::Command::new("git")
+ .current_dir(&path)
+ .args(["diff", "HEAD"])
+ .output()
+ .await;
+
+ match diff_result {
+ Ok(output) if output.status.success() => {
+ let diff_text = String::from_utf8_lossy(&output.stdout).to_string();
+ if diff_text.is_empty() {
+ // No uncommitted changes, show diff from base
+ let base_diff = tokio::process::Command::new("git")
+ .current_dir(&path)
+ .args(["log", "-p", "--reverse", "HEAD~10..HEAD", "--"])
+ .output()
+ .await;
+
+ match base_diff {
+ Ok(o) => (true, Some(String::from_utf8_lossy(&o.stdout).to_string()), None),
+ Err(e) => (false, None, Some(format!("Failed to get diff: {}", e))),
+ }
+ } else {
+ (true, Some(diff_text), None)
+ }
+ }
+ Ok(output) => {
+ let stderr = String::from_utf8_lossy(&output.stderr);
+ (false, None, Some(format!("Git diff failed: {}", stderr)))
+ }
+ Err(e) => (false, None, Some(format!("Failed to run git: {}", e))),
+ }
+ } else {
+ (false, None, Some(format!("Task {} not found or has no worktree", task_id)))
+ };
+
+ let msg = DaemonMessage::TaskDiff {
+ task_id,
+ success,
+ diff,
+ error,
+ };
+ let _ = self.ws_tx.send(msg).await;
+ Ok(())
+ }
+}
+
+/// Inner state for spawned tasks (cloneable).
+struct TaskManagerInner {
+ worktree_manager: Arc<WorktreeManager>,
+ process_manager: Arc<ProcessManager>,
+ temp_manager: Arc<TempManager>,
+ tasks: Arc<RwLock<HashMap<Uuid, ManagedTask>>>,
+ ws_tx: mpsc::Sender<DaemonMessage>,
+ task_inputs: Arc<RwLock<HashMap<Uuid, mpsc::Sender<String>>>>,
+}
+
+impl TaskManagerInner {
+ /// Run a task to completion.
+ #[allow(clippy::too_many_arguments)]
+ async fn run_task(
+ &self,
+ task_id: Uuid,
+ task_name: String,
+ plan: String,
+ repo_source: Option<String>,
+ base_branch: Option<String>,
+ target_branch: Option<String>,
+ is_orchestrator: bool,
+ is_supervisor: bool,
+ target_repo_path: Option<String>,
+ completion_action: Option<String>,
+ continue_from_task_id: Option<Uuid>,
+ copy_files: Option<Vec<String>>,
+ contract_id: Option<Uuid>,
+ ) -> Result<(), DaemonError> {
+ tracing::info!(task_id = %task_id, is_orchestrator = is_orchestrator, is_supervisor = is_supervisor, "=== RUN_TASK START ===");
+
+ // Determine working directory
+ let working_dir = if let Some(ref source) = repo_source {
+ if is_new_repo_request(source) {
+ // Explicit new repo request: new:// or new://project-name
+ tracing::info!(
+ task_id = %task_id,
+ source = %source,
+ "Creating new git repository"
+ );
+
+ let msg = DaemonMessage::task_output(
+ task_id,
+ format!("Initializing new git repository...\n"),
+ false,
+ );
+ let _ = self.ws_tx.send(msg).await;
+
+ let worktree_info = self.worktree_manager
+ .init_new_repo(task_id, source)
+ .await
+ .map_err(|e| DaemonError::Task(TaskError::SetupFailed(e.to_string())))?;
+
+ tracing::info!(
+ task_id = %task_id,
+ path = %worktree_info.path.display(),
+ "New repository created"
+ );
+
+ // Store worktree info
+ {
+ let mut tasks = self.tasks.write().await;
+ if let Some(task) = tasks.get_mut(&task_id) {
+ task.worktree = Some(worktree_info.clone());
+ }
+ }
+
+ let msg = DaemonMessage::task_output(
+ task_id,
+ format!("Repository ready at {}\n", worktree_info.path.display()),
+ false,
+ );
+ let _ = self.ws_tx.send(msg).await;
+
+ worktree_info.path
+ } else {
+ // Send progress message
+ let msg = DaemonMessage::task_output(
+ task_id,
+ format!("Setting up worktree from {}...\n", source),
+ false,
+ );
+ let _ = self.ws_tx.send(msg).await;
+
+ // Ensure source repo exists (clone if URL, verify if path)
+ let source_repo = self.worktree_manager.ensure_repo(source).await
+ .map_err(|e| DaemonError::Task(TaskError::SetupFailed(e.to_string())))?;
+
+ // Detect or use provided base branch
+ let branch = if let Some(ref b) = base_branch {
+ b.clone()
+ } else {
+ self.worktree_manager.detect_default_branch(&source_repo).await
+ .map_err(|e| DaemonError::Task(TaskError::SetupFailed(e.to_string())))?
+ };
+
+ tracing::info!(
+ task_id = %task_id,
+ source = %source,
+ branch = %branch,
+ continue_from_task_id = ?continue_from_task_id,
+ "Setting up worktree"
+ );
+
+ // Create worktree - either from scratch or copying from another task
+ let task_name = format!("task-{}", &task_id.to_string()[..8]);
+ let worktree_info = if let Some(from_task_id) = continue_from_task_id {
+ // Find the source task's worktree path
+ let source_worktree = self.find_worktree_for_task(from_task_id).await
+ .map_err(|e| DaemonError::Task(TaskError::SetupFailed(
+ format!("Cannot continue from task {}: {}", from_task_id, e)
+ )))?;
+
+ let msg = DaemonMessage::task_output(
+ task_id,
+ format!("Continuing from task {} worktree...\n", &from_task_id.to_string()[..8]),
+ false,
+ );
+ let _ = self.ws_tx.send(msg).await;
+
+ // Create worktree by copying from source task
+ self.worktree_manager
+ .create_worktree_from_task(&source_worktree, task_id, &task_name)
+ .await
+ .map_err(|e| DaemonError::Task(TaskError::SetupFailed(e.to_string())))?
+ } else {
+ // Create fresh worktree from repo
+ self.worktree_manager
+ .create_worktree(&source_repo, task_id, &task_name, &branch)
+ .await
+ .map_err(|e| DaemonError::Task(TaskError::SetupFailed(e.to_string())))?
+ };
+
+ tracing::info!(
+ task_id = %task_id,
+ worktree_path = %worktree_info.path.display(),
+ branch = %worktree_info.branch,
+ continued_from = ?continue_from_task_id,
+ "Worktree created"
+ );
+
+ // Store worktree info
+ {
+ let mut tasks = self.tasks.write().await;
+ if let Some(task) = tasks.get_mut(&task_id) {
+ task.worktree = Some(worktree_info.clone());
+ }
+ }
+
+ let msg = DaemonMessage::task_output(
+ task_id,
+ format!("Worktree ready at {}\n", worktree_info.path.display()),
+ false,
+ );
+ let _ = self.ws_tx.send(msg).await;
+
+ worktree_info.path
+ }
+ } else {
+ // No repo specified - use managed temp directory in ~/.makima/temp/
+ tracing::info!(task_id = %task_id, "Creating managed temp directory (no repo)");
+
+ let msg = DaemonMessage::task_output(
+ task_id,
+ "Creating temporary working directory...\n".to_string(),
+ false,
+ );
+ let _ = self.ws_tx.send(msg).await;
+
+ let temp_dir = self.temp_manager.create_task_dir(task_id).await?;
+
+ let msg = DaemonMessage::task_output(
+ task_id,
+ format!("Working directory ready at {}\n", temp_dir.display()),
+ false,
+ );
+ let _ = self.ws_tx.send(msg).await;
+
+ temp_dir
+ };
+
+ // Copy files from parent task's worktree if specified
+ if let Some(ref files) = copy_files {
+ if !files.is_empty() {
+ // Get the parent task ID to find its worktree
+ let parent_task_id = {
+ let tasks = self.tasks.read().await;
+ tasks.get(&task_id).and_then(|t| t.parent_task_id)
+ };
+
+ if let Some(parent_id) = parent_task_id {
+ match self.find_worktree_for_task(parent_id).await {
+ Ok(parent_worktree) => {
+ let msg = DaemonMessage::task_output(
+ task_id,
+ format!("Copying {} files from orchestrator...\n", files.len()),
+ false,
+ );
+ let _ = self.ws_tx.send(msg).await;
+
+ for file_path in files {
+ let source = parent_worktree.join(file_path);
+ let dest = working_dir.join(file_path);
+
+ // Create parent directories if needed
+ if let Some(parent) = dest.parent() {
+ if let Err(e) = tokio::fs::create_dir_all(parent).await {
+ tracing::warn!(
+ task_id = %task_id,
+ file = %file_path,
+ error = %e,
+ "Failed to create parent directory for file"
+ );
+ continue;
+ }
+ }
+
+ // Copy the file
+ match tokio::fs::copy(&source, &dest).await {
+ Ok(_) => {
+ tracing::info!(
+ task_id = %task_id,
+ source = %source.display(),
+ dest = %dest.display(),
+ "Copied file from orchestrator"
+ );
+ }
+ Err(e) => {
+ tracing::warn!(
+ task_id = %task_id,
+ source = %source.display(),
+ dest = %dest.display(),
+ error = %e,
+ "Failed to copy file from orchestrator"
+ );
+ // Notify but don't fail - the file might be optional
+ let msg = DaemonMessage::task_output(
+ task_id,
+ format!("Warning: Could not copy {}: {}\n", file_path, e),
+ false,
+ );
+ let _ = self.ws_tx.send(msg).await;
+ }
+ }
+ }
+
+ let msg = DaemonMessage::task_output(
+ task_id,
+ "Files copied from orchestrator.\n".to_string(),
+ false,
+ );
+ let _ = self.ws_tx.send(msg).await;
+ }
+ Err(e) => {
+ tracing::warn!(
+ task_id = %task_id,
+ parent_id = %parent_id,
+ error = %e,
+ "Could not find parent task worktree for file copying"
+ );
+ }
+ }
+ } else {
+ tracing::warn!(
+ task_id = %task_id,
+ "copy_files specified but no parent_task_id"
+ );
+ }
+ }
+ }
+
+ // Update state to Starting
+ tracing::info!(task_id = %task_id, "Updating state: Initializing -> Starting");
+ self.update_state(task_id, TaskState::Starting).await;
+ self.send_status_change(task_id, "initializing", "starting").await;
+
+ // Check Claude is available
+ match self.process_manager.check_claude_available().await {
+ Ok(version) => {
+ tracing::info!(task_id = %task_id, version = %version, "Claude Code available");
+ let msg = DaemonMessage::task_output(
+ task_id,
+ format!("Claude Code {} ready\n", version),
+ false,
+ );
+ let _ = self.ws_tx.send(msg).await;
+ }
+ Err(e) => {
+ let err_msg = format!("Claude Code not available: {}", e);
+ tracing::error!(task_id = %task_id, error = %err_msg);
+ return Err(DaemonError::Task(TaskError::SetupFailed(err_msg)));
+ }
+ }
+
+ // Set up supervisor, orchestrator, or subtask mode
+ let (extra_env, full_plan, system_prompt) = if is_supervisor {
+ // Supervisor mode: long-running contract orchestrator
+ tracing::info!(task_id = %task_id, working_dir = %working_dir.display(), "Setting up supervisor mode");
+
+ let msg = DaemonMessage::task_output(
+ task_id,
+ "Setting up supervisor environment...\n".to_string(),
+ false,
+ );
+ let _ = self.ws_tx.send(msg).await;
+
+ // Generate tool key for API access
+ let tool_key = generate_tool_key();
+ tracing::info!(task_id = %task_id, tool_key_len = tool_key.len(), "Generated tool key for supervisor");
+
+ // Register tool key with server
+ let register_msg = DaemonMessage::register_tool_key(task_id, tool_key.clone());
+ if self.ws_tx.send(register_msg).await.is_err() {
+ tracing::warn!(task_id = %task_id, "Failed to register tool key");
+ } else {
+ tracing::info!(task_id = %task_id, "Tool key registration message sent to server");
+ }
+
+ // Set up environment variables for makima CLI
+ let mut env = HashMap::new();
+ // TODO: Make API URL configurable
+ env.insert("MAKIMA_API_URL".to_string(), "http://localhost:8080".to_string());
+ env.insert("MAKIMA_API_KEY".to_string(), tool_key.clone());
+ env.insert("MAKIMA_TASK_ID".to_string(), task_id.to_string());
+ // Supervisor needs contract ID for its tools
+ if let Some(cid) = contract_id {
+ env.insert("MAKIMA_CONTRACT_ID".to_string(), cid.to_string());
+ }
+
+ tracing::info!(
+ task_id = %task_id,
+ api_url = "http://localhost:8080",
+ tool_key_preview = &tool_key[..8.min(tool_key.len())],
+ "Set supervisor environment variables"
+ );
+
+ // For supervisor, pass instructions as SYSTEM PROMPT (not user message)
+ // This ensures Claude treats them as behavioral constraints
+ let supervisor_user_plan = format!(
+ "Contract goal:\n{}",
+ plan
+ );
+
+ let msg = DaemonMessage::task_output(
+ task_id,
+ "Supervisor environment ready (makima CLI available)\n".to_string(),
+ false,
+ );
+ let _ = self.ws_tx.send(msg).await;
+
+ // Return system prompt separately - it will be passed via --system-prompt flag
+ (Some(env), supervisor_user_plan, Some(SUPERVISOR_SYSTEM_PROMPT.to_string()))
+ } else if is_orchestrator {
+ tracing::info!(task_id = %task_id, working_dir = %working_dir.display(), "Setting up orchestrator mode");
+
+ let msg = DaemonMessage::task_output(
+ task_id,
+ "Setting up orchestrator environment...\n".to_string(),
+ false,
+ );
+ let _ = self.ws_tx.send(msg).await;
+
+ // Generate tool key for API access
+ let tool_key = generate_tool_key();
+ tracing::info!(task_id = %task_id, tool_key_len = tool_key.len(), "Generated tool key for orchestrator");
+
+ // Register tool key with server
+ let register_msg = DaemonMessage::register_tool_key(task_id, tool_key.clone());
+ if self.ws_tx.send(register_msg).await.is_err() {
+ tracing::warn!(task_id = %task_id, "Failed to register tool key");
+ } else {
+ tracing::info!(task_id = %task_id, "Tool key registration message sent to server");
+ }
+
+ // Set up environment variables for makima CLI
+ let mut env = HashMap::new();
+ // TODO: Make API URL configurable
+ env.insert("MAKIMA_API_URL".to_string(), "http://localhost:8080".to_string());
+ env.insert("MAKIMA_API_KEY".to_string(), tool_key.clone());
+ env.insert("MAKIMA_TASK_ID".to_string(), task_id.to_string());
+
+ tracing::info!(
+ task_id = %task_id,
+ api_url = "http://localhost:8080",
+ tool_key_preview = &tool_key[..8.min(tool_key.len())],
+ "Set orchestrator environment variables"
+ );
+
+ // For orchestrator, pass instructions as SYSTEM PROMPT
+ let orchestrator_user_plan = format!(
+ "Your task:\n{}",
+ plan
+ );
+
+ let msg = DaemonMessage::task_output(
+ task_id,
+ "Orchestrator environment ready (makima CLI available)\n".to_string(),
+ false,
+ );
+ let _ = self.ws_tx.send(msg).await;
+
+ (Some(env), orchestrator_user_plan, Some(ORCHESTRATOR_SYSTEM_PROMPT.to_string()))
+ } else {
+ tracing::info!(task_id = %task_id, "Running as regular subtask (not orchestrator)");
+ // For subtasks, pass worktree isolation instructions as system prompt
+ let subtask_user_plan = format!(
+ "Your task:\n{}",
+ plan
+ );
+ (None, subtask_user_plan, Some(SUBTASK_SYSTEM_PROMPT.to_string()))
+ };
+
+ // Add contract environment if task has contract_id (skip for supervisors - they already have it)
+ let (extra_env, full_plan, system_prompt) = if let Some(cid) = contract_id {
+ if is_supervisor {
+ // Supervisors already have contract ID and API access set up
+ tracing::info!(task_id = %task_id, contract_id = %cid, "Supervisor already has contract integration");
+ (extra_env, full_plan, system_prompt)
+ } else {
+ tracing::info!(task_id = %task_id, contract_id = %cid, "Setting up contract integration");
+
+ // Set up environment variables for makima CLI
+ let mut env = extra_env.unwrap_or_default();
+ env.insert("MAKIMA_CONTRACT_ID".to_string(), cid.to_string());
+
+ // If not already an orchestrator, we need API access for makima CLI
+ if !is_orchestrator {
+ // Generate tool key for API access
+ let tool_key = generate_tool_key();
+ tracing::info!(task_id = %task_id, "Generated tool key for contract access");
+
+ // Register tool key with server
+ let register_msg = DaemonMessage::register_tool_key(task_id, tool_key.clone());
+ if self.ws_tx.send(register_msg).await.is_err() {
+ tracing::warn!(task_id = %task_id, "Failed to register contract tool key");
+ }
+
+ env.insert("MAKIMA_API_URL".to_string(), "http://localhost:8080".to_string());
+ env.insert("MAKIMA_API_KEY".to_string(), tool_key);
+ env.insert("MAKIMA_TASK_ID".to_string(), task_id.to_string());
+ }
+
+ let msg = DaemonMessage::task_output(
+ task_id,
+ "Contract integration ready (makima CLI available)\n".to_string(),
+ false,
+ );
+ let _ = self.ws_tx.send(msg).await;
+
+ // Prepend contract integration prompt to the plan so the task knows to use makima CLI
+ let contract_plan = format!(
+ "{}{}",
+ CONTRACT_INTEGRATION_PROMPT,
+ full_plan
+ );
+
+ (Some(env), contract_plan, system_prompt)
+ }
+ } else {
+ (extra_env, full_plan, system_prompt)
+ };
+
+ // Spawn Claude process
+ let plan_bytes = full_plan.len();
+ let plan_chars = full_plan.chars().count();
+ // Rough token estimate: ~4 chars per token for English
+ let estimated_tokens = plan_chars / 4;
+
+ tracing::info!(
+ task_id = %task_id,
+ working_dir = %working_dir.display(),
+ is_orchestrator = is_orchestrator,
+ plan_bytes = plan_bytes,
+ plan_chars = plan_chars,
+ estimated_tokens = estimated_tokens,
+ "Spawning Claude process"
+ );
+
+ // Warn if plan is very large (Claude's context is typically 100k-200k tokens)
+ if estimated_tokens > 50_000 {
+ tracing::warn!(task_id = %task_id, estimated_tokens = estimated_tokens, "Plan is very large - may hit context limits!");
+ let msg = DaemonMessage::task_output(
+ task_id,
+ format!("Warning: Plan is very large (~{} tokens). This may cause issues.\n", estimated_tokens),
+ false,
+ );
+ let _ = self.ws_tx.send(msg).await;
+ }
+
+ let msg = DaemonMessage::task_output(
+ task_id,
+ if is_orchestrator {
+ format!("Starting Claude Code (orchestrator mode, ~{} tokens)...\n", estimated_tokens)
+ } else {
+ format!("Starting Claude Code (~{} tokens)...\n", estimated_tokens)
+ },
+ false,
+ );
+ let _ = self.ws_tx.send(msg).await;
+
+ tracing::debug!(task_id = %task_id, has_system_prompt = system_prompt.is_some(), "Calling process_manager.spawn()...");
+ let mut process = self.process_manager
+ .spawn_with_system_prompt(&working_dir, &full_plan, extra_env, system_prompt.as_deref())
+ .await
+ .map_err(|e| {
+ tracing::error!(task_id = %task_id, error = %e, "Failed to spawn Claude process");
+ DaemonError::Task(TaskError::SetupFailed(e.to_string()))
+ })?;
+ tracing::info!(task_id = %task_id, "Claude process spawned successfully");
+
+ // Set up input channel for this task so we can send messages to its stdin
+ tracing::debug!(task_id = %task_id, "Setting up input channel...");
+ let (input_tx, mut input_rx) = mpsc::channel::<String>(100);
+ tracing::debug!(task_id = %task_id, "Acquiring task_inputs write lock...");
+ self.task_inputs.write().await.insert(task_id, input_tx);
+ tracing::debug!(task_id = %task_id, "Input channel registered");
+
+ // Get stdin handle for input forwarding and completion signaling
+ let stdin_handle = process.stdin_handle();
+ let stdin_handle_for_completion = stdin_handle.clone();
+
+ tracing::info!(task_id = %task_id, "Setting up stdin forwarder for task input (JSON protocol)");
+ tokio::spawn(async move {
+ tracing::info!(task_id = %task_id, "Stdin forwarder task started, waiting for messages...");
+ while let Some(msg) = input_rx.recv().await {
+ tracing::info!(task_id = %task_id, msg_len = msg.len(), msg_preview = %if msg.len() > 50 { &msg[..50] } else { &msg }, "Received message from input channel");
+
+ // Format as JSON user message for stream-json input protocol
+ let json_msg = ClaudeInputMessage::user(&msg);
+ let json_line = match json_msg.to_json_line() {
+ Ok(line) => line,
+ Err(e) => {
+ tracing::error!(task_id = %task_id, error = %e, "Failed to serialize input message");
+ continue;
+ }
+ };
+
+ tracing::debug!(task_id = %task_id, json_line = %json_line.trim(), "Formatted JSON line for stdin");
+
+ let mut stdin_guard = stdin_handle.lock().await;
+ if let Some(ref mut stdin) = *stdin_guard {
+ tracing::debug!(task_id = %task_id, "Acquired stdin lock, writing...");
+ if stdin.write_all(json_line.as_bytes()).await.is_err() {
+ tracing::warn!(task_id = %task_id, "Failed to write to stdin, breaking");
+ break;
+ }
+ if stdin.flush().await.is_err() {
+ tracing::warn!(task_id = %task_id, "Failed to flush stdin, breaking");
+ break;
+ }
+ tracing::info!(task_id = %task_id, json_len = json_line.len(), "Successfully wrote user message to Claude stdin");
+ } else {
+ tracing::warn!(task_id = %task_id, "Stdin is None (already closed), cannot send message");
+ break;
+ }
+ }
+ tracing::info!(task_id = %task_id, "Stdin forwarder task ended (channel closed or stdin unavailable)");
+ });
+
+ // Update state to Running
+ {
+ tracing::debug!(task_id = %task_id, "Acquiring tasks write lock for Running state update");
+ let mut tasks = self.tasks.write().await;
+ if let Some(task) = tasks.get_mut(&task_id) {
+ task.state = TaskState::Running;
+ task.started_at = Some(Instant::now());
+ }
+ tracing::debug!(task_id = %task_id, "Released tasks write lock");
+ }
+ tracing::info!(task_id = %task_id, "Updating state: Starting -> Running");
+ self.send_status_change(task_id, "starting", "running").await;
+ tracing::debug!(task_id = %task_id, "Sent status change notification");
+
+ // Stream output with startup timeout check
+ tracing::info!(task_id = %task_id, "Starting output stream - waiting for Claude output...");
+ tracing::debug!(task_id = %task_id, "Output will be forwarded via WebSocket to server");
+ let ws_tx = self.ws_tx.clone();
+
+ // For auth error detection
+ let claude_command = self.process_manager.claude_command().to_string();
+ let daemon_hostname = hostname::get().ok().and_then(|h| h.into_string().ok());
+ let mut auth_error_handled = false;
+
+ let mut output_count = 0u64;
+ let mut output_bytes = 0usize;
+ let startup_timeout = tokio::time::Duration::from_secs(30);
+ let mut startup_check = tokio::time::interval(tokio::time::Duration::from_secs(5));
+ startup_check.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
+ let startup_deadline = tokio::time::Instant::now() + startup_timeout;
+
+ loop {
+ tokio::select! {
+ maybe_line = process.next_output() => {
+ match maybe_line {
+ Some(line) => {
+ output_count += 1;
+ output_bytes += line.content.len();
+
+ if output_count == 1 {
+ tracing::info!(task_id = %task_id, "Received first output line from Claude");
+ }
+ if output_count % 100 == 0 {
+ tracing::debug!(task_id = %task_id, output_count = output_count, output_bytes = output_bytes, "Output progress");
+ }
+
+ // Log output details for debugging
+ tracing::trace!(
+ task_id = %task_id,
+ line_num = output_count,
+ content_len = line.content.len(),
+ is_stdout = line.is_stdout,
+ json_type = ?line.json_type,
+ "Forwarding output to WebSocket"
+ );
+
+ // Check if this is a "result" message indicating task completion
+ // With --input-format=stream-json, Claude waits for more input after completion
+ // We close stdin to signal EOF and let the process exit
+ if line.json_type.as_deref() == Some("result") {
+ tracing::info!(task_id = %task_id, "Received result message, closing stdin to signal completion");
+ let mut stdin_guard = stdin_handle_for_completion.lock().await;
+ if let Some(mut stdin) = stdin_guard.take() {
+ let _ = stdin.shutdown().await;
+ }
+ }
+
+ // Check for OAuth auth error before sending output
+ let content_for_auth_check = line.content.clone();
+
+ let msg = DaemonMessage::task_output(task_id, line.content, false);
+ if ws_tx.send(msg).await.is_err() {
+ tracing::warn!(task_id = %task_id, "Failed to send output, channel closed");
+ break;
+ }
+
+ // Detect OAuth token expiration and trigger remote login flow
+ if !auth_error_handled && is_oauth_auth_error(&content_for_auth_check) {
+ auth_error_handled = true;
+ tracing::warn!(task_id = %task_id, "OAuth authentication error detected, initiating remote login flow");
+
+ // Spawn claude setup-token to get login URL
+ if let Some(login_url) = get_oauth_login_url(&claude_command).await {
+ tracing::info!(task_id = %task_id, login_url = %login_url, "Got OAuth login URL");
+ let auth_msg = DaemonMessage::AuthenticationRequired {
+ task_id: Some(task_id),
+ login_url,
+ hostname: daemon_hostname.clone(),
+ };
+ if ws_tx.send(auth_msg).await.is_err() {
+ tracing::warn!(task_id = %task_id, "Failed to send auth required message");
+ }
+ } else {
+ tracing::error!(task_id = %task_id, "Failed to get OAuth login URL from setup-token");
+ let fallback_msg = DaemonMessage::task_output(
+ task_id,
+ format!("Authentication required on daemon{}. Please run 'claude /login' on the daemon machine.\n",
+ daemon_hostname.as_ref().map(|h| format!(" ({})", h)).unwrap_or_default()),
+ false,
+ );
+ let _ = ws_tx.send(fallback_msg).await;
+ }
+ }
+ }
+ None => {
+ tracing::info!(task_id = %task_id, output_count = output_count, output_bytes = output_bytes, "Output stream ended");
+ break;
+ }
+ }
+ }
+ _ = startup_check.tick(), if output_count == 0 => {
+ // Check if process is still alive
+ match process.try_wait() {
+ Ok(Some(exit_code)) => {
+ tracing::error!(task_id = %task_id, exit_code = exit_code, "Claude process exited before producing output!");
+ let msg = DaemonMessage::task_output(
+ task_id,
+ format!("Error: Claude process exited unexpectedly with code {}\n", exit_code),
+ false,
+ );
+ let _ = ws_tx.send(msg).await;
+ break;
+ }
+ Ok(None) => {
+ // Still running but no output
+ if tokio::time::Instant::now() > startup_deadline {
+ tracing::warn!(task_id = %task_id, "Claude process not producing output after 30s - may be stuck");
+ let msg = DaemonMessage::task_output(
+ task_id,
+ "Warning: Claude Code is taking longer than expected to start. It may be waiting for authentication or network access.\n".to_string(),
+ false,
+ );
+ let _ = ws_tx.send(msg).await;
+ } else {
+ tracing::debug!(task_id = %task_id, "Claude process still running, waiting for output...");
+ }
+ }
+ Err(e) => {
+ tracing::error!(task_id = %task_id, error = %e, "Failed to check Claude process status");
+ }
+ }
+ }
+ }
+ }
+
+ // Wait for process to exit
+ let exit_code = process.wait().await.unwrap_or(-1);
+
+ // Clean up input channel for this task
+ self.task_inputs.write().await.remove(&task_id);
+ tracing::debug!(task_id = %task_id, "Removed task input channel");
+
+ // Update state based on exit code
+ let success = exit_code == 0;
+ let new_state = if success {
+ TaskState::Completed
+ } else {
+ TaskState::Failed
+ };
+
+ tracing::info!(
+ task_id = %task_id,
+ exit_code = exit_code,
+ success = success,
+ new_state = ?new_state,
+ "Claude process exited, updating task state"
+ );
+
+ {
+ let mut tasks = self.tasks.write().await;
+ if let Some(task) = tasks.get_mut(&task_id) {
+ task.state = new_state;
+ task.completed_at = Some(Instant::now());
+ if !success {
+ task.error = Some(format!("Process exited with code {}", exit_code));
+ }
+ }
+ }
+
+ // Execute completion action if task succeeded
+ let completion_result = if success {
+ if let Some(ref action) = completion_action {
+ if action != "none" {
+ self.execute_completion_action(
+ task_id,
+ &task_name,
+ &working_dir,
+ action,
+ target_repo_path.as_deref(),
+ target_branch.as_deref(),
+ ).await
+ } else {
+ Ok(None)
+ }
+ } else {
+ Ok(None)
+ }
+ } else {
+ Ok(None)
+ };
+
+ // Log completion action result
+ match &completion_result {
+ Ok(Some(pr_url)) => {
+ tracing::info!(task_id = %task_id, pr_url = %pr_url, "Completion action created PR");
+ }
+ Ok(None) => {}
+ Err(e) => {
+ tracing::warn!(task_id = %task_id, error = %e, "Completion action failed (task still marked as done)");
+ }
+ }
+
+ // Notify server - but NOT for supervisors which should never complete
+ if is_supervisor {
+ tracing::info!(
+ task_id = %task_id,
+ exit_code = exit_code,
+ "Supervisor Claude process exited - NOT marking as complete"
+ );
+ // Update local state to reflect it's paused/waiting for input
+ {
+ let mut tasks = self.tasks.write().await;
+ if let Some(task) = tasks.get_mut(&task_id) {
+ task.state = TaskState::Running; // Keep it as running, not completed
+ task.completed_at = None;
+ }
+ }
+ // Send a status message to let the frontend know supervisor is ready for more input
+ let msg = DaemonMessage::task_output(
+ task_id,
+ "\n[Supervisor ready for next instruction]\n".to_string(),
+ false,
+ );
+ let _ = self.ws_tx.send(msg).await;
+ } else {
+ let error = if success {
+ None
+ } else {
+ Some(format!("Exit code: {}", exit_code))
+ };
+ tracing::info!(task_id = %task_id, success = success, "Notifying server of task completion");
+ let msg = DaemonMessage::task_complete(task_id, success, error);
+ let _ = self.ws_tx.send(msg).await;
+ }
+
+ // Note: Worktrees are kept until explicitly deleted (per user preference)
+ // This allows inspection, PR creation, etc.
+
+ tracing::info!(task_id = %task_id, "=== RUN_TASK END ===");
+ Ok(())
+ }
+
+ /// Execute the completion action for a task.
+ async fn execute_completion_action(
+ &self,
+ task_id: Uuid,
+ task_name: &str,
+ worktree_path: &std::path::Path,
+ action: &str,
+ target_repo_path: Option<&str>,
+ target_branch: Option<&str>,
+ ) -> Result<Option<String>, String> {
+ let target_repo = match target_repo_path {
+ Some(path) => crate::daemon::worktree::expand_tilde(path),
+ None => {
+ tracing::warn!(task_id = %task_id, "No target_repo_path configured, skipping completion action");
+ return Ok(None);
+ }
+ };
+
+ if !target_repo.exists() {
+ return Err(format!("Target repo not found: {} (expanded from {:?})", target_repo.display(), target_repo_path));
+ }
+
+ // Get the branch name: makima/{task-name-with-dashes}-{short-id}
+ let branch_name = format!(
+ "makima/{}-{}",
+ crate::daemon::worktree::sanitize_name(task_name),
+ crate::daemon::worktree::short_uuid(task_id)
+ );
+
+ // Determine target branch - use provided value or detect default branch of target repo
+ let target_branch = match target_branch {
+ Some(branch) => branch.to_string(),
+ None => {
+ // Detect default branch (main, master, develop, etc.)
+ self.worktree_manager
+ .detect_default_branch(&target_repo)
+ .await
+ .unwrap_or_else(|_| "master".to_string())
+ }
+ };
+
+ let msg = DaemonMessage::task_output(
+ task_id,
+ format!("Executing completion action: {}...\n", action),
+ false,
+ );
+ let _ = self.ws_tx.send(msg).await;
+
+ match action {
+ "branch" => {
+ // Just push the branch to target repo
+ self.worktree_manager
+ .push_to_target_repo(worktree_path, &target_repo, &branch_name, task_name)
+ .await
+ .map_err(|e| e.to_string())?;
+
+ let msg = DaemonMessage::task_output(
+ task_id,
+ format!("Branch '{}' pushed to {}\n", branch_name, target_repo.display()),
+ false,
+ );
+ let _ = self.ws_tx.send(msg).await;
+ Ok(None)
+ }
+ "merge" => {
+ // Push and merge into target branch
+ let commit_sha = self.worktree_manager
+ .merge_to_target(worktree_path, &target_repo, &branch_name, &target_branch, task_name)
+ .await
+ .map_err(|e| e.to_string())?;
+
+ let msg = DaemonMessage::task_output(
+ task_id,
+ format!("Branch merged into {} (commit: {})\n", target_branch, commit_sha),
+ false,
+ );
+ let _ = self.ws_tx.send(msg).await;
+ Ok(None)
+ }
+ "pr" => {
+ // Push and create PR
+ let title = task_name.to_string();
+ let body = format!(
+ "Automated PR from makima task.\n\nTask ID: `{}`",
+ task_id
+ );
+ let pr_url = self.worktree_manager
+ .create_pull_request(
+ worktree_path,
+ &target_repo,
+ &branch_name,
+ &target_branch,
+ &title,
+ &body,
+ )
+ .await
+ .map_err(|e| e.to_string())?;
+
+ let msg = DaemonMessage::task_output(
+ task_id,
+ format!("Pull request created: {}\n", pr_url),
+ false,
+ );
+ let _ = self.ws_tx.send(msg).await;
+ Ok(Some(pr_url))
+ }
+ _ => {
+ tracing::warn!(task_id = %task_id, action = %action, "Unknown completion action");
+ Ok(None)
+ }
+ }
+ }
+
+ /// Find worktree path for a task ID.
+ /// First checks in-memory tasks, then scans the worktrees directory.
+ async fn find_worktree_for_task(&self, task_id: Uuid) -> Result<PathBuf, String> {
+ // First try to get from in-memory tasks
+ {
+ let tasks = self.tasks.read().await;
+ if let Some(task) = tasks.get(&task_id) {
+ if let Some(ref worktree) = task.worktree {
+ return Ok(worktree.path.clone());
+ }
+ }
+ }
+
+ // Task not in memory - scan worktrees directory for matching task ID
+ let short_id = &task_id.to_string()[..8];
+ let worktrees_dir = self.worktree_manager.base_dir();
+
+ if let Ok(mut entries) = tokio::fs::read_dir(worktrees_dir).await {
+ while let Ok(Some(entry)) = entries.next_entry().await {
+ let name = entry.file_name();
+ let name_str = name.to_string_lossy();
+ if name_str.starts_with(short_id) {
+ let path = entry.path();
+ // Verify it's a valid git directory
+ if path.join(".git").exists() {
+ tracing::info!(
+ task_id = %task_id,
+ worktree_path = %path.display(),
+ "Found worktree by scanning directory"
+ );
+ return Ok(path);
+ }
+ }
+ }
+ }
+
+ Err(format!(
+ "No worktree found for task {}. The worktree may have been cleaned up.",
+ task_id
+ ))
+ }
+
+ async fn update_state(&self, task_id: Uuid, state: TaskState) {
+ let mut tasks = self.tasks.write().await;
+ if let Some(task) = tasks.get_mut(&task_id) {
+ task.state = state;
+ }
+ }
+
+ async fn send_status_change(&self, task_id: Uuid, old_status: &str, new_status: &str) {
+ let msg = DaemonMessage::task_status_change(task_id, old_status, new_status);
+ let _ = self.ws_tx.send(msg).await;
+ }
+
+ /// Mark task as failed.
+ async fn mark_failed(&self, task_id: Uuid, error: &str) {
+ {
+ let mut tasks = self.tasks.write().await;
+ if let Some(task) = tasks.get_mut(&task_id) {
+ task.state = TaskState::Failed;
+ task.error = Some(error.to_string());
+ task.completed_at = Some(Instant::now());
+ }
+ }
+
+ // Notify server
+ let msg = DaemonMessage::task_complete(task_id, false, Some(error.to_string()));
+ let _ = self.ws_tx.send(msg).await;
+ }
+}
+
+impl Clone for TaskManagerInner {
+ fn clone(&self) -> Self {
+ Self {
+ worktree_manager: self.worktree_manager.clone(),
+ process_manager: self.process_manager.clone(),
+ temp_manager: self.temp_manager.clone(),
+ tasks: self.tasks.clone(),
+ ws_tx: self.ws_tx.clone(),
+ task_inputs: self.task_inputs.clone(),
+ }
+ }
+}
diff --git a/makima/src/daemon/task/mod.rs b/makima/src/daemon/task/mod.rs
new file mode 100644
index 0000000..29c261e
--- /dev/null
+++ b/makima/src/daemon/task/mod.rs
@@ -0,0 +1,7 @@
+//! Task management and execution.
+
+pub mod manager;
+pub mod state;
+
+pub use manager::{ManagedTask, TaskConfig, TaskManager};
+pub use state::TaskState;
diff --git a/makima/src/daemon/task/state.rs b/makima/src/daemon/task/state.rs
new file mode 100644
index 0000000..ca5fc01
--- /dev/null
+++ b/makima/src/daemon/task/state.rs
@@ -0,0 +1,161 @@
+//! Task state machine.
+
+use std::fmt;
+
+/// Task execution state.
+#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
+pub enum TaskState {
+ /// Task received, preparing overlay.
+ Initializing,
+ /// Overlay ready, starting container.
+ Starting,
+ /// Container running.
+ Running,
+ /// Container paused.
+ Paused,
+ /// Waiting for sibling or resource.
+ Blocked,
+ /// Task completed successfully.
+ Completed,
+ /// Task failed with error.
+ Failed,
+ /// Task interrupted by user.
+ Interrupted,
+}
+
+impl TaskState {
+ /// Check if a state transition is valid.
+ pub fn can_transition_to(&self, target: TaskState) -> bool {
+ use TaskState::*;
+
+ matches!(
+ (self, target),
+ // From Initializing
+ (Initializing, Starting)
+ | (Initializing, Failed)
+ | (Initializing, Interrupted)
+ // From Starting
+ | (Starting, Running)
+ | (Starting, Failed)
+ | (Starting, Interrupted)
+ // From Running
+ | (Running, Paused)
+ | (Running, Blocked)
+ | (Running, Completed)
+ | (Running, Failed)
+ | (Running, Interrupted)
+ // From Paused
+ | (Paused, Running)
+ | (Paused, Interrupted)
+ | (Paused, Failed)
+ // From Blocked
+ | (Blocked, Running)
+ | (Blocked, Failed)
+ | (Blocked, Interrupted)
+ )
+ }
+
+ /// Check if this state is terminal (no more transitions possible).
+ pub fn is_terminal(&self) -> bool {
+ matches!(
+ self,
+ TaskState::Completed | TaskState::Failed | TaskState::Interrupted
+ )
+ }
+
+ /// Check if the task is currently active (running or paused).
+ pub fn is_active(&self) -> bool {
+ matches!(
+ self,
+ TaskState::Initializing
+ | TaskState::Starting
+ | TaskState::Running
+ | TaskState::Paused
+ | TaskState::Blocked
+ )
+ }
+
+ /// Check if the task is running.
+ pub fn is_running(&self) -> bool {
+ matches!(self, TaskState::Running)
+ }
+
+ /// Convert to string for protocol messages.
+ pub fn as_str(&self) -> &'static str {
+ match self {
+ TaskState::Initializing => "initializing",
+ TaskState::Starting => "starting",
+ TaskState::Running => "running",
+ TaskState::Paused => "paused",
+ TaskState::Blocked => "blocked",
+ TaskState::Completed => "done",
+ TaskState::Failed => "failed",
+ TaskState::Interrupted => "interrupted",
+ }
+ }
+
+ /// Parse from string.
+ pub fn from_str(s: &str) -> Option<Self> {
+ match s.to_lowercase().as_str() {
+ "initializing" => Some(TaskState::Initializing),
+ "starting" => Some(TaskState::Starting),
+ "running" => Some(TaskState::Running),
+ "paused" => Some(TaskState::Paused),
+ "blocked" => Some(TaskState::Blocked),
+ "done" | "completed" => Some(TaskState::Completed),
+ "failed" => Some(TaskState::Failed),
+ "interrupted" => Some(TaskState::Interrupted),
+ _ => None,
+ }
+ }
+}
+
+impl fmt::Display for TaskState {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ write!(f, "{}", self.as_str())
+ }
+}
+
+impl Default for TaskState {
+ fn default() -> Self {
+ TaskState::Initializing
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use crate::daemon::*;
+
+ #[test]
+ fn test_valid_transitions() {
+ use TaskState::*;
+
+ // Valid transitions
+ assert!(Initializing.can_transition_to(Starting));
+ assert!(Starting.can_transition_to(Running));
+ assert!(Running.can_transition_to(Completed));
+ assert!(Running.can_transition_to(Paused));
+ assert!(Paused.can_transition_to(Running));
+
+ // Invalid transitions
+ assert!(!Completed.can_transition_to(Running));
+ assert!(!Failed.can_transition_to(Running));
+ assert!(!Running.can_transition_to(Initializing));
+ }
+
+ #[test]
+ fn test_terminal_states() {
+ assert!(TaskState::Completed.is_terminal());
+ assert!(TaskState::Failed.is_terminal());
+ assert!(TaskState::Interrupted.is_terminal());
+ assert!(!TaskState::Running.is_terminal());
+ assert!(!TaskState::Paused.is_terminal());
+ }
+
+ #[test]
+ fn test_parse() {
+ assert_eq!(TaskState::from_str("running"), Some(TaskState::Running));
+ assert_eq!(TaskState::from_str("done"), Some(TaskState::Completed));
+ assert_eq!(TaskState::from_str("invalid"), None);
+ }
+}
diff --git a/makima/src/daemon/temp.rs b/makima/src/daemon/temp.rs
new file mode 100644
index 0000000..42d4a28
--- /dev/null
+++ b/makima/src/daemon/temp.rs
@@ -0,0 +1,224 @@
+//! Managed temporary directory for tasks without repositories.
+//!
+//! Tasks that don't have a repository URL and aren't subtasks (which inherit
+//! from parent) use a managed temp directory in ~/.makima/temp/. The directory
+//! is automatically cleaned up when it exceeds a size limit.
+
+use std::path::PathBuf;
+
+use tokio::fs;
+use uuid::Uuid;
+
+/// Maximum size of the temp directory before cleanup (5GB).
+const MAX_TEMP_SIZE_BYTES: u64 = 5 * 1024 * 1024 * 1024;
+
+/// Manages temporary directories for tasks without repositories.
+pub struct TempManager {
+ /// Base directory for temp task directories (~/.makima/temp/).
+ temp_dir: PathBuf,
+}
+
+impl TempManager {
+ /// Create a new TempManager.
+ pub fn new() -> Self {
+ let home = dirs::home_dir().unwrap_or_else(|| PathBuf::from("."));
+ Self {
+ temp_dir: home.join(".makima").join("temp"),
+ }
+ }
+
+ /// Create a new TempManager with a custom base directory.
+ #[allow(dead_code)]
+ pub fn with_base_dir(base_dir: PathBuf) -> Self {
+ Self { temp_dir: base_dir }
+ }
+
+ /// Get the base temp directory path.
+ pub fn temp_dir(&self) -> &PathBuf {
+ &self.temp_dir
+ }
+
+ /// Create a temp directory for a task.
+ ///
+ /// This creates a directory at ~/.makima/temp/task-{id}/ and triggers
+ /// cleanup if the total size exceeds the limit.
+ pub async fn create_task_dir(&self, task_id: Uuid) -> Result<PathBuf, std::io::Error> {
+ // Ensure base directory exists
+ fs::create_dir_all(&self.temp_dir).await?;
+
+ // Check size and cleanup if needed
+ if let Err(e) = self.cleanup_if_needed().await {
+ tracing::warn!("Temp directory cleanup failed: {}", e);
+ // Continue anyway, cleanup is best-effort
+ }
+
+ // Create task-specific directory
+ let task_dir = self.temp_dir.join(format!("task-{}", task_id));
+ fs::create_dir_all(&task_dir).await?;
+
+ tracing::info!(
+ task_id = %task_id,
+ path = %task_dir.display(),
+ "Created temp directory for task"
+ );
+
+ Ok(task_dir)
+ }
+
+ /// Calculate total size of temp directory recursively.
+ async fn get_total_size(&self) -> Result<u64, std::io::Error> {
+ if !self.temp_dir.exists() {
+ return Ok(0);
+ }
+
+ let mut total = 0u64;
+ let mut stack = vec![self.temp_dir.clone()];
+
+ while let Some(dir) = stack.pop() {
+ let mut entries = match fs::read_dir(&dir).await {
+ Ok(e) => e,
+ Err(_) => continue,
+ };
+
+ while let Ok(Some(entry)) = entries.next_entry().await {
+ let metadata = match entry.metadata().await {
+ Ok(m) => m,
+ Err(_) => continue,
+ };
+
+ if metadata.is_dir() {
+ stack.push(entry.path());
+ } else {
+ total += metadata.len();
+ }
+ }
+ }
+
+ Ok(total)
+ }
+
+ /// Remove oldest directories if total size exceeds limit.
+ async fn cleanup_if_needed(&self) -> Result<(), std::io::Error> {
+ let size = self.get_total_size().await?;
+ if size <= MAX_TEMP_SIZE_BYTES {
+ return Ok(());
+ }
+
+ tracing::info!(
+ current_size_mb = size / 1024 / 1024,
+ limit_mb = MAX_TEMP_SIZE_BYTES / 1024 / 1024,
+ "Temp directory exceeds size limit, starting cleanup"
+ );
+
+ // Get all task dirs with modification times
+ let mut dirs: Vec<(PathBuf, std::time::SystemTime, u64)> = vec![];
+ let mut entries = fs::read_dir(&self.temp_dir).await?;
+
+ while let Ok(Some(entry)) = entries.next_entry().await {
+ let path = entry.path();
+ if !path.is_dir() {
+ continue;
+ }
+
+ let metadata = match entry.metadata().await {
+ Ok(m) => m,
+ Err(_) => continue,
+ };
+
+ let modified = metadata.modified().unwrap_or(std::time::UNIX_EPOCH);
+ let dir_size = self.get_dir_size(&path).await.unwrap_or(0);
+ dirs.push((path, modified, dir_size));
+ }
+
+ // Sort by oldest first
+ dirs.sort_by(|a, b| a.1.cmp(&b.1));
+
+ // Remove oldest until under limit
+ let mut current_size = size;
+ for (path, _, dir_size) in dirs {
+ if current_size <= MAX_TEMP_SIZE_BYTES {
+ break;
+ }
+
+ tracing::info!(
+ path = %path.display(),
+ size_mb = dir_size / 1024 / 1024,
+ "Removing old temp directory"
+ );
+
+ if let Err(e) = fs::remove_dir_all(&path).await {
+ tracing::warn!(path = %path.display(), error = %e, "Failed to remove temp directory");
+ continue;
+ }
+
+ current_size = current_size.saturating_sub(dir_size);
+ }
+
+ tracing::info!(
+ new_size_mb = current_size / 1024 / 1024,
+ "Temp directory cleanup complete"
+ );
+
+ Ok(())
+ }
+
+ /// Calculate size of a directory recursively.
+ async fn get_dir_size(&self, path: &PathBuf) -> Result<u64, std::io::Error> {
+ let mut total = 0u64;
+ let mut stack = vec![path.clone()];
+
+ while let Some(dir) = stack.pop() {
+ let mut entries = match fs::read_dir(&dir).await {
+ Ok(e) => e,
+ Err(_) => continue,
+ };
+
+ while let Ok(Some(entry)) = entries.next_entry().await {
+ let metadata = match entry.metadata().await {
+ Ok(m) => m,
+ Err(_) => continue,
+ };
+
+ if metadata.is_dir() {
+ stack.push(entry.path());
+ } else {
+ total += metadata.len();
+ }
+ }
+ }
+
+ Ok(total)
+ }
+
+ /// Remove a specific task's temp directory.
+ #[allow(dead_code)]
+ pub async fn remove_task_dir(&self, task_id: Uuid) -> Result<(), std::io::Error> {
+ let task_dir = self.temp_dir.join(format!("task-{}", task_id));
+ if task_dir.exists() {
+ fs::remove_dir_all(&task_dir).await?;
+ tracing::info!(
+ task_id = %task_id,
+ path = %task_dir.display(),
+ "Removed temp directory for task"
+ );
+ }
+ Ok(())
+ }
+}
+
+impl Default for TempManager {
+ fn default() -> Self {
+ Self::new()
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use crate::daemon::*;
+
+ #[test]
+ fn test_temp_manager_default_dir() {
+ let manager = TempManager::new();
+ assert!(manager.temp_dir().ends_with(".makima/temp"));
+ }
+}
diff --git a/makima/src/daemon/worktree/manager.rs b/makima/src/daemon/worktree/manager.rs
new file mode 100644
index 0000000..9af5dcb
--- /dev/null
+++ b/makima/src/daemon/worktree/manager.rs
@@ -0,0 +1,1623 @@
+//! Worktree manager implementation.
+
+use std::collections::HashMap;
+use std::path::{Path, PathBuf};
+use std::sync::LazyLock;
+
+use tokio::process::Command;
+use tokio::sync::Mutex;
+use uuid::Uuid;
+
+/// Errors that can occur during worktree operations.
+#[derive(Debug, thiserror::Error)]
+pub enum WorktreeError {
+ #[error("Git command failed: {0}")]
+ GitCommand(String),
+
+ #[error("Repository not found: {0}")]
+ RepoNotFound(String),
+
+ #[error("Failed to create directory: {0}")]
+ CreateDir(#[from] std::io::Error),
+
+ #[error("Invalid repository path: {0}")]
+ InvalidPath(String),
+
+ #[error("Worktree already exists: {0}")]
+ AlreadyExists(String),
+
+ #[error("Clone failed: {0}")]
+ CloneFailed(String),
+
+ #[error("Merge in progress")]
+ MergeInProgress,
+
+ #[error("No merge in progress")]
+ NoMergeInProgress,
+
+ #[error("Merge has conflicts: {0}")]
+ MergeConflicts(String),
+}
+
+/// Strategy for resolving a merge conflict.
+#[derive(Debug, Clone, Copy, PartialEq, Eq)]
+pub enum ConflictResolution {
+ /// Use our version (the branch being merged into).
+ Ours,
+ /// Use their version (the branch being merged).
+ Theirs,
+}
+
+/// State of an in-progress merge.
+#[derive(Debug, Clone)]
+pub struct MergeState {
+ /// The branch being merged.
+ pub source_branch: String,
+ /// Files with unresolved conflicts.
+ pub conflicted_files: Vec<String>,
+ /// Whether a merge is currently in progress.
+ pub in_progress: bool,
+}
+
+/// Information about a task branch.
+#[derive(Debug, Clone)]
+pub struct TaskBranchInfo {
+ /// Full branch name.
+ pub name: String,
+ /// Task ID extracted from branch name (if parseable).
+ pub task_id: Option<Uuid>,
+ /// Whether this branch has been merged into the current branch.
+ pub is_merged: bool,
+ /// Short SHA of the last commit.
+ pub last_commit: String,
+ /// Subject line of the last commit.
+ pub last_commit_message: String,
+}
+
+/// Information about a created worktree.
+#[derive(Debug, Clone)]
+pub struct WorktreeInfo {
+ /// Path to the worktree directory.
+ pub path: PathBuf,
+ /// Git branch name for this worktree.
+ pub branch: String,
+ /// Source repository path.
+ pub source_repo: PathBuf,
+}
+
+/// Manages git worktrees for task isolation.
+pub struct WorktreeManager {
+ /// Base directory for all worktrees (~/.makima/worktrees).
+ base_dir: PathBuf,
+ /// Base directory for cloned repos (~/.makima/repos).
+ repos_dir: PathBuf,
+ /// Branch prefix for task branches.
+ branch_prefix: String,
+}
+
+/// Per-worktree locks to prevent concurrent creation issues.
+static WORKTREE_LOCKS: LazyLock<Mutex<HashMap<String, std::sync::Arc<tokio::sync::Mutex<()>>>>> =
+ LazyLock::new(|| Mutex::new(HashMap::new()));
+
+impl WorktreeManager {
+ /// Create a new WorktreeManager with the given base directory.
+ pub fn new(base_dir: PathBuf) -> Self {
+ let repos_dir = base_dir.parent()
+ .map(|p| p.join("repos"))
+ .unwrap_or_else(|| base_dir.join("repos"));
+
+ Self {
+ base_dir,
+ repos_dir,
+ branch_prefix: "makima/task-".to_string(),
+ }
+ }
+
+ /// Get the default worktree base directory (~/.makima/worktrees).
+ pub fn default_base_dir() -> PathBuf {
+ dirs::home_dir()
+ .unwrap_or_else(|| PathBuf::from("."))
+ .join(".makima")
+ .join("worktrees")
+ }
+
+ /// Get the base directory for worktrees.
+ pub fn base_dir(&self) -> &Path {
+ &self.base_dir
+ }
+
+ /// Detect the default branch of a repository.
+ /// Tries to find HEAD's target, falling back to common branch names.
+ pub async fn detect_default_branch(&self, repo_path: &Path) -> Result<String, WorktreeError> {
+ // Try to get the branch that HEAD points to
+ let output = Command::new("git")
+ .args(["symbolic-ref", "refs/remotes/origin/HEAD", "--short"])
+ .current_dir(repo_path)
+ .output()
+ .await?;
+
+ if output.status.success() {
+ let branch = String::from_utf8_lossy(&output.stdout).trim().to_string();
+ // Remove "origin/" prefix if present
+ let branch = branch.strip_prefix("origin/").unwrap_or(&branch).to_string();
+ if !branch.is_empty() {
+ return Ok(branch);
+ }
+ }
+
+ // Try common branch names
+ for branch in ["main", "master", "develop", "trunk"] {
+ let output = Command::new("git")
+ .args(["rev-parse", "--verify", &format!("refs/heads/{}", branch)])
+ .current_dir(repo_path)
+ .output()
+ .await?;
+
+ if output.status.success() {
+ return Ok(branch.to_string());
+ }
+ }
+
+ // Fall back to getting the current branch
+ let output = Command::new("git")
+ .args(["rev-parse", "--abbrev-ref", "HEAD"])
+ .current_dir(repo_path)
+ .output()
+ .await?;
+
+ if output.status.success() {
+ let branch = String::from_utf8_lossy(&output.stdout).trim().to_string();
+ if !branch.is_empty() && branch != "HEAD" {
+ return Ok(branch);
+ }
+ }
+
+ Err(WorktreeError::GitCommand(
+ "Could not detect default branch".to_string(),
+ ))
+ }
+
+ /// Ensure the source repository exists locally and is up-to-date.
+ /// If repo_source is a URL, clone it. If it's a path, verify it exists.
+ /// For both cases, fetch latest changes from remote if available.
+ pub async fn ensure_repo(&self, repo_source: &str) -> Result<PathBuf, WorktreeError> {
+ // Check if it's a URL (simple heuristic)
+ if repo_source.starts_with("http://")
+ || repo_source.starts_with("https://")
+ || repo_source.starts_with("git@")
+ || repo_source.starts_with("ssh://")
+ {
+ self.clone_or_fetch_repo(repo_source).await
+ } else {
+ // Treat as local path - expand tilde if present
+ let path = expand_tilde(repo_source);
+ if !path.exists() {
+ return Err(WorktreeError::RepoNotFound(repo_source.to_string()));
+ }
+ // Verify it's a git repo
+ let git_dir = path.join(".git");
+ if !git_dir.exists() {
+ return Err(WorktreeError::InvalidPath(format!(
+ "{} is not a git repository",
+ repo_source
+ )));
+ }
+
+ // Fetch latest changes from remote if configured
+ tracing::info!("Fetching latest changes for local repo: {}", repo_source);
+ let output = Command::new("git")
+ .args(["fetch", "--all", "--prune"])
+ .current_dir(&path)
+ .output()
+ .await?;
+
+ if !output.status.success() {
+ let stderr = String::from_utf8_lossy(&output.stderr);
+ // Don't fail - repo might not have a remote configured
+ tracing::debug!("Git fetch for local repo (may not have remote): {}", stderr);
+ } else {
+ tracing::info!("Fetched latest changes for {}", repo_source);
+ }
+
+ Ok(path)
+ }
+ }
+
+ /// Clone a repository or fetch if already cloned.
+ async fn clone_or_fetch_repo(&self, url: &str) -> Result<PathBuf, WorktreeError> {
+ // Extract repo name from URL
+ let repo_name = extract_repo_name(url);
+ let repo_path = self.repos_dir.join(&repo_name);
+
+ // Create repos directory if needed
+ tokio::fs::create_dir_all(&self.repos_dir).await?;
+
+ if repo_path.exists() {
+ // Fetch latest changes
+ tracing::info!("Fetching updates for existing repo: {}", repo_name);
+ let output = Command::new("git")
+ .args(["fetch", "--all", "--prune"])
+ .current_dir(&repo_path)
+ .output()
+ .await?;
+
+ if !output.status.success() {
+ let stderr = String::from_utf8_lossy(&output.stderr);
+ tracing::warn!("Git fetch warning: {}", stderr);
+ // Don't fail on fetch errors, repo might still be usable
+ }
+ } else {
+ // Clone the repository
+ tracing::info!("Cloning repository: {} -> {}", url, repo_path.display());
+ let output = Command::new("git")
+ .args(["clone", "--bare", url])
+ .arg(&repo_path)
+ .output()
+ .await?;
+
+ if !output.status.success() {
+ let stderr = String::from_utf8_lossy(&output.stderr);
+ return Err(WorktreeError::CloneFailed(stderr.to_string()));
+ }
+ }
+
+ Ok(repo_path)
+ }
+
+ /// Create a worktree for a task.
+ ///
+ /// This creates a unique directory with a git worktree checked out to a new branch.
+ pub async fn create_worktree(
+ &self,
+ source_repo: &Path,
+ task_id: Uuid,
+ task_name: &str,
+ base_branch: &str,
+ ) -> Result<WorktreeInfo, WorktreeError> {
+ // Generate unique directory name and branch
+ let dir_name = format!("{}-{}", short_uuid(task_id), sanitize_name(task_name));
+ let worktree_path = self.base_dir.join(&dir_name);
+ // Branch name: makima/{task-name-with-dashes}-{short-id}
+ let branch_name = format!("{}{}-{}", self.branch_prefix, sanitize_name(task_name), short_uuid(task_id));
+
+ // Acquire lock for this worktree path
+ let lock = {
+ let mut locks = WORKTREE_LOCKS.lock().await;
+ locks
+ .entry(worktree_path.to_string_lossy().to_string())
+ .or_insert_with(|| std::sync::Arc::new(tokio::sync::Mutex::new(())))
+ .clone()
+ };
+ let _guard = lock.lock().await;
+
+ // Check if worktree already exists - reuse it if so
+ if worktree_path.exists() {
+ tracing::info!(
+ task_id = %task_id,
+ worktree_path = %worktree_path.display(),
+ "Worktree already exists, reusing"
+ );
+
+ // Verify it's a valid git directory
+ let git_dir = worktree_path.join(".git");
+ if git_dir.exists() {
+ // Get the current branch name
+ let output = Command::new("git")
+ .args(["rev-parse", "--abbrev-ref", "HEAD"])
+ .current_dir(&worktree_path)
+ .output()
+ .await?;
+
+ let current_branch = if output.status.success() {
+ String::from_utf8_lossy(&output.stdout).trim().to_string()
+ } else {
+ branch_name.clone()
+ };
+
+ return Ok(WorktreeInfo {
+ path: worktree_path,
+ branch: current_branch,
+ source_repo: source_repo.to_path_buf(),
+ });
+ } else {
+ // Directory exists but isn't a git worktree - remove and recreate
+ tracing::warn!(
+ task_id = %task_id,
+ worktree_path = %worktree_path.display(),
+ "Directory exists but is not a git worktree, removing"
+ );
+ tokio::fs::remove_dir_all(&worktree_path).await?;
+ }
+ }
+
+ // Create base directory
+ tokio::fs::create_dir_all(&self.base_dir).await?;
+
+ tracing::info!(
+ task_id = %task_id,
+ worktree_path = %worktree_path.display(),
+ branch = %branch_name,
+ base_branch = %base_branch,
+ "Creating worktree from local branch"
+ );
+
+ // Create the worktree with a new branch based on the local base_branch
+ let output = Command::new("git")
+ .args([
+ "worktree",
+ "add",
+ "-b",
+ &branch_name,
+ ])
+ .arg(&worktree_path)
+ .arg(base_branch)
+ .current_dir(source_repo)
+ .output()
+ .await?;
+
+ if !output.status.success() {
+ let stderr = String::from_utf8_lossy(&output.stderr);
+ return Err(WorktreeError::GitCommand(format!(
+ "Failed to create worktree: {}",
+ stderr
+ )));
+ }
+
+ tracing::info!(
+ task_id = %task_id,
+ worktree_path = %worktree_path.display(),
+ "Worktree created successfully"
+ );
+
+ Ok(WorktreeInfo {
+ path: worktree_path,
+ branch: branch_name,
+ source_repo: source_repo.to_path_buf(),
+ })
+ }
+
+ /// Create a worktree for a task by copying from another task's worktree.
+ ///
+ /// This allows sequential subtasks where one continues from another's work,
+ /// including uncommitted changes.
+ pub async fn create_worktree_from_task(
+ &self,
+ source_worktree: &Path,
+ task_id: Uuid,
+ task_name: &str,
+ ) -> Result<WorktreeInfo, WorktreeError> {
+ // Verify source worktree exists
+ if !source_worktree.exists() {
+ return Err(WorktreeError::RepoNotFound(format!(
+ "Source worktree not found: {}",
+ source_worktree.display()
+ )));
+ }
+
+ // Get the source repo from the source worktree
+ let source_repo = self.get_worktree_source(source_worktree).await?;
+
+ // Get the base branch from source worktree's current HEAD
+ let output = Command::new("git")
+ .args(["rev-parse", "HEAD"])
+ .current_dir(source_worktree)
+ .output()
+ .await?;
+
+ if !output.status.success() {
+ return Err(WorktreeError::GitCommand(
+ "Failed to get source worktree HEAD".to_string(),
+ ));
+ }
+ let source_commit = String::from_utf8_lossy(&output.stdout).trim().to_string();
+
+ // Generate unique directory name and branch for new worktree
+ let dir_name = format!("{}-{}", short_uuid(task_id), sanitize_name(task_name));
+ let worktree_path = self.base_dir.join(&dir_name);
+ let branch_name = format!("{}{}", self.branch_prefix, task_id);
+
+ // Acquire lock for this worktree path
+ let lock = {
+ let mut locks = WORKTREE_LOCKS.lock().await;
+ locks
+ .entry(worktree_path.to_string_lossy().to_string())
+ .or_insert_with(|| std::sync::Arc::new(tokio::sync::Mutex::new(())))
+ .clone()
+ };
+ let _guard = lock.lock().await;
+
+ // Remove existing worktree if present
+ if worktree_path.exists() {
+ tracing::info!(
+ task_id = %task_id,
+ worktree_path = %worktree_path.display(),
+ "Removing existing worktree before creating from source"
+ );
+ tokio::fs::remove_dir_all(&worktree_path).await?;
+ }
+
+ // Create base directory
+ tokio::fs::create_dir_all(&self.base_dir).await?;
+
+ tracing::info!(
+ task_id = %task_id,
+ source_worktree = %source_worktree.display(),
+ worktree_path = %worktree_path.display(),
+ branch = %branch_name,
+ source_commit = %source_commit,
+ "Creating worktree from source task"
+ );
+
+ // Create a new worktree based on the source commit
+ let output = Command::new("git")
+ .args([
+ "worktree",
+ "add",
+ "-b",
+ &branch_name,
+ ])
+ .arg(&worktree_path)
+ .arg(&source_commit)
+ .current_dir(&source_repo)
+ .output()
+ .await?;
+
+ if !output.status.success() {
+ let stderr = String::from_utf8_lossy(&output.stderr);
+ return Err(WorktreeError::GitCommand(format!(
+ "Failed to create worktree: {}",
+ stderr
+ )));
+ }
+
+ // Now copy uncommitted changes from source worktree
+ // Use rsync to copy all files except .git
+ let output = Command::new("rsync")
+ .args([
+ "-a",
+ "--exclude", ".git",
+ "--exclude", ".makima",
+ &format!("{}/", source_worktree.display()),
+ &format!("{}/", worktree_path.display()),
+ ])
+ .output()
+ .await?;
+
+ if !output.status.success() {
+ let stderr = String::from_utf8_lossy(&output.stderr);
+ tracing::warn!(
+ task_id = %task_id,
+ "rsync warning (continuing anyway): {}",
+ stderr
+ );
+ }
+
+ tracing::info!(
+ task_id = %task_id,
+ worktree_path = %worktree_path.display(),
+ "Worktree created from source task successfully"
+ );
+
+ Ok(WorktreeInfo {
+ path: worktree_path,
+ branch: branch_name,
+ source_repo: source_repo.to_path_buf(),
+ })
+ }
+
+ /// Remove a worktree and optionally its branch.
+ pub async fn remove_worktree(
+ &self,
+ worktree_path: &Path,
+ delete_branch: bool,
+ ) -> Result<(), WorktreeError> {
+ if !worktree_path.exists() {
+ return Ok(()); // Already gone
+ }
+
+ // Get the branch name before removing
+ let branch_name = if delete_branch {
+ self.get_worktree_branch(worktree_path).await.ok()
+ } else {
+ None
+ };
+
+ // Find the source repo from worktree
+ let source_repo = self.get_worktree_source(worktree_path).await?;
+
+ tracing::info!(
+ worktree_path = %worktree_path.display(),
+ delete_branch = delete_branch,
+ "Removing worktree"
+ );
+
+ // Remove the worktree
+ let output = Command::new("git")
+ .args(["worktree", "remove", "--force"])
+ .arg(worktree_path)
+ .current_dir(&source_repo)
+ .output()
+ .await?;
+
+ if !output.status.success() {
+ let stderr = String::from_utf8_lossy(&output.stderr);
+ // Try force removal of directory if git worktree remove fails
+ if worktree_path.exists() {
+ tokio::fs::remove_dir_all(worktree_path).await?;
+ }
+ tracing::warn!("Git worktree remove warning: {}", stderr);
+ }
+
+ // Prune worktree references
+ let _ = Command::new("git")
+ .args(["worktree", "prune"])
+ .current_dir(&source_repo)
+ .output()
+ .await;
+
+ // Delete the branch if requested
+ if let Some(branch) = branch_name {
+ let output = Command::new("git")
+ .args(["branch", "-D", &branch])
+ .current_dir(&source_repo)
+ .output()
+ .await?;
+
+ if !output.status.success() {
+ let stderr = String::from_utf8_lossy(&output.stderr);
+ tracing::warn!("Failed to delete branch {}: {}", branch, stderr);
+ }
+ }
+
+ Ok(())
+ }
+
+ /// Get the branch name of a worktree.
+ async fn get_worktree_branch(&self, worktree_path: &Path) -> Result<String, WorktreeError> {
+ let output = Command::new("git")
+ .args(["rev-parse", "--abbrev-ref", "HEAD"])
+ .current_dir(worktree_path)
+ .output()
+ .await?;
+
+ if !output.status.success() {
+ let stderr = String::from_utf8_lossy(&output.stderr);
+ return Err(WorktreeError::GitCommand(format!(
+ "Failed to get branch: {}",
+ stderr
+ )));
+ }
+
+ Ok(String::from_utf8_lossy(&output.stdout).trim().to_string())
+ }
+
+ /// Get the source repository path for a worktree.
+ async fn get_worktree_source(&self, worktree_path: &Path) -> Result<PathBuf, WorktreeError> {
+ // Read the .git file in the worktree which contains the path to the main repo
+ let git_file = worktree_path.join(".git");
+
+ if git_file.is_file() {
+ let content = tokio::fs::read_to_string(&git_file).await?;
+ // Format: "gitdir: /path/to/repo/.git/worktrees/name"
+ if let Some(gitdir) = content.strip_prefix("gitdir: ") {
+ let gitdir = gitdir.trim();
+ // Navigate from worktrees/name back to the main repo
+ let path = PathBuf::from(gitdir);
+ if let Some(worktrees_dir) = path.parent() {
+ if let Some(git_dir) = worktrees_dir.parent() {
+ if let Some(repo_dir) = git_dir.parent() {
+ return Ok(repo_dir.to_path_buf());
+ }
+ }
+ }
+ }
+ }
+
+ // Fallback: try to find it in our repos directory
+ Err(WorktreeError::InvalidPath(format!(
+ "Could not determine source repo for worktree: {}",
+ worktree_path.display()
+ )))
+ }
+
+ /// List all worktrees in the base directory.
+ pub async fn list_worktrees(&self) -> Result<Vec<PathBuf>, WorktreeError> {
+ let mut worktrees = Vec::new();
+
+ if !self.base_dir.exists() {
+ return Ok(worktrees);
+ }
+
+ let mut entries = tokio::fs::read_dir(&self.base_dir).await?;
+ while let Some(entry) = entries.next_entry().await? {
+ let path = entry.path();
+ if path.is_dir() && path.join(".git").exists() {
+ worktrees.push(path);
+ }
+ }
+
+ Ok(worktrees)
+ }
+
+ /// Initialize a new git repository for a task.
+ ///
+ /// This creates a fresh git repo (not a worktree) for tasks that don't need
+ /// an existing codebase. Use this when `repository_url` is `new://` or `new://project-name`.
+ pub async fn init_new_repo(
+ &self,
+ task_id: Uuid,
+ repo_source: &str,
+ ) -> Result<WorktreeInfo, WorktreeError> {
+ let project_name = extract_new_repo_name(repo_source);
+ let dir_name = match project_name {
+ Some(name) => format!("{}-{}", short_uuid(task_id), sanitize_name(name)),
+ None => format!("{}-new", short_uuid(task_id)),
+ };
+ let repo_path = self.repos_dir.join(&dir_name);
+
+ tracing::info!(
+ task_id = %task_id,
+ path = %repo_path.display(),
+ project_name = ?project_name,
+ "Initializing new git repository"
+ );
+
+ // Create directory
+ tokio::fs::create_dir_all(&repo_path).await?;
+
+ // git init
+ let output = Command::new("git")
+ .args(["init"])
+ .current_dir(&repo_path)
+ .output()
+ .await?;
+
+ if !output.status.success() {
+ let stderr = String::from_utf8_lossy(&output.stderr);
+ return Err(WorktreeError::GitCommand(format!(
+ "Failed to init repository: {}",
+ stderr
+ )));
+ }
+
+ // Configure git user (needed for commits)
+ let _ = Command::new("git")
+ .args(["config", "user.email", "makima@localhost"])
+ .current_dir(&repo_path)
+ .output()
+ .await;
+ let _ = Command::new("git")
+ .args(["config", "user.name", "Makima"])
+ .current_dir(&repo_path)
+ .output()
+ .await;
+
+ // Initial commit (required for worktrees to work later if needed)
+ let output = Command::new("git")
+ .args(["commit", "--allow-empty", "-m", "Initial commit"])
+ .current_dir(&repo_path)
+ .output()
+ .await?;
+
+ if !output.status.success() {
+ let stderr = String::from_utf8_lossy(&output.stderr);
+ return Err(WorktreeError::GitCommand(format!(
+ "Failed to create initial commit: {}",
+ stderr
+ )));
+ }
+
+ tracing::info!(
+ task_id = %task_id,
+ path = %repo_path.display(),
+ "New git repository initialized"
+ );
+
+ Ok(WorktreeInfo {
+ path: repo_path.clone(),
+ branch: "main".to_string(),
+ source_repo: repo_path,
+ })
+ }
+
+ // ========== Merge Operations ==========
+
+ /// List all task branches in a repository.
+ ///
+ /// Returns branches matching the pattern `makima/task-*`.
+ pub async fn list_task_branches(
+ &self,
+ repo_path: &Path,
+ ) -> Result<Vec<TaskBranchInfo>, WorktreeError> {
+ // Get all branches matching our prefix
+ let output = Command::new("git")
+ .args([
+ "branch",
+ "--list",
+ &format!("{}*", self.branch_prefix),
+ "--format=%(refname:short)|%(objectname:short)|%(subject)",
+ ])
+ .current_dir(repo_path)
+ .output()
+ .await?;
+
+ if !output.status.success() {
+ let stderr = String::from_utf8_lossy(&output.stderr);
+ return Err(WorktreeError::GitCommand(format!(
+ "Failed to list branches: {}",
+ stderr
+ )));
+ }
+
+ // Get list of merged branches
+ let merged_output = Command::new("git")
+ .args(["branch", "--merged", "HEAD", "--format=%(refname:short)"])
+ .current_dir(repo_path)
+ .output()
+ .await?;
+
+ let merged_branches: std::collections::HashSet<String> = if merged_output.status.success() {
+ String::from_utf8_lossy(&merged_output.stdout)
+ .lines()
+ .map(|s| s.trim().to_string())
+ .collect()
+ } else {
+ std::collections::HashSet::new()
+ };
+
+ let stdout = String::from_utf8_lossy(&output.stdout);
+ let mut branches = Vec::new();
+
+ for line in stdout.lines() {
+ let parts: Vec<&str> = line.split('|').collect();
+ if parts.len() >= 3 {
+ let name = parts[0].trim().to_string();
+ let last_commit = parts[1].trim().to_string();
+ let last_commit_message = parts[2].trim().to_string();
+
+ // Try to extract task ID from branch name
+ let task_id = name
+ .strip_prefix(&self.branch_prefix)
+ .and_then(|s| Uuid::parse_str(s).ok());
+
+ let is_merged = merged_branches.contains(&name);
+
+ branches.push(TaskBranchInfo {
+ name,
+ task_id,
+ is_merged,
+ last_commit,
+ last_commit_message,
+ });
+ }
+ }
+
+ Ok(branches)
+ }
+
+ /// Start a merge of a branch into the current worktree.
+ ///
+ /// Uses `--no-commit` to allow conflict resolution before committing.
+ /// Returns Ok(None) if merge succeeds without conflicts, or Ok(Some(files))
+ /// with the list of conflicted files.
+ pub async fn merge_branch(
+ &self,
+ worktree_path: &Path,
+ source_branch: &str,
+ ) -> Result<Option<Vec<String>>, WorktreeError> {
+ // Check if there's already a merge in progress
+ if self.is_merge_in_progress(worktree_path).await? {
+ return Err(WorktreeError::MergeInProgress);
+ }
+
+ tracing::info!(
+ worktree = %worktree_path.display(),
+ source_branch = %source_branch,
+ "Starting merge"
+ );
+
+ // Attempt the merge with --no-commit --no-ff
+ let output = Command::new("git")
+ .args(["merge", "--no-commit", "--no-ff", source_branch])
+ .current_dir(worktree_path)
+ .output()
+ .await?;
+
+ if output.status.success() {
+ tracing::info!("Merge completed without conflicts");
+ return Ok(None);
+ }
+
+ // Check if there are conflicts
+ let conflicts = self.get_conflicted_files(worktree_path).await?;
+ if !conflicts.is_empty() {
+ tracing::info!(
+ conflicts = ?conflicts,
+ "Merge has conflicts"
+ );
+ return Ok(Some(conflicts));
+ }
+
+ // Other error
+ let stderr = String::from_utf8_lossy(&output.stderr);
+ Err(WorktreeError::GitCommand(format!(
+ "Merge failed: {}",
+ stderr
+ )))
+ }
+
+ /// Check if a merge is currently in progress.
+ pub async fn is_merge_in_progress(&self, worktree_path: &Path) -> Result<bool, WorktreeError> {
+ // Check for MERGE_HEAD file
+ let merge_head = worktree_path.join(".git").join("MERGE_HEAD");
+ if merge_head.exists() {
+ return Ok(true);
+ }
+
+ // Also check in .git file (for worktrees)
+ let git_file = worktree_path.join(".git");
+ if git_file.is_file() {
+ if let Ok(content) = tokio::fs::read_to_string(&git_file).await {
+ if let Some(gitdir) = content.strip_prefix("gitdir: ") {
+ let gitdir = PathBuf::from(gitdir.trim());
+ let merge_head = gitdir.join("MERGE_HEAD");
+ if merge_head.exists() {
+ return Ok(true);
+ }
+ }
+ }
+ }
+
+ Ok(false)
+ }
+
+ /// Get the list of files with unresolved conflicts.
+ pub async fn get_conflicted_files(
+ &self,
+ worktree_path: &Path,
+ ) -> Result<Vec<String>, WorktreeError> {
+ let output = Command::new("git")
+ .args(["diff", "--name-only", "--diff-filter=U"])
+ .current_dir(worktree_path)
+ .output()
+ .await?;
+
+ if !output.status.success() {
+ // No conflicts or not in merge state
+ return Ok(Vec::new());
+ }
+
+ let stdout = String::from_utf8_lossy(&output.stdout);
+ let files: Vec<String> = stdout
+ .lines()
+ .map(|s| s.trim().to_string())
+ .filter(|s| !s.is_empty())
+ .collect();
+
+ Ok(files)
+ }
+
+ /// Get the current merge state.
+ pub async fn get_merge_state(
+ &self,
+ worktree_path: &Path,
+ ) -> Result<MergeState, WorktreeError> {
+ let in_progress = self.is_merge_in_progress(worktree_path).await?;
+
+ if !in_progress {
+ return Ok(MergeState {
+ source_branch: String::new(),
+ conflicted_files: Vec::new(),
+ in_progress: false,
+ });
+ }
+
+ // Get the branch being merged from MERGE_HEAD
+ let source_branch = self.get_merge_source_branch(worktree_path).await?;
+ let conflicted_files = self.get_conflicted_files(worktree_path).await?;
+
+ Ok(MergeState {
+ source_branch,
+ conflicted_files,
+ in_progress: true,
+ })
+ }
+
+ /// Get the branch name being merged (from MERGE_HEAD).
+ async fn get_merge_source_branch(&self, worktree_path: &Path) -> Result<String, WorktreeError> {
+ // Get MERGE_HEAD commit
+ let output = Command::new("git")
+ .args(["rev-parse", "MERGE_HEAD"])
+ .current_dir(worktree_path)
+ .output()
+ .await?;
+
+ if !output.status.success() {
+ return Ok("unknown".to_string());
+ }
+
+ let commit = String::from_utf8_lossy(&output.stdout).trim().to_string();
+
+ // Try to find branch name for this commit
+ let output = Command::new("git")
+ .args(["name-rev", "--name-only", &commit])
+ .current_dir(worktree_path)
+ .output()
+ .await?;
+
+ if output.status.success() {
+ let name = String::from_utf8_lossy(&output.stdout).trim().to_string();
+ // Clean up the name (remove ~N suffixes, etc.)
+ let name = name.split('~').next().unwrap_or(&name);
+ let name = name.split('^').next().unwrap_or(name);
+ return Ok(name.to_string());
+ }
+
+ Ok(commit[..8.min(commit.len())].to_string())
+ }
+
+ /// Resolve a conflict in a specific file.
+ pub async fn resolve_conflict(
+ &self,
+ worktree_path: &Path,
+ file_path: &str,
+ resolution: ConflictResolution,
+ ) -> Result<(), WorktreeError> {
+ if !self.is_merge_in_progress(worktree_path).await? {
+ return Err(WorktreeError::NoMergeInProgress);
+ }
+
+ let strategy = match resolution {
+ ConflictResolution::Ours => "--ours",
+ ConflictResolution::Theirs => "--theirs",
+ };
+
+ tracing::info!(
+ worktree = %worktree_path.display(),
+ file = %file_path,
+ strategy = %strategy,
+ "Resolving conflict"
+ );
+
+ // Checkout the chosen version
+ let output = Command::new("git")
+ .args(["checkout", strategy, "--", file_path])
+ .current_dir(worktree_path)
+ .output()
+ .await?;
+
+ if !output.status.success() {
+ let stderr = String::from_utf8_lossy(&output.stderr);
+ return Err(WorktreeError::GitCommand(format!(
+ "Failed to resolve conflict: {}",
+ stderr
+ )));
+ }
+
+ // Stage the resolved file
+ let output = Command::new("git")
+ .args(["add", file_path])
+ .current_dir(worktree_path)
+ .output()
+ .await?;
+
+ if !output.status.success() {
+ let stderr = String::from_utf8_lossy(&output.stderr);
+ return Err(WorktreeError::GitCommand(format!(
+ "Failed to stage resolved file: {}",
+ stderr
+ )));
+ }
+
+ Ok(())
+ }
+
+ /// Abort the current merge.
+ pub async fn abort_merge(&self, worktree_path: &Path) -> Result<(), WorktreeError> {
+ if !self.is_merge_in_progress(worktree_path).await? {
+ return Err(WorktreeError::NoMergeInProgress);
+ }
+
+ tracing::info!(
+ worktree = %worktree_path.display(),
+ "Aborting merge"
+ );
+
+ let output = Command::new("git")
+ .args(["merge", "--abort"])
+ .current_dir(worktree_path)
+ .output()
+ .await?;
+
+ if !output.status.success() {
+ let stderr = String::from_utf8_lossy(&output.stderr);
+ return Err(WorktreeError::GitCommand(format!(
+ "Failed to abort merge: {}",
+ stderr
+ )));
+ }
+
+ Ok(())
+ }
+
+ /// Commit the current merge.
+ pub async fn commit_merge(
+ &self,
+ worktree_path: &Path,
+ message: &str,
+ ) -> Result<String, WorktreeError> {
+ // Check for remaining conflicts
+ let conflicts = self.get_conflicted_files(worktree_path).await?;
+ if !conflicts.is_empty() {
+ return Err(WorktreeError::MergeConflicts(conflicts.join(", ")));
+ }
+
+ tracing::info!(
+ worktree = %worktree_path.display(),
+ message = %message,
+ "Committing merge"
+ );
+
+ let output = Command::new("git")
+ .args(["commit", "-m", message])
+ .current_dir(worktree_path)
+ .output()
+ .await?;
+
+ if !output.status.success() {
+ let stderr = String::from_utf8_lossy(&output.stderr);
+ return Err(WorktreeError::GitCommand(format!(
+ "Failed to commit merge: {}",
+ stderr
+ )));
+ }
+
+ // Get the new commit SHA
+ let output = Command::new("git")
+ .args(["rev-parse", "HEAD"])
+ .current_dir(worktree_path)
+ .output()
+ .await?;
+
+ if output.status.success() {
+ let sha = String::from_utf8_lossy(&output.stdout).trim().to_string();
+ return Ok(sha);
+ }
+
+ Ok("unknown".to_string())
+ }
+
+ // ========== Completion Action Operations ==========
+
+ /// Push task branch from worktree to an external target repository.
+ ///
+ /// This stages and commits any uncommitted changes, then pushes to the target repo.
+ pub async fn push_to_target_repo(
+ &self,
+ worktree_path: &Path,
+ target_repo: &Path,
+ branch_name: &str,
+ task_name: &str,
+ ) -> Result<(), WorktreeError> {
+ tracing::info!(
+ worktree = %worktree_path.display(),
+ target_repo = %target_repo.display(),
+ branch = %branch_name,
+ "Pushing branch to target repository"
+ );
+
+ // First, stage all changes (including new files)
+ let output = Command::new("git")
+ .args(["add", "-A"])
+ .current_dir(worktree_path)
+ .output()
+ .await?;
+
+ if !output.status.success() {
+ let stderr = String::from_utf8_lossy(&output.stderr);
+ return Err(WorktreeError::GitCommand(format!(
+ "Failed to stage changes: {}",
+ stderr
+ )));
+ }
+
+ // Check if there are staged changes to commit
+ let output = Command::new("git")
+ .args(["diff", "--cached", "--quiet"])
+ .current_dir(worktree_path)
+ .output()
+ .await?;
+
+ // Exit code 1 means there are staged changes
+ if !output.status.success() {
+ tracing::info!("Committing staged changes before push");
+
+ let commit_message = format!("feat: {}", task_name);
+ let output = Command::new("git")
+ .args(["commit", "-m", &commit_message])
+ .current_dir(worktree_path)
+ .output()
+ .await?;
+
+ if !output.status.success() {
+ let stderr = String::from_utf8_lossy(&output.stderr);
+ return Err(WorktreeError::GitCommand(format!(
+ "Failed to commit changes: {}",
+ stderr
+ )));
+ }
+ }
+
+ // Ensure there are commits to push
+ let output = Command::new("git")
+ .args(["log", "--oneline", "-1"])
+ .current_dir(worktree_path)
+ .output()
+ .await?;
+
+ if !output.status.success() {
+ return Err(WorktreeError::GitCommand(
+ "No commits in worktree".to_string(),
+ ));
+ }
+
+ // Add target repo as a remote in the worktree (if not already)
+ let remote_name = "target";
+ let target_path_str = target_repo.to_string_lossy();
+
+ // Remove existing remote if any (ignore errors)
+ let _ = Command::new("git")
+ .args(["remote", "remove", remote_name])
+ .current_dir(worktree_path)
+ .output()
+ .await;
+
+ // Add the target as a remote
+ let output = Command::new("git")
+ .args(["remote", "add", remote_name, &target_path_str])
+ .current_dir(worktree_path)
+ .output()
+ .await?;
+
+ if !output.status.success() {
+ let stderr = String::from_utf8_lossy(&output.stderr);
+ return Err(WorktreeError::GitCommand(format!(
+ "Failed to add remote: {}",
+ stderr
+ )));
+ }
+
+ // Push the branch to the target
+ let output = Command::new("git")
+ .args(["push", "-u", remote_name, &format!("HEAD:{}", branch_name)])
+ .current_dir(worktree_path)
+ .output()
+ .await?;
+
+ if !output.status.success() {
+ let stderr = String::from_utf8_lossy(&output.stderr);
+ return Err(WorktreeError::GitCommand(format!(
+ "Failed to push to target: {}",
+ stderr
+ )));
+ }
+
+ tracing::info!(
+ branch = %branch_name,
+ target_repo = %target_repo.display(),
+ "Branch pushed successfully"
+ );
+
+ // Detach HEAD in the worktree to release the branch
+ // This allows the branch to be checked out in the target repo
+ let output = Command::new("git")
+ .args(["checkout", "--detach", "HEAD"])
+ .current_dir(worktree_path)
+ .output()
+ .await?;
+
+ if !output.status.success() {
+ // Non-fatal: log but don't fail the push
+ let stderr = String::from_utf8_lossy(&output.stderr);
+ tracing::warn!(
+ "Failed to detach HEAD in worktree (branch may not be checkable in target): {}",
+ stderr
+ );
+ } else {
+ tracing::info!("Detached HEAD in worktree to release branch");
+ }
+
+ Ok(())
+ }
+
+ /// Merge a branch into the target branch in the target repository.
+ ///
+ /// This pushes the branch first (if needed), then performs a merge in the target repo.
+ pub async fn merge_to_target(
+ &self,
+ worktree_path: &Path,
+ target_repo: &Path,
+ source_branch: &str,
+ target_branch: &str,
+ task_name: &str,
+ ) -> Result<String, WorktreeError> {
+ tracing::info!(
+ worktree = %worktree_path.display(),
+ target_repo = %target_repo.display(),
+ source_branch = %source_branch,
+ target_branch = %target_branch,
+ "Merging branch to target"
+ );
+
+ // First, push the branch to target repo
+ self.push_to_target_repo(worktree_path, target_repo, source_branch, task_name)
+ .await?;
+
+ // In target repo, checkout the target branch
+ let output = Command::new("git")
+ .args(["checkout", target_branch])
+ .current_dir(target_repo)
+ .output()
+ .await?;
+
+ if !output.status.success() {
+ let stderr = String::from_utf8_lossy(&output.stderr);
+ return Err(WorktreeError::GitCommand(format!(
+ "Failed to checkout target branch: {}",
+ stderr
+ )));
+ }
+
+ // Pull latest changes first
+ let _ = Command::new("git")
+ .args(["pull", "--ff-only"])
+ .current_dir(target_repo)
+ .output()
+ .await;
+
+ // Merge the source branch
+ let merge_message = format!("feat: {}", task_name);
+ let output = Command::new("git")
+ .args(["merge", "--no-ff", source_branch, "-m", &merge_message])
+ .current_dir(target_repo)
+ .output()
+ .await?;
+
+ if !output.status.success() {
+ let stderr = String::from_utf8_lossy(&output.stderr);
+
+ // Check if it's a conflict
+ let conflicts = self.get_conflicted_files(target_repo).await?;
+ if !conflicts.is_empty() {
+ // Abort the merge
+ let _ = Command::new("git")
+ .args(["merge", "--abort"])
+ .current_dir(target_repo)
+ .output()
+ .await;
+
+ return Err(WorktreeError::MergeConflicts(format!(
+ "Merge conflicts in: {}. Consider creating a PR instead.",
+ conflicts.join(", ")
+ )));
+ }
+
+ return Err(WorktreeError::GitCommand(format!(
+ "Failed to merge: {}",
+ stderr
+ )));
+ }
+
+ // Get the merge commit SHA
+ let output = Command::new("git")
+ .args(["rev-parse", "HEAD"])
+ .current_dir(target_repo)
+ .output()
+ .await?;
+
+ let commit_sha = if output.status.success() {
+ String::from_utf8_lossy(&output.stdout).trim().to_string()
+ } else {
+ "unknown".to_string()
+ };
+
+ tracing::info!(
+ commit_sha = %commit_sha,
+ "Merge completed successfully"
+ );
+
+ Ok(commit_sha)
+ }
+
+ /// Create a GitHub pull request using the gh CLI.
+ ///
+ /// This pushes the branch first, then creates a PR.
+ pub async fn create_pull_request(
+ &self,
+ worktree_path: &Path,
+ target_repo: &Path,
+ source_branch: &str,
+ target_branch: &str,
+ title: &str,
+ body: &str,
+ ) -> Result<String, WorktreeError> {
+ tracing::info!(
+ worktree = %worktree_path.display(),
+ target_repo = %target_repo.display(),
+ source_branch = %source_branch,
+ target_branch = %target_branch,
+ title = %title,
+ "Creating pull request"
+ );
+
+ // First, push the branch to the target repo's remote
+ // For PRs, we need to push to origin (the GitHub remote)
+
+ // Get the worktree's current branch
+ let output = Command::new("git")
+ .args(["rev-parse", "--abbrev-ref", "HEAD"])
+ .current_dir(worktree_path)
+ .output()
+ .await?;
+
+ let current_branch = if output.status.success() {
+ String::from_utf8_lossy(&output.stdout).trim().to_string()
+ } else {
+ source_branch.to_string()
+ };
+
+ // Push to the target repo's origin
+ // First, check if target_repo has an origin remote
+ let output = Command::new("git")
+ .args(["remote", "get-url", "origin"])
+ .current_dir(target_repo)
+ .output()
+ .await?;
+
+ if !output.status.success() {
+ return Err(WorktreeError::GitCommand(
+ "Target repository has no origin remote configured".to_string(),
+ ));
+ }
+
+ let origin_url = String::from_utf8_lossy(&output.stdout).trim().to_string();
+
+ // Push the branch from worktree to the remote
+ // First add the remote to worktree
+ let _ = Command::new("git")
+ .args(["remote", "remove", "pr-origin"])
+ .current_dir(worktree_path)
+ .output()
+ .await;
+
+ let output = Command::new("git")
+ .args(["remote", "add", "pr-origin", &origin_url])
+ .current_dir(worktree_path)
+ .output()
+ .await?;
+
+ if !output.status.success() {
+ let stderr = String::from_utf8_lossy(&output.stderr);
+ return Err(WorktreeError::GitCommand(format!(
+ "Failed to add remote: {}",
+ stderr
+ )));
+ }
+
+ // Push to the remote
+ let output = Command::new("git")
+ .args(["push", "-u", "pr-origin", &format!("{}:{}", current_branch, source_branch)])
+ .current_dir(worktree_path)
+ .output()
+ .await?;
+
+ if !output.status.success() {
+ let stderr = String::from_utf8_lossy(&output.stderr);
+ return Err(WorktreeError::GitCommand(format!(
+ "Failed to push branch: {}",
+ stderr
+ )));
+ }
+
+ // Create PR using gh CLI in the target repo
+ let output = Command::new("gh")
+ .args([
+ "pr",
+ "create",
+ "--title", title,
+ "--body", body,
+ "--head", source_branch,
+ "--base", target_branch,
+ ])
+ .current_dir(target_repo)
+ .output()
+ .await?;
+
+ if !output.status.success() {
+ let stderr = String::from_utf8_lossy(&output.stderr);
+ return Err(WorktreeError::GitCommand(format!(
+ "Failed to create PR: {}",
+ stderr
+ )));
+ }
+
+ // The gh CLI outputs the PR URL
+ let pr_url = String::from_utf8_lossy(&output.stdout).trim().to_string();
+
+ tracing::info!(
+ pr_url = %pr_url,
+ "Pull request created successfully"
+ );
+
+ Ok(pr_url)
+ }
+
+ /// Clone/copy the worktree contents to a target directory.
+ ///
+ /// This creates a new git repository at the target path with the same contents
+ /// as the worktree. Returns (success, message).
+ pub async fn clone_worktree_to_directory(
+ &self,
+ worktree_path: &Path,
+ target_dir: &Path,
+ ) -> Result<String, WorktreeError> {
+ tracing::info!(
+ worktree = %worktree_path.display(),
+ target = %target_dir.display(),
+ "Cloning worktree to target directory"
+ );
+
+ // Check if target directory already exists
+ if target_dir.exists() {
+ return Err(WorktreeError::AlreadyExists(format!(
+ "Target directory already exists: {}",
+ target_dir.display()
+ )));
+ }
+
+ // Get parent directory to ensure it exists
+ if let Some(parent) = target_dir.parent() {
+ if !parent.exists() {
+ tokio::fs::create_dir_all(parent).await?;
+ }
+ }
+
+ // Use git clone --local to efficiently copy the repository
+ // This is more efficient than cp -r for git repos
+ let output = Command::new("git")
+ .args([
+ "clone",
+ "--local",
+ "--no-hardlinks",
+ &worktree_path.to_string_lossy(),
+ &target_dir.to_string_lossy(),
+ ])
+ .output()
+ .await?;
+
+ if !output.status.success() {
+ let stderr = String::from_utf8_lossy(&output.stderr);
+ return Err(WorktreeError::CloneFailed(format!(
+ "Failed to clone worktree: {}",
+ stderr
+ )));
+ }
+
+ // Remove the 'origin' remote that points back to the worktree
+ let _ = Command::new("git")
+ .args(["remote", "remove", "origin"])
+ .current_dir(target_dir)
+ .output()
+ .await;
+
+ tracing::info!(
+ target = %target_dir.display(),
+ "Worktree cloned successfully"
+ );
+
+ Ok(format!("Cloned to {}", target_dir.display()))
+ }
+
+ /// Check if a target directory exists.
+ pub async fn target_directory_exists(&self, target_dir: &Path) -> bool {
+ target_dir.exists()
+ }
+}
+
+/// Check if repo_source is a "new repo" request.
+///
+/// Accepts `new://` or `new://project-name` to create a fresh git repository.
+pub fn is_new_repo_request(source: &str) -> bool {
+ source == "new" || source == "new://" || source.starts_with("new://")
+}
+
+/// Extract optional project name from new:// URL.
+fn extract_new_repo_name(source: &str) -> Option<&str> {
+ source.strip_prefix("new://").filter(|s| !s.is_empty())
+}
+
+/// Extract repository name from URL.
+fn extract_repo_name(url: &str) -> String {
+ // Handle various URL formats:
+ // https://github.com/user/repo.git -> repo
+ // git@github.com:user/repo.git -> repo
+ // https://github.com/user/repo -> repo
+
+ let url = url.trim_end_matches('/');
+ let url = url.trim_end_matches(".git");
+
+ url.rsplit('/')
+ .next()
+ .or_else(|| url.rsplit(':').next())
+ .unwrap_or("repo")
+ .to_string()
+}
+
+/// Create a short UUID string for directory naming.
+pub fn short_uuid(id: Uuid) -> String {
+ id.to_string()[..8].to_string()
+}
+
+/// Expand tilde (~) in path to home directory.
+pub fn expand_tilde(path: &str) -> PathBuf {
+ if let Some(rest) = path.strip_prefix("~/") {
+ if let Some(home) = dirs::home_dir() {
+ return home.join(rest);
+ }
+ } else if path == "~" {
+ if let Some(home) = dirs::home_dir() {
+ return home;
+ }
+ }
+ PathBuf::from(path)
+}
+
+/// Sanitize a name for use in directory/branch names.
+pub fn sanitize_name(name: &str) -> String {
+ name.chars()
+ .map(|c| {
+ if c.is_alphanumeric() || c == '-' || c == '_' {
+ c.to_ascii_lowercase()
+ } else {
+ '-'
+ }
+ })
+ .collect::<String>()
+ .chars()
+ .take(50) // Limit length
+ .collect()
+}
+
+#[cfg(test)]
+mod tests {
+ use crate::daemon::*;
+
+ #[test]
+ fn test_extract_repo_name() {
+ assert_eq!(
+ extract_repo_name("https://github.com/user/repo.git"),
+ "repo"
+ );
+ assert_eq!(
+ extract_repo_name("https://github.com/user/repo"),
+ "repo"
+ );
+ assert_eq!(
+ extract_repo_name("git@github.com:user/repo.git"),
+ "repo"
+ );
+ }
+
+ #[test]
+ fn test_sanitize_name() {
+ assert_eq!(sanitize_name("Hello World!"), "hello-world-");
+ assert_eq!(sanitize_name("test_name-123"), "test_name-123");
+ assert_eq!(sanitize_name("A".repeat(100).as_str()).len(), 50);
+ }
+
+ #[test]
+ fn test_short_uuid() {
+ let id = Uuid::parse_str("550e8400-e29b-41d4-a716-446655440000").unwrap();
+ assert_eq!(short_uuid(id), "550e8400");
+ }
+}
diff --git a/makima/src/daemon/worktree/mod.rs b/makima/src/daemon/worktree/mod.rs
new file mode 100644
index 0000000..eb9f031
--- /dev/null
+++ b/makima/src/daemon/worktree/mod.rs
@@ -0,0 +1,11 @@
+//! Git worktree management for task isolation.
+//!
+//! Each task gets a unique git worktree with its own branch,
+//! providing isolation without the overhead of Docker containers.
+
+mod manager;
+
+pub use manager::{
+ expand_tilde, is_new_repo_request, sanitize_name, short_uuid, ConflictResolution, MergeState,
+ TaskBranchInfo, WorktreeError, WorktreeInfo, WorktreeManager,
+};
diff --git a/makima/src/daemon/ws/client.rs b/makima/src/daemon/ws/client.rs
new file mode 100644
index 0000000..67594a2
--- /dev/null
+++ b/makima/src/daemon/ws/client.rs
@@ -0,0 +1,290 @@
+//! WebSocket client for connecting to the makima server.
+
+use std::sync::Arc;
+use std::time::Duration;
+
+use backoff::backoff::Backoff;
+use backoff::ExponentialBackoff;
+use futures::{SinkExt, StreamExt};
+use tokio::sync::{mpsc, RwLock};
+use tokio_tungstenite::{connect_async, tungstenite::{client::IntoClientRequest, Message}};
+use uuid::Uuid;
+
+use super::protocol::{DaemonCommand, DaemonMessage};
+use crate::daemon::config::ServerConfig;
+use crate::daemon::error::{DaemonError, Result};
+
+/// WebSocket client state.
+#[derive(Debug, Clone, Copy, PartialEq, Eq)]
+pub enum ConnectionState {
+ /// Not connected to server.
+ Disconnected,
+ /// Currently connecting.
+ Connecting,
+ /// Connected and authenticated.
+ Connected,
+ /// Connection failed, will retry.
+ Reconnecting,
+ /// Permanently failed (e.g., auth failure).
+ Failed,
+}
+
+/// WebSocket client for daemon-server communication.
+pub struct WsClient {
+ config: ServerConfig,
+ machine_id: String,
+ hostname: String,
+ max_concurrent_tasks: i32,
+ state: Arc<RwLock<ConnectionState>>,
+ daemon_id: Arc<RwLock<Option<Uuid>>>,
+ /// Channel to receive messages to send to server.
+ outgoing_rx: mpsc::Receiver<DaemonMessage>,
+ /// Sender for outgoing messages (clone this to send messages).
+ outgoing_tx: mpsc::Sender<DaemonMessage>,
+ /// Channel to send received commands to the task manager.
+ incoming_tx: mpsc::Sender<DaemonCommand>,
+}
+
+impl WsClient {
+ /// Create a new WebSocket client.
+ pub fn new(
+ config: ServerConfig,
+ machine_id: String,
+ hostname: String,
+ max_concurrent_tasks: i32,
+ incoming_tx: mpsc::Sender<DaemonCommand>,
+ ) -> Self {
+ let (outgoing_tx, outgoing_rx) = mpsc::channel(256);
+
+ Self {
+ config,
+ machine_id,
+ hostname,
+ max_concurrent_tasks,
+ state: Arc::new(RwLock::new(ConnectionState::Disconnected)),
+ daemon_id: Arc::new(RwLock::new(None)),
+ outgoing_rx,
+ outgoing_tx,
+ incoming_tx,
+ }
+ }
+
+ /// Get a sender for outgoing messages.
+ pub fn sender(&self) -> mpsc::Sender<DaemonMessage> {
+ self.outgoing_tx.clone()
+ }
+
+ /// Get current connection state.
+ pub async fn state(&self) -> ConnectionState {
+ *self.state.read().await
+ }
+
+ /// Get daemon ID if authenticated.
+ pub async fn daemon_id(&self) -> Option<Uuid> {
+ *self.daemon_id.read().await
+ }
+
+ /// Run the WebSocket client with automatic reconnection.
+ pub async fn run(&mut self) -> Result<()> {
+ let mut backoff = ExponentialBackoff {
+ initial_interval: Duration::from_secs(self.config.reconnect_interval_secs),
+ max_interval: Duration::from_secs(60),
+ max_elapsed_time: if self.config.max_reconnect_attempts > 0 {
+ Some(Duration::from_secs(
+ self.config.reconnect_interval_secs * self.config.max_reconnect_attempts as u64 * 10,
+ ))
+ } else {
+ None // Infinite retries
+ },
+ ..Default::default()
+ };
+
+ loop {
+ *self.state.write().await = ConnectionState::Connecting;
+ tracing::info!("Connecting to server: {}", self.config.url);
+
+ match self.connect_and_run().await {
+ Ok(()) => {
+ // Clean shutdown
+ tracing::info!("WebSocket connection closed cleanly");
+ break;
+ }
+ Err(DaemonError::AuthFailed(msg)) => {
+ tracing::error!("Authentication failed: {}", msg);
+ *self.state.write().await = ConnectionState::Failed;
+ return Err(DaemonError::AuthFailed(msg));
+ }
+ Err(e) => {
+ tracing::warn!("Connection error: {}", e);
+ *self.state.write().await = ConnectionState::Reconnecting;
+
+ if let Some(delay) = backoff.next_backoff() {
+ tracing::info!("Reconnecting in {:?}...", delay);
+ tokio::time::sleep(delay).await;
+ } else {
+ tracing::error!("Max reconnection attempts reached");
+ *self.state.write().await = ConnectionState::Failed;
+ return Err(DaemonError::ConnectionLost);
+ }
+ }
+ }
+ }
+
+ Ok(())
+ }
+
+ /// Connect to server and run the message loop.
+ async fn connect_and_run(&mut self) -> Result<()> {
+ // Build WebSocket URL
+ let ws_url = format!("{}/api/v1/mesh/daemons/connect", self.config.url);
+ tracing::debug!("Connecting to WebSocket: {}", ws_url);
+
+ // Build request with API key header
+ let mut request = ws_url.into_client_request()?;
+ request.headers_mut().insert(
+ "x-makima-api-key",
+ self.config.api_key.parse().map_err(|_| {
+ DaemonError::AuthFailed("Invalid API key format".into())
+ })?,
+ );
+
+ // Connect with API key in headers
+ let (ws_stream, _response) = connect_async(request).await?;
+ let (mut write, mut read) = ws_stream.split();
+
+ // Send daemon info after connection (server authenticated us via header)
+ let info_msg = DaemonMessage::authenticate(
+ &self.config.api_key,
+ &self.machine_id,
+ &self.hostname,
+ self.max_concurrent_tasks,
+ );
+ let info_json = serde_json::to_string(&info_msg)?;
+ write.send(Message::Text(info_json)).await?;
+
+ // Wait for authentication response
+ let auth_response = read
+ .next()
+ .await
+ .ok_or(DaemonError::ConnectionLost)??;
+
+ let auth_text = match auth_response {
+ Message::Text(text) => text,
+ Message::Close(_) => return Err(DaemonError::ConnectionLost),
+ _ => return Err(DaemonError::AuthFailed("Unexpected response type".into())),
+ };
+
+ let command: DaemonCommand = serde_json::from_str(&auth_text)?;
+ match command {
+ DaemonCommand::Authenticated { daemon_id } => {
+ tracing::info!("Authenticated with daemon ID: {}", daemon_id);
+ *self.daemon_id.write().await = Some(daemon_id);
+ *self.state.write().await = ConnectionState::Connected;
+
+ // Send daemon directories info to server
+ let working_directory = std::env::current_dir()
+ .map(|p| p.to_string_lossy().to_string())
+ .unwrap_or_else(|_| ".".to_string());
+ let home_directory = dirs::home_dir()
+ .map(|h| h.join(".makima").join("home"))
+ .unwrap_or_else(|| std::path::PathBuf::from("~/.makima/home"));
+ // Create home directory if it doesn't exist
+ if let Err(e) = std::fs::create_dir_all(&home_directory) {
+ tracing::warn!("Failed to create home directory {:?}: {}", home_directory, e);
+ }
+ let home_directory_str = home_directory.to_string_lossy().to_string();
+ let worktrees_directory = dirs::home_dir()
+ .map(|h| h.join(".makima").join("worktrees").to_string_lossy().to_string())
+ .unwrap_or_else(|| "~/.makima/worktrees".to_string());
+
+ let dirs_msg = DaemonMessage::DaemonDirectories {
+ working_directory,
+ home_directory: home_directory_str,
+ worktrees_directory,
+ };
+ let dirs_json = serde_json::to_string(&dirs_msg)?;
+ write.send(Message::Text(dirs_json)).await?;
+ tracing::info!("Sent daemon directories info to server");
+ }
+ DaemonCommand::Error { code, message } => {
+ return Err(DaemonError::AuthFailed(format!("{}: {}", code, message)));
+ }
+ _ => {
+ return Err(DaemonError::AuthFailed(
+ "Unexpected response to authentication".into(),
+ ));
+ }
+ }
+
+ // Start main message loop
+ let heartbeat_interval = Duration::from_secs(self.config.heartbeat_interval_secs);
+ let mut heartbeat_timer = tokio::time::interval(heartbeat_interval);
+
+ loop {
+ tokio::select! {
+ // Handle incoming server commands
+ msg = read.next() => {
+ match msg {
+ Some(Ok(Message::Text(text))) => {
+ tracing::info!("Received WebSocket message: {} bytes", text.len());
+ match serde_json::from_str::<DaemonCommand>(&text) {
+ Ok(command) => {
+ tracing::info!("Parsed command: {:?}", command);
+ tracing::info!("Sending command to task manager channel...");
+ if self.incoming_tx.send(command).await.is_err() {
+ tracing::warn!("Command receiver dropped, shutting down");
+ break;
+ }
+ tracing::info!("Command sent to task manager successfully");
+ }
+ Err(e) => {
+ tracing::warn!("Failed to parse server message: {}", e);
+ tracing::debug!("Raw message: {}", text);
+ }
+ }
+ }
+ Some(Ok(Message::Ping(data))) => {
+ write.send(Message::Pong(data)).await?;
+ }
+ Some(Ok(Message::Close(_))) | None => {
+ tracing::info!("Server closed connection");
+ return Err(DaemonError::ConnectionLost);
+ }
+ Some(Err(e)) => {
+ tracing::warn!("WebSocket error: {}", e);
+ return Err(e.into());
+ }
+ _ => {}
+ }
+ }
+
+ // Handle outgoing messages
+ msg = self.outgoing_rx.recv() => {
+ match msg {
+ Some(message) => {
+ let json = serde_json::to_string(&message)?;
+ tracing::trace!("Sending message: {}", json);
+ write.send(Message::Text(json)).await?;
+ }
+ None => {
+ // Sender dropped, shutdown
+ tracing::info!("Outgoing channel closed, shutting down");
+ break;
+ }
+ }
+ }
+
+ // Send heartbeat
+ _ = heartbeat_timer.tick() => {
+ // Get active task IDs from task manager
+ // For now, send empty list - will be connected to task manager
+ let heartbeat = DaemonMessage::heartbeat(vec![]);
+ let json = serde_json::to_string(&heartbeat)?;
+ write.send(Message::Text(json)).await?;
+ }
+ }
+ }
+
+ Ok(())
+ }
+}
diff --git a/makima/src/daemon/ws/mod.rs b/makima/src/daemon/ws/mod.rs
new file mode 100644
index 0000000..5a0e9d1
--- /dev/null
+++ b/makima/src/daemon/ws/mod.rs
@@ -0,0 +1,7 @@
+//! WebSocket client and protocol types for daemon-server communication.
+
+pub mod client;
+pub mod protocol;
+
+pub use client::{ConnectionState, WsClient};
+pub use protocol::{BranchInfo, DaemonCommand, DaemonMessage};
diff --git a/makima/src/daemon/ws/protocol.rs b/makima/src/daemon/ws/protocol.rs
new file mode 100644
index 0000000..e86a577
--- /dev/null
+++ b/makima/src/daemon/ws/protocol.rs
@@ -0,0 +1,658 @@
+//! Protocol types for daemon-server communication.
+//!
+//! These types mirror the server's protocol exactly for compatibility.
+
+use serde::{Deserialize, Serialize};
+use uuid::Uuid;
+
+/// Message from daemon to server.
+#[derive(Debug, Clone, Serialize, Deserialize)]
+#[serde(tag = "type", rename_all = "camelCase")]
+pub enum DaemonMessage {
+ /// Authentication request (first message required).
+ Authenticate {
+ #[serde(rename = "apiKey")]
+ api_key: String,
+ #[serde(rename = "machineId")]
+ machine_id: String,
+ hostname: String,
+ #[serde(rename = "maxConcurrentTasks")]
+ max_concurrent_tasks: i32,
+ },
+
+ /// Periodic heartbeat with current status.
+ Heartbeat {
+ #[serde(rename = "activeTasks")]
+ active_tasks: Vec<Uuid>,
+ },
+
+ /// Task output streaming (stdout/stderr from Claude Code).
+ TaskOutput {
+ #[serde(rename = "taskId")]
+ task_id: Uuid,
+ output: String,
+ #[serde(rename = "isPartial")]
+ is_partial: bool,
+ },
+
+ /// Task status change notification.
+ TaskStatusChange {
+ #[serde(rename = "taskId")]
+ task_id: Uuid,
+ #[serde(rename = "oldStatus")]
+ old_status: String,
+ #[serde(rename = "newStatus")]
+ new_status: String,
+ },
+
+ /// Task progress update with summary.
+ TaskProgress {
+ #[serde(rename = "taskId")]
+ task_id: Uuid,
+ summary: String,
+ },
+
+ /// Task completion notification.
+ TaskComplete {
+ #[serde(rename = "taskId")]
+ task_id: Uuid,
+ success: bool,
+ error: Option<String>,
+ },
+
+ /// Register a tool key for orchestrator API access.
+ RegisterToolKey {
+ #[serde(rename = "taskId")]
+ task_id: Uuid,
+ /// The API key for this orchestrator to use when calling mesh endpoints.
+ key: String,
+ },
+
+ /// Revoke a tool key when task completes.
+ RevokeToolKey {
+ #[serde(rename = "taskId")]
+ task_id: Uuid,
+ },
+
+ /// Authentication required - OAuth token expired, provides login URL.
+ AuthenticationRequired {
+ /// Task ID that triggered the auth error (if any).
+ #[serde(rename = "taskId")]
+ task_id: Option<Uuid>,
+ /// OAuth login URL for remote authentication.
+ #[serde(rename = "loginUrl")]
+ login_url: String,
+ /// Hostname of the daemon requiring auth.
+ hostname: Option<String>,
+ },
+
+ // =========================================================================
+ // Merge Response Messages (sent by daemon after processing merge commands)
+ // =========================================================================
+
+ /// Response to ListBranches command.
+ BranchList {
+ #[serde(rename = "taskId")]
+ task_id: Uuid,
+ branches: Vec<BranchInfo>,
+ },
+
+ /// Response to MergeStatus command.
+ MergeStatusResponse {
+ #[serde(rename = "taskId")]
+ task_id: Uuid,
+ #[serde(rename = "inProgress")]
+ in_progress: bool,
+ #[serde(rename = "sourceBranch")]
+ source_branch: Option<String>,
+ #[serde(rename = "conflictedFiles")]
+ conflicted_files: Vec<String>,
+ },
+
+ /// Response to merge operations (MergeStart, MergeResolve, MergeCommit, MergeAbort).
+ MergeResult {
+ #[serde(rename = "taskId")]
+ task_id: Uuid,
+ success: bool,
+ message: String,
+ #[serde(rename = "commitSha")]
+ commit_sha: Option<String>,
+ /// Present only when conflicts occurred.
+ conflicts: Option<Vec<String>>,
+ },
+
+ /// Response to CheckMergeComplete command.
+ MergeCompleteCheck {
+ #[serde(rename = "taskId")]
+ task_id: Uuid,
+ #[serde(rename = "canComplete")]
+ can_complete: bool,
+ #[serde(rename = "unmergedBranches")]
+ unmerged_branches: Vec<String>,
+ #[serde(rename = "mergedCount")]
+ merged_count: u32,
+ #[serde(rename = "skippedCount")]
+ skipped_count: u32,
+ },
+
+ // =========================================================================
+ // Completion Action Response Messages
+ // =========================================================================
+
+ /// Response to RetryCompletionAction command.
+ CompletionActionResult {
+ #[serde(rename = "taskId")]
+ task_id: Uuid,
+ success: bool,
+ message: String,
+ /// PR URL if action was "pr" and successful.
+ #[serde(rename = "prUrl")]
+ pr_url: Option<String>,
+ },
+
+ /// Report daemon's available directories for task output.
+ DaemonDirectories {
+ /// Current working directory of the daemon.
+ #[serde(rename = "workingDirectory")]
+ working_directory: String,
+ /// Path to ~/.makima/home directory (for cloning completed work).
+ #[serde(rename = "homeDirectory")]
+ home_directory: String,
+ /// Path to worktrees directory (~/.makima/worktrees).
+ #[serde(rename = "worktreesDirectory")]
+ worktrees_directory: String,
+ },
+
+ /// Response to CloneWorktree command.
+ CloneWorktreeResult {
+ #[serde(rename = "taskId")]
+ task_id: Uuid,
+ success: bool,
+ message: String,
+ /// The path where the worktree was cloned.
+ #[serde(rename = "targetDir")]
+ target_dir: Option<String>,
+ },
+
+ /// Response to CheckTargetExists command.
+ CheckTargetExistsResult {
+ #[serde(rename = "taskId")]
+ task_id: Uuid,
+ /// Whether the target directory exists.
+ exists: bool,
+ /// The path that was checked.
+ #[serde(rename = "targetDir")]
+ target_dir: String,
+ },
+
+ // =========================================================================
+ // Contract File Response Messages
+ // =========================================================================
+
+ /// Response to ReadRepoFile command.
+ RepoFileContent {
+ /// Request ID from the original command.
+ #[serde(rename = "requestId")]
+ request_id: Uuid,
+ /// Path to the file that was read.
+ #[serde(rename = "filePath")]
+ file_path: String,
+ /// File content (None if error occurred).
+ content: Option<String>,
+ /// Whether the operation succeeded.
+ success: bool,
+ /// Error message if operation failed.
+ error: Option<String>,
+ },
+
+ // =========================================================================
+ // Supervisor Git Response Messages
+ // =========================================================================
+
+ /// Response to CreateBranch command.
+ BranchCreated {
+ #[serde(rename = "taskId")]
+ task_id: Uuid,
+ success: bool,
+ #[serde(rename = "branchName")]
+ branch_name: String,
+ message: String,
+ },
+
+ /// Response to MergeTaskToTarget command.
+ MergeToTargetResult {
+ #[serde(rename = "taskId")]
+ task_id: Uuid,
+ success: bool,
+ message: String,
+ #[serde(rename = "commitSha")]
+ commit_sha: Option<String>,
+ conflicts: Option<Vec<String>>,
+ },
+
+ /// Response to CreatePR command.
+ PRCreated {
+ #[serde(rename = "taskId")]
+ task_id: Uuid,
+ success: bool,
+ message: String,
+ #[serde(rename = "prUrl")]
+ pr_url: Option<String>,
+ #[serde(rename = "prNumber")]
+ pr_number: Option<i32>,
+ },
+
+ /// Response to GetTaskDiff command.
+ TaskDiff {
+ #[serde(rename = "taskId")]
+ task_id: Uuid,
+ success: bool,
+ diff: Option<String>,
+ error: Option<String>,
+ },
+}
+
+/// Information about a branch (used in BranchList message).
+#[derive(Debug, Clone, Serialize, Deserialize)]
+#[serde(rename_all = "camelCase")]
+pub struct BranchInfo {
+ /// Full branch name.
+ pub name: String,
+ /// Task ID extracted from branch name (if parseable).
+ #[serde(rename = "taskId")]
+ pub task_id: Option<Uuid>,
+ /// Whether this branch has been merged.
+ #[serde(rename = "isMerged")]
+ pub is_merged: bool,
+ /// Short SHA of the last commit.
+ #[serde(rename = "lastCommit")]
+ pub last_commit: String,
+ /// Subject line of the last commit.
+ #[serde(rename = "lastCommitMessage")]
+ pub last_commit_message: String,
+}
+
+/// Command from server to daemon.
+#[derive(Debug, Clone, Serialize, Deserialize)]
+#[serde(tag = "type", rename_all = "camelCase")]
+pub enum DaemonCommand {
+ /// Confirm successful authentication.
+ Authenticated {
+ #[serde(rename = "daemonId")]
+ daemon_id: Uuid,
+ },
+
+ /// Spawn a new task in a container.
+ SpawnTask {
+ #[serde(rename = "taskId")]
+ task_id: Uuid,
+ /// Human-readable task name (used for commit messages).
+ #[serde(rename = "taskName")]
+ task_name: String,
+ plan: String,
+ #[serde(rename = "repoUrl")]
+ repo_url: Option<String>,
+ #[serde(rename = "baseBranch")]
+ base_branch: Option<String>,
+ /// Target branch to merge into (used for completion actions).
+ #[serde(rename = "targetBranch")]
+ target_branch: Option<String>,
+ /// Parent task ID if this is a subtask.
+ #[serde(rename = "parentTaskId")]
+ parent_task_id: Option<Uuid>,
+ /// Depth in task hierarchy (0=top-level, 1=subtask, 2=sub-subtask).
+ depth: i32,
+ /// Whether this task should run as an orchestrator (true if depth==0 and has subtasks).
+ #[serde(rename = "isOrchestrator")]
+ is_orchestrator: bool,
+ /// Path to user's local repository (outside ~/.makima) for completion actions.
+ #[serde(rename = "targetRepoPath")]
+ target_repo_path: Option<String>,
+ /// Action on completion: "none", "branch", "merge", "pr".
+ #[serde(rename = "completionAction")]
+ completion_action: Option<String>,
+ /// Task ID to continue from (copy worktree from this task).
+ #[serde(rename = "continueFromTaskId")]
+ continue_from_task_id: Option<Uuid>,
+ /// Files to copy from parent task's worktree.
+ #[serde(rename = "copyFiles")]
+ copy_files: Option<Vec<String>>,
+ /// Contract ID if this task is associated with a contract.
+ #[serde(rename = "contractId")]
+ contract_id: Option<Uuid>,
+ /// Whether this task is a supervisor (long-running contract orchestrator).
+ #[serde(rename = "isSupervisor", default)]
+ is_supervisor: bool,
+ },
+
+ /// Pause a running task.
+ PauseTask {
+ #[serde(rename = "taskId")]
+ task_id: Uuid,
+ },
+
+ /// Resume a paused task.
+ ResumeTask {
+ #[serde(rename = "taskId")]
+ task_id: Uuid,
+ },
+
+ /// Interrupt a task (gracefully or forced).
+ InterruptTask {
+ #[serde(rename = "taskId")]
+ task_id: Uuid,
+ graceful: bool,
+ },
+
+ /// Send a message to a running task.
+ SendMessage {
+ #[serde(rename = "taskId")]
+ task_id: Uuid,
+ message: String,
+ },
+
+ /// Inject context about sibling task progress.
+ InjectSiblingContext {
+ #[serde(rename = "taskId")]
+ task_id: Uuid,
+ #[serde(rename = "siblingTaskId")]
+ sibling_task_id: Uuid,
+ #[serde(rename = "siblingName")]
+ sibling_name: String,
+ #[serde(rename = "siblingStatus")]
+ sibling_status: String,
+ #[serde(rename = "progressSummary")]
+ progress_summary: Option<String>,
+ #[serde(rename = "changedFiles")]
+ changed_files: Vec<String>,
+ },
+
+ // =========================================================================
+ // Merge Commands (for orchestrators to merge subtask branches)
+ // =========================================================================
+
+ /// List all subtask branches for a task.
+ ListBranches {
+ #[serde(rename = "taskId")]
+ task_id: Uuid,
+ },
+
+ /// Start merging a subtask branch.
+ MergeStart {
+ #[serde(rename = "taskId")]
+ task_id: Uuid,
+ #[serde(rename = "sourceBranch")]
+ source_branch: String,
+ },
+
+ /// Get current merge status.
+ MergeStatus {
+ #[serde(rename = "taskId")]
+ task_id: Uuid,
+ },
+
+ /// Resolve a merge conflict.
+ MergeResolve {
+ #[serde(rename = "taskId")]
+ task_id: Uuid,
+ file: String,
+ /// "ours" or "theirs"
+ strategy: String,
+ },
+
+ /// Commit the current merge.
+ MergeCommit {
+ #[serde(rename = "taskId")]
+ task_id: Uuid,
+ message: String,
+ },
+
+ /// Abort the current merge.
+ MergeAbort {
+ #[serde(rename = "taskId")]
+ task_id: Uuid,
+ },
+
+ /// Skip merging a subtask branch (mark as intentionally not merged).
+ MergeSkip {
+ #[serde(rename = "taskId")]
+ task_id: Uuid,
+ #[serde(rename = "subtaskId")]
+ subtask_id: Uuid,
+ reason: String,
+ },
+
+ /// Check if all subtask branches have been merged or skipped (completion gate).
+ CheckMergeComplete {
+ #[serde(rename = "taskId")]
+ task_id: Uuid,
+ },
+
+ // =========================================================================
+ // Completion Action Commands
+ // =========================================================================
+
+ /// Retry a completion action for a completed task.
+ RetryCompletionAction {
+ #[serde(rename = "taskId")]
+ task_id: Uuid,
+ /// Human-readable task name (used for commit messages).
+ #[serde(rename = "taskName")]
+ task_name: String,
+ /// The action to execute: "branch", "merge", or "pr".
+ action: String,
+ /// Path to the target repository.
+ #[serde(rename = "targetRepoPath")]
+ target_repo_path: String,
+ /// Target branch to merge into (for merge/pr actions).
+ #[serde(rename = "targetBranch")]
+ target_branch: Option<String>,
+ },
+
+ /// Clone worktree to a target directory.
+ CloneWorktree {
+ #[serde(rename = "taskId")]
+ task_id: Uuid,
+ /// Path to the target directory.
+ #[serde(rename = "targetDir")]
+ target_dir: String,
+ },
+
+ /// Check if a target directory exists.
+ CheckTargetExists {
+ #[serde(rename = "taskId")]
+ task_id: Uuid,
+ /// Path to check.
+ #[serde(rename = "targetDir")]
+ target_dir: String,
+ },
+
+ // =========================================================================
+ // Contract File Commands
+ // =========================================================================
+
+ /// Read a file from a repository linked to a contract.
+ ReadRepoFile {
+ /// Request ID for correlating response.
+ #[serde(rename = "requestId")]
+ request_id: Uuid,
+ /// Contract ID (used for logging/context).
+ #[serde(rename = "contractId")]
+ contract_id: Uuid,
+ /// Path to the file within the repository.
+ #[serde(rename = "filePath")]
+ file_path: String,
+ /// Full repository path on daemon's filesystem.
+ #[serde(rename = "repoPath")]
+ repo_path: String,
+ },
+
+ // =========================================================================
+ // Supervisor Git Commands
+ // =========================================================================
+
+ /// Create a new branch in the supervisor's worktree.
+ CreateBranch {
+ #[serde(rename = "taskId")]
+ task_id: Uuid,
+ #[serde(rename = "branchName")]
+ branch_name: String,
+ /// Optional reference to create branch from (task_id or SHA).
+ #[serde(rename = "fromRef")]
+ from_ref: Option<String>,
+ },
+
+ /// Merge a task's changes to a target branch.
+ MergeTaskToTarget {
+ #[serde(rename = "taskId")]
+ task_id: Uuid,
+ /// Target branch to merge into (default: task's base branch).
+ #[serde(rename = "targetBranch")]
+ target_branch: Option<String>,
+ /// Whether to squash commits.
+ squash: bool,
+ },
+
+ /// Create a pull request for a task's changes.
+ CreatePR {
+ #[serde(rename = "taskId")]
+ task_id: Uuid,
+ title: String,
+ body: Option<String>,
+ /// Base branch for the PR (default: main).
+ #[serde(rename = "baseBranch")]
+ base_branch: String,
+ },
+
+ /// Get the diff for a task's changes.
+ GetTaskDiff {
+ #[serde(rename = "taskId")]
+ task_id: Uuid,
+ },
+
+ /// Error response.
+ Error {
+ code: String,
+ message: String,
+ },
+}
+
+impl DaemonMessage {
+ /// Create an authentication message.
+ pub fn authenticate(
+ api_key: &str,
+ machine_id: &str,
+ hostname: &str,
+ max_concurrent_tasks: i32,
+ ) -> Self {
+ Self::Authenticate {
+ api_key: api_key.to_string(),
+ machine_id: machine_id.to_string(),
+ hostname: hostname.to_string(),
+ max_concurrent_tasks,
+ }
+ }
+
+ /// Create a heartbeat message.
+ pub fn heartbeat(active_tasks: Vec<Uuid>) -> Self {
+ Self::Heartbeat { active_tasks }
+ }
+
+ /// Create a task output message.
+ pub fn task_output(task_id: Uuid, output: String, is_partial: bool) -> Self {
+ Self::TaskOutput {
+ task_id,
+ output,
+ is_partial,
+ }
+ }
+
+ /// Create a task status change message.
+ pub fn task_status_change(task_id: Uuid, old_status: &str, new_status: &str) -> Self {
+ Self::TaskStatusChange {
+ task_id,
+ old_status: old_status.to_string(),
+ new_status: new_status.to_string(),
+ }
+ }
+
+ /// Create a task progress message.
+ pub fn task_progress(task_id: Uuid, summary: String) -> Self {
+ Self::TaskProgress { task_id, summary }
+ }
+
+ /// Create a task complete message.
+ pub fn task_complete(task_id: Uuid, success: bool, error: Option<String>) -> Self {
+ Self::TaskComplete {
+ task_id,
+ success,
+ error,
+ }
+ }
+
+ /// Create a register tool key message.
+ pub fn register_tool_key(task_id: Uuid, key: String) -> Self {
+ Self::RegisterToolKey { task_id, key }
+ }
+
+ /// Create a revoke tool key message.
+ pub fn revoke_tool_key(task_id: Uuid) -> Self {
+ Self::RevokeToolKey { task_id }
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use crate::daemon::*;
+
+ #[test]
+ fn test_daemon_message_serialization() {
+ let msg = DaemonMessage::authenticate("key123", "machine-abc", "worker-1", 4);
+ let json = serde_json::to_string(&msg).unwrap();
+ assert!(json.contains("\"type\":\"authenticate\""));
+ assert!(json.contains("\"apiKey\":\"key123\""));
+ assert!(json.contains("\"machineId\":\"machine-abc\""));
+ }
+
+ #[test]
+ fn test_daemon_command_deserialization() {
+ let json = r#"{"type":"spawnTask","taskId":"550e8400-e29b-41d4-a716-446655440000","plan":"Build the feature","repoUrl":"https://github.com/test/repo","baseBranch":"main","parentTaskId":null,"depth":0,"isOrchestrator":false}"#;
+ let cmd: DaemonCommand = serde_json::from_str(json).unwrap();
+ match cmd {
+ DaemonCommand::SpawnTask {
+ plan,
+ repo_url,
+ base_branch,
+ parent_task_id,
+ depth,
+ is_orchestrator,
+ ..
+ } => {
+ assert_eq!(plan, "Build the feature");
+ assert_eq!(repo_url, Some("https://github.com/test/repo".to_string()));
+ assert_eq!(base_branch, Some("main".to_string()));
+ assert_eq!(parent_task_id, None);
+ assert_eq!(depth, 0);
+ assert!(!is_orchestrator);
+ }
+ _ => panic!("Expected SpawnTask"),
+ }
+ }
+
+ #[test]
+ fn test_orchestrator_spawn_deserialization() {
+ let json = r#"{"type":"spawnTask","taskId":"550e8400-e29b-41d4-a716-446655440000","plan":"Coordinate subtasks","repoUrl":"https://github.com/test/repo","baseBranch":"main","parentTaskId":null,"depth":0,"isOrchestrator":true}"#;
+ let cmd: DaemonCommand = serde_json::from_str(json).unwrap();
+ match cmd {
+ DaemonCommand::SpawnTask {
+ is_orchestrator,
+ depth,
+ ..
+ } => {
+ assert!(is_orchestrator);
+ assert_eq!(depth, 0);
+ }
+ _ => panic!("Expected SpawnTask"),
+ }
+ }
+}