diff options
| author | soryu <soryu@soryu.co> | 2026-01-11 05:52:14 +0000 |
|---|---|---|
| committer | soryu <soryu@soryu.co> | 2026-01-15 00:21:16 +0000 |
| commit | 87044a747b47bd83249d61a45842c7f7b2eae56d (patch) | |
| tree | ef2000ce79ffcc2723ef841acef5aa1deb1d5378 /makima/src/daemon | |
| parent | 077820c4167c168072d217a1b01df840463a12a8 (diff) | |
| download | soryu-87044a747b47bd83249d61a45842c7f7b2eae56d.tar.gz soryu-87044a747b47bd83249d61a45842c7f7b2eae56d.zip | |
Contract system
Diffstat (limited to 'makima/src/daemon')
26 files changed, 8737 insertions, 0 deletions
diff --git a/makima/src/daemon/api/client.rs b/makima/src/daemon/api/client.rs new file mode 100644 index 0000000..b27d606 --- /dev/null +++ b/makima/src/daemon/api/client.rs @@ -0,0 +1,129 @@ +//! Base HTTP client for makima API. + +use reqwest::Client; +use serde::{de::DeserializeOwned, Serialize}; +use thiserror::Error; + +/// API client errors. +#[derive(Error, Debug)] +pub enum ApiError { + #[error("HTTP request failed: {0}")] + Request(#[from] reqwest::Error), + + #[error("API error (HTTP {status}): {message}")] + Api { status: u16, message: String }, + + #[error("Failed to parse response: {0}")] + Parse(String), +} + +/// HTTP client for makima API. +pub struct ApiClient { + client: Client, + base_url: String, + api_key: String, +} + +impl ApiClient { + /// Create a new API client. + pub fn new(base_url: String, api_key: String) -> Result<Self, ApiError> { + let client = Client::builder() + .build()?; + + Ok(Self { + client, + base_url: base_url.trim_end_matches('/').to_string(), + api_key, + }) + } + + /// Make a GET request. + pub async fn get<T: DeserializeOwned>(&self, path: &str) -> Result<T, ApiError> { + let url = format!("{}{}", self.base_url, path); + let response = self.client + .get(&url) + .header("X-Makima-Tool-Key", &self.api_key) + .send() + .await?; + + self.handle_response(response).await + } + + /// Make a POST request with JSON body. + pub async fn post<T: DeserializeOwned, B: Serialize>( + &self, + path: &str, + body: &B, + ) -> Result<T, ApiError> { + let url = format!("{}{}", self.base_url, path); + let response = self.client + .post(&url) + .header("X-Makima-Tool-Key", &self.api_key) + .header("Content-Type", "application/json") + .json(body) + .send() + .await?; + + self.handle_response(response).await + } + + /// Make a POST request without body. + pub async fn post_empty<T: DeserializeOwned>(&self, path: &str) -> Result<T, ApiError> { + let url = format!("{}{}", self.base_url, path); + let response = self.client + .post(&url) + .header("X-Makima-Tool-Key", &self.api_key) + .send() + .await?; + + self.handle_response(response).await + } + + /// Make a PUT request with JSON body. + pub async fn put<T: DeserializeOwned, B: Serialize>( + &self, + path: &str, + body: &B, + ) -> Result<T, ApiError> { + let url = format!("{}{}", self.base_url, path); + let response = self.client + .put(&url) + .header("X-Makima-Tool-Key", &self.api_key) + .header("Content-Type", "application/json") + .json(body) + .send() + .await?; + + self.handle_response(response).await + } + + /// Handle API response. + async fn handle_response<T: DeserializeOwned>( + &self, + response: reqwest::Response, + ) -> Result<T, ApiError> { + let status = response.status(); + let status_code = status.as_u16(); + + if !status.is_success() { + let body = response.text().await.unwrap_or_default(); + return Err(ApiError::Api { + status: status_code, + message: body, + }); + } + + let body = response.text().await?; + + // Handle empty responses + if body.is_empty() || body == "null" { + // Try to parse empty/null as the target type + serde_json::from_str::<T>("null") + .or_else(|_| serde_json::from_str::<T>("{}")) + .map_err(|e| ApiError::Parse(e.to_string())) + } else { + serde_json::from_str::<T>(&body) + .map_err(|e| ApiError::Parse(format!("{}: {}", e, body))) + } + } +} diff --git a/makima/src/daemon/api/contract.rs b/makima/src/daemon/api/contract.rs new file mode 100644 index 0000000..aac6b94 --- /dev/null +++ b/makima/src/daemon/api/contract.rs @@ -0,0 +1,161 @@ +//! Contract API methods. + +use serde::Serialize; +use uuid::Uuid; + +use super::client::{ApiClient, ApiError}; +use super::supervisor::JsonValue; + +// Request types + +#[derive(Serialize)] +#[serde(rename_all = "camelCase")] +pub struct ReportRequest { + pub message: String, + #[serde(skip_serializing_if = "Option::is_none")] + pub task_id: Option<Uuid>, +} + +#[derive(Serialize)] +#[serde(rename_all = "camelCase")] +pub struct CompletionActionRequest { + pub lines_added: i32, + pub lines_removed: i32, + pub has_code_changes: bool, + #[serde(skip_serializing_if = "Option::is_none")] + pub task_id: Option<Uuid>, + #[serde(skip_serializing_if = "Option::is_none")] + pub files_modified: Option<Vec<String>>, +} + +#[derive(Serialize)] +#[serde(rename_all = "camelCase")] +pub struct UpdateFileRequest { + pub content: String, +} + +#[derive(Serialize)] +#[serde(rename_all = "camelCase")] +pub struct CreateFileRequest { + pub name: String, + pub content: String, +} + +impl ApiClient { + /// Get contract status. + pub async fn contract_status(&self, contract_id: Uuid) -> Result<JsonValue, ApiError> { + self.get(&format!("/api/v1/contracts/{}/daemon/status", contract_id)) + .await + } + + /// Get phase checklist. + pub async fn contract_checklist(&self, contract_id: Uuid) -> Result<JsonValue, ApiError> { + self.get(&format!("/api/v1/contracts/{}/daemon/checklist", contract_id)) + .await + } + + /// Get contract goals. + pub async fn contract_goals(&self, contract_id: Uuid) -> Result<JsonValue, ApiError> { + self.get(&format!("/api/v1/contracts/{}/daemon/goals", contract_id)) + .await + } + + /// List contract files. + pub async fn contract_files(&self, contract_id: Uuid) -> Result<JsonValue, ApiError> { + self.get(&format!("/api/v1/contracts/{}/daemon/files", contract_id)) + .await + } + + /// Get a specific file. + pub async fn contract_file( + &self, + contract_id: Uuid, + file_id: Uuid, + ) -> Result<JsonValue, ApiError> { + self.get(&format!( + "/api/v1/contracts/{}/daemon/files/{}", + contract_id, file_id + )) + .await + } + + /// Report progress. + pub async fn contract_report( + &self, + contract_id: Uuid, + message: &str, + task_id: Option<Uuid>, + ) -> Result<JsonValue, ApiError> { + let req = ReportRequest { + message: message.to_string(), + task_id, + }; + self.post(&format!("/api/v1/contracts/{}/daemon/report", contract_id), &req) + .await + } + + /// Get suggested action. + pub async fn contract_suggest_action(&self, contract_id: Uuid) -> Result<JsonValue, ApiError> { + self.post_empty(&format!( + "/api/v1/contracts/{}/daemon/suggest-action", + contract_id + )) + .await + } + + /// Get completion action recommendation. + pub async fn contract_completion_action( + &self, + contract_id: Uuid, + task_id: Option<Uuid>, + files_modified: Option<Vec<String>>, + lines_added: i32, + lines_removed: i32, + has_code_changes: bool, + ) -> Result<JsonValue, ApiError> { + let req = CompletionActionRequest { + task_id, + files_modified, + lines_added, + lines_removed, + has_code_changes, + }; + self.post( + &format!("/api/v1/contracts/{}/daemon/completion-action", contract_id), + &req, + ) + .await + } + + /// Update a file. + pub async fn contract_update_file( + &self, + contract_id: Uuid, + file_id: Uuid, + content: &str, + ) -> Result<JsonValue, ApiError> { + let req = UpdateFileRequest { + content: content.to_string(), + }; + self.put( + &format!("/api/v1/contracts/{}/daemon/files/{}", contract_id, file_id), + &req, + ) + .await + } + + /// Create a new file. + pub async fn contract_create_file( + &self, + contract_id: Uuid, + name: &str, + content: &str, + ) -> Result<JsonValue, ApiError> { + let req = CreateFileRequest { + name: name.to_string(), + content: content.to_string(), + }; + self.post(&format!("/api/v1/contracts/{}/daemon/files", contract_id), &req) + .await + } +} diff --git a/makima/src/daemon/api/mod.rs b/makima/src/daemon/api/mod.rs new file mode 100644 index 0000000..0c05fb4 --- /dev/null +++ b/makima/src/daemon/api/mod.rs @@ -0,0 +1,7 @@ +//! HTTP API client for makima CLI commands. + +pub mod client; +pub mod contract; +pub mod supervisor; + +pub use client::ApiClient; diff --git a/makima/src/daemon/api/supervisor.rs b/makima/src/daemon/api/supervisor.rs new file mode 100644 index 0000000..b691cc4 --- /dev/null +++ b/makima/src/daemon/api/supervisor.rs @@ -0,0 +1,186 @@ +//! Supervisor API methods. + +use serde::{Deserialize, Serialize}; +use uuid::Uuid; + +use super::client::{ApiClient, ApiError}; + +// Request/Response types + +#[derive(Serialize)] +#[serde(rename_all = "camelCase")] +pub struct SpawnTaskRequest { + pub name: String, + pub plan: String, + pub contract_id: Uuid, + #[serde(skip_serializing_if = "Option::is_none")] + pub parent_task_id: Option<Uuid>, + #[serde(skip_serializing_if = "Option::is_none")] + pub checkpoint_sha: Option<String>, +} + +#[derive(Serialize)] +#[serde(rename_all = "camelCase")] +pub struct WaitRequest { + pub timeout_seconds: i32, +} + +#[derive(Serialize)] +#[serde(rename_all = "camelCase")] +pub struct ReadFileRequest { + pub file_path: String, +} + +#[derive(Serialize)] +#[serde(rename_all = "camelCase")] +pub struct CreateBranchRequest { + pub branch_name: String, + #[serde(skip_serializing_if = "Option::is_none")] + pub from_ref: Option<String>, +} + +#[derive(Serialize)] +#[serde(rename_all = "camelCase")] +pub struct MergeRequest { + pub squash: bool, + #[serde(skip_serializing_if = "Option::is_none")] + pub target_branch: Option<String>, +} + +#[derive(Serialize)] +#[serde(rename_all = "camelCase")] +pub struct CreatePrRequest { + pub task_id: Uuid, + pub title: String, + pub body: String, + pub base_branch: String, +} + +#[derive(Serialize)] +#[serde(rename_all = "camelCase")] +pub struct CheckpointRequest { + pub message: String, +} + +// Generic response type for JSON output +#[derive(Deserialize, Serialize)] +pub struct JsonValue(pub serde_json::Value); + +impl ApiClient { + /// Get all tasks in a contract. + pub async fn supervisor_tasks(&self, contract_id: Uuid) -> Result<JsonValue, ApiError> { + self.get(&format!("/api/v1/mesh/supervisor/contracts/{}/tasks", contract_id)) + .await + } + + /// Get task tree structure. + pub async fn supervisor_tree(&self, contract_id: Uuid) -> Result<JsonValue, ApiError> { + self.get(&format!("/api/v1/mesh/supervisor/contracts/{}/tree", contract_id)) + .await + } + + /// Spawn a new task. + pub async fn supervisor_spawn(&self, req: SpawnTaskRequest) -> Result<JsonValue, ApiError> { + self.post("/api/v1/mesh/supervisor/tasks", &req).await + } + + /// Wait for a task to complete. + pub async fn supervisor_wait( + &self, + task_id: Uuid, + timeout_seconds: i32, + ) -> Result<JsonValue, ApiError> { + let req = WaitRequest { timeout_seconds }; + self.post(&format!("/api/v1/mesh/supervisor/tasks/{}/wait", task_id), &req) + .await + } + + /// Read a file from a task's worktree. + pub async fn supervisor_read_file( + &self, + task_id: Uuid, + file_path: &str, + ) -> Result<JsonValue, ApiError> { + let req = ReadFileRequest { + file_path: file_path.to_string(), + }; + self.post(&format!("/api/v1/mesh/supervisor/tasks/{}/read-file", task_id), &req) + .await + } + + /// Create a new branch. + pub async fn supervisor_branch( + &self, + branch_name: &str, + from_ref: Option<String>, + ) -> Result<JsonValue, ApiError> { + let req = CreateBranchRequest { + branch_name: branch_name.to_string(), + from_ref, + }; + self.post("/api/v1/mesh/supervisor/branches", &req).await + } + + /// Merge a task's changes. + pub async fn supervisor_merge( + &self, + task_id: Uuid, + target_branch: Option<String>, + squash: bool, + ) -> Result<JsonValue, ApiError> { + let req = MergeRequest { + squash, + target_branch, + }; + self.post(&format!("/api/v1/mesh/supervisor/tasks/{}/merge", task_id), &req) + .await + } + + /// Create a pull request. + pub async fn supervisor_pr( + &self, + task_id: Uuid, + title: &str, + body: &str, + base_branch: &str, + ) -> Result<JsonValue, ApiError> { + let req = CreatePrRequest { + task_id, + title: title.to_string(), + body: body.to_string(), + base_branch: base_branch.to_string(), + }; + self.post("/api/v1/mesh/supervisor/pr", &req).await + } + + /// Get task diff. + pub async fn supervisor_diff(&self, task_id: Uuid) -> Result<JsonValue, ApiError> { + self.get(&format!("/api/v1/mesh/supervisor/tasks/{}/diff", task_id)) + .await + } + + /// Create a checkpoint. + pub async fn supervisor_checkpoint( + &self, + task_id: Uuid, + message: &str, + ) -> Result<JsonValue, ApiError> { + let req = CheckpointRequest { + message: message.to_string(), + }; + self.post(&format!("/api/v1/mesh/tasks/{}/checkpoint", task_id), &req) + .await + } + + /// List checkpoints. + pub async fn supervisor_checkpoints(&self, task_id: Uuid) -> Result<JsonValue, ApiError> { + self.get(&format!("/api/v1/mesh/tasks/{}/checkpoints", task_id)) + .await + } + + /// Get contract status. + pub async fn supervisor_status(&self, contract_id: Uuid) -> Result<JsonValue, ApiError> { + self.get(&format!("/api/v1/contracts/{}/daemon/status", contract_id)) + .await + } +} diff --git a/makima/src/daemon/cli/contract.rs b/makima/src/daemon/cli/contract.rs new file mode 100644 index 0000000..5fef5ec --- /dev/null +++ b/makima/src/daemon/cli/contract.rs @@ -0,0 +1,87 @@ +//! Contract subcommand - task-contract interaction commands. + +use clap::Args; +use uuid::Uuid; + +/// Common arguments for contract commands. +#[derive(Args, Debug, Clone)] +pub struct ContractArgs { + /// API URL + #[arg(long, env = "MAKIMA_API_URL", default_value = "http://localhost:8080", global = true)] + pub api_url: String, + + /// API key for authentication + #[arg(long, env = "MAKIMA_API_KEY", global = true)] + pub api_key: String, + + /// Current task ID (optional) + #[arg(long, env = "MAKIMA_TASK_ID", global = true)] + pub task_id: Option<Uuid>, + + /// Contract ID + #[arg(long, env = "MAKIMA_CONTRACT_ID", global = true)] + pub contract_id: Uuid, +} + +/// Arguments for file command (get specific file). +#[derive(Args, Debug)] +pub struct FileArgs { + #[command(flatten)] + pub common: ContractArgs, + + /// File ID to retrieve + pub file_id: Uuid, +} + +/// Arguments for report command. +#[derive(Args, Debug)] +pub struct ReportArgs { + #[command(flatten)] + pub common: ContractArgs, + + /// Progress message + pub message: String, +} + +/// Arguments for completion-action command. +#[derive(Args, Debug)] +pub struct CompletionActionArgs { + #[command(flatten)] + pub common: ContractArgs, + + /// Comma-separated list of modified files + #[arg(long)] + pub files: Option<String>, + + /// Number of lines added + #[arg(long, default_value = "0")] + pub lines_added: i32, + + /// Number of lines removed + #[arg(long, default_value = "0")] + pub lines_removed: i32, + + /// Whether there are code changes + #[arg(long)] + pub code: bool, +} + +/// Arguments for update-file command. +#[derive(Args, Debug)] +pub struct UpdateFileArgs { + #[command(flatten)] + pub common: ContractArgs, + + /// File ID to update + pub file_id: Uuid, +} + +/// Arguments for create-file command. +#[derive(Args, Debug)] +pub struct CreateFileArgs { + #[command(flatten)] + pub common: ContractArgs, + + /// Name of the new file + pub name: String, +} diff --git a/makima/src/daemon/cli/daemon.rs b/makima/src/daemon/cli/daemon.rs new file mode 100644 index 0000000..de4cff4 --- /dev/null +++ b/makima/src/daemon/cli/daemon.rs @@ -0,0 +1,36 @@ +//! Daemon subcommand - connect to server and manage tasks. + +use clap::Args; +use std::path::PathBuf; + +/// Run the makima daemon (connect to server and manage tasks). +#[derive(Args, Debug)] +pub struct DaemonArgs { + /// Path to custom config file + #[arg(short, long)] + pub config: Option<PathBuf>, + + /// Directory where repositories are cloned + #[arg(long, env = "MAKIMA_DAEMON_REPOS_DIR")] + pub repos_dir: Option<PathBuf>, + + /// Directory where worktrees are created + #[arg(long, env = "MAKIMA_DAEMON_WORKTREES_DIR")] + pub worktrees_dir: Option<PathBuf>, + + /// WebSocket server URL to connect to + #[arg(long, env = "MAKIMA_DAEMON_SERVER_URL")] + pub server_url: Option<String>, + + /// API key for server authentication + #[arg(long, env = "MAKIMA_DAEMON_SERVER_APIKEY")] + pub api_key: Option<String>, + + /// Maximum number of concurrent tasks + #[arg(long)] + pub max_tasks: Option<u32>, + + /// Log level (trace, debug, info, warn, error) + #[arg(short, long, default_value = "info")] + pub log_level: String, +} diff --git a/makima/src/daemon/cli/mod.rs b/makima/src/daemon/cli/mod.rs new file mode 100644 index 0000000..24c19c6 --- /dev/null +++ b/makima/src/daemon/cli/mod.rs @@ -0,0 +1,120 @@ +//! Command-line interface for the makima CLI. + +pub mod contract; +pub mod daemon; +pub mod server; +pub mod supervisor; + +use clap::{Parser, Subcommand}; + +pub use contract::ContractArgs; +pub use daemon::DaemonArgs; +pub use server::ServerArgs; +pub use supervisor::SupervisorArgs; + +/// Makima - unified CLI for server, daemon, and task management. +#[derive(Parser, Debug)] +#[command(name = "makima")] +#[command(version, about = "Makima CLI - server, daemon, and task management", long_about = None)] +pub struct Cli { + #[command(subcommand)] + pub command: Commands, +} + +#[derive(Subcommand, Debug)] +pub enum Commands { + /// Run the makima server + Server(ServerArgs), + + /// Run the daemon (connect to server, manage tasks) + Daemon(DaemonArgs), + + /// Supervisor commands for contract orchestration + #[command(subcommand)] + Supervisor(SupervisorCommand), + + /// Contract commands for task-contract interaction + #[command(subcommand)] + Contract(ContractCommand), +} + +/// Supervisor subcommands for contract orchestration. +#[derive(Subcommand, Debug)] +pub enum SupervisorCommand { + /// List all tasks in the contract + Tasks(SupervisorArgs), + + /// Get the task tree structure + Tree(SupervisorArgs), + + /// Create and start a new task + Spawn(supervisor::SpawnArgs), + + /// Wait for a task to complete + Wait(supervisor::WaitArgs), + + /// Read a file from a task's worktree + ReadFile(supervisor::ReadFileArgs), + + /// Create a git branch + Branch(supervisor::BranchArgs), + + /// Merge a task's changes to a branch + Merge(supervisor::MergeArgs), + + /// Create a pull request + Pr(supervisor::PrArgs), + + /// View task diff + Diff(supervisor::DiffArgs), + + /// Create a checkpoint + Checkpoint(supervisor::CheckpointArgs), + + /// List checkpoints + Checkpoints(SupervisorArgs), + + /// Get contract status + Status(SupervisorArgs), +} + +/// Contract subcommands for task-contract interaction. +#[derive(Subcommand, Debug)] +pub enum ContractCommand { + /// Get contract status + Status(ContractArgs), + + /// Get the phase checklist + Checklist(ContractArgs), + + /// Get contract goals + Goals(ContractArgs), + + /// List contract files + Files(ContractArgs), + + /// Get a specific file's content + File(contract::FileArgs), + + /// Report progress on the contract + Report(contract::ReportArgs), + + /// Get suggested next action + SuggestAction(ContractArgs), + + /// Get completion recommendation + CompletionAction(contract::CompletionActionArgs), + + /// Update a file (reads content from stdin) + UpdateFile(contract::UpdateFileArgs), + + /// Create a new file (reads content from stdin) + CreateFile(contract::CreateFileArgs), +} + +impl Cli { + /// Parse command-line arguments + pub fn parse_args() -> Self { + Self::parse() + } +} diff --git a/makima/src/daemon/cli/server.rs b/makima/src/daemon/cli/server.rs new file mode 100644 index 0000000..371a912 --- /dev/null +++ b/makima/src/daemon/cli/server.rs @@ -0,0 +1,43 @@ +//! Server subcommand - run the makima server. + +use clap::Args; + +/// Run the makima server. +#[derive(Args, Debug)] +pub struct ServerArgs { + /// Server port + #[arg(long, env = "PORT", default_value = "8080")] + pub port: u16, + + /// Path to parakeet model directory + #[arg( + long, + env = "PARAKEET_MODEL_DIR", + default_value = "models/parakeet-tdt-0.6b-v3" + )] + pub parakeet_model_dir: String, + + /// Path to parakeet EOU model directory + #[arg( + long, + env = "PARAKEET_EOU_DIR", + default_value = "models/realtime_eou_120m-v1-onnx" + )] + pub parakeet_eou_dir: String, + + /// Path to sortformer model + #[arg( + long, + env = "SORTFORMER_MODEL_PATH", + default_value = "models/diarization/diar_streaming_sortformer_4spk-v2.1.onnx" + )] + pub sortformer_model_path: String, + + /// PostgreSQL connection URI + #[arg(long, env = "POSTGRES_CONNECTION_URI")] + pub database_url: Option<String>, + + /// Log level (trace, debug, info, warn, error) + #[arg(short, long, default_value = "info")] + pub log_level: String, +} diff --git a/makima/src/daemon/cli/supervisor.rs b/makima/src/daemon/cli/supervisor.rs new file mode 100644 index 0000000..00c7ff4 --- /dev/null +++ b/makima/src/daemon/cli/supervisor.rs @@ -0,0 +1,146 @@ +//! Supervisor subcommand - contract orchestration commands. + +use clap::Args; +use uuid::Uuid; + +/// Common arguments for supervisor commands. +#[derive(Args, Debug, Clone)] +pub struct SupervisorArgs { + /// API URL + #[arg(long, env = "MAKIMA_API_URL", default_value = "http://localhost:8080", global = true)] + pub api_url: String, + + /// API key for authentication + #[arg(long, env = "MAKIMA_API_KEY", global = true)] + pub api_key: String, + + /// Current task ID (optional) + #[arg(long, env = "MAKIMA_TASK_ID", global = true)] + pub task_id: Option<Uuid>, + + /// Contract ID + #[arg(long, env = "MAKIMA_CONTRACT_ID", global = true)] + pub contract_id: Uuid, +} + +/// Arguments for spawn command. +#[derive(Args, Debug)] +pub struct SpawnArgs { + #[command(flatten)] + pub common: SupervisorArgs, + + /// Name of the task + pub name: String, + + /// Plan/description for the task + pub plan: String, + + /// Parent task ID to branch from + #[arg(long)] + pub parent: Option<Uuid>, + + /// Checkpoint SHA to start from + #[arg(long)] + pub checkpoint: Option<String>, +} + +/// Arguments for wait command. +#[derive(Args, Debug)] +pub struct WaitArgs { + #[command(flatten)] + pub common: SupervisorArgs, + + /// Task ID to wait for + pub task_id: Uuid, + + /// Timeout in seconds + #[arg(default_value = "300")] + pub timeout: i32, +} + +/// Arguments for read-file command. +#[derive(Args, Debug)] +pub struct ReadFileArgs { + #[command(flatten)] + pub common: SupervisorArgs, + + /// Task ID to read from + pub task_id: Uuid, + + /// File path to read + pub file_path: String, +} + +/// Arguments for branch command. +#[derive(Args, Debug)] +pub struct BranchArgs { + #[command(flatten)] + pub common: SupervisorArgs, + + /// Branch name to create + pub name: String, + + /// Reference (task ID or SHA) to branch from + #[arg(long)] + pub from: Option<String>, +} + +/// Arguments for merge command. +#[derive(Args, Debug)] +pub struct MergeArgs { + #[command(flatten)] + pub common: SupervisorArgs, + + /// Task ID to merge + pub task_id: Uuid, + + /// Target branch to merge into + #[arg(long)] + pub to: Option<String>, + + /// Squash commits on merge + #[arg(long)] + pub squash: bool, +} + +/// Arguments for pr command. +#[derive(Args, Debug)] +pub struct PrArgs { + #[command(flatten)] + pub common: SupervisorArgs, + + /// Task ID to create PR for + pub task_id: Uuid, + + /// PR title + #[arg(long)] + pub title: String, + + /// PR body/description + #[arg(long)] + pub body: Option<String>, + + /// Base branch (default: main) + #[arg(long, default_value = "main")] + pub base: String, +} + +/// Arguments for diff command. +#[derive(Args, Debug)] +pub struct DiffArgs { + #[command(flatten)] + pub common: SupervisorArgs, + + /// Task ID to get diff for + pub task_id: Uuid, +} + +/// Arguments for checkpoint command. +#[derive(Args, Debug)] +pub struct CheckpointArgs { + #[command(flatten)] + pub common: SupervisorArgs, + + /// Checkpoint message + pub message: String, +} diff --git a/makima/src/daemon/config.rs b/makima/src/daemon/config.rs new file mode 100644 index 0000000..866ee70 --- /dev/null +++ b/makima/src/daemon/config.rs @@ -0,0 +1,555 @@ +//! Configuration management for the makima daemon. + +use config::{Config, Environment, File}; +use serde::Deserialize; +use std::collections::HashMap; +use std::path::PathBuf; + +/// Root daemon configuration. +#[derive(Debug, Clone, Deserialize)] +pub struct DaemonConfig { + /// Server connection settings. + #[serde(default)] + pub server: ServerConfig, + + /// Worktree settings. + #[serde(default)] + pub worktree: WorktreeConfig, + + /// Process settings. + #[serde(default)] + pub process: ProcessConfig, + + /// Local database settings. + #[serde(default)] + pub local_db: LocalDbConfig, + + /// Logging settings. + #[serde(default)] + pub logging: LoggingConfig, + + /// Repositories to auto-clone on startup. + #[serde(default)] + pub repos: ReposConfig, +} + +/// Server connection configuration. +#[derive(Debug, Clone, Deserialize)] +#[serde(default)] +pub struct ServerConfig { + /// WebSocket URL of makima server (e.g., ws://localhost:8080 or wss://makima.example.com). + /// Defaults to wss://api.makima.jp. + #[serde(default = "default_server_url")] + pub url: String, + + /// API key for authentication. + #[serde(default, alias = "apikey")] + pub api_key: String, + + /// Heartbeat interval in seconds. + #[serde(default = "default_heartbeat_interval", alias = "heartbeatintervalsecs")] + pub heartbeat_interval_secs: u64, + + /// Reconnect interval in seconds after connection loss. + #[serde(default = "default_reconnect_interval", alias = "reconnectintervalsecs")] + pub reconnect_interval_secs: u64, + + /// Maximum reconnect attempts before giving up (0 = infinite). + #[serde(default, alias = "maxreconnectattempts")] + pub max_reconnect_attempts: u32, +} + +fn default_heartbeat_interval() -> u64 { + 30 +} + +fn default_reconnect_interval() -> u64 { + 5 +} + +fn default_server_url() -> String { + "wss://api.makima.jp".to_string() +} + +impl Default for ServerConfig { + fn default() -> Self { + Self { + url: default_server_url(), + api_key: String::new(), + heartbeat_interval_secs: default_heartbeat_interval(), + reconnect_interval_secs: default_reconnect_interval(), + max_reconnect_attempts: 0, + } + } +} + +/// Worktree configuration for task isolation. +#[derive(Debug, Clone, Deserialize)] +#[serde(default)] +pub struct WorktreeConfig { + /// Base directory for worktrees (~/.makima/worktrees). + #[serde(default = "default_worktree_base_dir", alias = "basedir")] + pub base_dir: PathBuf, + + /// Base directory for cloned repositories (~/.makima/repos). + #[serde(default = "default_repos_base_dir", alias = "reposdir")] + pub repos_dir: PathBuf, + + /// Branch prefix for task branches. + #[serde(default = "default_branch_prefix", alias = "branchprefix")] + pub branch_prefix: String, + + /// Clean up worktrees on daemon start. + #[serde(default, alias = "cleanuponstart")] + pub cleanup_on_start: bool, + + /// Default target repository path for pushing completed branches. + /// Used when task.target_repo_path is not set. + #[serde(default, alias = "defaulttargetrepo")] + pub default_target_repo: Option<PathBuf>, +} + +fn default_worktree_base_dir() -> PathBuf { + dirs::home_dir() + .unwrap_or_else(|| PathBuf::from(".")) + .join(".makima") + .join("worktrees") +} + +fn default_repos_base_dir() -> PathBuf { + dirs::home_dir() + .unwrap_or_else(|| PathBuf::from(".")) + .join(".makima") + .join("repos") +} + +fn default_branch_prefix() -> String { + "makima/task-".to_string() +} + +impl Default for WorktreeConfig { + fn default() -> Self { + Self { + base_dir: default_worktree_base_dir(), + repos_dir: default_repos_base_dir(), + branch_prefix: default_branch_prefix(), + cleanup_on_start: false, + default_target_repo: None, + } + } +} + +/// Process configuration for Claude Code subprocess execution. +#[derive(Debug, Clone, Deserialize)] +#[serde(default)] +pub struct ProcessConfig { + /// Path or command for Claude Code CLI. + #[serde(default = "default_claude_command", alias = "claudecommand")] + pub claude_command: String, + + /// Additional arguments to pass to Claude Code. + /// These are added after the default arguments. + #[serde(default, alias = "claudeargs")] + pub claude_args: Vec<String>, + + /// Arguments to pass before the default arguments. + /// Useful for overriding defaults. + #[serde(default, alias = "claudepreargs")] + pub claude_pre_args: Vec<String>, + + /// Skip the --dangerously-skip-permissions flag (default: false). + /// Set to true if you want to use Claude's permission system. + #[serde(default, alias = "enablepermissions")] + pub enable_permissions: bool, + + /// Skip the --verbose flag (default: false). + #[serde(default, alias = "disableverbose")] + pub disable_verbose: bool, + + /// Maximum concurrent tasks. + #[serde(default = "default_max_tasks", alias = "maxconcurrenttasks")] + pub max_concurrent_tasks: u32, + + /// Default timeout for tasks in seconds (0 = no timeout). + #[serde(default, alias = "defaulttimeoutsecs")] + pub default_timeout_secs: u64, + + /// Additional environment variables to pass to Claude Code. + #[serde(default, alias = "envvars")] + pub env_vars: HashMap<String, String>, +} + +fn default_claude_command() -> String { + "claude".to_string() +} + +fn default_max_tasks() -> u32 { + 4 +} + +impl Default for ProcessConfig { + fn default() -> Self { + Self { + claude_command: default_claude_command(), + claude_args: Vec::new(), + claude_pre_args: Vec::new(), + enable_permissions: false, + disable_verbose: false, + max_concurrent_tasks: default_max_tasks(), + default_timeout_secs: 0, + env_vars: HashMap::new(), + } + } +} + +/// Local database configuration. +#[derive(Debug, Clone, Deserialize)] +#[serde(default)] +pub struct LocalDbConfig { + /// Path to local SQLite database. + #[serde(default = "default_db_path")] + pub path: PathBuf, +} + +impl Default for LocalDbConfig { + fn default() -> Self { + Self { + path: default_db_path(), + } + } +} + +fn default_db_path() -> PathBuf { + dirs::home_dir() + .unwrap_or_else(|| PathBuf::from(".")) + .join(".makima") + .join("daemon.db") +} + +/// Logging configuration. +#[derive(Debug, Clone, Deserialize, Default)] +pub struct LoggingConfig { + /// Log level: "trace", "debug", "info", "warn", "error". + #[serde(default = "default_log_level")] + pub level: String, + + /// Log format: "pretty" or "json". + #[serde(default = "default_log_format")] + pub format: String, +} + +fn default_log_level() -> String { + "info".to_string() +} + +fn default_log_format() -> String { + "pretty".to_string() +} + +/// Repository auto-clone configuration. +#[derive(Debug, Clone, Deserialize, Default)] +pub struct ReposConfig { + /// Directory to clone repositories into (default: ~/.makima/home). + #[serde(default = "default_home_dir")] + pub home_dir: PathBuf, + + /// List of repositories to auto-clone on startup. + /// Each entry can be a URL (e.g., "https://github.com/user/repo.git") + /// or a shorthand (e.g., "github:user/repo"). + #[serde(default, alias = "autoclone")] + pub auto_clone: Vec<RepoEntry>, +} + +/// A repository entry for auto-cloning. +#[derive(Debug, Clone, Deserialize)] +#[serde(untagged)] +pub enum RepoEntry { + /// Simple URL string. + Url(String), + /// Detailed configuration. + Config { + /// Repository URL. + url: String, + /// Custom directory name (defaults to repo name from URL). + #[serde(default)] + name: Option<String>, + /// Branch to checkout after cloning (defaults to default branch). + #[serde(default)] + branch: Option<String>, + /// Whether to do a shallow clone (default: false). + #[serde(default)] + shallow: bool, + }, +} + +impl RepoEntry { + /// Get the URL for this repo entry. + pub fn url(&self) -> &str { + match self { + RepoEntry::Url(url) => url, + RepoEntry::Config { url, .. } => url, + } + } + + /// Get the custom name, if any. + pub fn name(&self) -> Option<&str> { + match self { + RepoEntry::Url(_) => None, + RepoEntry::Config { name, .. } => name.as_deref(), + } + } + + /// Get the branch to checkout, if any. + pub fn branch(&self) -> Option<&str> { + match self { + RepoEntry::Url(_) => None, + RepoEntry::Config { branch, .. } => branch.as_deref(), + } + } + + /// Whether to do a shallow clone. + pub fn shallow(&self) -> bool { + match self { + RepoEntry::Url(_) => false, + RepoEntry::Config { shallow, .. } => *shallow, + } + } + + /// Get the directory name to use (either custom name or derived from URL). + pub fn dir_name(&self) -> Option<String> { + if let Some(name) = self.name() { + return Some(name.to_string()); + } + + // Derive from URL + let url = self.url(); + + // Handle shorthand formats + let url = if url.starts_with("github:") { + url.strip_prefix("github:").unwrap_or(url) + } else if url.starts_with("gitlab:") { + url.strip_prefix("gitlab:").unwrap_or(url) + } else { + url + }; + + // Extract repo name from URL + url.trim_end_matches('/') + .trim_end_matches(".git") + .rsplit('/') + .next() + .map(|s| s.to_string()) + } + + /// Expand the URL (e.g., convert shorthand to full URL). + pub fn expanded_url(&self) -> String { + let url = self.url(); + + if url.starts_with("github:") { + format!("https://github.com/{}.git", url.strip_prefix("github:").unwrap_or("")) + } else if url.starts_with("gitlab:") { + format!("https://gitlab.com/{}.git", url.strip_prefix("gitlab:").unwrap_or("")) + } else { + url.to_string() + } + } +} + +fn default_home_dir() -> PathBuf { + dirs::home_dir() + .unwrap_or_else(|| PathBuf::from(".")) + .join(".makima") + .join("home") +} + +impl DaemonConfig { + /// Load configuration from files and environment variables. + /// + /// Configuration sources (in order of precedence): + /// 1. Environment variables (MAKIMA_API_KEY, MAKIMA_DAEMON_SERVER_URL, etc.) + /// 2. ./makima-daemon.toml (current directory) + /// 3. ~/.config/makima-daemon/config.toml + /// 4. /etc/makima-daemon/config.toml (Linux only) + /// + /// Environment variable examples: + /// - MAKIMA_API_KEY=your-api-key (preferred) + /// - MAKIMA_DAEMON_SERVER_URL=ws://localhost:8080 + /// - MAKIMA_DAEMON_PROCESS_MAXCONCURRENTTASKS=4 + pub fn load() -> Result<Self, config::ConfigError> { + Self::load_from_path(None) + } + + /// Load configuration from a specific path plus standard sources. + fn load_from_path(config_path: Option<&std::path::Path>) -> Result<Self, config::ConfigError> { + let mut builder = Config::builder(); + + // System-wide config (Linux only) + #[cfg(target_os = "linux")] + { + builder = builder.add_source( + File::with_name("/etc/makima-daemon/config").required(false), + ); + } + + // User config + if let Some(config_dir) = dirs::config_dir() { + let user_config = config_dir.join("makima-daemon").join("config"); + builder = builder.add_source( + File::with_name(user_config.to_str().unwrap_or("")).required(false), + ); + } + + // Local config + builder = builder.add_source(File::with_name("makima-daemon").required(false)); + + // Custom config file (if provided) + if let Some(path) = config_path { + builder = builder.add_source( + File::with_name(path.to_str().unwrap_or("")).required(true), + ); + } + + // Environment variables with underscore separator for nesting + // e.g., MAKIMA_DAEMON_SERVER_URL -> server.url + // MAKIMA_DAEMON_SERVER_APIKEY -> server.api_key + builder = builder.add_source( + Environment::with_prefix("MAKIMA_DAEMON") + .separator("_") + .try_parsing(true), + ); + + let config = builder.build()?; + let mut config: DaemonConfig = config.try_deserialize()?; + + // Check for MAKIMA_API_KEY environment variable (preferred over MAKIMA_DAEMON_SERVER_APIKEY) + if let Ok(api_key) = std::env::var("MAKIMA_API_KEY") { + config.server.api_key = api_key; + } + + // Validate required fields (don't validate here - let load_with_cli do final validation) + Ok(config) + } + + /// Validate that required configuration fields are set. + pub fn validate(&self) -> Result<(), config::ConfigError> { + if self.server.api_key.is_empty() { + return Err(config::ConfigError::Message( + "API key is required. Set via MAKIMA_API_KEY, config file, or --api-key".to_string() + )); + } + Ok(()) + } + + /// Load configuration with CLI argument overrides. + /// + /// Configuration sources (in order of precedence, highest first): + /// 1. CLI arguments + /// 2. Environment variables + /// 3. Custom config file (if --config specified) + /// 4. ./makima-daemon.toml (current directory) + /// 5. ~/.config/makima-daemon/config.toml + /// 6. /etc/makima-daemon/config.toml (Linux only) + /// 7. Default values + pub fn load_with_cli(cli: &super::cli::daemon::DaemonArgs) -> Result<Self, config::ConfigError> { + Self::load_with_daemon_args(cli) + } + + /// Load configuration from various sources with daemon CLI overrides. + pub fn load_with_daemon_args(args: &super::cli::daemon::DaemonArgs) -> Result<Self, config::ConfigError> { + // Load base config (with optional custom config file) + let mut config = Self::load_from_path(args.config.as_deref())?; + + // Apply CLI overrides (highest priority) + if let Some(ref repos_dir) = args.repos_dir { + config.worktree.repos_dir = repos_dir.clone(); + } + if let Some(ref worktrees_dir) = args.worktrees_dir { + config.worktree.base_dir = worktrees_dir.clone(); + } + if let Some(ref server_url) = args.server_url { + config.server.url = server_url.clone(); + } + if let Some(ref api_key) = args.api_key { + config.server.api_key = api_key.clone(); + } + if let Some(max_tasks) = args.max_tasks { + config.process.max_concurrent_tasks = max_tasks; + } + // Log level is always set (has default) + config.logging.level = args.log_level.clone(); + + // Validate required fields after all sources are merged + config.validate()?; + + Ok(config) + } + + /// Create a minimal config for testing. + #[cfg(test)] + pub fn test_config() -> Self { + Self { + server: ServerConfig { + url: "ws://localhost:8080".to_string(), + api_key: "test-key".to_string(), + heartbeat_interval_secs: 30, + reconnect_interval_secs: 5, + max_reconnect_attempts: 0, + }, + worktree: WorktreeConfig { + base_dir: PathBuf::from("/tmp/makima-daemon-test/worktrees"), + repos_dir: PathBuf::from("/tmp/makima-daemon-test/repos"), + branch_prefix: "makima/task-".to_string(), + cleanup_on_start: true, + default_target_repo: None, + }, + process: ProcessConfig { + claude_command: "claude".to_string(), + claude_args: Vec::new(), + claude_pre_args: Vec::new(), + enable_permissions: false, + disable_verbose: false, + max_concurrent_tasks: 2, + default_timeout_secs: 0, + env_vars: HashMap::new(), + }, + local_db: LocalDbConfig { + path: PathBuf::from("/tmp/makima-daemon-test/state.db"), + }, + logging: LoggingConfig::default(), + repos: ReposConfig::default(), + } + } +} + +/// Helper module for dirs crate (minimal subset). +mod dirs { + use std::path::PathBuf; + + pub fn home_dir() -> Option<PathBuf> { + std::env::var("HOME").ok().map(PathBuf::from) + } + + pub fn config_dir() -> Option<PathBuf> { + #[cfg(target_os = "macos")] + { + std::env::var("HOME") + .ok() + .map(|h| PathBuf::from(h).join("Library").join("Application Support")) + } + #[cfg(target_os = "linux")] + { + std::env::var("XDG_CONFIG_HOME") + .ok() + .map(PathBuf::from) + .or_else(|| std::env::var("HOME").ok().map(|h| PathBuf::from(h).join(".config"))) + } + #[cfg(target_os = "windows")] + { + std::env::var("APPDATA").ok().map(PathBuf::from) + } + #[cfg(not(any(target_os = "macos", target_os = "linux", target_os = "windows")))] + { + None + } + } +} diff --git a/makima/src/daemon/db/local.rs b/makima/src/daemon/db/local.rs new file mode 100644 index 0000000..f3ed45a --- /dev/null +++ b/makima/src/daemon/db/local.rs @@ -0,0 +1,391 @@ +//! Local SQLite database for crash recovery and state persistence. + +use std::path::Path; + +use chrono::{DateTime, Utc}; +use rusqlite::{params, Connection, Result as SqliteResult}; +use uuid::Uuid; + +use crate::daemon::task::TaskState; + +/// Local task record for persistence. +#[derive(Debug, Clone)] +pub struct LocalTask { + pub id: Uuid, + pub server_task_id: Uuid, + pub state: TaskState, + pub container_id: Option<String>, + pub overlay_path: Option<String>, + pub repo_url: Option<String>, + pub base_branch: Option<String>, + pub plan: String, + pub created_at: DateTime<Utc>, + pub started_at: Option<DateTime<Utc>>, + pub completed_at: Option<DateTime<Utc>>, + pub error_message: Option<String>, +} + +/// Buffered output for reliable delivery. +#[derive(Debug, Clone)] +pub struct BufferedOutput { + pub id: i64, + pub task_id: Uuid, + pub output: String, + pub is_partial: bool, + pub timestamp: DateTime<Utc>, +} + +/// Local database for daemon state persistence. +pub struct LocalDb { + conn: Connection, +} + +impl LocalDb { + /// Open or create the local database. + pub fn open(path: &Path) -> SqliteResult<Self> { + // Create parent directory if needed + if let Some(parent) = path.parent() { + std::fs::create_dir_all(parent).ok(); + } + + let conn = Connection::open(path)?; + + // Initialize schema + conn.execute_batch(Self::schema())?; + + Ok(Self { conn }) + } + + /// Open an in-memory database (for testing). + #[cfg(test)] + pub fn open_memory() -> SqliteResult<Self> { + let conn = Connection::open_in_memory()?; + conn.execute_batch(Self::schema())?; + Ok(Self { conn }) + } + + /// Database schema. + fn schema() -> &'static str { + r#" + -- Local task state for crash recovery + CREATE TABLE IF NOT EXISTS tasks ( + id TEXT PRIMARY KEY, + server_task_id TEXT NOT NULL, + state TEXT NOT NULL, + container_id TEXT, + overlay_path TEXT, + repo_url TEXT, + base_branch TEXT, + plan TEXT NOT NULL, + created_at TEXT NOT NULL, + started_at TEXT, + completed_at TEXT, + error_message TEXT + ); + + -- Buffered output for reliable delivery + CREATE TABLE IF NOT EXISTS output_buffer ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + task_id TEXT NOT NULL, + output TEXT NOT NULL, + is_partial INTEGER NOT NULL, + timestamp TEXT NOT NULL, + sent INTEGER NOT NULL DEFAULT 0 + ); + + -- Daemon state key-value store + CREATE TABLE IF NOT EXISTS daemon_state ( + key TEXT PRIMARY KEY, + value TEXT NOT NULL, + updated_at TEXT NOT NULL + ); + + -- Indexes + CREATE INDEX IF NOT EXISTS idx_tasks_state ON tasks(state); + CREATE INDEX IF NOT EXISTS idx_output_buffer_sent ON output_buffer(sent, id); + CREATE INDEX IF NOT EXISTS idx_output_buffer_task ON output_buffer(task_id); + "# + } + + /// Save a task. + pub fn save_task(&self, task: &LocalTask) -> SqliteResult<()> { + self.conn.execute( + r#" + INSERT OR REPLACE INTO tasks + (id, server_task_id, state, container_id, overlay_path, repo_url, base_branch, plan, created_at, started_at, completed_at, error_message) + VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12) + "#, + params![ + task.id.to_string(), + task.server_task_id.to_string(), + task.state.as_str(), + task.container_id, + task.overlay_path, + task.repo_url, + task.base_branch, + task.plan, + task.created_at.to_rfc3339(), + task.started_at.map(|t| t.to_rfc3339()), + task.completed_at.map(|t| t.to_rfc3339()), + task.error_message, + ], + )?; + Ok(()) + } + + /// Get a task by ID. + pub fn get_task(&self, id: Uuid) -> SqliteResult<Option<LocalTask>> { + let mut stmt = self.conn.prepare( + "SELECT id, server_task_id, state, container_id, overlay_path, repo_url, base_branch, plan, created_at, started_at, completed_at, error_message FROM tasks WHERE id = ?1", + )?; + + let mut rows = stmt.query(params![id.to_string()])?; + + if let Some(row) = rows.next()? { + Ok(Some(Self::task_from_row(row)?)) + } else { + Ok(None) + } + } + + /// Get all running/active tasks (for recovery). + pub fn get_active_tasks(&self) -> SqliteResult<Vec<LocalTask>> { + let mut stmt = self.conn.prepare( + r#" + SELECT id, server_task_id, state, container_id, overlay_path, repo_url, base_branch, plan, created_at, started_at, completed_at, error_message + FROM tasks + WHERE state IN ('initializing', 'starting', 'running', 'paused', 'blocked') + "#, + )?; + + let rows = stmt.query_map([], |row| Self::task_from_row(row))?; + + rows.collect() + } + + /// Delete a task. + pub fn delete_task(&self, id: Uuid) -> SqliteResult<()> { + self.conn.execute( + "DELETE FROM tasks WHERE id = ?1", + params![id.to_string()], + )?; + Ok(()) + } + + /// Update task state. + pub fn update_task_state(&self, id: Uuid, state: TaskState) -> SqliteResult<()> { + self.conn.execute( + "UPDATE tasks SET state = ?2 WHERE id = ?1", + params![id.to_string(), state.as_str()], + )?; + Ok(()) + } + + /// Buffer output for reliable delivery. + pub fn buffer_output(&self, task_id: Uuid, output: &str, is_partial: bool) -> SqliteResult<i64> { + self.conn.execute( + r#" + INSERT INTO output_buffer (task_id, output, is_partial, timestamp, sent) + VALUES (?1, ?2, ?3, datetime('now'), 0) + "#, + params![task_id.to_string(), output, is_partial as i32], + )?; + Ok(self.conn.last_insert_rowid()) + } + + /// Get unsent outputs. + pub fn get_unsent_outputs(&self, limit: i64) -> SqliteResult<Vec<BufferedOutput>> { + let mut stmt = self.conn.prepare( + r#" + SELECT id, task_id, output, is_partial, timestamp + FROM output_buffer + WHERE sent = 0 + ORDER BY id + LIMIT ?1 + "#, + )?; + + let rows = stmt.query_map(params![limit], |row| { + let id: i64 = row.get(0)?; + let task_id_str: String = row.get(1)?; + let task_id = Uuid::parse_str(&task_id_str).unwrap_or_default(); + let output: String = row.get(2)?; + let is_partial: i32 = row.get(3)?; + let timestamp_str: String = row.get(4)?; + let timestamp = DateTime::parse_from_rfc3339(×tamp_str) + .map(|dt| dt.with_timezone(&Utc)) + .unwrap_or_else(|_| Utc::now()); + + Ok(BufferedOutput { + id, + task_id, + output, + is_partial: is_partial != 0, + timestamp, + }) + })?; + + rows.collect() + } + + /// Mark outputs as sent. + pub fn mark_outputs_sent(&self, ids: &[i64]) -> SqliteResult<()> { + if ids.is_empty() { + return Ok(()); + } + + let placeholders: Vec<&str> = ids.iter().map(|_| "?").collect(); + let sql = format!( + "UPDATE output_buffer SET sent = 1 WHERE id IN ({})", + placeholders.join(",") + ); + + let params: Vec<rusqlite::types::Value> = ids + .iter() + .map(|id| rusqlite::types::Value::Integer(*id)) + .collect(); + + self.conn.execute(&sql, rusqlite::params_from_iter(params))?; + Ok(()) + } + + /// Clean up old sent outputs. + pub fn cleanup_sent_outputs(&self, older_than_hours: i64) -> SqliteResult<usize> { + let result = self.conn.execute( + r#" + DELETE FROM output_buffer + WHERE sent = 1 AND timestamp < datetime('now', ?1 || ' hours') + "#, + params![format!("-{}", older_than_hours)], + )?; + Ok(result) + } + + /// Get daemon state value. + pub fn get_state(&self, key: &str) -> SqliteResult<Option<String>> { + let mut stmt = self.conn.prepare( + "SELECT value FROM daemon_state WHERE key = ?1", + )?; + + let mut rows = stmt.query(params![key])?; + + if let Some(row) = rows.next()? { + let value: String = row.get(0)?; + Ok(Some(value)) + } else { + Ok(None) + } + } + + /// Set daemon state value. + pub fn set_state(&self, key: &str, value: &str) -> SqliteResult<()> { + self.conn.execute( + r#" + INSERT OR REPLACE INTO daemon_state (key, value, updated_at) + VALUES (?1, ?2, datetime('now')) + "#, + params![key, value], + )?; + Ok(()) + } + + /// Parse a task from a database row. + fn task_from_row(row: &rusqlite::Row) -> SqliteResult<LocalTask> { + let id_str: String = row.get(0)?; + let server_task_id_str: String = row.get(1)?; + let state_str: String = row.get(2)?; + let container_id: Option<String> = row.get(3)?; + let overlay_path: Option<String> = row.get(4)?; + let repo_url: Option<String> = row.get(5)?; + let base_branch: Option<String> = row.get(6)?; + let plan: String = row.get(7)?; + let created_at_str: String = row.get(8)?; + let started_at_str: Option<String> = row.get(9)?; + let completed_at_str: Option<String> = row.get(10)?; + let error_message: Option<String> = row.get(11)?; + + let id = Uuid::parse_str(&id_str).unwrap_or_default(); + let server_task_id = Uuid::parse_str(&server_task_id_str).unwrap_or_default(); + let state = TaskState::from_str(&state_str).unwrap_or_default(); + let created_at = DateTime::parse_from_rfc3339(&created_at_str) + .map(|dt| dt.with_timezone(&Utc)) + .unwrap_or_else(|_| Utc::now()); + let started_at = started_at_str + .and_then(|s| DateTime::parse_from_rfc3339(&s).ok()) + .map(|dt| dt.with_timezone(&Utc)); + let completed_at = completed_at_str + .and_then(|s| DateTime::parse_from_rfc3339(&s).ok()) + .map(|dt| dt.with_timezone(&Utc)); + + Ok(LocalTask { + id, + server_task_id, + state, + container_id, + overlay_path, + repo_url, + base_branch, + plan, + created_at, + started_at, + completed_at, + error_message, + }) + } +} + +#[cfg(test)] +mod tests { + use crate::daemon::*; + + #[test] + fn test_open_memory() { + let db = LocalDb::open_memory().unwrap(); + assert!(db.get_active_tasks().unwrap().is_empty()); + } + + #[test] + fn test_save_and_get_task() { + let db = LocalDb::open_memory().unwrap(); + + let task = LocalTask { + id: Uuid::new_v4(), + server_task_id: Uuid::new_v4(), + state: TaskState::Running, + container_id: Some("abc123".to_string()), + overlay_path: Some("/tmp/overlay".to_string()), + repo_url: Some("https://github.com/test/repo".to_string()), + base_branch: Some("main".to_string()), + plan: "Build the feature".to_string(), + created_at: Utc::now(), + started_at: Some(Utc::now()), + completed_at: None, + error_message: None, + }; + + db.save_task(&task).unwrap(); + + let loaded = db.get_task(task.id).unwrap().unwrap(); + assert_eq!(loaded.id, task.id); + assert_eq!(loaded.state, TaskState::Running); + assert_eq!(loaded.plan, "Build the feature"); + } + + #[test] + fn test_output_buffer() { + let db = LocalDb::open_memory().unwrap(); + let task_id = Uuid::new_v4(); + + db.buffer_output(task_id, "line 1", false).unwrap(); + db.buffer_output(task_id, "line 2", false).unwrap(); + + let unsent = db.get_unsent_outputs(10).unwrap(); + assert_eq!(unsent.len(), 2); + + let ids: Vec<i64> = unsent.iter().map(|o| o.id).collect(); + db.mark_outputs_sent(&ids).unwrap(); + + let unsent = db.get_unsent_outputs(10).unwrap(); + assert!(unsent.is_empty()); + } +} diff --git a/makima/src/daemon/db/mod.rs b/makima/src/daemon/db/mod.rs new file mode 100644 index 0000000..2c6e0f3 --- /dev/null +++ b/makima/src/daemon/db/mod.rs @@ -0,0 +1,5 @@ +//! Local database for daemon state persistence. + +pub mod local; + +pub use local::{BufferedOutput, LocalDb, LocalTask}; diff --git a/makima/src/daemon/error.rs b/makima/src/daemon/error.rs new file mode 100644 index 0000000..b993169 --- /dev/null +++ b/makima/src/daemon/error.rs @@ -0,0 +1,75 @@ +//! Error types for the makima daemon. + +use thiserror::Error; +use uuid::Uuid; + +/// Top-level daemon error type. +#[derive(Error, Debug)] +pub enum DaemonError { + #[error("WebSocket error: {0}")] + WebSocket(#[from] tokio_tungstenite::tungstenite::Error), + + #[error("Worktree error: {0}")] + Worktree(#[from] crate::daemon::worktree::WorktreeError), + + #[error("Process error: {0}")] + Process(#[from] crate::daemon::process::ClaudeProcessError), + + #[error("Task error: {0}")] + Task(#[from] TaskError), + + #[error("Configuration error: {0}")] + Config(#[from] config::ConfigError), + + #[error("Database error: {0}")] + Database(#[from] rusqlite::Error), + + #[error("IO error: {0}")] + Io(#[from] std::io::Error), + + #[error("JSON error: {0}")] + Json(#[from] serde_json::Error), + + #[error("Authentication failed: {0}")] + AuthFailed(String), + + #[error("Connection lost")] + ConnectionLost, + + #[error("Server error: {code} - {message}")] + ServerError { code: String, message: String }, +} + +/// Task management errors. +#[derive(Error, Debug)] +pub enum TaskError { + #[error("Task not found: {0}")] + NotFound(Uuid), + + #[error("Invalid state transition from {from} to {to}")] + InvalidStateTransition { from: String, to: String }, + + #[error("Concurrency limit reached")] + ConcurrencyLimit, + + #[error("Task already exists: {0}")] + AlreadyExists(Uuid), + + #[error("Task not running: {0}")] + NotRunning(Uuid), + + #[error("Failed to send message to task: {0}")] + MessageFailed(String), + + #[error("Task setup failed: {0}")] + SetupFailed(String), + + #[error("Task execution failed: {0}")] + ExecutionFailed(String), +} + +/// Result type alias for daemon operations. +pub type Result<T> = std::result::Result<T, DaemonError>; + +/// Result type alias for task operations. +pub type TaskResult<T> = std::result::Result<T, TaskError>; diff --git a/makima/src/daemon/mod.rs b/makima/src/daemon/mod.rs new file mode 100644 index 0000000..d7ec3f0 --- /dev/null +++ b/makima/src/daemon/mod.rs @@ -0,0 +1,22 @@ +//! Makima CLI - Unified CLI for server, daemon, and task management. +//! +//! This crate provides: +//! - `makima server` - Run the makima server +//! - `makima daemon` - Run the daemon (connect to server, manage tasks) +//! - `makima supervisor` - Contract orchestration commands +//! - `makima contract` - Task-contract interaction commands + +pub mod api; +pub mod cli; +pub mod config; +pub mod db; +pub mod error; +pub mod process; +pub mod task; +pub mod temp; +pub mod worktree; +pub mod ws; + +pub use cli::{Cli, Commands, ContractCommand, SupervisorCommand}; +pub use config::DaemonConfig; +pub use error::{DaemonError, Result}; diff --git a/makima/src/daemon/process/claude.rs b/makima/src/daemon/process/claude.rs new file mode 100644 index 0000000..93b097c --- /dev/null +++ b/makima/src/daemon/process/claude.rs @@ -0,0 +1,509 @@ +//! Claude Code process management. + +use std::collections::HashMap; +use std::path::Path; +use std::process::Stdio; +use std::sync::Arc; + +use futures::Stream; +use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader}; +use tokio::process::{Child, ChildStdin, Command}; +use tokio::sync::{mpsc, Mutex}; + +use super::claude_protocol::ClaudeInputMessage; + +/// Errors that can occur during Claude process management. +#[derive(Debug, thiserror::Error)] +pub enum ClaudeProcessError { + #[error("Failed to spawn Claude process: {0}")] + SpawnFailed(#[from] std::io::Error), + + #[error("Claude command not found: {0}")] + CommandNotFound(String), + + #[error("Process already exited")] + AlreadyExited, + + #[error("Failed to read output: {0}")] + OutputRead(String), +} + +/// A line of output from Claude Code. +#[derive(Debug, Clone)] +pub struct OutputLine { + /// The raw content of the line. + pub content: String, + /// Whether this is from stdout (true) or stderr (false). + pub is_stdout: bool, + /// Parsed JSON type if available (e.g., "system", "assistant", "result"). + pub json_type: Option<String>, +} + +impl OutputLine { + /// Create a new stdout output line. + pub fn stdout(content: String) -> Self { + let json_type = extract_json_type(&content); + Self { + content, + is_stdout: true, + json_type, + } + } + + /// Create a new stderr output line. + pub fn stderr(content: String) -> Self { + Self { + content, + is_stdout: false, + json_type: None, + } + } +} + +/// Extract the "type" field from a JSON line if present. +fn extract_json_type(line: &str) -> Option<String> { + // Quick check for JSON + if !line.starts_with('{') { + return None; + } + + // Try to parse and extract type + if let Ok(json) = serde_json::from_str::<serde_json::Value>(line) { + json.get("type") + .and_then(|v| v.as_str()) + .map(|s| s.to_string()) + } else { + None + } +} + +/// Handle to a running Claude Code process. +pub struct ClaudeProcess { + /// The child process. + child: Child, + /// Receiver for output lines. + output_rx: mpsc::Receiver<OutputLine>, + /// Stdin handle for sending input to the process (thread-safe). + stdin: Arc<Mutex<Option<ChildStdin>>>, +} + +impl ClaudeProcess { + /// Wait for the process to exit and return the exit code. + pub async fn wait(&mut self) -> Result<i64, ClaudeProcessError> { + let status = self.child.wait().await?; + Ok(status.code().unwrap_or(-1) as i64) + } + + /// Check if the process has exited. + pub fn try_wait(&mut self) -> Result<Option<i64>, ClaudeProcessError> { + match self.child.try_wait()? { + Some(status) => Ok(Some(status.code().unwrap_or(-1) as i64)), + None => Ok(None), + } + } + + /// Kill the process. + pub async fn kill(&mut self) -> Result<(), ClaudeProcessError> { + self.child.kill().await?; + Ok(()) + } + + /// Get the next output line, if available. + pub async fn next_output(&mut self) -> Option<OutputLine> { + self.output_rx.recv().await + } + + /// Send a raw message to the process via stdin. + /// + /// This can be used to provide input when Claude Code is waiting for user input. + pub async fn send_input(&self, message: &str) -> Result<(), ClaudeProcessError> { + let mut stdin_guard = self.stdin.lock().await; + if let Some(ref mut stdin) = *stdin_guard { + stdin + .write_all(message.as_bytes()) + .await + .map_err(|e| ClaudeProcessError::OutputRead(format!("Failed to write to stdin: {}", e)))?; + stdin + .write_all(b"\n") + .await + .map_err(|e| ClaudeProcessError::OutputRead(format!("Failed to write newline: {}", e)))?; + stdin + .flush() + .await + .map_err(|e| ClaudeProcessError::OutputRead(format!("Failed to flush stdin: {}", e)))?; + Ok(()) + } else { + Err(ClaudeProcessError::OutputRead("Stdin not available".to_string())) + } + } + + /// Send a user message to the process via stdin using JSON protocol. + /// + /// This is the preferred method when using `--input-format=stream-json`. + /// The message is serialized as JSON and sent as a single line. + pub async fn send_user_message(&self, content: &str) -> Result<(), ClaudeProcessError> { + let message = ClaudeInputMessage::user(content); + let json_line = message.to_json_line().map_err(|e| { + ClaudeProcessError::OutputRead(format!("Failed to serialize message: {}", e)) + })?; + + tracing::debug!(content_len = content.len(), "Sending user message to Claude process"); + + let mut stdin_guard = self.stdin.lock().await; + if let Some(ref mut stdin) = *stdin_guard { + stdin + .write_all(json_line.as_bytes()) + .await + .map_err(|e| ClaudeProcessError::OutputRead(format!("Failed to write to stdin: {}", e)))?; + stdin + .flush() + .await + .map_err(|e| ClaudeProcessError::OutputRead(format!("Failed to flush stdin: {}", e)))?; + Ok(()) + } else { + Err(ClaudeProcessError::OutputRead("Stdin not available".to_string())) + } + } + + /// Get a clone of the stdin handle for external use. + pub fn stdin_handle(&self) -> Arc<Mutex<Option<ChildStdin>>> { + Arc::clone(&self.stdin) + } + + /// Close stdin, signaling EOF to the process. + pub async fn close_stdin(&self) -> Result<(), ClaudeProcessError> { + let mut stdin_guard = self.stdin.lock().await; + if let Some(mut stdin) = stdin_guard.take() { + let _ = stdin.shutdown().await; + } + Ok(()) + } + + /// Convert to a stream of output lines. + pub fn into_stream(self) -> impl Stream<Item = OutputLine> { + futures::stream::unfold(self.output_rx, |mut rx| async move { + rx.recv().await.map(|line| (line, rx)) + }) + } +} + +/// Manages Claude Code process spawning. +pub struct ProcessManager { + /// Path to the claude command. + claude_command: String, + /// Additional arguments to pass to Claude Code (after defaults). + claude_args: Vec<String>, + /// Arguments to pass before defaults. + claude_pre_args: Vec<String>, + /// Whether to enable Claude's permission system (skip --dangerously-skip-permissions). + enable_permissions: bool, + /// Whether to disable verbose output. + disable_verbose: bool, + /// Default environment variables to pass. + default_env: HashMap<String, String>, +} + +impl Default for ProcessManager { + fn default() -> Self { + Self::new() + } +} + +impl ProcessManager { + /// Create a new ProcessManager with default settings. + pub fn new() -> Self { + Self { + claude_command: "claude".to_string(), + claude_args: Vec::new(), + claude_pre_args: Vec::new(), + enable_permissions: false, + disable_verbose: false, + default_env: HashMap::new(), + } + } + + /// Create a ProcessManager with a custom claude command path. + pub fn with_command(command: String) -> Self { + Self { + claude_command: command, + claude_args: Vec::new(), + claude_pre_args: Vec::new(), + enable_permissions: false, + disable_verbose: false, + default_env: HashMap::new(), + } + } + + /// Set additional arguments to pass after default arguments. + pub fn with_args(mut self, args: Vec<String>) -> Self { + self.claude_args = args; + self + } + + /// Set arguments to pass before default arguments. + pub fn with_pre_args(mut self, args: Vec<String>) -> Self { + self.claude_pre_args = args; + self + } + + /// Enable Claude's permission system (don't pass --dangerously-skip-permissions). + pub fn with_permissions_enabled(mut self, enabled: bool) -> Self { + self.enable_permissions = enabled; + self + } + + /// Disable verbose output. + pub fn with_verbose_disabled(mut self, disabled: bool) -> Self { + self.disable_verbose = disabled; + self + } + + /// Add default environment variables. + pub fn with_env(mut self, env: HashMap<String, String>) -> Self { + self.default_env = env; + self + } + + /// Get the claude command path. + pub fn claude_command(&self) -> &str { + &self.claude_command + } + + /// Spawn a Claude Code process to execute a plan. + /// + /// The process runs in the specified working directory with stream-json output format. + /// If `system_prompt` is provided, it will be passed via --system-prompt flag. + pub async fn spawn( + &self, + working_dir: &Path, + plan: &str, + extra_env: Option<HashMap<String, String>>, + ) -> Result<ClaudeProcess, ClaudeProcessError> { + self.spawn_with_system_prompt(working_dir, plan, extra_env, None).await + } + + /// Spawn a Claude Code process with an optional system prompt. + /// + /// The process runs in the specified working directory with stream-json output format. + /// If `system_prompt` is provided, it will be passed via --system-prompt flag as + /// behavioral constraints that Claude will treat as system-level instructions. + pub async fn spawn_with_system_prompt( + &self, + working_dir: &Path, + plan: &str, + extra_env: Option<HashMap<String, String>>, + system_prompt: Option<&str>, + ) -> Result<ClaudeProcess, ClaudeProcessError> { + tracing::info!( + working_dir = %working_dir.display(), + plan_len = plan.len(), + plan_preview = %if plan.len() > 200 { &plan[..200] } else { plan }, + has_system_prompt = system_prompt.is_some(), + "Spawning Claude Code process" + ); + + // Verify working directory exists + if !working_dir.exists() { + tracing::error!(working_dir = %working_dir.display(), "Working directory does not exist!"); + return Err(ClaudeProcessError::SpawnFailed(std::io::Error::new( + std::io::ErrorKind::NotFound, + format!("Working directory does not exist: {}", working_dir.display()), + ))); + } + + // Build environment + let mut env = self.default_env.clone(); + if let Some(extra) = extra_env { + env.extend(extra); + } + + // Build arguments list + let mut args = Vec::new(); + + // Pre-args (before defaults) + args.extend(self.claude_pre_args.clone()); + + // Required arguments for stream-json protocol + args.push("--output-format=stream-json".to_string()); + args.push("--input-format=stream-json".to_string()); + + // Optional default arguments + if !self.disable_verbose { + args.push("--verbose".to_string()); + } + if !self.enable_permissions { + args.push("--dangerously-skip-permissions".to_string()); + } + + // System prompt - passed via --system-prompt flag for system-level constraints + if let Some(prompt) = system_prompt { + args.push("--system-prompt".to_string()); + args.push(prompt.to_string()); + } + + // Additional user-configured arguments + args.extend(self.claude_args.clone()); + + tracing::debug!(args = ?args, "Claude command arguments"); + + // Spawn the process + let mut child = Command::new(&self.claude_command) + .args(&args) + .current_dir(working_dir) + .envs(env) + .stdin(Stdio::piped()) + .stdout(Stdio::piped()) + .stderr(Stdio::piped()) + .kill_on_drop(true) + .spawn() + .map_err(|e| { + if e.kind() == std::io::ErrorKind::NotFound { + ClaudeProcessError::CommandNotFound(self.claude_command.clone()) + } else { + ClaudeProcessError::SpawnFailed(e) + } + })?; + + // Create output channel + let (tx, rx) = mpsc::channel(1000); + + // Take stdout, stderr, and stdin + // With --input-format=stream-json, we keep stdin open for sending messages + let stdin = child.stdin.take(); + let stdin = Arc::new(Mutex::new(stdin)); + + let stdout = child.stdout.take().expect("stdout should be piped"); + let stderr = child.stderr.take().expect("stderr should be piped"); + + // Spawn task to read stdout + let tx_stdout = tx.clone(); + tokio::spawn(async move { + use tokio::io::AsyncReadExt; + let mut reader = BufReader::new(stdout); + let mut buffer = vec![0u8; 4096]; + let mut line_buffer = String::new(); + + loop { + // Try to read with a timeout to detect if we're stuck + match tokio::time::timeout( + tokio::time::Duration::from_secs(5), + reader.read(&mut buffer) + ).await { + Ok(Ok(0)) => { + // EOF + tracing::debug!("Claude stdout EOF"); + // Send any remaining content + if !line_buffer.is_empty() { + let _ = tx_stdout.send(OutputLine::stdout(line_buffer)).await; + } + break; + } + Ok(Ok(n)) => { + let chunk = String::from_utf8_lossy(&buffer[..n]); + tracing::debug!(bytes = n, chunk_preview = %if chunk.len() > 100 { &chunk[..100] } else { &chunk }, "Got stdout chunk from Claude"); + + // Accumulate into line buffer and emit complete lines + line_buffer.push_str(&chunk); + while let Some(newline_pos) = line_buffer.find('\n') { + let line = line_buffer[..newline_pos].to_string(); + line_buffer = line_buffer[newline_pos + 1..].to_string(); + if tx_stdout.send(OutputLine::stdout(line)).await.is_err() { + return; + } + } + } + Ok(Err(e)) => { + tracing::error!(error = %e, "Error reading Claude stdout"); + break; + } + Err(_) => { + // Timeout - no data for 5 seconds + tracing::warn!("No stdout data from Claude for 5 seconds"); + } + } + } + tracing::debug!("Claude stdout reader task ended"); + }); + + // Spawn task to read stderr + let tx_stderr = tx; + tokio::spawn(async move { + let reader = BufReader::new(stderr); + let mut lines = reader.lines(); + while let Ok(Some(line)) = lines.next_line().await { + tracing::debug!(line = %line, "Claude stderr"); + if tx_stderr.send(OutputLine::stderr(line)).await.is_err() { + break; + } + } + tracing::debug!("Claude stderr reader task ended"); + }); + + tracing::info!("Claude Code process spawned successfully"); + + let process = ClaudeProcess { + child, + output_rx: rx, + stdin, + }; + + // Send the initial plan as the first user message + tracing::info!(plan_len = plan.len(), "Sending initial plan to Claude via stdin"); + process.send_user_message(plan).await?; + + Ok(process) + } + + /// Check if the claude command is available. + pub async fn check_claude_available(&self) -> Result<String, ClaudeProcessError> { + let output = Command::new(&self.claude_command) + .arg("--version") + .output() + .await + .map_err(|e| { + if e.kind() == std::io::ErrorKind::NotFound { + ClaudeProcessError::CommandNotFound(self.claude_command.clone()) + } else { + ClaudeProcessError::SpawnFailed(e) + } + })?; + + if output.status.success() { + Ok(String::from_utf8_lossy(&output.stdout).trim().to_string()) + } else { + Err(ClaudeProcessError::CommandNotFound( + self.claude_command.clone(), + )) + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_extract_json_type() { + assert_eq!( + extract_json_type(r#"{"type":"system","subtype":"init"}"#), + Some("system".to_string()) + ); + assert_eq!( + extract_json_type(r#"{"type":"assistant","message":{}}"#), + Some("assistant".to_string()) + ); + assert_eq!(extract_json_type("not json"), None); + assert_eq!(extract_json_type(r#"{"no_type": true}"#), None); + } + + #[test] + fn test_output_line_creation() { + let line = OutputLine::stdout(r#"{"type":"result"}"#.to_string()); + assert!(line.is_stdout); + assert_eq!(line.json_type, Some("result".to_string())); + + let line = OutputLine::stderr("error message".to_string()); + assert!(!line.is_stdout); + assert_eq!(line.json_type, None); + } +} diff --git a/makima/src/daemon/process/claude_protocol.rs b/makima/src/daemon/process/claude_protocol.rs new file mode 100644 index 0000000..96e5377 --- /dev/null +++ b/makima/src/daemon/process/claude_protocol.rs @@ -0,0 +1,59 @@ +//! Claude Code JSON protocol types for stdin communication. +//! +//! When using `--input-format=stream-json`, Claude Code expects +//! newline-delimited JSON messages on stdin. + +use serde::Serialize; + +/// Message sent to Claude Code via stdin. +/// +/// Format based on Claude Code's stream-json input protocol. +#[derive(Debug, Clone, Serialize)] +#[serde(tag = "type", rename_all = "snake_case")] +pub enum ClaudeInputMessage { + /// A user message to send to Claude. + User { message: UserMessage }, +} + +/// The inner user message structure. +#[derive(Debug, Clone, Serialize)] +pub struct UserMessage { + /// Always "user" for user messages. + pub role: String, + /// The message content. + pub content: String, +} + +impl ClaudeInputMessage { + /// Create a new user message. + pub fn user(content: impl Into<String>) -> Self { + Self::User { + message: UserMessage { + role: "user".to_string(), + content: content.into(), + }, + } + } + + /// Serialize to a JSON string with trailing newline (NDJSON format). + pub fn to_json_line(&self) -> Result<String, serde_json::Error> { + let mut json = serde_json::to_string(self)?; + json.push('\n'); + Ok(json) + } +} + +#[cfg(test)] +mod tests { + use crate::daemon::*; + + #[test] + fn test_user_message_serialization() { + let msg = ClaudeInputMessage::user("Hello, Claude!"); + let json = msg.to_json_line().unwrap(); + + // Should produce: {"type":"user","message":{"role":"user","content":"Hello, Claude!"}}\n + assert!(json.starts_with(r#"{"type":"user","message":{"role":"user","content":"Hello, Claude!"}}"#)); + assert!(json.ends_with('\n')); + } +} diff --git a/makima/src/daemon/process/mod.rs b/makima/src/daemon/process/mod.rs new file mode 100644 index 0000000..814a3c5 --- /dev/null +++ b/makima/src/daemon/process/mod.rs @@ -0,0 +1,10 @@ +//! Process management for Claude Code subprocess execution. +//! +//! Spawns and manages Claude Code processes in worktree directories, +//! streaming JSON output back to the daemon. + +mod claude; +mod claude_protocol; + +pub use claude::{ClaudeProcess, ClaudeProcessError, OutputLine, ProcessManager}; +pub use claude_protocol::ClaudeInputMessage; diff --git a/makima/src/daemon/task/manager.rs b/makima/src/daemon/task/manager.rs new file mode 100644 index 0000000..8269083 --- /dev/null +++ b/makima/src/daemon/task/manager.rs @@ -0,0 +1,3215 @@ +//! Task lifecycle manager using git worktrees and Claude Code subprocesses. + +use std::collections::HashMap; +use std::path::PathBuf; +use std::sync::Arc; +use std::time::Instant; + +use rand::Rng; +use tokio::io::AsyncWriteExt; +use tokio::sync::{mpsc, RwLock, Semaphore}; +use uuid::Uuid; + +use std::collections::HashSet; + +use super::state::TaskState; +use crate::daemon::error::{DaemonError, TaskError, TaskResult}; +use crate::daemon::process::{ClaudeInputMessage, ProcessManager}; +use crate::daemon::temp::TempManager; +use crate::daemon::worktree::{is_new_repo_request, ConflictResolution, WorktreeInfo, WorktreeManager}; +use crate::daemon::ws::{BranchInfo, DaemonCommand, DaemonMessage}; + +/// Generate a secure random API key for orchestrator tool access. +fn generate_tool_key() -> String { + let mut rng = rand::thread_rng(); + let bytes: [u8; 32] = rng.r#gen(); + hex::encode(bytes) +} + +/// Check if output contains an OAuth authentication error. +fn is_oauth_auth_error(output: &str) -> bool { + // Match various authentication error patterns from Claude Code + if output.contains("Please run /login") { + return true; + } + if output.contains("Invalid API key") { + return true; + } + if output.contains("authentication_error") + && (output.contains("OAuth token has expired") + || output.contains("Please obtain a new token")) + { + return true; + } + false +} + +/// Extract OAuth URL from text (looks for claude.ai OAuth URLs). +fn extract_url(text: &str) -> Option<String> { + // Look for claude.ai OAuth URLs - try multiple patterns + let patterns = [ + "https://claude.ai/oauth", + "https://console.anthropic.com/oauth", + ]; + + for pattern in patterns { + if let Some(start) = text.find(pattern) { + let remaining = &text[start..]; + // Find the end of the URL - stop at: + // - Whitespace, common URL terminators, escape sequences + let end = remaining + .find(|c: char| { + c.is_whitespace() || c == '"' || c == '\'' || c == '>' || c == ')' || c == ']' || c == '\x07' || c == '\x1b' + }) + .unwrap_or(remaining.len()); + + let url = &remaining[..end]; + + // Also check if there's another https:// within the match (hyperlink duplication) + // Skip first 20 chars to avoid matching the start + let url = if url.len() > 30 { + if let Some(second_https) = url[20..].find("https://") { + &url[..second_https + 20] // Keep only first URL + } else { + url + } + } else { + url + }; + + if url.len() > 20 { + return Some(url.to_string()); + } + } + } + None +} + +/// Global storage for pending OAuth flow (only one can be active at a time per daemon) +static PENDING_AUTH_FLOW: std::sync::OnceLock<std::sync::Mutex<Option<std::sync::mpsc::Sender<String>>>> = std::sync::OnceLock::new(); + +fn get_auth_flow_storage() -> &'static std::sync::Mutex<Option<std::sync::mpsc::Sender<String>>> { + PENDING_AUTH_FLOW.get_or_init(|| std::sync::Mutex::new(None)) +} + +/// Send an auth code to the pending OAuth flow. +pub fn send_auth_code(code: &str) -> bool { + let storage = get_auth_flow_storage(); + if let Ok(mut guard) = storage.lock() { + if let Some(sender) = guard.take() { + if sender.send(code.to_string()).is_ok() { + tracing::info!("Auth code sent to setup-token process"); + return true; + } + } + } + tracing::warn!("No pending auth flow to send code to"); + false +} + +/// Spawn `claude setup-token` to initiate OAuth flow and capture the login URL. +/// This spawns the process in a PTY (required by Ink) and reads output until we find a URL. +/// The process continues running in the background waiting for auth completion. +async fn get_oauth_login_url(claude_command: &str) -> Option<String> { + use portable_pty::{native_pty_system, CommandBuilder, PtySize}; + use std::io::{Read, Write}; + + tracing::info!("Spawning claude setup-token in PTY to get OAuth login URL"); + + // Create a PTY - Ink requires a real terminal + let pty_system = native_pty_system(); + let pair = match pty_system.openpty(PtySize { + rows: 24, + cols: 200, // Wide enough to avoid line wrapping for long URLs/codes + pixel_width: 0, + pixel_height: 0, + }) { + Ok(pair) => pair, + Err(e) => { + tracing::error!(error = %e, "Failed to open PTY"); + return None; + } + }; + + // Build the command + let mut cmd = CommandBuilder::new(claude_command); + cmd.arg("setup-token"); + // Set environment variables to prevent browser from opening and disable fancy output + // Use "false" so the browser command fails, forcing setup-token to show URL and wait for manual input + cmd.env("BROWSER", "false"); + cmd.env("TERM", "dumb"); // Disable hyperlinks and fancy terminal features + cmd.env("NO_COLOR", "1"); // Disable colors + + // Spawn the process in the PTY + let mut child = match pair.slave.spawn_command(cmd) { + Ok(child) => child, + Err(e) => { + tracing::error!(error = %e, "Failed to spawn claude setup-token in PTY"); + return None; + } + }; + + // Get the reader and writer from the master side + let mut reader = match pair.master.try_clone_reader() { + Ok(reader) => reader, + Err(e) => { + tracing::error!(error = %e, "Failed to clone PTY reader"); + return None; + } + }; + + let mut writer = match pair.master.take_writer() { + Ok(writer) => writer, + Err(e) => { + tracing::error!(error = %e, "Failed to take PTY writer"); + return None; + } + }; + + // Create channels for communication + let (code_tx, code_rx) = std::sync::mpsc::channel::<String>(); + let (url_tx, url_rx) = std::sync::mpsc::channel::<String>(); + + // Store the code sender globally so it can be used when AUTH_CODE message arrives + { + let storage = get_auth_flow_storage(); + if let Ok(mut guard) = storage.lock() { + *guard = Some(code_tx); + } + } + + // Spawn reader thread - reads PTY output and sends URL when found + let reader_handle = std::thread::spawn(move || { + let mut buffer = [0u8; 4096]; + let mut accumulated = String::new(); + let mut url_sent = false; + let mut read_count = 0; + + tracing::info!("setup-token reader thread started"); + + loop { + match reader.read(&mut buffer) { + Ok(0) => { + tracing::info!("setup-token PTY EOF reached after {} reads", read_count); + break; + } + Ok(n) => { + read_count += 1; + let chunk = String::from_utf8_lossy(&buffer[..n]); + accumulated.push_str(&chunk); + + // Process complete lines + while let Some(newline_pos) = accumulated.find('\n') { + let line = accumulated[..newline_pos].to_string(); + accumulated = accumulated[newline_pos + 1..].to_string(); + + let clean_line = strip_ansi_codes(&line); + if !clean_line.trim().is_empty() { + tracing::info!(line = %clean_line, "setup-token output"); + } + + // Look for OAuth URL if not found yet + if !url_sent { + if let Some(url) = extract_url(&line) { + tracing::info!(url = %url, "Found OAuth login URL"); + let _ = url_tx.send(url); + url_sent = true; + } + } + + // Check for success/failure messages + if clean_line.contains("successfully") || clean_line.contains("authenticated") || clean_line.contains("Success") { + tracing::info!("Authentication appears successful!"); + } + if clean_line.contains("error") || clean_line.contains("failed") || clean_line.contains("invalid") { + tracing::warn!(line = %clean_line, "setup-token may have encountered an error"); + } + } + } + Err(e) => { + tracing::warn!(error = %e, "PTY read error after {} reads", read_count); + break; + } + } + } + tracing::info!("setup-token reader thread ended"); + }); + + // Spawn writer thread - waits for auth code and writes it to PTY + std::thread::spawn(move || { + tracing::info!("setup-token writer thread started, waiting for auth code (10 min timeout)"); + + // Wait for auth code from frontend (with long timeout - user needs time to authenticate) + match code_rx.recv_timeout(std::time::Duration::from_secs(600)) { + Ok(code) => { + tracing::info!(code_len = code.len(), "Received auth code from frontend, writing to PTY"); + // Write code followed by carriage return (Enter key in raw terminal mode) + let code_with_enter = format!("{}\r", code); + if let Err(e) = writer.write_all(code_with_enter.as_bytes()) { + tracing::error!(error = %e, "Failed to write auth code to PTY"); + } else if let Err(e) = writer.flush() { + tracing::error!(error = %e, "Failed to flush PTY writer"); + } else { + tracing::info!("Auth code written to setup-token PTY successfully"); + // Give Ink a moment to process, then send another Enter in case first was buffered + std::thread::sleep(std::time::Duration::from_millis(100)); + let _ = writer.write_all(b"\r"); + let _ = writer.flush(); + tracing::info!("Sent additional Enter keypress"); + } + } + Err(e) => { + tracing::info!(error = %e, "Auth code receive ended (timeout or channel closed)"); + } + } + + // Wait for reader thread to finish + tracing::debug!("Waiting for reader thread to finish..."); + let _ = reader_handle.join(); + + // Wait for child to fully exit + tracing::debug!("Waiting for setup-token child process to exit..."); + match child.wait() { + Ok(status) => { + tracing::info!(exit_status = ?status, "setup-token process exited"); + } + Err(e) => { + tracing::error!(error = %e, "Failed to wait for setup-token process"); + } + } + }); + + // Wait for URL with timeout + match url_rx.recv_timeout(std::time::Duration::from_secs(30)) { + Ok(url) => Some(url), + Err(e) => { + tracing::error!(error = %e, "Timed out waiting for OAuth login URL"); + None + } + } +} + +/// Strip ANSI escape codes from a string for cleaner logging. +fn strip_ansi_codes(s: &str) -> String { + let mut result = String::with_capacity(s.len()); + let mut chars = s.chars().peekable(); + + while let Some(c) = chars.next() { + if c == '\x1b' { + // Check what type of escape sequence + match chars.peek() { + Some(&'[') => { + // CSI sequence: ESC [ ... letter + chars.next(); // consume '[' + while let Some(&next) = chars.peek() { + chars.next(); + if next.is_ascii_alphabetic() { + break; + } + } + } + Some(&']') => { + // OSC sequence: ESC ] ... ST (where ST is BEL or ESC \) + chars.next(); // consume ']' + while let Some(&next) = chars.peek() { + if next == '\x07' { + chars.next(); // consume BEL (string terminator) + break; + } + if next == '\x1b' { + chars.next(); // consume ESC + if chars.peek() == Some(&'\\') { + chars.next(); // consume \ (string terminator) + } + break; + } + chars.next(); + } + } + _ => { + // Unknown escape, skip just the ESC + } + } + } else if !c.is_control() || c == '\n' { + result.push(c); + } + } + + result +} + +/// System prompt for regular (non-orchestrator) subtasks. +/// This ensures subtasks work only within their isolated worktree directory. +const SUBTASK_SYSTEM_PROMPT: &str = r#"You are working in an isolated worktree directory that contains a snapshot of the codebase. + +## IMPORTANT: Directory Restrictions + +**You MUST only work within the current working directory (your worktree).** + +- DO NOT use `cd` to navigate to directories outside your worktree +- DO NOT use absolute paths that point outside your worktree (e.g., don't write to ~/some/path, /tmp, or the original repository) +- DO NOT modify files in parent directories or sibling directories +- All your file operations should be relative to the current directory + +Your working directory is your sandboxed workspace. When you complete your task, your changes will be reviewed and integrated by the orchestrator. + +**Why?** Your worktree is isolated so that: +1. Your changes don't affect other running tasks +2. Changes can be reviewed before integration +3. Multiple tasks can work on the codebase in parallel without conflicts + +--- + +"#; + +/// The orchestrator system prompt that tells Claude how to use the helper script. +const ORCHESTRATOR_SYSTEM_PROMPT: &str = r#"You are an orchestrator task. Your job is to coordinate subtasks and integrate their work, NOT to write code directly. + +## FIRST STEP + +Start by checking if you have existing subtasks: + +```bash +# List all subtasks to see what work needs to be done +./.makima/orchestrate.sh list +``` + +If subtasks exist, start them. If you need additional subtasks or no subtasks exist yet, you can create them. + +--- + +## Creating Subtasks + +You can create new subtasks to break down work: + +```bash +# Create a new subtask with a name and plan +./.makima/orchestrate.sh create "Subtask Name" "Detailed plan for what the subtask should do..." + +# The command returns the new subtask ID - use it to start the subtask +./.makima/orchestrate.sh start <new_subtask_id> +``` + +Create subtasks when you need to: +- Break down complex work into smaller pieces +- Run multiple tasks in parallel on different parts of the codebase +- Delegate specific implementation work + +## Task Continuation (Sequential Dependencies) + +When subtasks need to build on each other's work (e.g., Task B depends on Task A's changes), use `--continue-from`: + +```bash +# Create Task B that continues from Task A's worktree +./.makima/orchestrate.sh create "Task B" "Build on Task A's work..." --continue-from <task_a_id> +``` + +This copies all files from Task A's worktree into Task B's worktree, so Task B starts with Task A's changes. + +**When to use continuation:** +- Sequential work: Task B needs Task A's output files +- Staged implementation: Building features incrementally +- Fix-and-extend: One task fixes issues, another adds features on top + +**When NOT to use continuation:** +- Parallel tasks working on different files +- Independent subtasks that can be merged separately + +**Important for merging:** When tasks continue from each other, only merge the FINAL task in the chain. Earlier tasks' changes are already included in later tasks' worktrees. + +## Sharing Files with Subtasks + +Use `--files` to copy specific files from your orchestrator worktree to subtasks. This is useful for sharing plans, configs, or data files: + +```bash +# Create subtask with specific files copied from orchestrator +./.makima/orchestrate.sh create "Implement Feature" "Follow the plan in PLAN.md" --files "PLAN.md" + +# Copy multiple files (comma-separated) +./.makima/orchestrate.sh create "API Work" "Use the spec..." --files "PLAN.md,api-spec.yaml,types.ts" + +# Combine with --continue-from to share files AND continue from another task +./.makima/orchestrate.sh create "Step 2" "Continue..." --continue-from <task_a_id> --files "requirements.md" +``` + +**Use cases for --files:** +- Share a PLAN.md with detailed implementation steps +- Distribute configuration or spec files +- Pass generated data or intermediate results + +## How Subtasks Work + +Each subtask runs in its own **worktree** - a separate directory with a copy of the codebase. When subtasks complete: +- Their work remains in the worktree files (NOT committed to git) +- **Subtasks do NOT auto-merge** - YOU must integrate their work into your worktree +- You can view and copy files from subtask worktrees using their paths +- The worktree path is returned when you get subtask status + +**IMPORTANT:** Subtasks never create PRs or merge to the target repository. Only the orchestrator (you) can trigger completion actions like PR creation or merging after integrating all subtask work. + +## Subtask Commands +```bash +# List all subtasks and their current status +./.makima/orchestrate.sh list + +# Create a new subtask (returns the subtask ID) +./.makima/orchestrate.sh create "Name" "Plan/description" + +# Create a subtask that continues from another task's worktree +./.makima/orchestrate.sh create "Name" "Plan" --continue-from <other_task_id> + +# Create a subtask with specific files copied from orchestrator worktree +./.makima/orchestrate.sh create "Name" "Plan" --files "file1.md,file2.yaml" + +# Start a specific subtask (it will run in its own Claude instance) +./.makima/orchestrate.sh start <subtask_id> + +# Stop a running subtask +./.makima/orchestrate.sh stop <subtask_id> + +# Get detailed status of a subtask (includes worktree_path when available) +./.makima/orchestrate.sh status <subtask_id> + +# Get the output/logs of a subtask +./.makima/orchestrate.sh output <subtask_id> + +# Get the worktree path for a subtask +./.makima/orchestrate.sh worktree <subtask_id> +``` + +## Integrating Subtask Work + +When subtasks complete, their changes exist as files in their worktree directories: +- Files are NOT committed to git branches +- You must copy/integrate files from subtask worktrees into your worktree +- Use standard file operations (cp, cat, etc.) to review and integrate changes + +### Handling Continuation Chains + +**CRITICAL:** When subtasks use `--continue-from`, they form a chain where each task includes all changes from previous tasks. You must ONLY integrate the FINAL task in each chain. + +Example chain: Task A → Task B (continues from A) → Task C (continues from B) +- Task C's worktree contains ALL changes from A, B, and C +- You should ONLY integrate Task C's worktree +- DO NOT integrate Task A or Task B separately (their changes are already in C) + +**How to track continuation chains:** +1. When you create tasks with `--continue-from`, note which task continues from which +2. Build a mental model: Independent tasks (no continuation) + Continuation chains +3. For each chain, only integrate the LAST task in the chain + +**Example with mixed independent and chained tasks:** +``` +Independent tasks (integrate all): +- Task X: API endpoints +- Task Y: Database models + +Continuation chain (integrate ONLY the last one): +- Task A: Core feature → Task B: Tests (continues from A) → Task C: Docs (continues from B) + Only integrate Task C! +``` + +### Integration Examples + +For independent subtasks (no continuation): +```bash +# Get the worktree path for a completed subtask +SUBTASK_PATH=$(./.makima/orchestrate.sh worktree <subtask_id>) + +# View what files were changed +ls -la "$SUBTASK_PATH" +diff -r . "$SUBTASK_PATH" --exclude=.git --exclude=.makima + +# Copy specific files from subtask +cp "$SUBTASK_PATH/src/new_file.rs" ./src/ +cp "$SUBTASK_PATH/src/modified_file.rs" ./src/ + +# Or use diff/patch for more control +diff -u ./src/file.rs "$SUBTASK_PATH/src/file.rs" > changes.patch +patch -p0 < changes.patch +``` + +For continuation chains (only integrate the final task): +```bash +# If you have: Task A → Task B → Task C (each continues from previous) +# ONLY get and integrate Task C's worktree - it has everything! + +FINAL_TASK_PATH=$(./.makima/orchestrate.sh worktree <task_c_id>) + +# Copy all changes from the final task +rsync -av --exclude='.git' --exclude='.makima' "$FINAL_TASK_PATH/" ./ +``` + +## Completion +```bash +# Mark yourself as complete after integrating all subtask work +./.makima/orchestrate.sh done "Summary of what was accomplished" +``` + +## Workflow +1. **List existing subtasks**: Run `list` to see current subtasks +2. **Create subtasks if needed**: Use `create` to add new subtasks for the work + - For independent parallel work: create without `--continue-from` + - For sequential dependencies: use `--continue-from <previous_task_id>` + - Track which tasks continue from which (continuation chains) +3. **Start subtasks**: Run `start` for each subtask +4. **Monitor progress**: Check status and output as subtasks run +5. **Integrate work**: When subtasks complete: + - For independent tasks: integrate each one's worktree + - For continuation chains: ONLY integrate the FINAL task (it has all changes) + - Get worktree path with `worktree <subtask_id>` + - Copy or merge files into your worktree +6. **Complete**: Call `done` once all work is integrated + +## Important Notes +- Subtask files are in worktrees, NOT committed git branches +- **Subtasks do NOT auto-merge or create PRs** - you must integrate their work +- You can read files from subtask worktrees using their paths +- Use standard file tools (cp, diff, cat, rsync) to integrate changes +- You should NOT edit files directly - that's what subtasks are for +- DO NOT DO THE SUBTASKS' WORK! Your only job is to coordinate, not implement. +- When you call `done`, YOUR worktree may be used for the final PR/merge +"#; + + +/// System prompt for supervisor tasks (contract orchestrators). +/// Supervisors monitor all tasks in a contract, create new tasks, and drive the contract to completion. +const SUPERVISOR_SYSTEM_PROMPT: &str = r#"You are the SUPERVISOR for this contract. Your ONLY job is to coordinate work by spawning tasks, waiting for them to complete, and managing git operations. + +## CRITICAL RULES - READ CAREFULLY + +1. **NEVER write code or edit files yourself** - you are a coordinator ONLY +2. **NEVER make commits yourself** - tasks do their own commits +3. **ALWAYS spawn tasks** for ANY work that involves: + - Writing or editing code + - Creating or modifying files + - Making implementation changes + - Any actual development work +4. **ALWAYS wait for tasks to complete** - you MUST use `wait` after spawning +5. **Your role is ONLY to**: + - Analyze the contract goal and break it into tasks + - Spawn tasks AND wait for them to complete + - Review completed task results + - Merge completed work using `merge` + - Create PRs when ready using `pr` + +## REQUIRED WORKFLOW - Follow This Pattern + +For EVERY task you spawn, you MUST: +1. Spawn the task with `spawn` +2. IMMEDIATELY call `wait` to block until completion +3. Check the result and handle success/failure +4. Merge if successful + +```bash +# CORRECT PATTERN - spawn then wait +RESULT=$(makima supervisor spawn "Task Name" "Detailed plan...") +TASK_ID=$(echo "$RESULT" | jq -r '.taskId') +echo "Spawned task: $TASK_ID" + +# MUST wait for the task - DO NOT skip this step! +makima supervisor wait "$TASK_ID" + +# Check result, view diff, merge if successful +makima supervisor diff "$TASK_ID" +makima supervisor merge "$TASK_ID" +``` + +## Example - Full Workflow + +Goal: "Add user authentication" + +```bash +# Step 1: Create a makima branch for this work (use makima/{name} convention) +makima supervisor branch "makima/user-authentication" + +# Step 2: Spawn tasks, wait for each, and merge to the branch + +# Task 1: Research (spawn and wait) +RESULT=$(makima supervisor spawn "Research auth patterns" "Explore the codebase for existing authentication. Document findings.") +TASK_ID=$(echo "$RESULT" | jq -r '.taskId') +makima supervisor wait "$TASK_ID" +# Review findings before continuing + +# Task 2: Login endpoint (spawn and wait) +RESULT=$(makima supervisor spawn "Implement login" "Create POST /api/login endpoint...") +TASK_ID=$(echo "$RESULT" | jq -r '.taskId') +makima supervisor wait "$TASK_ID" +makima supervisor diff "$TASK_ID" +makima supervisor merge "$TASK_ID" --to "makima/user-authentication" + +# Task 3: Logout endpoint (spawn and wait) +RESULT=$(makima supervisor spawn "Implement logout" "Create POST /api/logout endpoint...") +TASK_ID=$(echo "$RESULT" | jq -r '.taskId') +makima supervisor wait "$TASK_ID" +makima supervisor merge "$TASK_ID" --to "makima/user-authentication" + +# Step 3: All tasks complete - create PR from makima branch +makima supervisor pr "makima/user-authentication" --title "Add user authentication" --base main +``` + +## Available Tools (via makima supervisor) + +### Task Management +```bash +# List all tasks in this contract +makima supervisor tasks + +# Spawn a new task (returns JSON with taskId) +makima supervisor spawn "Task Name" "Detailed plan..." + +# IMPORTANT: Wait for task to complete (blocks until done/failed) +makima supervisor wait <task_id> [timeout_seconds] + +# Read a file from any task's worktree +makima supervisor read-file <task_id> <file_path> + +# Get the full task tree structure +makima supervisor tree +``` + +### Git Operations +```bash +# Create a new branch +makima supervisor branch <branch_name> [--from <task_id|sha>] + +# Merge a task's changes to a branch +makima supervisor merge <task_id> [--to <branch>] [--squash] + +# Create a pull request +makima supervisor pr <task_id> --title "Title" [--body "Body"] [--base main] + +# View a task's diff +makima supervisor diff <task_id> + +# Create a git checkpoint +makima supervisor checkpoint "Checkpoint message" + +# List checkpoints for a task +makima supervisor checkpoints [task_id] +``` + +### Contract +```bash +# Get contract status +makima supervisor status +``` + +## Key Points + +1. **Create a makima branch first** - use `branch "makima/{name}"` for the contract's work +2. **spawn returns immediately** - the task runs in the background +3. **wait blocks until complete** - you MUST call this to know when a task finishes +4. **Never fire-and-forget** - always wait for each task before moving on +5. **Merge to your makima branch** - use `merge <task_id> --to "makima/{name}"` to collect completed work +6. **Create PR when done** - use `pr "makima/{name}" --title "..." --base main` + +## Standard Workflow + +1. `branch "makima/{name}"` - Create branch (e.g., "makima/add-auth") +2. For each piece of work: + - `spawn` - Create task + - `wait` - Block until complete + - `merge --to "makima/{name}"` - Merge to branch +3. `pr "makima/{name}" --title "..." --base main` - Create PR + +## Important Reminders + +- **ONLY YOU can spawn tasks** - regular tasks cannot create children +- **NEVER implement anything yourself** - always spawn tasks +- **ALWAYS create a makima branch** - use `makima/{name}` naming convention +- Tasks run independently - you just coordinate +- You will be resumed if interrupted - your conversation is preserved +- Create checkpoints before major transitions + +--- + +"#; + +/// System prompt for tasks that are part of a contract. +/// This tells the task about contract.sh and how to use it to interact with the contract. +const CONTRACT_INTEGRATION_PROMPT: &str = r##" +## Contract Integration + +This task is part of a contract. You have access to contract tools via the `makima contract` CLI. + +### Contract Commands + +```bash +# Get contract context (name, phase, goals) +makima contract status + +# Get phase checklist and deliverables +makima contract checklist + +# List contract files +makima contract files + +# Read a specific file content +makima contract file <file_id> + +# Report progress to the contract +makima contract report "Completed X, working on Y..." + +# Create a new contract file (content via stdin) +echo "# New Documentation" | makima contract create-file "New Document" + +# Update an existing contract file (content via stdin) +cat updated_content.md | makima contract update-file <file_id> + +# Get suggested next action when done +makima contract suggest-action + +# Report completion with metrics +makima contract completion-action --files "file1.rs,file2.rs" --code +``` + +### What You Should Do + +**Before starting:** +1. Run `makima contract status` to understand the contract context +2. Run `makima contract checklist` to see phase deliverables +3. Run `makima contract files` to see existing documentation + +**While working:** +- Report significant progress with `makima contract report "..."` + +**When completing:** +1. If your work should be documented, create or update contract files +2. Run `makima contract completion-action` to see recommended next steps +3. Consider what contract files or phases might need updating + +**Important:** Your work should contribute to the contract's goals. Check the contract status to understand what's expected. + +--- + +"##; + +/// Tracks merge state for an orchestrator task. +#[derive(Default)] +struct MergeTracker { + /// Subtask branches that have been successfully merged. + merged_subtasks: HashSet<Uuid>, + /// Subtask branches that were explicitly skipped (with reason). + skipped_subtasks: HashMap<Uuid, String>, +} + +/// Managed task information. +#[derive(Clone)] +pub struct ManagedTask { + /// Task ID. + pub id: Uuid, + /// Human-readable task name. + pub task_name: String, + /// Current state. + pub state: TaskState, + /// Worktree info if created. + pub worktree: Option<WorktreeInfo>, + /// Task plan. + pub plan: String, + /// Repository URL or path. + pub repo_source: Option<String>, + /// Base branch. + pub base_branch: Option<String>, + /// Target branch to merge into. + pub target_branch: Option<String>, + /// Parent task ID if this is a subtask. + pub parent_task_id: Option<Uuid>, + /// Depth in task hierarchy (0=top-level, 1=subtask, 2=sub-subtask). + pub depth: i32, + /// Whether this task runs as an orchestrator (coordinates subtasks). + pub is_orchestrator: bool, + /// Whether this task is a supervisor (long-running contract orchestrator). + pub is_supervisor: bool, + /// Path to target repository for completion actions. + pub target_repo_path: Option<String>, + /// Completion action: "none", "branch", "merge", "pr". + pub completion_action: Option<String>, + /// Task ID to continue from (copy worktree from this task). + pub continue_from_task_id: Option<Uuid>, + /// Files to copy from parent task's worktree. + pub copy_files: Option<Vec<String>>, + /// Contract ID if this task is associated with a contract. + pub contract_id: Option<Uuid>, + /// Time task was created. + pub created_at: Instant, + /// Time task started running. + pub started_at: Option<Instant>, + /// Time task completed. + pub completed_at: Option<Instant>, + /// Error message if failed. + pub error: Option<String>, +} + +/// Configuration for task execution. +#[derive(Clone)] +pub struct TaskConfig { + /// Maximum concurrent tasks. + pub max_concurrent_tasks: u32, + /// Base directory for worktrees. + pub worktree_base_dir: PathBuf, + /// Environment variables to pass to Claude. + pub env_vars: HashMap<String, String>, + /// Claude command path. + pub claude_command: String, + /// Additional arguments to pass to Claude Code. + pub claude_args: Vec<String>, + /// Arguments to pass before defaults. + pub claude_pre_args: Vec<String>, + /// Enable Claude's permission system. + pub enable_permissions: bool, + /// Disable verbose output. + pub disable_verbose: bool, +} + +impl Default for TaskConfig { + fn default() -> Self { + Self { + max_concurrent_tasks: 4, + worktree_base_dir: WorktreeManager::default_base_dir(), + env_vars: HashMap::new(), + claude_command: "claude".to_string(), + claude_args: Vec::new(), + claude_pre_args: Vec::new(), + enable_permissions: false, + disable_verbose: false, + } + } +} + +/// Task manager for handling task lifecycle. +pub struct TaskManager { + /// Worktree manager. + worktree_manager: Arc<WorktreeManager>, + /// Process manager. + process_manager: Arc<ProcessManager>, + /// Temp directory manager. + temp_manager: Arc<TempManager>, + /// Task configuration. + #[allow(dead_code)] + config: TaskConfig, + /// Active tasks. + tasks: Arc<RwLock<HashMap<Uuid, ManagedTask>>>, + /// Channel to send messages to server. + ws_tx: mpsc::Sender<DaemonMessage>, + /// Semaphore for limiting concurrent tasks. + semaphore: Arc<Semaphore>, + /// Channels for sending input to running tasks. + /// Each sender allows sending messages to the stdin of a running Claude process. + task_inputs: Arc<RwLock<HashMap<Uuid, mpsc::Sender<String>>>>, + /// Tracks merge state per orchestrator task (for completion gate). + merge_trackers: Arc<RwLock<HashMap<Uuid, MergeTracker>>>, +} + +impl TaskManager { + /// Create a new task manager. + pub fn new(config: TaskConfig, ws_tx: mpsc::Sender<DaemonMessage>) -> Self { + let max_concurrent = config.max_concurrent_tasks as usize; + let worktree_manager = Arc::new(WorktreeManager::new(config.worktree_base_dir.clone())); + let process_manager = Arc::new( + ProcessManager::with_command(config.claude_command.clone()) + .with_args(config.claude_args.clone()) + .with_pre_args(config.claude_pre_args.clone()) + .with_permissions_enabled(config.enable_permissions) + .with_verbose_disabled(config.disable_verbose) + .with_env(config.env_vars.clone()), + ); + let temp_manager = Arc::new(TempManager::new()); + + Self { + worktree_manager, + process_manager, + temp_manager, + config, + tasks: Arc::new(RwLock::new(HashMap::new())), + ws_tx, + semaphore: Arc::new(Semaphore::new(max_concurrent)), + task_inputs: Arc::new(RwLock::new(HashMap::new())), + merge_trackers: Arc::new(RwLock::new(HashMap::new())), + } + } + + /// Handle a command from the server. + pub async fn handle_command(&self, command: DaemonCommand) -> Result<(), DaemonError> { + tracing::info!("Received command from server: {:?}", command); + + match command { + DaemonCommand::SpawnTask { + task_id, + task_name, + plan, + repo_url, + base_branch, + target_branch, + parent_task_id, + depth, + is_orchestrator, + target_repo_path, + completion_action, + continue_from_task_id, + copy_files, + contract_id, + is_supervisor, + } => { + tracing::info!( + task_id = %task_id, + task_name = %task_name, + repo_url = ?repo_url, + base_branch = ?base_branch, + target_branch = ?target_branch, + parent_task_id = ?parent_task_id, + depth = depth, + is_orchestrator = is_orchestrator, + is_supervisor = is_supervisor, + target_repo_path = ?target_repo_path, + completion_action = ?completion_action, + continue_from_task_id = ?continue_from_task_id, + copy_files = ?copy_files, + contract_id = ?contract_id, + plan_len = plan.len(), + "Spawning new task" + ); + self.spawn_task( + task_id, task_name, plan, repo_url, base_branch, target_branch, + parent_task_id, depth, is_orchestrator, is_supervisor, + target_repo_path, completion_action, continue_from_task_id, + copy_files, contract_id + ).await?; + } + DaemonCommand::PauseTask { task_id } => { + tracing::info!(task_id = %task_id, "Pause not supported for subprocess tasks"); + // Subprocesses don't support pause, just log and ignore + } + DaemonCommand::ResumeTask { task_id } => { + tracing::info!(task_id = %task_id, "Resume not supported for subprocess tasks"); + // Subprocesses don't support resume, just log and ignore + } + DaemonCommand::InterruptTask { task_id, graceful: _ } => { + tracing::info!(task_id = %task_id, "Interrupting task"); + self.interrupt_task(task_id).await?; + } + DaemonCommand::SendMessage { task_id, message } => { + // Check if this is an auth code message + if message.starts_with("AUTH_CODE:") { + let code = message.strip_prefix("AUTH_CODE:").unwrap_or("").trim(); + tracing::info!(task_id = %task_id, "Received auth code from frontend"); + if send_auth_code(code) { + tracing::info!(task_id = %task_id, "Auth code forwarded to setup-token"); + } else { + tracing::warn!(task_id = %task_id, "No pending auth flow to receive code"); + } + } else { + // Regular message - send to task's stdin + tracing::info!(task_id = %task_id, message_len = message.len(), "Sending message to task"); + // Send message to the task's stdin via the input channel + let inputs = self.task_inputs.read().await; + if let Some(sender) = inputs.get(&task_id) { + if let Err(e) = sender.send(message).await { + tracing::warn!(task_id = %task_id, error = %e, "Failed to send message to task input channel"); + } else { + tracing::info!(task_id = %task_id, "Message sent to task successfully"); + } + } else { + drop(inputs); // Release read lock before checking if we need to respawn + + // Check if this is a supervisor that needs to be respawned + let task_info = { + let tasks = self.tasks.read().await; + tasks.get(&task_id).cloned() + }; + + if let Some(task) = task_info { + if task.is_supervisor { + tracing::info!( + task_id = %task_id, + "Supervisor has no active Claude process, respawning with message" + ); + + // Respawn the supervisor with the new message as the plan + // Claude Code will use --continue to maintain conversation history + let inner = self.clone_inner(); + let task_name = task.task_name.clone(); + let repo_source = task.repo_source.clone(); + let base_branch = task.base_branch.clone(); + let target_branch = task.target_branch.clone(); + let target_repo_path = task.target_repo_path.clone(); + let completion_action = task.completion_action.clone(); + let contract_id = task.contract_id; + + // Spawn in background to not block the command handler + tokio::spawn(async move { + if let Err(e) = inner.run_task( + task_id, + task_name, + message, // Use the message as the new prompt + repo_source, + base_branch, + target_branch, + false, // is_orchestrator + true, // is_supervisor + target_repo_path, + completion_action, + None, // continue_from_task_id + None, // copy_files + contract_id, + ).await { + tracing::error!( + task_id = %task_id, + error = %e, + "Failed to respawn supervisor" + ); + } + }); + } else { + tracing::warn!(task_id = %task_id, "No input channel for task (task may not be running)"); + } + } else { + tracing::warn!(task_id = %task_id, "Task not found"); + } + } + } + } + DaemonCommand::InjectSiblingContext { task_id, .. } => { + tracing::debug!(task_id = %task_id, "Sibling context injection not supported for subprocess tasks"); + } + DaemonCommand::Authenticated { daemon_id } => { + tracing::debug!(daemon_id = %daemon_id, "Authenticated command (handled by WS client)"); + } + DaemonCommand::Error { code, message } => { + tracing::warn!(code = %code, message = %message, "Error command from server"); + } + + // ========================================================================= + // Merge Commands + // ========================================================================= + + DaemonCommand::ListBranches { task_id } => { + tracing::info!(task_id = %task_id, "Listing task branches"); + self.handle_list_branches(task_id).await?; + } + DaemonCommand::MergeStart { task_id, source_branch } => { + tracing::info!(task_id = %task_id, source_branch = %source_branch, "Starting merge"); + self.handle_merge_start(task_id, source_branch).await?; + } + DaemonCommand::MergeStatus { task_id } => { + tracing::info!(task_id = %task_id, "Getting merge status"); + self.handle_merge_status(task_id).await?; + } + DaemonCommand::MergeResolve { task_id, file, strategy } => { + tracing::info!(task_id = %task_id, file = %file, strategy = %strategy, "Resolving conflict"); + self.handle_merge_resolve(task_id, file, strategy).await?; + } + DaemonCommand::MergeCommit { task_id, message } => { + tracing::info!(task_id = %task_id, "Committing merge"); + self.handle_merge_commit(task_id, message).await?; + } + DaemonCommand::MergeAbort { task_id } => { + tracing::info!(task_id = %task_id, "Aborting merge"); + self.handle_merge_abort(task_id).await?; + } + DaemonCommand::MergeSkip { task_id, subtask_id, reason } => { + tracing::info!(task_id = %task_id, subtask_id = %subtask_id, reason = %reason, "Skipping subtask merge"); + self.handle_merge_skip(task_id, subtask_id, reason).await?; + } + DaemonCommand::CheckMergeComplete { task_id } => { + tracing::info!(task_id = %task_id, "Checking merge completion"); + self.handle_check_merge_complete(task_id).await?; + } + + // ========================================================================= + // Completion Action Commands + // ========================================================================= + + DaemonCommand::RetryCompletionAction { + task_id, + task_name, + action, + target_repo_path, + target_branch, + } => { + tracing::info!( + task_id = %task_id, + task_name = %task_name, + action = %action, + target_repo_path = %target_repo_path, + target_branch = ?target_branch, + "Retrying completion action" + ); + self.handle_retry_completion_action(task_id, task_name, action, target_repo_path, target_branch).await?; + } + + DaemonCommand::CloneWorktree { task_id, target_dir } => { + tracing::info!( + task_id = %task_id, + target_dir = %target_dir, + "Cloning worktree to target directory" + ); + self.handle_clone_worktree(task_id, target_dir).await?; + } + + DaemonCommand::CheckTargetExists { task_id, target_dir } => { + tracing::debug!( + task_id = %task_id, + target_dir = %target_dir, + "Checking if target directory exists" + ); + self.handle_check_target_exists(task_id, target_dir).await?; + } + + // ========================================================================= + // Contract File Commands + // ========================================================================= + + DaemonCommand::ReadRepoFile { + request_id, + contract_id, + file_path, + repo_path, + } => { + tracing::info!( + request_id = %request_id, + contract_id = %contract_id, + file_path = %file_path, + repo_path = %repo_path, + "Reading file from repository" + ); + self.handle_read_repo_file(request_id, file_path, repo_path).await?; + } + DaemonCommand::CreateBranch { + task_id, + branch_name, + from_ref, + } => { + tracing::info!( + task_id = %task_id, + branch_name = %branch_name, + from_ref = ?from_ref, + "Creating branch" + ); + self.handle_create_branch(task_id, branch_name, from_ref).await?; + } + DaemonCommand::MergeTaskToTarget { + task_id, + target_branch, + squash, + } => { + tracing::info!( + task_id = %task_id, + target_branch = ?target_branch, + squash = squash, + "Merging task to target branch" + ); + self.handle_merge_task_to_target(task_id, target_branch, squash).await?; + } + DaemonCommand::CreatePR { + task_id, + title, + body, + base_branch, + } => { + tracing::info!( + task_id = %task_id, + title = %title, + base_branch = %base_branch, + "Creating pull request" + ); + self.handle_create_pr(task_id, title, body, base_branch).await?; + } + DaemonCommand::GetTaskDiff { + task_id, + } => { + tracing::info!(task_id = %task_id, "Getting task diff"); + self.handle_get_task_diff(task_id).await?; + } + } + Ok(()) + } + + /// Spawn a new task. + #[allow(clippy::too_many_arguments)] + pub async fn spawn_task( + &self, + task_id: Uuid, + task_name: String, + plan: String, + repo_url: Option<String>, + base_branch: Option<String>, + target_branch: Option<String>, + parent_task_id: Option<Uuid>, + depth: i32, + is_orchestrator: bool, + is_supervisor: bool, + target_repo_path: Option<String>, + completion_action: Option<String>, + continue_from_task_id: Option<Uuid>, + copy_files: Option<Vec<String>>, + contract_id: Option<Uuid>, + ) -> TaskResult<()> { + tracing::info!(task_id = %task_id, is_orchestrator = is_orchestrator, is_supervisor = is_supervisor, depth = depth, "=== SPAWN_TASK START ==="); + + // Check if task already exists - allow re-spawning if in terminal state + { + let mut tasks = self.tasks.write().await; + if let Some(existing) = tasks.get(&task_id) { + if existing.state.is_terminal() { + // Task exists but is in terminal state (completed, failed, interrupted) + // Remove it so we can re-spawn + tracing::info!(task_id = %task_id, old_state = ?existing.state, "Removing terminated task to allow re-spawn"); + tasks.remove(&task_id); + } else { + // Task is still active, reject + tracing::warn!(task_id = %task_id, state = ?existing.state, "Task already exists and is active, rejecting spawn"); + return Err(TaskError::AlreadyExists(task_id)); + } + } + } + + // Acquire semaphore permit + tracing::info!(task_id = %task_id, "Acquiring concurrency permit..."); + let permit = self + .semaphore + .clone() + .try_acquire_owned() + .map_err(|_| { + tracing::warn!(task_id = %task_id, "Concurrency limit reached, cannot spawn task"); + TaskError::ConcurrencyLimit + })?; + tracing::info!(task_id = %task_id, "Concurrency permit acquired"); + + // Create task entry + tracing::info!(task_id = %task_id, "Creating task entry in state: Initializing"); + let task = ManagedTask { + id: task_id, + task_name: task_name.clone(), + state: TaskState::Initializing, + worktree: None, + plan: plan.clone(), + repo_source: repo_url.clone(), + base_branch: base_branch.clone(), + target_branch: target_branch.clone(), + parent_task_id, + depth, + is_orchestrator, + is_supervisor, + target_repo_path: target_repo_path.clone(), + completion_action: completion_action.clone(), + continue_from_task_id, + copy_files: copy_files.clone(), + contract_id, + created_at: Instant::now(), + started_at: None, + completed_at: None, + error: None, + }; + + self.tasks.write().await.insert(task_id, task); + tracing::info!(task_id = %task_id, "Task entry created and stored"); + + // Notify server of status change + tracing::info!(task_id = %task_id, "Notifying server: pending -> initializing"); + self.send_status_change(task_id, "pending", "initializing").await; + + // Spawn task in background + tracing::info!(task_id = %task_id, "Spawning background task runner"); + let inner = self.clone_inner(); + tokio::spawn(async move { + let _permit = permit; // Hold permit until done + tracing::info!(task_id = %task_id, "Background task runner started"); + + if let Err(e) = inner.run_task( + task_id, task_name, plan, repo_url, base_branch, target_branch, + is_orchestrator, is_supervisor, target_repo_path, completion_action, + continue_from_task_id, copy_files, contract_id + ).await { + tracing::error!(task_id = %task_id, error = %e, "Task execution failed"); + inner.mark_failed(task_id, &e.to_string()).await; + } + tracing::info!(task_id = %task_id, "Background task runner completed"); + }); + + tracing::info!(task_id = %task_id, "=== SPAWN_TASK END (task running in background) ==="); + Ok(()) + } + + /// Clone inner state for spawned tasks. + fn clone_inner(&self) -> TaskManagerInner { + TaskManagerInner { + worktree_manager: self.worktree_manager.clone(), + process_manager: self.process_manager.clone(), + temp_manager: self.temp_manager.clone(), + tasks: self.tasks.clone(), + ws_tx: self.ws_tx.clone(), + task_inputs: self.task_inputs.clone(), + } + } + + /// Interrupt a task. + pub async fn interrupt_task(&self, task_id: Uuid) -> TaskResult<()> { + let mut tasks = self.tasks.write().await; + let task = tasks.get_mut(&task_id).ok_or(TaskError::NotFound(task_id))?; + + if task.state.is_terminal() { + return Ok(()); // Already done + } + + let old_state = task.state; + task.state = TaskState::Interrupted; + task.completed_at = Some(Instant::now()); + + // Notify server + drop(tasks); + self.send_status_change(task_id, old_state.as_str(), "interrupted").await; + + // Note: The process will be killed when the ClaudeProcess is dropped + // Worktrees are kept until explicitly deleted + + Ok(()) + } + + /// Get list of active task IDs. + pub async fn active_task_ids(&self) -> Vec<Uuid> { + self.tasks + .read() + .await + .iter() + .filter(|(_, t)| t.state.is_active()) + .map(|(id, _)| *id) + .collect() + } + + /// Get task state. + pub async fn get_task_state(&self, task_id: Uuid) -> Option<TaskState> { + self.tasks.read().await.get(&task_id).map(|t| t.state) + } + + /// Send status change notification to server. + async fn send_status_change(&self, task_id: Uuid, old_status: &str, new_status: &str) { + let msg = DaemonMessage::task_status_change(task_id, old_status, new_status); + let _ = self.ws_tx.send(msg).await; + } + + // ========================================================================= + // Merge Handler Methods + // ========================================================================= + + /// Get worktree path for a task, or return error if not found. + /// First checks in-memory tasks, then scans the worktrees directory. + async fn get_task_worktree_path(&self, task_id: Uuid) -> Result<std::path::PathBuf, DaemonError> { + // First try to get from in-memory tasks + { + let tasks = self.tasks.read().await; + if let Some(task) = tasks.get(&task_id) { + if let Some(ref worktree) = task.worktree { + return Ok(worktree.path.clone()); + } + } + } + + // Task not in memory - scan worktrees directory for matching task ID + let short_id = &task_id.to_string()[..8]; + let worktrees_dir = self.worktree_manager.base_dir(); + + if let Ok(mut entries) = tokio::fs::read_dir(worktrees_dir).await { + while let Ok(Some(entry)) = entries.next_entry().await { + let name = entry.file_name(); + let name_str = name.to_string_lossy(); + if name_str.starts_with(short_id) { + let path = entry.path(); + // Verify it's a valid git directory + if path.join(".git").exists() { + tracing::info!( + task_id = %task_id, + worktree_path = %path.display(), + "Found worktree by scanning directory" + ); + return Ok(path); + } + } + } + } + + Err(DaemonError::Task(TaskError::SetupFailed( + format!("No worktree found for task {}. The worktree may have been cleaned up.", task_id) + ))) + } + + /// Handle ListBranches command. + async fn handle_list_branches(&self, task_id: Uuid) -> Result<(), DaemonError> { + let worktree_path = self.get_task_worktree_path(task_id).await?; + + match self.worktree_manager.list_task_branches(&worktree_path).await { + Ok(branches) => { + let branch_infos: Vec<BranchInfo> = branches + .into_iter() + .map(|b| BranchInfo { + name: b.name, + task_id: b.task_id, + is_merged: b.is_merged, + last_commit: b.last_commit, + last_commit_message: b.last_commit_message, + }) + .collect(); + + let msg = DaemonMessage::BranchList { + task_id, + branches: branch_infos, + }; + let _ = self.ws_tx.send(msg).await; + } + Err(e) => { + tracing::error!(task_id = %task_id, error = %e, "Failed to list branches"); + let msg = DaemonMessage::MergeResult { + task_id, + success: false, + message: e.to_string(), + commit_sha: None, + conflicts: None, + }; + let _ = self.ws_tx.send(msg).await; + } + } + Ok(()) + } + + /// Handle MergeStart command. + async fn handle_merge_start(&self, task_id: Uuid, source_branch: String) -> Result<(), DaemonError> { + let worktree_path = self.get_task_worktree_path(task_id).await?; + + match self.worktree_manager.merge_branch(&worktree_path, &source_branch).await { + Ok(None) => { + // Merge succeeded without conflicts + let msg = DaemonMessage::MergeResult { + task_id, + success: true, + message: "Merge completed without conflicts".to_string(), + commit_sha: None, + conflicts: None, + }; + let _ = self.ws_tx.send(msg).await; + } + Ok(Some(conflicts)) => { + // Merge has conflicts + let msg = DaemonMessage::MergeResult { + task_id, + success: false, + message: format!("Merge has {} conflicts", conflicts.len()), + commit_sha: None, + conflicts: Some(conflicts), + }; + let _ = self.ws_tx.send(msg).await; + } + Err(e) => { + let msg = DaemonMessage::MergeResult { + task_id, + success: false, + message: e.to_string(), + commit_sha: None, + conflicts: None, + }; + let _ = self.ws_tx.send(msg).await; + } + } + Ok(()) + } + + /// Handle MergeStatus command. + async fn handle_merge_status(&self, task_id: Uuid) -> Result<(), DaemonError> { + let worktree_path = self.get_task_worktree_path(task_id).await?; + + match self.worktree_manager.get_merge_state(&worktree_path).await { + Ok(state) => { + let msg = DaemonMessage::MergeStatusResponse { + task_id, + in_progress: state.in_progress, + source_branch: if state.in_progress { Some(state.source_branch) } else { None }, + conflicted_files: state.conflicted_files, + }; + let _ = self.ws_tx.send(msg).await; + } + Err(e) => { + tracing::error!(task_id = %task_id, error = %e, "Failed to get merge status"); + let msg = DaemonMessage::MergeStatusResponse { + task_id, + in_progress: false, + source_branch: None, + conflicted_files: vec![], + }; + let _ = self.ws_tx.send(msg).await; + } + } + Ok(()) + } + + /// Handle MergeResolve command. + async fn handle_merge_resolve(&self, task_id: Uuid, file: String, strategy: String) -> Result<(), DaemonError> { + let worktree_path = self.get_task_worktree_path(task_id).await?; + + let resolution = match strategy.to_lowercase().as_str() { + "ours" => ConflictResolution::Ours, + "theirs" => ConflictResolution::Theirs, + _ => { + let msg = DaemonMessage::MergeResult { + task_id, + success: false, + message: format!("Invalid strategy '{}', must be 'ours' or 'theirs'", strategy), + commit_sha: None, + conflicts: None, + }; + let _ = self.ws_tx.send(msg).await; + return Ok(()); + } + }; + + match self.worktree_manager.resolve_conflict(&worktree_path, &file, resolution).await { + Ok(()) => { + let msg = DaemonMessage::MergeResult { + task_id, + success: true, + message: format!("Resolved conflict in {}", file), + commit_sha: None, + conflicts: None, + }; + let _ = self.ws_tx.send(msg).await; + } + Err(e) => { + let msg = DaemonMessage::MergeResult { + task_id, + success: false, + message: e.to_string(), + commit_sha: None, + conflicts: None, + }; + let _ = self.ws_tx.send(msg).await; + } + } + Ok(()) + } + + /// Handle MergeCommit command. + async fn handle_merge_commit(&self, task_id: Uuid, message: String) -> Result<(), DaemonError> { + let worktree_path = self.get_task_worktree_path(task_id).await?; + + match self.worktree_manager.commit_merge(&worktree_path, &message).await { + Ok(commit_sha) => { + // Track this merge as completed (extract subtask ID from branch if possible) + // For now, we'll track it when MergeSkip is called or based on branch names + let msg = DaemonMessage::MergeResult { + task_id, + success: true, + message: "Merge committed successfully".to_string(), + commit_sha: Some(commit_sha), + conflicts: None, + }; + let _ = self.ws_tx.send(msg).await; + } + Err(e) => { + let msg = DaemonMessage::MergeResult { + task_id, + success: false, + message: e.to_string(), + commit_sha: None, + conflicts: None, + }; + let _ = self.ws_tx.send(msg).await; + } + } + Ok(()) + } + + /// Handle MergeAbort command. + async fn handle_merge_abort(&self, task_id: Uuid) -> Result<(), DaemonError> { + let worktree_path = self.get_task_worktree_path(task_id).await?; + + match self.worktree_manager.abort_merge(&worktree_path).await { + Ok(()) => { + let msg = DaemonMessage::MergeResult { + task_id, + success: true, + message: "Merge aborted".to_string(), + commit_sha: None, + conflicts: None, + }; + let _ = self.ws_tx.send(msg).await; + } + Err(e) => { + let msg = DaemonMessage::MergeResult { + task_id, + success: false, + message: e.to_string(), + commit_sha: None, + conflicts: None, + }; + let _ = self.ws_tx.send(msg).await; + } + } + Ok(()) + } + + /// Handle MergeSkip command. + async fn handle_merge_skip(&self, task_id: Uuid, subtask_id: Uuid, reason: String) -> Result<(), DaemonError> { + // Record that this subtask was skipped + { + let mut trackers = self.merge_trackers.write().await; + let tracker = trackers.entry(task_id).or_insert_with(MergeTracker::default); + tracker.skipped_subtasks.insert(subtask_id, reason.clone()); + } + + let msg = DaemonMessage::MergeResult { + task_id, + success: true, + message: format!("Subtask {} skipped: {}", subtask_id, reason), + commit_sha: None, + conflicts: None, + }; + let _ = self.ws_tx.send(msg).await; + Ok(()) + } + + /// Handle CheckMergeComplete command. + async fn handle_check_merge_complete(&self, task_id: Uuid) -> Result<(), DaemonError> { + let worktree_path = self.get_task_worktree_path(task_id).await?; + + // Get all task branches + let branches = match self.worktree_manager.list_task_branches(&worktree_path).await { + Ok(b) => b, + Err(e) => { + let msg = DaemonMessage::MergeCompleteCheck { + task_id, + can_complete: false, + unmerged_branches: vec![format!("Error listing branches: {}", e)], + merged_count: 0, + skipped_count: 0, + }; + let _ = self.ws_tx.send(msg).await; + return Ok(()); + } + }; + + // Get tracker state + let trackers = self.merge_trackers.read().await; + let empty_merged: HashSet<Uuid> = HashSet::new(); + let empty_skipped: HashMap<Uuid, String> = HashMap::new(); + let tracker = trackers.get(&task_id); + let merged_set = tracker.map(|t| &t.merged_subtasks).unwrap_or(&empty_merged); + let skipped_set = tracker.map(|t| &t.skipped_subtasks).unwrap_or(&empty_skipped); + + let mut merged_count = 0u32; + let mut skipped_count = 0u32; + let mut unmerged_branches = Vec::new(); + + for branch in &branches { + if branch.is_merged { + merged_count += 1; + } else if let Some(subtask_id) = branch.task_id { + if merged_set.contains(&subtask_id) { + merged_count += 1; + } else if skipped_set.contains_key(&subtask_id) { + skipped_count += 1; + } else { + unmerged_branches.push(branch.name.clone()); + } + } else { + // Branch without task ID - check if it's merged + unmerged_branches.push(branch.name.clone()); + } + } + + let can_complete = unmerged_branches.is_empty(); + + let msg = DaemonMessage::MergeCompleteCheck { + task_id, + can_complete, + unmerged_branches, + merged_count, + skipped_count, + }; + let _ = self.ws_tx.send(msg).await; + Ok(()) + } + + /// Mark a subtask as merged in the tracker. + #[allow(dead_code)] + pub async fn mark_subtask_merged(&self, orchestrator_task_id: Uuid, subtask_id: Uuid) { + let mut trackers = self.merge_trackers.write().await; + let tracker = trackers.entry(orchestrator_task_id).or_insert_with(MergeTracker::default); + tracker.merged_subtasks.insert(subtask_id); + } + + // ========================================================================= + // Completion Action Handler Methods + // ========================================================================= + + /// Handle RetryCompletionAction command. + async fn handle_retry_completion_action( + &self, + task_id: Uuid, + task_name: String, + action: String, + target_repo_path: String, + target_branch: Option<String>, + ) -> Result<(), DaemonError> { + // Get the task's worktree path + let worktree_path = self.get_task_worktree_path(task_id).await?; + + // Execute the completion action + let inner = self.clone_inner(); + let result = inner.execute_completion_action( + task_id, + &task_name, + &worktree_path, + &action, + Some(target_repo_path.as_str()), + target_branch.as_deref(), + ).await; + + // Send result back to server + let msg = match result { + Ok(pr_url) => DaemonMessage::CompletionActionResult { + task_id, + success: true, + message: match action.as_str() { + "branch" => format!("Branch pushed to {}", target_repo_path), + "merge" => format!("Merged into {}", target_branch.as_deref().unwrap_or("main")), + "pr" => format!("Pull request created"), + _ => format!("Completion action '{}' executed", action), + }, + pr_url, + }, + Err(e) => DaemonMessage::CompletionActionResult { + task_id, + success: false, + message: e, + pr_url: None, + }, + }; + let _ = self.ws_tx.send(msg).await; + Ok(()) + } + + /// Handle CloneWorktree command. + async fn handle_clone_worktree( + &self, + task_id: Uuid, + target_dir: String, + ) -> Result<(), DaemonError> { + // Get the task's worktree path + let worktree_path = self.get_task_worktree_path(task_id).await?; + + // Expand tilde in target path + let target_path = crate::daemon::worktree::expand_tilde(&target_dir); + + // Clone the worktree to target directory + let result = self.worktree_manager.clone_worktree_to_directory( + &worktree_path, + &target_path, + ).await; + + // Send result back to server + let msg = match result { + Ok(message) => DaemonMessage::CloneWorktreeResult { + task_id, + success: true, + message, + target_dir: Some(target_path.to_string_lossy().to_string()), + }, + Err(e) => DaemonMessage::CloneWorktreeResult { + task_id, + success: false, + message: e.to_string(), + target_dir: None, + }, + }; + let _ = self.ws_tx.send(msg).await; + Ok(()) + } + + /// Handle CheckTargetExists command. + async fn handle_check_target_exists( + &self, + task_id: Uuid, + target_dir: String, + ) -> Result<(), DaemonError> { + // Expand tilde in target path + let target_path = crate::daemon::worktree::expand_tilde(&target_dir); + + // Check if target exists + let exists = self.worktree_manager.target_directory_exists(&target_path).await; + + // Send result back to server + let msg = DaemonMessage::CheckTargetExistsResult { + task_id, + exists, + target_dir: target_path.to_string_lossy().to_string(), + }; + let _ = self.ws_tx.send(msg).await; + Ok(()) + } + + /// Handle ReadRepoFile command. + /// + /// Reads a file from a repository on the daemon's filesystem and sends + /// the content back to the server for syncing contract files. + async fn handle_read_repo_file( + &self, + request_id: Uuid, + file_path: String, + repo_path: String, + ) -> Result<(), DaemonError> { + // Expand tilde in repo path + let repo_path_expanded = crate::daemon::worktree::expand_tilde(&repo_path); + + // Construct full file path + let full_path = repo_path_expanded.join(&file_path); + + // Try to read the file + let (content, success, error) = match tokio::fs::read_to_string(&full_path).await { + Ok(content) => (Some(content), true, None), + Err(e) => { + tracing::warn!( + request_id = %request_id, + file_path = %file_path, + repo_path = %repo_path, + full_path = %full_path.display(), + error = %e, + "Failed to read repo file" + ); + (None, false, Some(e.to_string())) + } + }; + + // Send result back to server + let msg = DaemonMessage::RepoFileContent { + request_id, + file_path, + content, + success, + error, + }; + let _ = self.ws_tx.send(msg).await; + Ok(()) + } + + /// Handle CreateBranch command - create a new branch in a task's worktree. + async fn handle_create_branch( + &self, + task_id: Uuid, + branch_name: String, + from_ref: Option<String>, + ) -> Result<(), DaemonError> { + // Get task's worktree path + let worktree_path = { + let tasks = self.tasks.read().await; + tasks.get(&task_id) + .and_then(|t| t.worktree.as_ref()) + .map(|w| w.path.clone()) + }; + + let (success, message) = if let Some(path) = worktree_path { + // Build git checkout command + let mut cmd = tokio::process::Command::new("git"); + cmd.current_dir(&path); + cmd.arg("checkout").arg("-b").arg(&branch_name); + + if let Some(ref from) = from_ref { + cmd.arg(from); + } + + match cmd.output().await { + Ok(output) => { + if output.status.success() { + (true, format!("Branch '{}' created successfully", branch_name)) + } else { + let stderr = String::from_utf8_lossy(&output.stderr); + (false, format!("Failed to create branch: {}", stderr)) + } + } + Err(e) => (false, format!("Failed to execute git: {}", e)), + } + } else { + (false, format!("Task {} not found or has no worktree", task_id)) + }; + + let msg = DaemonMessage::BranchCreated { + task_id, + success, + branch_name, + message, + }; + let _ = self.ws_tx.send(msg).await; + Ok(()) + } + + /// Handle MergeTaskToTarget command - merge a task's changes to a target branch. + async fn handle_merge_task_to_target( + &self, + task_id: Uuid, + target_branch: Option<String>, + squash: bool, + ) -> Result<(), DaemonError> { + // Get task info + let task_info = { + let tasks = self.tasks.read().await; + tasks.get(&task_id).map(|t| ( + t.worktree.as_ref().map(|w| w.path.clone()), + t.base_branch.clone(), + )) + }; + + let (success, message, commit_sha, conflicts) = match task_info { + Some((Some(worktree_path), base)) => { + let target = target_branch.unwrap_or_else(|| base.unwrap_or_else(|| "main".to_string())); + + // First, stage and commit any uncommitted changes + let add_result = tokio::process::Command::new("git") + .current_dir(&worktree_path) + .args(["add", "-A"]) + .output() + .await; + + if let Err(e) = add_result { + (false, format!("Failed to stage changes: {}", e), None, None) + } else { + // Commit if there are staged changes + let commit_result = tokio::process::Command::new("git") + .current_dir(&worktree_path) + .args(["commit", "-m", "Task completion checkpoint", "--allow-empty"]) + .output() + .await; + + if let Err(e) = commit_result { + tracing::warn!(task_id = %task_id, error = %e, "Commit failed (may be empty)"); + } + + // Get current branch name + let branch_output = tokio::process::Command::new("git") + .current_dir(&worktree_path) + .args(["rev-parse", "--abbrev-ref", "HEAD"]) + .output() + .await; + + let source_branch = branch_output + .map(|o| String::from_utf8_lossy(&o.stdout).trim().to_string()) + .unwrap_or_else(|_| "unknown".to_string()); + + // Checkout target branch + let checkout = tokio::process::Command::new("git") + .current_dir(&worktree_path) + .args(["checkout", &target]) + .output() + .await; + + match checkout { + Ok(output) if output.status.success() => { + // Merge the source branch + let mut merge_cmd = tokio::process::Command::new("git"); + merge_cmd.current_dir(&worktree_path); + merge_cmd.arg("merge"); + if squash { + merge_cmd.arg("--squash"); + } + merge_cmd.arg(&source_branch); + merge_cmd.arg("-m").arg(format!("Merge task {} into {}", task_id, target)); + + match merge_cmd.output().await { + Ok(output) if output.status.success() => { + // Get the commit SHA + let sha_output = tokio::process::Command::new("git") + .current_dir(&worktree_path) + .args(["rev-parse", "HEAD"]) + .output() + .await; + + let sha = sha_output + .ok() + .map(|o| String::from_utf8_lossy(&o.stdout).trim().to_string()); + + if squash { + // For squash merge, we need to commit + let _ = tokio::process::Command::new("git") + .current_dir(&worktree_path) + .args(["commit", "-m", &format!("Squashed merge of task {}", task_id)]) + .output() + .await; + } + + (true, format!("Merged {} into {}", source_branch, target), sha, None) + } + Ok(output) => { + let stderr = String::from_utf8_lossy(&output.stderr); + // Check for merge conflicts + if stderr.contains("CONFLICT") { + let conflict_files = stderr + .lines() + .filter(|l| l.contains("CONFLICT")) + .map(|l| l.to_string()) + .collect::<Vec<_>>(); + (false, "Merge conflicts detected".to_string(), None, Some(conflict_files)) + } else { + (false, format!("Merge failed: {}", stderr), None, None) + } + } + Err(e) => (false, format!("Failed to merge: {}", e), None, None), + } + } + Ok(output) => { + let stderr = String::from_utf8_lossy(&output.stderr); + (false, format!("Failed to checkout target branch: {}", stderr), None, None) + } + Err(e) => (false, format!("Failed to checkout: {}", e), None, None), + } + } + } + Some((None, _)) => (false, format!("Task {} has no worktree", task_id), None, None), + None => (false, format!("Task {} not found", task_id), None, None), + }; + + let msg = DaemonMessage::MergeToTargetResult { + task_id, + success, + message, + commit_sha, + conflicts, + }; + let _ = self.ws_tx.send(msg).await; + Ok(()) + } + + /// Handle CreatePR command - create a pull request for a task's changes. + async fn handle_create_pr( + &self, + task_id: Uuid, + title: String, + body: Option<String>, + base_branch: String, + ) -> Result<(), DaemonError> { + // Get task's worktree path + let worktree_path = { + let tasks = self.tasks.read().await; + tasks.get(&task_id) + .and_then(|t| t.worktree.as_ref()) + .map(|w| w.path.clone()) + }; + + let (success, message, pr_url, pr_number) = if let Some(path) = worktree_path { + // Push the current branch first + let push_result = tokio::process::Command::new("git") + .current_dir(&path) + .args(["push", "-u", "origin", "HEAD"]) + .output() + .await; + + if let Err(e) = push_result { + (false, format!("Failed to push branch: {}", e), None, None) + } else { + // Create PR using gh CLI + let mut pr_cmd = tokio::process::Command::new("gh"); + pr_cmd.current_dir(&path); + pr_cmd.args(["pr", "create", "--title", &title, "--base", &base_branch]); + + if let Some(ref body_text) = body { + pr_cmd.args(["--body", body_text]); + } else { + pr_cmd.args(["--body", ""]); + } + + match pr_cmd.output().await { + Ok(output) if output.status.success() => { + let stdout = String::from_utf8_lossy(&output.stdout); + // gh pr create outputs the PR URL + let url = stdout.lines().last().map(|s| s.trim().to_string()); + // Extract PR number from URL + let number = url.as_ref().and_then(|u| { + u.split('/').last().and_then(|n| n.parse::<i32>().ok()) + }); + (true, "Pull request created".to_string(), url, number) + } + Ok(output) => { + let stderr = String::from_utf8_lossy(&output.stderr); + (false, format!("Failed to create PR: {}", stderr), None, None) + } + Err(e) => (false, format!("Failed to run gh: {}", e), None, None), + } + } + } else { + (false, format!("Task {} not found or has no worktree", task_id), None, None) + }; + + let msg = DaemonMessage::PRCreated { + task_id, + success, + message, + pr_url, + pr_number, + }; + let _ = self.ws_tx.send(msg).await; + Ok(()) + } + + /// Handle GetTaskDiff command - get the diff for a task's changes. + async fn handle_get_task_diff( + &self, + task_id: Uuid, + ) -> Result<(), DaemonError> { + // Get task's worktree path + let worktree_path = { + let tasks = self.tasks.read().await; + tasks.get(&task_id) + .and_then(|t| t.worktree.as_ref()) + .map(|w| w.path.clone()) + }; + + let (success, diff, error) = if let Some(path) = worktree_path { + // Get diff of all changes (staged and unstaged) + let diff_result = tokio::process::Command::new("git") + .current_dir(&path) + .args(["diff", "HEAD"]) + .output() + .await; + + match diff_result { + Ok(output) if output.status.success() => { + let diff_text = String::from_utf8_lossy(&output.stdout).to_string(); + if diff_text.is_empty() { + // No uncommitted changes, show diff from base + let base_diff = tokio::process::Command::new("git") + .current_dir(&path) + .args(["log", "-p", "--reverse", "HEAD~10..HEAD", "--"]) + .output() + .await; + + match base_diff { + Ok(o) => (true, Some(String::from_utf8_lossy(&o.stdout).to_string()), None), + Err(e) => (false, None, Some(format!("Failed to get diff: {}", e))), + } + } else { + (true, Some(diff_text), None) + } + } + Ok(output) => { + let stderr = String::from_utf8_lossy(&output.stderr); + (false, None, Some(format!("Git diff failed: {}", stderr))) + } + Err(e) => (false, None, Some(format!("Failed to run git: {}", e))), + } + } else { + (false, None, Some(format!("Task {} not found or has no worktree", task_id))) + }; + + let msg = DaemonMessage::TaskDiff { + task_id, + success, + diff, + error, + }; + let _ = self.ws_tx.send(msg).await; + Ok(()) + } +} + +/// Inner state for spawned tasks (cloneable). +struct TaskManagerInner { + worktree_manager: Arc<WorktreeManager>, + process_manager: Arc<ProcessManager>, + temp_manager: Arc<TempManager>, + tasks: Arc<RwLock<HashMap<Uuid, ManagedTask>>>, + ws_tx: mpsc::Sender<DaemonMessage>, + task_inputs: Arc<RwLock<HashMap<Uuid, mpsc::Sender<String>>>>, +} + +impl TaskManagerInner { + /// Run a task to completion. + #[allow(clippy::too_many_arguments)] + async fn run_task( + &self, + task_id: Uuid, + task_name: String, + plan: String, + repo_source: Option<String>, + base_branch: Option<String>, + target_branch: Option<String>, + is_orchestrator: bool, + is_supervisor: bool, + target_repo_path: Option<String>, + completion_action: Option<String>, + continue_from_task_id: Option<Uuid>, + copy_files: Option<Vec<String>>, + contract_id: Option<Uuid>, + ) -> Result<(), DaemonError> { + tracing::info!(task_id = %task_id, is_orchestrator = is_orchestrator, is_supervisor = is_supervisor, "=== RUN_TASK START ==="); + + // Determine working directory + let working_dir = if let Some(ref source) = repo_source { + if is_new_repo_request(source) { + // Explicit new repo request: new:// or new://project-name + tracing::info!( + task_id = %task_id, + source = %source, + "Creating new git repository" + ); + + let msg = DaemonMessage::task_output( + task_id, + format!("Initializing new git repository...\n"), + false, + ); + let _ = self.ws_tx.send(msg).await; + + let worktree_info = self.worktree_manager + .init_new_repo(task_id, source) + .await + .map_err(|e| DaemonError::Task(TaskError::SetupFailed(e.to_string())))?; + + tracing::info!( + task_id = %task_id, + path = %worktree_info.path.display(), + "New repository created" + ); + + // Store worktree info + { + let mut tasks = self.tasks.write().await; + if let Some(task) = tasks.get_mut(&task_id) { + task.worktree = Some(worktree_info.clone()); + } + } + + let msg = DaemonMessage::task_output( + task_id, + format!("Repository ready at {}\n", worktree_info.path.display()), + false, + ); + let _ = self.ws_tx.send(msg).await; + + worktree_info.path + } else { + // Send progress message + let msg = DaemonMessage::task_output( + task_id, + format!("Setting up worktree from {}...\n", source), + false, + ); + let _ = self.ws_tx.send(msg).await; + + // Ensure source repo exists (clone if URL, verify if path) + let source_repo = self.worktree_manager.ensure_repo(source).await + .map_err(|e| DaemonError::Task(TaskError::SetupFailed(e.to_string())))?; + + // Detect or use provided base branch + let branch = if let Some(ref b) = base_branch { + b.clone() + } else { + self.worktree_manager.detect_default_branch(&source_repo).await + .map_err(|e| DaemonError::Task(TaskError::SetupFailed(e.to_string())))? + }; + + tracing::info!( + task_id = %task_id, + source = %source, + branch = %branch, + continue_from_task_id = ?continue_from_task_id, + "Setting up worktree" + ); + + // Create worktree - either from scratch or copying from another task + let task_name = format!("task-{}", &task_id.to_string()[..8]); + let worktree_info = if let Some(from_task_id) = continue_from_task_id { + // Find the source task's worktree path + let source_worktree = self.find_worktree_for_task(from_task_id).await + .map_err(|e| DaemonError::Task(TaskError::SetupFailed( + format!("Cannot continue from task {}: {}", from_task_id, e) + )))?; + + let msg = DaemonMessage::task_output( + task_id, + format!("Continuing from task {} worktree...\n", &from_task_id.to_string()[..8]), + false, + ); + let _ = self.ws_tx.send(msg).await; + + // Create worktree by copying from source task + self.worktree_manager + .create_worktree_from_task(&source_worktree, task_id, &task_name) + .await + .map_err(|e| DaemonError::Task(TaskError::SetupFailed(e.to_string())))? + } else { + // Create fresh worktree from repo + self.worktree_manager + .create_worktree(&source_repo, task_id, &task_name, &branch) + .await + .map_err(|e| DaemonError::Task(TaskError::SetupFailed(e.to_string())))? + }; + + tracing::info!( + task_id = %task_id, + worktree_path = %worktree_info.path.display(), + branch = %worktree_info.branch, + continued_from = ?continue_from_task_id, + "Worktree created" + ); + + // Store worktree info + { + let mut tasks = self.tasks.write().await; + if let Some(task) = tasks.get_mut(&task_id) { + task.worktree = Some(worktree_info.clone()); + } + } + + let msg = DaemonMessage::task_output( + task_id, + format!("Worktree ready at {}\n", worktree_info.path.display()), + false, + ); + let _ = self.ws_tx.send(msg).await; + + worktree_info.path + } + } else { + // No repo specified - use managed temp directory in ~/.makima/temp/ + tracing::info!(task_id = %task_id, "Creating managed temp directory (no repo)"); + + let msg = DaemonMessage::task_output( + task_id, + "Creating temporary working directory...\n".to_string(), + false, + ); + let _ = self.ws_tx.send(msg).await; + + let temp_dir = self.temp_manager.create_task_dir(task_id).await?; + + let msg = DaemonMessage::task_output( + task_id, + format!("Working directory ready at {}\n", temp_dir.display()), + false, + ); + let _ = self.ws_tx.send(msg).await; + + temp_dir + }; + + // Copy files from parent task's worktree if specified + if let Some(ref files) = copy_files { + if !files.is_empty() { + // Get the parent task ID to find its worktree + let parent_task_id = { + let tasks = self.tasks.read().await; + tasks.get(&task_id).and_then(|t| t.parent_task_id) + }; + + if let Some(parent_id) = parent_task_id { + match self.find_worktree_for_task(parent_id).await { + Ok(parent_worktree) => { + let msg = DaemonMessage::task_output( + task_id, + format!("Copying {} files from orchestrator...\n", files.len()), + false, + ); + let _ = self.ws_tx.send(msg).await; + + for file_path in files { + let source = parent_worktree.join(file_path); + let dest = working_dir.join(file_path); + + // Create parent directories if needed + if let Some(parent) = dest.parent() { + if let Err(e) = tokio::fs::create_dir_all(parent).await { + tracing::warn!( + task_id = %task_id, + file = %file_path, + error = %e, + "Failed to create parent directory for file" + ); + continue; + } + } + + // Copy the file + match tokio::fs::copy(&source, &dest).await { + Ok(_) => { + tracing::info!( + task_id = %task_id, + source = %source.display(), + dest = %dest.display(), + "Copied file from orchestrator" + ); + } + Err(e) => { + tracing::warn!( + task_id = %task_id, + source = %source.display(), + dest = %dest.display(), + error = %e, + "Failed to copy file from orchestrator" + ); + // Notify but don't fail - the file might be optional + let msg = DaemonMessage::task_output( + task_id, + format!("Warning: Could not copy {}: {}\n", file_path, e), + false, + ); + let _ = self.ws_tx.send(msg).await; + } + } + } + + let msg = DaemonMessage::task_output( + task_id, + "Files copied from orchestrator.\n".to_string(), + false, + ); + let _ = self.ws_tx.send(msg).await; + } + Err(e) => { + tracing::warn!( + task_id = %task_id, + parent_id = %parent_id, + error = %e, + "Could not find parent task worktree for file copying" + ); + } + } + } else { + tracing::warn!( + task_id = %task_id, + "copy_files specified but no parent_task_id" + ); + } + } + } + + // Update state to Starting + tracing::info!(task_id = %task_id, "Updating state: Initializing -> Starting"); + self.update_state(task_id, TaskState::Starting).await; + self.send_status_change(task_id, "initializing", "starting").await; + + // Check Claude is available + match self.process_manager.check_claude_available().await { + Ok(version) => { + tracing::info!(task_id = %task_id, version = %version, "Claude Code available"); + let msg = DaemonMessage::task_output( + task_id, + format!("Claude Code {} ready\n", version), + false, + ); + let _ = self.ws_tx.send(msg).await; + } + Err(e) => { + let err_msg = format!("Claude Code not available: {}", e); + tracing::error!(task_id = %task_id, error = %err_msg); + return Err(DaemonError::Task(TaskError::SetupFailed(err_msg))); + } + } + + // Set up supervisor, orchestrator, or subtask mode + let (extra_env, full_plan, system_prompt) = if is_supervisor { + // Supervisor mode: long-running contract orchestrator + tracing::info!(task_id = %task_id, working_dir = %working_dir.display(), "Setting up supervisor mode"); + + let msg = DaemonMessage::task_output( + task_id, + "Setting up supervisor environment...\n".to_string(), + false, + ); + let _ = self.ws_tx.send(msg).await; + + // Generate tool key for API access + let tool_key = generate_tool_key(); + tracing::info!(task_id = %task_id, tool_key_len = tool_key.len(), "Generated tool key for supervisor"); + + // Register tool key with server + let register_msg = DaemonMessage::register_tool_key(task_id, tool_key.clone()); + if self.ws_tx.send(register_msg).await.is_err() { + tracing::warn!(task_id = %task_id, "Failed to register tool key"); + } else { + tracing::info!(task_id = %task_id, "Tool key registration message sent to server"); + } + + // Set up environment variables for makima CLI + let mut env = HashMap::new(); + // TODO: Make API URL configurable + env.insert("MAKIMA_API_URL".to_string(), "http://localhost:8080".to_string()); + env.insert("MAKIMA_API_KEY".to_string(), tool_key.clone()); + env.insert("MAKIMA_TASK_ID".to_string(), task_id.to_string()); + // Supervisor needs contract ID for its tools + if let Some(cid) = contract_id { + env.insert("MAKIMA_CONTRACT_ID".to_string(), cid.to_string()); + } + + tracing::info!( + task_id = %task_id, + api_url = "http://localhost:8080", + tool_key_preview = &tool_key[..8.min(tool_key.len())], + "Set supervisor environment variables" + ); + + // For supervisor, pass instructions as SYSTEM PROMPT (not user message) + // This ensures Claude treats them as behavioral constraints + let supervisor_user_plan = format!( + "Contract goal:\n{}", + plan + ); + + let msg = DaemonMessage::task_output( + task_id, + "Supervisor environment ready (makima CLI available)\n".to_string(), + false, + ); + let _ = self.ws_tx.send(msg).await; + + // Return system prompt separately - it will be passed via --system-prompt flag + (Some(env), supervisor_user_plan, Some(SUPERVISOR_SYSTEM_PROMPT.to_string())) + } else if is_orchestrator { + tracing::info!(task_id = %task_id, working_dir = %working_dir.display(), "Setting up orchestrator mode"); + + let msg = DaemonMessage::task_output( + task_id, + "Setting up orchestrator environment...\n".to_string(), + false, + ); + let _ = self.ws_tx.send(msg).await; + + // Generate tool key for API access + let tool_key = generate_tool_key(); + tracing::info!(task_id = %task_id, tool_key_len = tool_key.len(), "Generated tool key for orchestrator"); + + // Register tool key with server + let register_msg = DaemonMessage::register_tool_key(task_id, tool_key.clone()); + if self.ws_tx.send(register_msg).await.is_err() { + tracing::warn!(task_id = %task_id, "Failed to register tool key"); + } else { + tracing::info!(task_id = %task_id, "Tool key registration message sent to server"); + } + + // Set up environment variables for makima CLI + let mut env = HashMap::new(); + // TODO: Make API URL configurable + env.insert("MAKIMA_API_URL".to_string(), "http://localhost:8080".to_string()); + env.insert("MAKIMA_API_KEY".to_string(), tool_key.clone()); + env.insert("MAKIMA_TASK_ID".to_string(), task_id.to_string()); + + tracing::info!( + task_id = %task_id, + api_url = "http://localhost:8080", + tool_key_preview = &tool_key[..8.min(tool_key.len())], + "Set orchestrator environment variables" + ); + + // For orchestrator, pass instructions as SYSTEM PROMPT + let orchestrator_user_plan = format!( + "Your task:\n{}", + plan + ); + + let msg = DaemonMessage::task_output( + task_id, + "Orchestrator environment ready (makima CLI available)\n".to_string(), + false, + ); + let _ = self.ws_tx.send(msg).await; + + (Some(env), orchestrator_user_plan, Some(ORCHESTRATOR_SYSTEM_PROMPT.to_string())) + } else { + tracing::info!(task_id = %task_id, "Running as regular subtask (not orchestrator)"); + // For subtasks, pass worktree isolation instructions as system prompt + let subtask_user_plan = format!( + "Your task:\n{}", + plan + ); + (None, subtask_user_plan, Some(SUBTASK_SYSTEM_PROMPT.to_string())) + }; + + // Add contract environment if task has contract_id (skip for supervisors - they already have it) + let (extra_env, full_plan, system_prompt) = if let Some(cid) = contract_id { + if is_supervisor { + // Supervisors already have contract ID and API access set up + tracing::info!(task_id = %task_id, contract_id = %cid, "Supervisor already has contract integration"); + (extra_env, full_plan, system_prompt) + } else { + tracing::info!(task_id = %task_id, contract_id = %cid, "Setting up contract integration"); + + // Set up environment variables for makima CLI + let mut env = extra_env.unwrap_or_default(); + env.insert("MAKIMA_CONTRACT_ID".to_string(), cid.to_string()); + + // If not already an orchestrator, we need API access for makima CLI + if !is_orchestrator { + // Generate tool key for API access + let tool_key = generate_tool_key(); + tracing::info!(task_id = %task_id, "Generated tool key for contract access"); + + // Register tool key with server + let register_msg = DaemonMessage::register_tool_key(task_id, tool_key.clone()); + if self.ws_tx.send(register_msg).await.is_err() { + tracing::warn!(task_id = %task_id, "Failed to register contract tool key"); + } + + env.insert("MAKIMA_API_URL".to_string(), "http://localhost:8080".to_string()); + env.insert("MAKIMA_API_KEY".to_string(), tool_key); + env.insert("MAKIMA_TASK_ID".to_string(), task_id.to_string()); + } + + let msg = DaemonMessage::task_output( + task_id, + "Contract integration ready (makima CLI available)\n".to_string(), + false, + ); + let _ = self.ws_tx.send(msg).await; + + // Prepend contract integration prompt to the plan so the task knows to use makima CLI + let contract_plan = format!( + "{}{}", + CONTRACT_INTEGRATION_PROMPT, + full_plan + ); + + (Some(env), contract_plan, system_prompt) + } + } else { + (extra_env, full_plan, system_prompt) + }; + + // Spawn Claude process + let plan_bytes = full_plan.len(); + let plan_chars = full_plan.chars().count(); + // Rough token estimate: ~4 chars per token for English + let estimated_tokens = plan_chars / 4; + + tracing::info!( + task_id = %task_id, + working_dir = %working_dir.display(), + is_orchestrator = is_orchestrator, + plan_bytes = plan_bytes, + plan_chars = plan_chars, + estimated_tokens = estimated_tokens, + "Spawning Claude process" + ); + + // Warn if plan is very large (Claude's context is typically 100k-200k tokens) + if estimated_tokens > 50_000 { + tracing::warn!(task_id = %task_id, estimated_tokens = estimated_tokens, "Plan is very large - may hit context limits!"); + let msg = DaemonMessage::task_output( + task_id, + format!("Warning: Plan is very large (~{} tokens). This may cause issues.\n", estimated_tokens), + false, + ); + let _ = self.ws_tx.send(msg).await; + } + + let msg = DaemonMessage::task_output( + task_id, + if is_orchestrator { + format!("Starting Claude Code (orchestrator mode, ~{} tokens)...\n", estimated_tokens) + } else { + format!("Starting Claude Code (~{} tokens)...\n", estimated_tokens) + }, + false, + ); + let _ = self.ws_tx.send(msg).await; + + tracing::debug!(task_id = %task_id, has_system_prompt = system_prompt.is_some(), "Calling process_manager.spawn()..."); + let mut process = self.process_manager + .spawn_with_system_prompt(&working_dir, &full_plan, extra_env, system_prompt.as_deref()) + .await + .map_err(|e| { + tracing::error!(task_id = %task_id, error = %e, "Failed to spawn Claude process"); + DaemonError::Task(TaskError::SetupFailed(e.to_string())) + })?; + tracing::info!(task_id = %task_id, "Claude process spawned successfully"); + + // Set up input channel for this task so we can send messages to its stdin + tracing::debug!(task_id = %task_id, "Setting up input channel..."); + let (input_tx, mut input_rx) = mpsc::channel::<String>(100); + tracing::debug!(task_id = %task_id, "Acquiring task_inputs write lock..."); + self.task_inputs.write().await.insert(task_id, input_tx); + tracing::debug!(task_id = %task_id, "Input channel registered"); + + // Get stdin handle for input forwarding and completion signaling + let stdin_handle = process.stdin_handle(); + let stdin_handle_for_completion = stdin_handle.clone(); + + tracing::info!(task_id = %task_id, "Setting up stdin forwarder for task input (JSON protocol)"); + tokio::spawn(async move { + tracing::info!(task_id = %task_id, "Stdin forwarder task started, waiting for messages..."); + while let Some(msg) = input_rx.recv().await { + tracing::info!(task_id = %task_id, msg_len = msg.len(), msg_preview = %if msg.len() > 50 { &msg[..50] } else { &msg }, "Received message from input channel"); + + // Format as JSON user message for stream-json input protocol + let json_msg = ClaudeInputMessage::user(&msg); + let json_line = match json_msg.to_json_line() { + Ok(line) => line, + Err(e) => { + tracing::error!(task_id = %task_id, error = %e, "Failed to serialize input message"); + continue; + } + }; + + tracing::debug!(task_id = %task_id, json_line = %json_line.trim(), "Formatted JSON line for stdin"); + + let mut stdin_guard = stdin_handle.lock().await; + if let Some(ref mut stdin) = *stdin_guard { + tracing::debug!(task_id = %task_id, "Acquired stdin lock, writing..."); + if stdin.write_all(json_line.as_bytes()).await.is_err() { + tracing::warn!(task_id = %task_id, "Failed to write to stdin, breaking"); + break; + } + if stdin.flush().await.is_err() { + tracing::warn!(task_id = %task_id, "Failed to flush stdin, breaking"); + break; + } + tracing::info!(task_id = %task_id, json_len = json_line.len(), "Successfully wrote user message to Claude stdin"); + } else { + tracing::warn!(task_id = %task_id, "Stdin is None (already closed), cannot send message"); + break; + } + } + tracing::info!(task_id = %task_id, "Stdin forwarder task ended (channel closed or stdin unavailable)"); + }); + + // Update state to Running + { + tracing::debug!(task_id = %task_id, "Acquiring tasks write lock for Running state update"); + let mut tasks = self.tasks.write().await; + if let Some(task) = tasks.get_mut(&task_id) { + task.state = TaskState::Running; + task.started_at = Some(Instant::now()); + } + tracing::debug!(task_id = %task_id, "Released tasks write lock"); + } + tracing::info!(task_id = %task_id, "Updating state: Starting -> Running"); + self.send_status_change(task_id, "starting", "running").await; + tracing::debug!(task_id = %task_id, "Sent status change notification"); + + // Stream output with startup timeout check + tracing::info!(task_id = %task_id, "Starting output stream - waiting for Claude output..."); + tracing::debug!(task_id = %task_id, "Output will be forwarded via WebSocket to server"); + let ws_tx = self.ws_tx.clone(); + + // For auth error detection + let claude_command = self.process_manager.claude_command().to_string(); + let daemon_hostname = hostname::get().ok().and_then(|h| h.into_string().ok()); + let mut auth_error_handled = false; + + let mut output_count = 0u64; + let mut output_bytes = 0usize; + let startup_timeout = tokio::time::Duration::from_secs(30); + let mut startup_check = tokio::time::interval(tokio::time::Duration::from_secs(5)); + startup_check.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); + let startup_deadline = tokio::time::Instant::now() + startup_timeout; + + loop { + tokio::select! { + maybe_line = process.next_output() => { + match maybe_line { + Some(line) => { + output_count += 1; + output_bytes += line.content.len(); + + if output_count == 1 { + tracing::info!(task_id = %task_id, "Received first output line from Claude"); + } + if output_count % 100 == 0 { + tracing::debug!(task_id = %task_id, output_count = output_count, output_bytes = output_bytes, "Output progress"); + } + + // Log output details for debugging + tracing::trace!( + task_id = %task_id, + line_num = output_count, + content_len = line.content.len(), + is_stdout = line.is_stdout, + json_type = ?line.json_type, + "Forwarding output to WebSocket" + ); + + // Check if this is a "result" message indicating task completion + // With --input-format=stream-json, Claude waits for more input after completion + // We close stdin to signal EOF and let the process exit + if line.json_type.as_deref() == Some("result") { + tracing::info!(task_id = %task_id, "Received result message, closing stdin to signal completion"); + let mut stdin_guard = stdin_handle_for_completion.lock().await; + if let Some(mut stdin) = stdin_guard.take() { + let _ = stdin.shutdown().await; + } + } + + // Check for OAuth auth error before sending output + let content_for_auth_check = line.content.clone(); + + let msg = DaemonMessage::task_output(task_id, line.content, false); + if ws_tx.send(msg).await.is_err() { + tracing::warn!(task_id = %task_id, "Failed to send output, channel closed"); + break; + } + + // Detect OAuth token expiration and trigger remote login flow + if !auth_error_handled && is_oauth_auth_error(&content_for_auth_check) { + auth_error_handled = true; + tracing::warn!(task_id = %task_id, "OAuth authentication error detected, initiating remote login flow"); + + // Spawn claude setup-token to get login URL + if let Some(login_url) = get_oauth_login_url(&claude_command).await { + tracing::info!(task_id = %task_id, login_url = %login_url, "Got OAuth login URL"); + let auth_msg = DaemonMessage::AuthenticationRequired { + task_id: Some(task_id), + login_url, + hostname: daemon_hostname.clone(), + }; + if ws_tx.send(auth_msg).await.is_err() { + tracing::warn!(task_id = %task_id, "Failed to send auth required message"); + } + } else { + tracing::error!(task_id = %task_id, "Failed to get OAuth login URL from setup-token"); + let fallback_msg = DaemonMessage::task_output( + task_id, + format!("Authentication required on daemon{}. Please run 'claude /login' on the daemon machine.\n", + daemon_hostname.as_ref().map(|h| format!(" ({})", h)).unwrap_or_default()), + false, + ); + let _ = ws_tx.send(fallback_msg).await; + } + } + } + None => { + tracing::info!(task_id = %task_id, output_count = output_count, output_bytes = output_bytes, "Output stream ended"); + break; + } + } + } + _ = startup_check.tick(), if output_count == 0 => { + // Check if process is still alive + match process.try_wait() { + Ok(Some(exit_code)) => { + tracing::error!(task_id = %task_id, exit_code = exit_code, "Claude process exited before producing output!"); + let msg = DaemonMessage::task_output( + task_id, + format!("Error: Claude process exited unexpectedly with code {}\n", exit_code), + false, + ); + let _ = ws_tx.send(msg).await; + break; + } + Ok(None) => { + // Still running but no output + if tokio::time::Instant::now() > startup_deadline { + tracing::warn!(task_id = %task_id, "Claude process not producing output after 30s - may be stuck"); + let msg = DaemonMessage::task_output( + task_id, + "Warning: Claude Code is taking longer than expected to start. It may be waiting for authentication or network access.\n".to_string(), + false, + ); + let _ = ws_tx.send(msg).await; + } else { + tracing::debug!(task_id = %task_id, "Claude process still running, waiting for output..."); + } + } + Err(e) => { + tracing::error!(task_id = %task_id, error = %e, "Failed to check Claude process status"); + } + } + } + } + } + + // Wait for process to exit + let exit_code = process.wait().await.unwrap_or(-1); + + // Clean up input channel for this task + self.task_inputs.write().await.remove(&task_id); + tracing::debug!(task_id = %task_id, "Removed task input channel"); + + // Update state based on exit code + let success = exit_code == 0; + let new_state = if success { + TaskState::Completed + } else { + TaskState::Failed + }; + + tracing::info!( + task_id = %task_id, + exit_code = exit_code, + success = success, + new_state = ?new_state, + "Claude process exited, updating task state" + ); + + { + let mut tasks = self.tasks.write().await; + if let Some(task) = tasks.get_mut(&task_id) { + task.state = new_state; + task.completed_at = Some(Instant::now()); + if !success { + task.error = Some(format!("Process exited with code {}", exit_code)); + } + } + } + + // Execute completion action if task succeeded + let completion_result = if success { + if let Some(ref action) = completion_action { + if action != "none" { + self.execute_completion_action( + task_id, + &task_name, + &working_dir, + action, + target_repo_path.as_deref(), + target_branch.as_deref(), + ).await + } else { + Ok(None) + } + } else { + Ok(None) + } + } else { + Ok(None) + }; + + // Log completion action result + match &completion_result { + Ok(Some(pr_url)) => { + tracing::info!(task_id = %task_id, pr_url = %pr_url, "Completion action created PR"); + } + Ok(None) => {} + Err(e) => { + tracing::warn!(task_id = %task_id, error = %e, "Completion action failed (task still marked as done)"); + } + } + + // Notify server - but NOT for supervisors which should never complete + if is_supervisor { + tracing::info!( + task_id = %task_id, + exit_code = exit_code, + "Supervisor Claude process exited - NOT marking as complete" + ); + // Update local state to reflect it's paused/waiting for input + { + let mut tasks = self.tasks.write().await; + if let Some(task) = tasks.get_mut(&task_id) { + task.state = TaskState::Running; // Keep it as running, not completed + task.completed_at = None; + } + } + // Send a status message to let the frontend know supervisor is ready for more input + let msg = DaemonMessage::task_output( + task_id, + "\n[Supervisor ready for next instruction]\n".to_string(), + false, + ); + let _ = self.ws_tx.send(msg).await; + } else { + let error = if success { + None + } else { + Some(format!("Exit code: {}", exit_code)) + }; + tracing::info!(task_id = %task_id, success = success, "Notifying server of task completion"); + let msg = DaemonMessage::task_complete(task_id, success, error); + let _ = self.ws_tx.send(msg).await; + } + + // Note: Worktrees are kept until explicitly deleted (per user preference) + // This allows inspection, PR creation, etc. + + tracing::info!(task_id = %task_id, "=== RUN_TASK END ==="); + Ok(()) + } + + /// Execute the completion action for a task. + async fn execute_completion_action( + &self, + task_id: Uuid, + task_name: &str, + worktree_path: &std::path::Path, + action: &str, + target_repo_path: Option<&str>, + target_branch: Option<&str>, + ) -> Result<Option<String>, String> { + let target_repo = match target_repo_path { + Some(path) => crate::daemon::worktree::expand_tilde(path), + None => { + tracing::warn!(task_id = %task_id, "No target_repo_path configured, skipping completion action"); + return Ok(None); + } + }; + + if !target_repo.exists() { + return Err(format!("Target repo not found: {} (expanded from {:?})", target_repo.display(), target_repo_path)); + } + + // Get the branch name: makima/{task-name-with-dashes}-{short-id} + let branch_name = format!( + "makima/{}-{}", + crate::daemon::worktree::sanitize_name(task_name), + crate::daemon::worktree::short_uuid(task_id) + ); + + // Determine target branch - use provided value or detect default branch of target repo + let target_branch = match target_branch { + Some(branch) => branch.to_string(), + None => { + // Detect default branch (main, master, develop, etc.) + self.worktree_manager + .detect_default_branch(&target_repo) + .await + .unwrap_or_else(|_| "master".to_string()) + } + }; + + let msg = DaemonMessage::task_output( + task_id, + format!("Executing completion action: {}...\n", action), + false, + ); + let _ = self.ws_tx.send(msg).await; + + match action { + "branch" => { + // Just push the branch to target repo + self.worktree_manager + .push_to_target_repo(worktree_path, &target_repo, &branch_name, task_name) + .await + .map_err(|e| e.to_string())?; + + let msg = DaemonMessage::task_output( + task_id, + format!("Branch '{}' pushed to {}\n", branch_name, target_repo.display()), + false, + ); + let _ = self.ws_tx.send(msg).await; + Ok(None) + } + "merge" => { + // Push and merge into target branch + let commit_sha = self.worktree_manager + .merge_to_target(worktree_path, &target_repo, &branch_name, &target_branch, task_name) + .await + .map_err(|e| e.to_string())?; + + let msg = DaemonMessage::task_output( + task_id, + format!("Branch merged into {} (commit: {})\n", target_branch, commit_sha), + false, + ); + let _ = self.ws_tx.send(msg).await; + Ok(None) + } + "pr" => { + // Push and create PR + let title = task_name.to_string(); + let body = format!( + "Automated PR from makima task.\n\nTask ID: `{}`", + task_id + ); + let pr_url = self.worktree_manager + .create_pull_request( + worktree_path, + &target_repo, + &branch_name, + &target_branch, + &title, + &body, + ) + .await + .map_err(|e| e.to_string())?; + + let msg = DaemonMessage::task_output( + task_id, + format!("Pull request created: {}\n", pr_url), + false, + ); + let _ = self.ws_tx.send(msg).await; + Ok(Some(pr_url)) + } + _ => { + tracing::warn!(task_id = %task_id, action = %action, "Unknown completion action"); + Ok(None) + } + } + } + + /// Find worktree path for a task ID. + /// First checks in-memory tasks, then scans the worktrees directory. + async fn find_worktree_for_task(&self, task_id: Uuid) -> Result<PathBuf, String> { + // First try to get from in-memory tasks + { + let tasks = self.tasks.read().await; + if let Some(task) = tasks.get(&task_id) { + if let Some(ref worktree) = task.worktree { + return Ok(worktree.path.clone()); + } + } + } + + // Task not in memory - scan worktrees directory for matching task ID + let short_id = &task_id.to_string()[..8]; + let worktrees_dir = self.worktree_manager.base_dir(); + + if let Ok(mut entries) = tokio::fs::read_dir(worktrees_dir).await { + while let Ok(Some(entry)) = entries.next_entry().await { + let name = entry.file_name(); + let name_str = name.to_string_lossy(); + if name_str.starts_with(short_id) { + let path = entry.path(); + // Verify it's a valid git directory + if path.join(".git").exists() { + tracing::info!( + task_id = %task_id, + worktree_path = %path.display(), + "Found worktree by scanning directory" + ); + return Ok(path); + } + } + } + } + + Err(format!( + "No worktree found for task {}. The worktree may have been cleaned up.", + task_id + )) + } + + async fn update_state(&self, task_id: Uuid, state: TaskState) { + let mut tasks = self.tasks.write().await; + if let Some(task) = tasks.get_mut(&task_id) { + task.state = state; + } + } + + async fn send_status_change(&self, task_id: Uuid, old_status: &str, new_status: &str) { + let msg = DaemonMessage::task_status_change(task_id, old_status, new_status); + let _ = self.ws_tx.send(msg).await; + } + + /// Mark task as failed. + async fn mark_failed(&self, task_id: Uuid, error: &str) { + { + let mut tasks = self.tasks.write().await; + if let Some(task) = tasks.get_mut(&task_id) { + task.state = TaskState::Failed; + task.error = Some(error.to_string()); + task.completed_at = Some(Instant::now()); + } + } + + // Notify server + let msg = DaemonMessage::task_complete(task_id, false, Some(error.to_string())); + let _ = self.ws_tx.send(msg).await; + } +} + +impl Clone for TaskManagerInner { + fn clone(&self) -> Self { + Self { + worktree_manager: self.worktree_manager.clone(), + process_manager: self.process_manager.clone(), + temp_manager: self.temp_manager.clone(), + tasks: self.tasks.clone(), + ws_tx: self.ws_tx.clone(), + task_inputs: self.task_inputs.clone(), + } + } +} diff --git a/makima/src/daemon/task/mod.rs b/makima/src/daemon/task/mod.rs new file mode 100644 index 0000000..29c261e --- /dev/null +++ b/makima/src/daemon/task/mod.rs @@ -0,0 +1,7 @@ +//! Task management and execution. + +pub mod manager; +pub mod state; + +pub use manager::{ManagedTask, TaskConfig, TaskManager}; +pub use state::TaskState; diff --git a/makima/src/daemon/task/state.rs b/makima/src/daemon/task/state.rs new file mode 100644 index 0000000..ca5fc01 --- /dev/null +++ b/makima/src/daemon/task/state.rs @@ -0,0 +1,161 @@ +//! Task state machine. + +use std::fmt; + +/// Task execution state. +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +pub enum TaskState { + /// Task received, preparing overlay. + Initializing, + /// Overlay ready, starting container. + Starting, + /// Container running. + Running, + /// Container paused. + Paused, + /// Waiting for sibling or resource. + Blocked, + /// Task completed successfully. + Completed, + /// Task failed with error. + Failed, + /// Task interrupted by user. + Interrupted, +} + +impl TaskState { + /// Check if a state transition is valid. + pub fn can_transition_to(&self, target: TaskState) -> bool { + use TaskState::*; + + matches!( + (self, target), + // From Initializing + (Initializing, Starting) + | (Initializing, Failed) + | (Initializing, Interrupted) + // From Starting + | (Starting, Running) + | (Starting, Failed) + | (Starting, Interrupted) + // From Running + | (Running, Paused) + | (Running, Blocked) + | (Running, Completed) + | (Running, Failed) + | (Running, Interrupted) + // From Paused + | (Paused, Running) + | (Paused, Interrupted) + | (Paused, Failed) + // From Blocked + | (Blocked, Running) + | (Blocked, Failed) + | (Blocked, Interrupted) + ) + } + + /// Check if this state is terminal (no more transitions possible). + pub fn is_terminal(&self) -> bool { + matches!( + self, + TaskState::Completed | TaskState::Failed | TaskState::Interrupted + ) + } + + /// Check if the task is currently active (running or paused). + pub fn is_active(&self) -> bool { + matches!( + self, + TaskState::Initializing + | TaskState::Starting + | TaskState::Running + | TaskState::Paused + | TaskState::Blocked + ) + } + + /// Check if the task is running. + pub fn is_running(&self) -> bool { + matches!(self, TaskState::Running) + } + + /// Convert to string for protocol messages. + pub fn as_str(&self) -> &'static str { + match self { + TaskState::Initializing => "initializing", + TaskState::Starting => "starting", + TaskState::Running => "running", + TaskState::Paused => "paused", + TaskState::Blocked => "blocked", + TaskState::Completed => "done", + TaskState::Failed => "failed", + TaskState::Interrupted => "interrupted", + } + } + + /// Parse from string. + pub fn from_str(s: &str) -> Option<Self> { + match s.to_lowercase().as_str() { + "initializing" => Some(TaskState::Initializing), + "starting" => Some(TaskState::Starting), + "running" => Some(TaskState::Running), + "paused" => Some(TaskState::Paused), + "blocked" => Some(TaskState::Blocked), + "done" | "completed" => Some(TaskState::Completed), + "failed" => Some(TaskState::Failed), + "interrupted" => Some(TaskState::Interrupted), + _ => None, + } + } +} + +impl fmt::Display for TaskState { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "{}", self.as_str()) + } +} + +impl Default for TaskState { + fn default() -> Self { + TaskState::Initializing + } +} + +#[cfg(test)] +mod tests { + use crate::daemon::*; + + #[test] + fn test_valid_transitions() { + use TaskState::*; + + // Valid transitions + assert!(Initializing.can_transition_to(Starting)); + assert!(Starting.can_transition_to(Running)); + assert!(Running.can_transition_to(Completed)); + assert!(Running.can_transition_to(Paused)); + assert!(Paused.can_transition_to(Running)); + + // Invalid transitions + assert!(!Completed.can_transition_to(Running)); + assert!(!Failed.can_transition_to(Running)); + assert!(!Running.can_transition_to(Initializing)); + } + + #[test] + fn test_terminal_states() { + assert!(TaskState::Completed.is_terminal()); + assert!(TaskState::Failed.is_terminal()); + assert!(TaskState::Interrupted.is_terminal()); + assert!(!TaskState::Running.is_terminal()); + assert!(!TaskState::Paused.is_terminal()); + } + + #[test] + fn test_parse() { + assert_eq!(TaskState::from_str("running"), Some(TaskState::Running)); + assert_eq!(TaskState::from_str("done"), Some(TaskState::Completed)); + assert_eq!(TaskState::from_str("invalid"), None); + } +} diff --git a/makima/src/daemon/temp.rs b/makima/src/daemon/temp.rs new file mode 100644 index 0000000..42d4a28 --- /dev/null +++ b/makima/src/daemon/temp.rs @@ -0,0 +1,224 @@ +//! Managed temporary directory for tasks without repositories. +//! +//! Tasks that don't have a repository URL and aren't subtasks (which inherit +//! from parent) use a managed temp directory in ~/.makima/temp/. The directory +//! is automatically cleaned up when it exceeds a size limit. + +use std::path::PathBuf; + +use tokio::fs; +use uuid::Uuid; + +/// Maximum size of the temp directory before cleanup (5GB). +const MAX_TEMP_SIZE_BYTES: u64 = 5 * 1024 * 1024 * 1024; + +/// Manages temporary directories for tasks without repositories. +pub struct TempManager { + /// Base directory for temp task directories (~/.makima/temp/). + temp_dir: PathBuf, +} + +impl TempManager { + /// Create a new TempManager. + pub fn new() -> Self { + let home = dirs::home_dir().unwrap_or_else(|| PathBuf::from(".")); + Self { + temp_dir: home.join(".makima").join("temp"), + } + } + + /// Create a new TempManager with a custom base directory. + #[allow(dead_code)] + pub fn with_base_dir(base_dir: PathBuf) -> Self { + Self { temp_dir: base_dir } + } + + /// Get the base temp directory path. + pub fn temp_dir(&self) -> &PathBuf { + &self.temp_dir + } + + /// Create a temp directory for a task. + /// + /// This creates a directory at ~/.makima/temp/task-{id}/ and triggers + /// cleanup if the total size exceeds the limit. + pub async fn create_task_dir(&self, task_id: Uuid) -> Result<PathBuf, std::io::Error> { + // Ensure base directory exists + fs::create_dir_all(&self.temp_dir).await?; + + // Check size and cleanup if needed + if let Err(e) = self.cleanup_if_needed().await { + tracing::warn!("Temp directory cleanup failed: {}", e); + // Continue anyway, cleanup is best-effort + } + + // Create task-specific directory + let task_dir = self.temp_dir.join(format!("task-{}", task_id)); + fs::create_dir_all(&task_dir).await?; + + tracing::info!( + task_id = %task_id, + path = %task_dir.display(), + "Created temp directory for task" + ); + + Ok(task_dir) + } + + /// Calculate total size of temp directory recursively. + async fn get_total_size(&self) -> Result<u64, std::io::Error> { + if !self.temp_dir.exists() { + return Ok(0); + } + + let mut total = 0u64; + let mut stack = vec![self.temp_dir.clone()]; + + while let Some(dir) = stack.pop() { + let mut entries = match fs::read_dir(&dir).await { + Ok(e) => e, + Err(_) => continue, + }; + + while let Ok(Some(entry)) = entries.next_entry().await { + let metadata = match entry.metadata().await { + Ok(m) => m, + Err(_) => continue, + }; + + if metadata.is_dir() { + stack.push(entry.path()); + } else { + total += metadata.len(); + } + } + } + + Ok(total) + } + + /// Remove oldest directories if total size exceeds limit. + async fn cleanup_if_needed(&self) -> Result<(), std::io::Error> { + let size = self.get_total_size().await?; + if size <= MAX_TEMP_SIZE_BYTES { + return Ok(()); + } + + tracing::info!( + current_size_mb = size / 1024 / 1024, + limit_mb = MAX_TEMP_SIZE_BYTES / 1024 / 1024, + "Temp directory exceeds size limit, starting cleanup" + ); + + // Get all task dirs with modification times + let mut dirs: Vec<(PathBuf, std::time::SystemTime, u64)> = vec![]; + let mut entries = fs::read_dir(&self.temp_dir).await?; + + while let Ok(Some(entry)) = entries.next_entry().await { + let path = entry.path(); + if !path.is_dir() { + continue; + } + + let metadata = match entry.metadata().await { + Ok(m) => m, + Err(_) => continue, + }; + + let modified = metadata.modified().unwrap_or(std::time::UNIX_EPOCH); + let dir_size = self.get_dir_size(&path).await.unwrap_or(0); + dirs.push((path, modified, dir_size)); + } + + // Sort by oldest first + dirs.sort_by(|a, b| a.1.cmp(&b.1)); + + // Remove oldest until under limit + let mut current_size = size; + for (path, _, dir_size) in dirs { + if current_size <= MAX_TEMP_SIZE_BYTES { + break; + } + + tracing::info!( + path = %path.display(), + size_mb = dir_size / 1024 / 1024, + "Removing old temp directory" + ); + + if let Err(e) = fs::remove_dir_all(&path).await { + tracing::warn!(path = %path.display(), error = %e, "Failed to remove temp directory"); + continue; + } + + current_size = current_size.saturating_sub(dir_size); + } + + tracing::info!( + new_size_mb = current_size / 1024 / 1024, + "Temp directory cleanup complete" + ); + + Ok(()) + } + + /// Calculate size of a directory recursively. + async fn get_dir_size(&self, path: &PathBuf) -> Result<u64, std::io::Error> { + let mut total = 0u64; + let mut stack = vec![path.clone()]; + + while let Some(dir) = stack.pop() { + let mut entries = match fs::read_dir(&dir).await { + Ok(e) => e, + Err(_) => continue, + }; + + while let Ok(Some(entry)) = entries.next_entry().await { + let metadata = match entry.metadata().await { + Ok(m) => m, + Err(_) => continue, + }; + + if metadata.is_dir() { + stack.push(entry.path()); + } else { + total += metadata.len(); + } + } + } + + Ok(total) + } + + /// Remove a specific task's temp directory. + #[allow(dead_code)] + pub async fn remove_task_dir(&self, task_id: Uuid) -> Result<(), std::io::Error> { + let task_dir = self.temp_dir.join(format!("task-{}", task_id)); + if task_dir.exists() { + fs::remove_dir_all(&task_dir).await?; + tracing::info!( + task_id = %task_id, + path = %task_dir.display(), + "Removed temp directory for task" + ); + } + Ok(()) + } +} + +impl Default for TempManager { + fn default() -> Self { + Self::new() + } +} + +#[cfg(test)] +mod tests { + use crate::daemon::*; + + #[test] + fn test_temp_manager_default_dir() { + let manager = TempManager::new(); + assert!(manager.temp_dir().ends_with(".makima/temp")); + } +} diff --git a/makima/src/daemon/worktree/manager.rs b/makima/src/daemon/worktree/manager.rs new file mode 100644 index 0000000..9af5dcb --- /dev/null +++ b/makima/src/daemon/worktree/manager.rs @@ -0,0 +1,1623 @@ +//! Worktree manager implementation. + +use std::collections::HashMap; +use std::path::{Path, PathBuf}; +use std::sync::LazyLock; + +use tokio::process::Command; +use tokio::sync::Mutex; +use uuid::Uuid; + +/// Errors that can occur during worktree operations. +#[derive(Debug, thiserror::Error)] +pub enum WorktreeError { + #[error("Git command failed: {0}")] + GitCommand(String), + + #[error("Repository not found: {0}")] + RepoNotFound(String), + + #[error("Failed to create directory: {0}")] + CreateDir(#[from] std::io::Error), + + #[error("Invalid repository path: {0}")] + InvalidPath(String), + + #[error("Worktree already exists: {0}")] + AlreadyExists(String), + + #[error("Clone failed: {0}")] + CloneFailed(String), + + #[error("Merge in progress")] + MergeInProgress, + + #[error("No merge in progress")] + NoMergeInProgress, + + #[error("Merge has conflicts: {0}")] + MergeConflicts(String), +} + +/// Strategy for resolving a merge conflict. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum ConflictResolution { + /// Use our version (the branch being merged into). + Ours, + /// Use their version (the branch being merged). + Theirs, +} + +/// State of an in-progress merge. +#[derive(Debug, Clone)] +pub struct MergeState { + /// The branch being merged. + pub source_branch: String, + /// Files with unresolved conflicts. + pub conflicted_files: Vec<String>, + /// Whether a merge is currently in progress. + pub in_progress: bool, +} + +/// Information about a task branch. +#[derive(Debug, Clone)] +pub struct TaskBranchInfo { + /// Full branch name. + pub name: String, + /// Task ID extracted from branch name (if parseable). + pub task_id: Option<Uuid>, + /// Whether this branch has been merged into the current branch. + pub is_merged: bool, + /// Short SHA of the last commit. + pub last_commit: String, + /// Subject line of the last commit. + pub last_commit_message: String, +} + +/// Information about a created worktree. +#[derive(Debug, Clone)] +pub struct WorktreeInfo { + /// Path to the worktree directory. + pub path: PathBuf, + /// Git branch name for this worktree. + pub branch: String, + /// Source repository path. + pub source_repo: PathBuf, +} + +/// Manages git worktrees for task isolation. +pub struct WorktreeManager { + /// Base directory for all worktrees (~/.makima/worktrees). + base_dir: PathBuf, + /// Base directory for cloned repos (~/.makima/repos). + repos_dir: PathBuf, + /// Branch prefix for task branches. + branch_prefix: String, +} + +/// Per-worktree locks to prevent concurrent creation issues. +static WORKTREE_LOCKS: LazyLock<Mutex<HashMap<String, std::sync::Arc<tokio::sync::Mutex<()>>>>> = + LazyLock::new(|| Mutex::new(HashMap::new())); + +impl WorktreeManager { + /// Create a new WorktreeManager with the given base directory. + pub fn new(base_dir: PathBuf) -> Self { + let repos_dir = base_dir.parent() + .map(|p| p.join("repos")) + .unwrap_or_else(|| base_dir.join("repos")); + + Self { + base_dir, + repos_dir, + branch_prefix: "makima/task-".to_string(), + } + } + + /// Get the default worktree base directory (~/.makima/worktrees). + pub fn default_base_dir() -> PathBuf { + dirs::home_dir() + .unwrap_or_else(|| PathBuf::from(".")) + .join(".makima") + .join("worktrees") + } + + /// Get the base directory for worktrees. + pub fn base_dir(&self) -> &Path { + &self.base_dir + } + + /// Detect the default branch of a repository. + /// Tries to find HEAD's target, falling back to common branch names. + pub async fn detect_default_branch(&self, repo_path: &Path) -> Result<String, WorktreeError> { + // Try to get the branch that HEAD points to + let output = Command::new("git") + .args(["symbolic-ref", "refs/remotes/origin/HEAD", "--short"]) + .current_dir(repo_path) + .output() + .await?; + + if output.status.success() { + let branch = String::from_utf8_lossy(&output.stdout).trim().to_string(); + // Remove "origin/" prefix if present + let branch = branch.strip_prefix("origin/").unwrap_or(&branch).to_string(); + if !branch.is_empty() { + return Ok(branch); + } + } + + // Try common branch names + for branch in ["main", "master", "develop", "trunk"] { + let output = Command::new("git") + .args(["rev-parse", "--verify", &format!("refs/heads/{}", branch)]) + .current_dir(repo_path) + .output() + .await?; + + if output.status.success() { + return Ok(branch.to_string()); + } + } + + // Fall back to getting the current branch + let output = Command::new("git") + .args(["rev-parse", "--abbrev-ref", "HEAD"]) + .current_dir(repo_path) + .output() + .await?; + + if output.status.success() { + let branch = String::from_utf8_lossy(&output.stdout).trim().to_string(); + if !branch.is_empty() && branch != "HEAD" { + return Ok(branch); + } + } + + Err(WorktreeError::GitCommand( + "Could not detect default branch".to_string(), + )) + } + + /// Ensure the source repository exists locally and is up-to-date. + /// If repo_source is a URL, clone it. If it's a path, verify it exists. + /// For both cases, fetch latest changes from remote if available. + pub async fn ensure_repo(&self, repo_source: &str) -> Result<PathBuf, WorktreeError> { + // Check if it's a URL (simple heuristic) + if repo_source.starts_with("http://") + || repo_source.starts_with("https://") + || repo_source.starts_with("git@") + || repo_source.starts_with("ssh://") + { + self.clone_or_fetch_repo(repo_source).await + } else { + // Treat as local path - expand tilde if present + let path = expand_tilde(repo_source); + if !path.exists() { + return Err(WorktreeError::RepoNotFound(repo_source.to_string())); + } + // Verify it's a git repo + let git_dir = path.join(".git"); + if !git_dir.exists() { + return Err(WorktreeError::InvalidPath(format!( + "{} is not a git repository", + repo_source + ))); + } + + // Fetch latest changes from remote if configured + tracing::info!("Fetching latest changes for local repo: {}", repo_source); + let output = Command::new("git") + .args(["fetch", "--all", "--prune"]) + .current_dir(&path) + .output() + .await?; + + if !output.status.success() { + let stderr = String::from_utf8_lossy(&output.stderr); + // Don't fail - repo might not have a remote configured + tracing::debug!("Git fetch for local repo (may not have remote): {}", stderr); + } else { + tracing::info!("Fetched latest changes for {}", repo_source); + } + + Ok(path) + } + } + + /// Clone a repository or fetch if already cloned. + async fn clone_or_fetch_repo(&self, url: &str) -> Result<PathBuf, WorktreeError> { + // Extract repo name from URL + let repo_name = extract_repo_name(url); + let repo_path = self.repos_dir.join(&repo_name); + + // Create repos directory if needed + tokio::fs::create_dir_all(&self.repos_dir).await?; + + if repo_path.exists() { + // Fetch latest changes + tracing::info!("Fetching updates for existing repo: {}", repo_name); + let output = Command::new("git") + .args(["fetch", "--all", "--prune"]) + .current_dir(&repo_path) + .output() + .await?; + + if !output.status.success() { + let stderr = String::from_utf8_lossy(&output.stderr); + tracing::warn!("Git fetch warning: {}", stderr); + // Don't fail on fetch errors, repo might still be usable + } + } else { + // Clone the repository + tracing::info!("Cloning repository: {} -> {}", url, repo_path.display()); + let output = Command::new("git") + .args(["clone", "--bare", url]) + .arg(&repo_path) + .output() + .await?; + + if !output.status.success() { + let stderr = String::from_utf8_lossy(&output.stderr); + return Err(WorktreeError::CloneFailed(stderr.to_string())); + } + } + + Ok(repo_path) + } + + /// Create a worktree for a task. + /// + /// This creates a unique directory with a git worktree checked out to a new branch. + pub async fn create_worktree( + &self, + source_repo: &Path, + task_id: Uuid, + task_name: &str, + base_branch: &str, + ) -> Result<WorktreeInfo, WorktreeError> { + // Generate unique directory name and branch + let dir_name = format!("{}-{}", short_uuid(task_id), sanitize_name(task_name)); + let worktree_path = self.base_dir.join(&dir_name); + // Branch name: makima/{task-name-with-dashes}-{short-id} + let branch_name = format!("{}{}-{}", self.branch_prefix, sanitize_name(task_name), short_uuid(task_id)); + + // Acquire lock for this worktree path + let lock = { + let mut locks = WORKTREE_LOCKS.lock().await; + locks + .entry(worktree_path.to_string_lossy().to_string()) + .or_insert_with(|| std::sync::Arc::new(tokio::sync::Mutex::new(()))) + .clone() + }; + let _guard = lock.lock().await; + + // Check if worktree already exists - reuse it if so + if worktree_path.exists() { + tracing::info!( + task_id = %task_id, + worktree_path = %worktree_path.display(), + "Worktree already exists, reusing" + ); + + // Verify it's a valid git directory + let git_dir = worktree_path.join(".git"); + if git_dir.exists() { + // Get the current branch name + let output = Command::new("git") + .args(["rev-parse", "--abbrev-ref", "HEAD"]) + .current_dir(&worktree_path) + .output() + .await?; + + let current_branch = if output.status.success() { + String::from_utf8_lossy(&output.stdout).trim().to_string() + } else { + branch_name.clone() + }; + + return Ok(WorktreeInfo { + path: worktree_path, + branch: current_branch, + source_repo: source_repo.to_path_buf(), + }); + } else { + // Directory exists but isn't a git worktree - remove and recreate + tracing::warn!( + task_id = %task_id, + worktree_path = %worktree_path.display(), + "Directory exists but is not a git worktree, removing" + ); + tokio::fs::remove_dir_all(&worktree_path).await?; + } + } + + // Create base directory + tokio::fs::create_dir_all(&self.base_dir).await?; + + tracing::info!( + task_id = %task_id, + worktree_path = %worktree_path.display(), + branch = %branch_name, + base_branch = %base_branch, + "Creating worktree from local branch" + ); + + // Create the worktree with a new branch based on the local base_branch + let output = Command::new("git") + .args([ + "worktree", + "add", + "-b", + &branch_name, + ]) + .arg(&worktree_path) + .arg(base_branch) + .current_dir(source_repo) + .output() + .await?; + + if !output.status.success() { + let stderr = String::from_utf8_lossy(&output.stderr); + return Err(WorktreeError::GitCommand(format!( + "Failed to create worktree: {}", + stderr + ))); + } + + tracing::info!( + task_id = %task_id, + worktree_path = %worktree_path.display(), + "Worktree created successfully" + ); + + Ok(WorktreeInfo { + path: worktree_path, + branch: branch_name, + source_repo: source_repo.to_path_buf(), + }) + } + + /// Create a worktree for a task by copying from another task's worktree. + /// + /// This allows sequential subtasks where one continues from another's work, + /// including uncommitted changes. + pub async fn create_worktree_from_task( + &self, + source_worktree: &Path, + task_id: Uuid, + task_name: &str, + ) -> Result<WorktreeInfo, WorktreeError> { + // Verify source worktree exists + if !source_worktree.exists() { + return Err(WorktreeError::RepoNotFound(format!( + "Source worktree not found: {}", + source_worktree.display() + ))); + } + + // Get the source repo from the source worktree + let source_repo = self.get_worktree_source(source_worktree).await?; + + // Get the base branch from source worktree's current HEAD + let output = Command::new("git") + .args(["rev-parse", "HEAD"]) + .current_dir(source_worktree) + .output() + .await?; + + if !output.status.success() { + return Err(WorktreeError::GitCommand( + "Failed to get source worktree HEAD".to_string(), + )); + } + let source_commit = String::from_utf8_lossy(&output.stdout).trim().to_string(); + + // Generate unique directory name and branch for new worktree + let dir_name = format!("{}-{}", short_uuid(task_id), sanitize_name(task_name)); + let worktree_path = self.base_dir.join(&dir_name); + let branch_name = format!("{}{}", self.branch_prefix, task_id); + + // Acquire lock for this worktree path + let lock = { + let mut locks = WORKTREE_LOCKS.lock().await; + locks + .entry(worktree_path.to_string_lossy().to_string()) + .or_insert_with(|| std::sync::Arc::new(tokio::sync::Mutex::new(()))) + .clone() + }; + let _guard = lock.lock().await; + + // Remove existing worktree if present + if worktree_path.exists() { + tracing::info!( + task_id = %task_id, + worktree_path = %worktree_path.display(), + "Removing existing worktree before creating from source" + ); + tokio::fs::remove_dir_all(&worktree_path).await?; + } + + // Create base directory + tokio::fs::create_dir_all(&self.base_dir).await?; + + tracing::info!( + task_id = %task_id, + source_worktree = %source_worktree.display(), + worktree_path = %worktree_path.display(), + branch = %branch_name, + source_commit = %source_commit, + "Creating worktree from source task" + ); + + // Create a new worktree based on the source commit + let output = Command::new("git") + .args([ + "worktree", + "add", + "-b", + &branch_name, + ]) + .arg(&worktree_path) + .arg(&source_commit) + .current_dir(&source_repo) + .output() + .await?; + + if !output.status.success() { + let stderr = String::from_utf8_lossy(&output.stderr); + return Err(WorktreeError::GitCommand(format!( + "Failed to create worktree: {}", + stderr + ))); + } + + // Now copy uncommitted changes from source worktree + // Use rsync to copy all files except .git + let output = Command::new("rsync") + .args([ + "-a", + "--exclude", ".git", + "--exclude", ".makima", + &format!("{}/", source_worktree.display()), + &format!("{}/", worktree_path.display()), + ]) + .output() + .await?; + + if !output.status.success() { + let stderr = String::from_utf8_lossy(&output.stderr); + tracing::warn!( + task_id = %task_id, + "rsync warning (continuing anyway): {}", + stderr + ); + } + + tracing::info!( + task_id = %task_id, + worktree_path = %worktree_path.display(), + "Worktree created from source task successfully" + ); + + Ok(WorktreeInfo { + path: worktree_path, + branch: branch_name, + source_repo: source_repo.to_path_buf(), + }) + } + + /// Remove a worktree and optionally its branch. + pub async fn remove_worktree( + &self, + worktree_path: &Path, + delete_branch: bool, + ) -> Result<(), WorktreeError> { + if !worktree_path.exists() { + return Ok(()); // Already gone + } + + // Get the branch name before removing + let branch_name = if delete_branch { + self.get_worktree_branch(worktree_path).await.ok() + } else { + None + }; + + // Find the source repo from worktree + let source_repo = self.get_worktree_source(worktree_path).await?; + + tracing::info!( + worktree_path = %worktree_path.display(), + delete_branch = delete_branch, + "Removing worktree" + ); + + // Remove the worktree + let output = Command::new("git") + .args(["worktree", "remove", "--force"]) + .arg(worktree_path) + .current_dir(&source_repo) + .output() + .await?; + + if !output.status.success() { + let stderr = String::from_utf8_lossy(&output.stderr); + // Try force removal of directory if git worktree remove fails + if worktree_path.exists() { + tokio::fs::remove_dir_all(worktree_path).await?; + } + tracing::warn!("Git worktree remove warning: {}", stderr); + } + + // Prune worktree references + let _ = Command::new("git") + .args(["worktree", "prune"]) + .current_dir(&source_repo) + .output() + .await; + + // Delete the branch if requested + if let Some(branch) = branch_name { + let output = Command::new("git") + .args(["branch", "-D", &branch]) + .current_dir(&source_repo) + .output() + .await?; + + if !output.status.success() { + let stderr = String::from_utf8_lossy(&output.stderr); + tracing::warn!("Failed to delete branch {}: {}", branch, stderr); + } + } + + Ok(()) + } + + /// Get the branch name of a worktree. + async fn get_worktree_branch(&self, worktree_path: &Path) -> Result<String, WorktreeError> { + let output = Command::new("git") + .args(["rev-parse", "--abbrev-ref", "HEAD"]) + .current_dir(worktree_path) + .output() + .await?; + + if !output.status.success() { + let stderr = String::from_utf8_lossy(&output.stderr); + return Err(WorktreeError::GitCommand(format!( + "Failed to get branch: {}", + stderr + ))); + } + + Ok(String::from_utf8_lossy(&output.stdout).trim().to_string()) + } + + /// Get the source repository path for a worktree. + async fn get_worktree_source(&self, worktree_path: &Path) -> Result<PathBuf, WorktreeError> { + // Read the .git file in the worktree which contains the path to the main repo + let git_file = worktree_path.join(".git"); + + if git_file.is_file() { + let content = tokio::fs::read_to_string(&git_file).await?; + // Format: "gitdir: /path/to/repo/.git/worktrees/name" + if let Some(gitdir) = content.strip_prefix("gitdir: ") { + let gitdir = gitdir.trim(); + // Navigate from worktrees/name back to the main repo + let path = PathBuf::from(gitdir); + if let Some(worktrees_dir) = path.parent() { + if let Some(git_dir) = worktrees_dir.parent() { + if let Some(repo_dir) = git_dir.parent() { + return Ok(repo_dir.to_path_buf()); + } + } + } + } + } + + // Fallback: try to find it in our repos directory + Err(WorktreeError::InvalidPath(format!( + "Could not determine source repo for worktree: {}", + worktree_path.display() + ))) + } + + /// List all worktrees in the base directory. + pub async fn list_worktrees(&self) -> Result<Vec<PathBuf>, WorktreeError> { + let mut worktrees = Vec::new(); + + if !self.base_dir.exists() { + return Ok(worktrees); + } + + let mut entries = tokio::fs::read_dir(&self.base_dir).await?; + while let Some(entry) = entries.next_entry().await? { + let path = entry.path(); + if path.is_dir() && path.join(".git").exists() { + worktrees.push(path); + } + } + + Ok(worktrees) + } + + /// Initialize a new git repository for a task. + /// + /// This creates a fresh git repo (not a worktree) for tasks that don't need + /// an existing codebase. Use this when `repository_url` is `new://` or `new://project-name`. + pub async fn init_new_repo( + &self, + task_id: Uuid, + repo_source: &str, + ) -> Result<WorktreeInfo, WorktreeError> { + let project_name = extract_new_repo_name(repo_source); + let dir_name = match project_name { + Some(name) => format!("{}-{}", short_uuid(task_id), sanitize_name(name)), + None => format!("{}-new", short_uuid(task_id)), + }; + let repo_path = self.repos_dir.join(&dir_name); + + tracing::info!( + task_id = %task_id, + path = %repo_path.display(), + project_name = ?project_name, + "Initializing new git repository" + ); + + // Create directory + tokio::fs::create_dir_all(&repo_path).await?; + + // git init + let output = Command::new("git") + .args(["init"]) + .current_dir(&repo_path) + .output() + .await?; + + if !output.status.success() { + let stderr = String::from_utf8_lossy(&output.stderr); + return Err(WorktreeError::GitCommand(format!( + "Failed to init repository: {}", + stderr + ))); + } + + // Configure git user (needed for commits) + let _ = Command::new("git") + .args(["config", "user.email", "makima@localhost"]) + .current_dir(&repo_path) + .output() + .await; + let _ = Command::new("git") + .args(["config", "user.name", "Makima"]) + .current_dir(&repo_path) + .output() + .await; + + // Initial commit (required for worktrees to work later if needed) + let output = Command::new("git") + .args(["commit", "--allow-empty", "-m", "Initial commit"]) + .current_dir(&repo_path) + .output() + .await?; + + if !output.status.success() { + let stderr = String::from_utf8_lossy(&output.stderr); + return Err(WorktreeError::GitCommand(format!( + "Failed to create initial commit: {}", + stderr + ))); + } + + tracing::info!( + task_id = %task_id, + path = %repo_path.display(), + "New git repository initialized" + ); + + Ok(WorktreeInfo { + path: repo_path.clone(), + branch: "main".to_string(), + source_repo: repo_path, + }) + } + + // ========== Merge Operations ========== + + /// List all task branches in a repository. + /// + /// Returns branches matching the pattern `makima/task-*`. + pub async fn list_task_branches( + &self, + repo_path: &Path, + ) -> Result<Vec<TaskBranchInfo>, WorktreeError> { + // Get all branches matching our prefix + let output = Command::new("git") + .args([ + "branch", + "--list", + &format!("{}*", self.branch_prefix), + "--format=%(refname:short)|%(objectname:short)|%(subject)", + ]) + .current_dir(repo_path) + .output() + .await?; + + if !output.status.success() { + let stderr = String::from_utf8_lossy(&output.stderr); + return Err(WorktreeError::GitCommand(format!( + "Failed to list branches: {}", + stderr + ))); + } + + // Get list of merged branches + let merged_output = Command::new("git") + .args(["branch", "--merged", "HEAD", "--format=%(refname:short)"]) + .current_dir(repo_path) + .output() + .await?; + + let merged_branches: std::collections::HashSet<String> = if merged_output.status.success() { + String::from_utf8_lossy(&merged_output.stdout) + .lines() + .map(|s| s.trim().to_string()) + .collect() + } else { + std::collections::HashSet::new() + }; + + let stdout = String::from_utf8_lossy(&output.stdout); + let mut branches = Vec::new(); + + for line in stdout.lines() { + let parts: Vec<&str> = line.split('|').collect(); + if parts.len() >= 3 { + let name = parts[0].trim().to_string(); + let last_commit = parts[1].trim().to_string(); + let last_commit_message = parts[2].trim().to_string(); + + // Try to extract task ID from branch name + let task_id = name + .strip_prefix(&self.branch_prefix) + .and_then(|s| Uuid::parse_str(s).ok()); + + let is_merged = merged_branches.contains(&name); + + branches.push(TaskBranchInfo { + name, + task_id, + is_merged, + last_commit, + last_commit_message, + }); + } + } + + Ok(branches) + } + + /// Start a merge of a branch into the current worktree. + /// + /// Uses `--no-commit` to allow conflict resolution before committing. + /// Returns Ok(None) if merge succeeds without conflicts, or Ok(Some(files)) + /// with the list of conflicted files. + pub async fn merge_branch( + &self, + worktree_path: &Path, + source_branch: &str, + ) -> Result<Option<Vec<String>>, WorktreeError> { + // Check if there's already a merge in progress + if self.is_merge_in_progress(worktree_path).await? { + return Err(WorktreeError::MergeInProgress); + } + + tracing::info!( + worktree = %worktree_path.display(), + source_branch = %source_branch, + "Starting merge" + ); + + // Attempt the merge with --no-commit --no-ff + let output = Command::new("git") + .args(["merge", "--no-commit", "--no-ff", source_branch]) + .current_dir(worktree_path) + .output() + .await?; + + if output.status.success() { + tracing::info!("Merge completed without conflicts"); + return Ok(None); + } + + // Check if there are conflicts + let conflicts = self.get_conflicted_files(worktree_path).await?; + if !conflicts.is_empty() { + tracing::info!( + conflicts = ?conflicts, + "Merge has conflicts" + ); + return Ok(Some(conflicts)); + } + + // Other error + let stderr = String::from_utf8_lossy(&output.stderr); + Err(WorktreeError::GitCommand(format!( + "Merge failed: {}", + stderr + ))) + } + + /// Check if a merge is currently in progress. + pub async fn is_merge_in_progress(&self, worktree_path: &Path) -> Result<bool, WorktreeError> { + // Check for MERGE_HEAD file + let merge_head = worktree_path.join(".git").join("MERGE_HEAD"); + if merge_head.exists() { + return Ok(true); + } + + // Also check in .git file (for worktrees) + let git_file = worktree_path.join(".git"); + if git_file.is_file() { + if let Ok(content) = tokio::fs::read_to_string(&git_file).await { + if let Some(gitdir) = content.strip_prefix("gitdir: ") { + let gitdir = PathBuf::from(gitdir.trim()); + let merge_head = gitdir.join("MERGE_HEAD"); + if merge_head.exists() { + return Ok(true); + } + } + } + } + + Ok(false) + } + + /// Get the list of files with unresolved conflicts. + pub async fn get_conflicted_files( + &self, + worktree_path: &Path, + ) -> Result<Vec<String>, WorktreeError> { + let output = Command::new("git") + .args(["diff", "--name-only", "--diff-filter=U"]) + .current_dir(worktree_path) + .output() + .await?; + + if !output.status.success() { + // No conflicts or not in merge state + return Ok(Vec::new()); + } + + let stdout = String::from_utf8_lossy(&output.stdout); + let files: Vec<String> = stdout + .lines() + .map(|s| s.trim().to_string()) + .filter(|s| !s.is_empty()) + .collect(); + + Ok(files) + } + + /// Get the current merge state. + pub async fn get_merge_state( + &self, + worktree_path: &Path, + ) -> Result<MergeState, WorktreeError> { + let in_progress = self.is_merge_in_progress(worktree_path).await?; + + if !in_progress { + return Ok(MergeState { + source_branch: String::new(), + conflicted_files: Vec::new(), + in_progress: false, + }); + } + + // Get the branch being merged from MERGE_HEAD + let source_branch = self.get_merge_source_branch(worktree_path).await?; + let conflicted_files = self.get_conflicted_files(worktree_path).await?; + + Ok(MergeState { + source_branch, + conflicted_files, + in_progress: true, + }) + } + + /// Get the branch name being merged (from MERGE_HEAD). + async fn get_merge_source_branch(&self, worktree_path: &Path) -> Result<String, WorktreeError> { + // Get MERGE_HEAD commit + let output = Command::new("git") + .args(["rev-parse", "MERGE_HEAD"]) + .current_dir(worktree_path) + .output() + .await?; + + if !output.status.success() { + return Ok("unknown".to_string()); + } + + let commit = String::from_utf8_lossy(&output.stdout).trim().to_string(); + + // Try to find branch name for this commit + let output = Command::new("git") + .args(["name-rev", "--name-only", &commit]) + .current_dir(worktree_path) + .output() + .await?; + + if output.status.success() { + let name = String::from_utf8_lossy(&output.stdout).trim().to_string(); + // Clean up the name (remove ~N suffixes, etc.) + let name = name.split('~').next().unwrap_or(&name); + let name = name.split('^').next().unwrap_or(name); + return Ok(name.to_string()); + } + + Ok(commit[..8.min(commit.len())].to_string()) + } + + /// Resolve a conflict in a specific file. + pub async fn resolve_conflict( + &self, + worktree_path: &Path, + file_path: &str, + resolution: ConflictResolution, + ) -> Result<(), WorktreeError> { + if !self.is_merge_in_progress(worktree_path).await? { + return Err(WorktreeError::NoMergeInProgress); + } + + let strategy = match resolution { + ConflictResolution::Ours => "--ours", + ConflictResolution::Theirs => "--theirs", + }; + + tracing::info!( + worktree = %worktree_path.display(), + file = %file_path, + strategy = %strategy, + "Resolving conflict" + ); + + // Checkout the chosen version + let output = Command::new("git") + .args(["checkout", strategy, "--", file_path]) + .current_dir(worktree_path) + .output() + .await?; + + if !output.status.success() { + let stderr = String::from_utf8_lossy(&output.stderr); + return Err(WorktreeError::GitCommand(format!( + "Failed to resolve conflict: {}", + stderr + ))); + } + + // Stage the resolved file + let output = Command::new("git") + .args(["add", file_path]) + .current_dir(worktree_path) + .output() + .await?; + + if !output.status.success() { + let stderr = String::from_utf8_lossy(&output.stderr); + return Err(WorktreeError::GitCommand(format!( + "Failed to stage resolved file: {}", + stderr + ))); + } + + Ok(()) + } + + /// Abort the current merge. + pub async fn abort_merge(&self, worktree_path: &Path) -> Result<(), WorktreeError> { + if !self.is_merge_in_progress(worktree_path).await? { + return Err(WorktreeError::NoMergeInProgress); + } + + tracing::info!( + worktree = %worktree_path.display(), + "Aborting merge" + ); + + let output = Command::new("git") + .args(["merge", "--abort"]) + .current_dir(worktree_path) + .output() + .await?; + + if !output.status.success() { + let stderr = String::from_utf8_lossy(&output.stderr); + return Err(WorktreeError::GitCommand(format!( + "Failed to abort merge: {}", + stderr + ))); + } + + Ok(()) + } + + /// Commit the current merge. + pub async fn commit_merge( + &self, + worktree_path: &Path, + message: &str, + ) -> Result<String, WorktreeError> { + // Check for remaining conflicts + let conflicts = self.get_conflicted_files(worktree_path).await?; + if !conflicts.is_empty() { + return Err(WorktreeError::MergeConflicts(conflicts.join(", "))); + } + + tracing::info!( + worktree = %worktree_path.display(), + message = %message, + "Committing merge" + ); + + let output = Command::new("git") + .args(["commit", "-m", message]) + .current_dir(worktree_path) + .output() + .await?; + + if !output.status.success() { + let stderr = String::from_utf8_lossy(&output.stderr); + return Err(WorktreeError::GitCommand(format!( + "Failed to commit merge: {}", + stderr + ))); + } + + // Get the new commit SHA + let output = Command::new("git") + .args(["rev-parse", "HEAD"]) + .current_dir(worktree_path) + .output() + .await?; + + if output.status.success() { + let sha = String::from_utf8_lossy(&output.stdout).trim().to_string(); + return Ok(sha); + } + + Ok("unknown".to_string()) + } + + // ========== Completion Action Operations ========== + + /// Push task branch from worktree to an external target repository. + /// + /// This stages and commits any uncommitted changes, then pushes to the target repo. + pub async fn push_to_target_repo( + &self, + worktree_path: &Path, + target_repo: &Path, + branch_name: &str, + task_name: &str, + ) -> Result<(), WorktreeError> { + tracing::info!( + worktree = %worktree_path.display(), + target_repo = %target_repo.display(), + branch = %branch_name, + "Pushing branch to target repository" + ); + + // First, stage all changes (including new files) + let output = Command::new("git") + .args(["add", "-A"]) + .current_dir(worktree_path) + .output() + .await?; + + if !output.status.success() { + let stderr = String::from_utf8_lossy(&output.stderr); + return Err(WorktreeError::GitCommand(format!( + "Failed to stage changes: {}", + stderr + ))); + } + + // Check if there are staged changes to commit + let output = Command::new("git") + .args(["diff", "--cached", "--quiet"]) + .current_dir(worktree_path) + .output() + .await?; + + // Exit code 1 means there are staged changes + if !output.status.success() { + tracing::info!("Committing staged changes before push"); + + let commit_message = format!("feat: {}", task_name); + let output = Command::new("git") + .args(["commit", "-m", &commit_message]) + .current_dir(worktree_path) + .output() + .await?; + + if !output.status.success() { + let stderr = String::from_utf8_lossy(&output.stderr); + return Err(WorktreeError::GitCommand(format!( + "Failed to commit changes: {}", + stderr + ))); + } + } + + // Ensure there are commits to push + let output = Command::new("git") + .args(["log", "--oneline", "-1"]) + .current_dir(worktree_path) + .output() + .await?; + + if !output.status.success() { + return Err(WorktreeError::GitCommand( + "No commits in worktree".to_string(), + )); + } + + // Add target repo as a remote in the worktree (if not already) + let remote_name = "target"; + let target_path_str = target_repo.to_string_lossy(); + + // Remove existing remote if any (ignore errors) + let _ = Command::new("git") + .args(["remote", "remove", remote_name]) + .current_dir(worktree_path) + .output() + .await; + + // Add the target as a remote + let output = Command::new("git") + .args(["remote", "add", remote_name, &target_path_str]) + .current_dir(worktree_path) + .output() + .await?; + + if !output.status.success() { + let stderr = String::from_utf8_lossy(&output.stderr); + return Err(WorktreeError::GitCommand(format!( + "Failed to add remote: {}", + stderr + ))); + } + + // Push the branch to the target + let output = Command::new("git") + .args(["push", "-u", remote_name, &format!("HEAD:{}", branch_name)]) + .current_dir(worktree_path) + .output() + .await?; + + if !output.status.success() { + let stderr = String::from_utf8_lossy(&output.stderr); + return Err(WorktreeError::GitCommand(format!( + "Failed to push to target: {}", + stderr + ))); + } + + tracing::info!( + branch = %branch_name, + target_repo = %target_repo.display(), + "Branch pushed successfully" + ); + + // Detach HEAD in the worktree to release the branch + // This allows the branch to be checked out in the target repo + let output = Command::new("git") + .args(["checkout", "--detach", "HEAD"]) + .current_dir(worktree_path) + .output() + .await?; + + if !output.status.success() { + // Non-fatal: log but don't fail the push + let stderr = String::from_utf8_lossy(&output.stderr); + tracing::warn!( + "Failed to detach HEAD in worktree (branch may not be checkable in target): {}", + stderr + ); + } else { + tracing::info!("Detached HEAD in worktree to release branch"); + } + + Ok(()) + } + + /// Merge a branch into the target branch in the target repository. + /// + /// This pushes the branch first (if needed), then performs a merge in the target repo. + pub async fn merge_to_target( + &self, + worktree_path: &Path, + target_repo: &Path, + source_branch: &str, + target_branch: &str, + task_name: &str, + ) -> Result<String, WorktreeError> { + tracing::info!( + worktree = %worktree_path.display(), + target_repo = %target_repo.display(), + source_branch = %source_branch, + target_branch = %target_branch, + "Merging branch to target" + ); + + // First, push the branch to target repo + self.push_to_target_repo(worktree_path, target_repo, source_branch, task_name) + .await?; + + // In target repo, checkout the target branch + let output = Command::new("git") + .args(["checkout", target_branch]) + .current_dir(target_repo) + .output() + .await?; + + if !output.status.success() { + let stderr = String::from_utf8_lossy(&output.stderr); + return Err(WorktreeError::GitCommand(format!( + "Failed to checkout target branch: {}", + stderr + ))); + } + + // Pull latest changes first + let _ = Command::new("git") + .args(["pull", "--ff-only"]) + .current_dir(target_repo) + .output() + .await; + + // Merge the source branch + let merge_message = format!("feat: {}", task_name); + let output = Command::new("git") + .args(["merge", "--no-ff", source_branch, "-m", &merge_message]) + .current_dir(target_repo) + .output() + .await?; + + if !output.status.success() { + let stderr = String::from_utf8_lossy(&output.stderr); + + // Check if it's a conflict + let conflicts = self.get_conflicted_files(target_repo).await?; + if !conflicts.is_empty() { + // Abort the merge + let _ = Command::new("git") + .args(["merge", "--abort"]) + .current_dir(target_repo) + .output() + .await; + + return Err(WorktreeError::MergeConflicts(format!( + "Merge conflicts in: {}. Consider creating a PR instead.", + conflicts.join(", ") + ))); + } + + return Err(WorktreeError::GitCommand(format!( + "Failed to merge: {}", + stderr + ))); + } + + // Get the merge commit SHA + let output = Command::new("git") + .args(["rev-parse", "HEAD"]) + .current_dir(target_repo) + .output() + .await?; + + let commit_sha = if output.status.success() { + String::from_utf8_lossy(&output.stdout).trim().to_string() + } else { + "unknown".to_string() + }; + + tracing::info!( + commit_sha = %commit_sha, + "Merge completed successfully" + ); + + Ok(commit_sha) + } + + /// Create a GitHub pull request using the gh CLI. + /// + /// This pushes the branch first, then creates a PR. + pub async fn create_pull_request( + &self, + worktree_path: &Path, + target_repo: &Path, + source_branch: &str, + target_branch: &str, + title: &str, + body: &str, + ) -> Result<String, WorktreeError> { + tracing::info!( + worktree = %worktree_path.display(), + target_repo = %target_repo.display(), + source_branch = %source_branch, + target_branch = %target_branch, + title = %title, + "Creating pull request" + ); + + // First, push the branch to the target repo's remote + // For PRs, we need to push to origin (the GitHub remote) + + // Get the worktree's current branch + let output = Command::new("git") + .args(["rev-parse", "--abbrev-ref", "HEAD"]) + .current_dir(worktree_path) + .output() + .await?; + + let current_branch = if output.status.success() { + String::from_utf8_lossy(&output.stdout).trim().to_string() + } else { + source_branch.to_string() + }; + + // Push to the target repo's origin + // First, check if target_repo has an origin remote + let output = Command::new("git") + .args(["remote", "get-url", "origin"]) + .current_dir(target_repo) + .output() + .await?; + + if !output.status.success() { + return Err(WorktreeError::GitCommand( + "Target repository has no origin remote configured".to_string(), + )); + } + + let origin_url = String::from_utf8_lossy(&output.stdout).trim().to_string(); + + // Push the branch from worktree to the remote + // First add the remote to worktree + let _ = Command::new("git") + .args(["remote", "remove", "pr-origin"]) + .current_dir(worktree_path) + .output() + .await; + + let output = Command::new("git") + .args(["remote", "add", "pr-origin", &origin_url]) + .current_dir(worktree_path) + .output() + .await?; + + if !output.status.success() { + let stderr = String::from_utf8_lossy(&output.stderr); + return Err(WorktreeError::GitCommand(format!( + "Failed to add remote: {}", + stderr + ))); + } + + // Push to the remote + let output = Command::new("git") + .args(["push", "-u", "pr-origin", &format!("{}:{}", current_branch, source_branch)]) + .current_dir(worktree_path) + .output() + .await?; + + if !output.status.success() { + let stderr = String::from_utf8_lossy(&output.stderr); + return Err(WorktreeError::GitCommand(format!( + "Failed to push branch: {}", + stderr + ))); + } + + // Create PR using gh CLI in the target repo + let output = Command::new("gh") + .args([ + "pr", + "create", + "--title", title, + "--body", body, + "--head", source_branch, + "--base", target_branch, + ]) + .current_dir(target_repo) + .output() + .await?; + + if !output.status.success() { + let stderr = String::from_utf8_lossy(&output.stderr); + return Err(WorktreeError::GitCommand(format!( + "Failed to create PR: {}", + stderr + ))); + } + + // The gh CLI outputs the PR URL + let pr_url = String::from_utf8_lossy(&output.stdout).trim().to_string(); + + tracing::info!( + pr_url = %pr_url, + "Pull request created successfully" + ); + + Ok(pr_url) + } + + /// Clone/copy the worktree contents to a target directory. + /// + /// This creates a new git repository at the target path with the same contents + /// as the worktree. Returns (success, message). + pub async fn clone_worktree_to_directory( + &self, + worktree_path: &Path, + target_dir: &Path, + ) -> Result<String, WorktreeError> { + tracing::info!( + worktree = %worktree_path.display(), + target = %target_dir.display(), + "Cloning worktree to target directory" + ); + + // Check if target directory already exists + if target_dir.exists() { + return Err(WorktreeError::AlreadyExists(format!( + "Target directory already exists: {}", + target_dir.display() + ))); + } + + // Get parent directory to ensure it exists + if let Some(parent) = target_dir.parent() { + if !parent.exists() { + tokio::fs::create_dir_all(parent).await?; + } + } + + // Use git clone --local to efficiently copy the repository + // This is more efficient than cp -r for git repos + let output = Command::new("git") + .args([ + "clone", + "--local", + "--no-hardlinks", + &worktree_path.to_string_lossy(), + &target_dir.to_string_lossy(), + ]) + .output() + .await?; + + if !output.status.success() { + let stderr = String::from_utf8_lossy(&output.stderr); + return Err(WorktreeError::CloneFailed(format!( + "Failed to clone worktree: {}", + stderr + ))); + } + + // Remove the 'origin' remote that points back to the worktree + let _ = Command::new("git") + .args(["remote", "remove", "origin"]) + .current_dir(target_dir) + .output() + .await; + + tracing::info!( + target = %target_dir.display(), + "Worktree cloned successfully" + ); + + Ok(format!("Cloned to {}", target_dir.display())) + } + + /// Check if a target directory exists. + pub async fn target_directory_exists(&self, target_dir: &Path) -> bool { + target_dir.exists() + } +} + +/// Check if repo_source is a "new repo" request. +/// +/// Accepts `new://` or `new://project-name` to create a fresh git repository. +pub fn is_new_repo_request(source: &str) -> bool { + source == "new" || source == "new://" || source.starts_with("new://") +} + +/// Extract optional project name from new:// URL. +fn extract_new_repo_name(source: &str) -> Option<&str> { + source.strip_prefix("new://").filter(|s| !s.is_empty()) +} + +/// Extract repository name from URL. +fn extract_repo_name(url: &str) -> String { + // Handle various URL formats: + // https://github.com/user/repo.git -> repo + // git@github.com:user/repo.git -> repo + // https://github.com/user/repo -> repo + + let url = url.trim_end_matches('/'); + let url = url.trim_end_matches(".git"); + + url.rsplit('/') + .next() + .or_else(|| url.rsplit(':').next()) + .unwrap_or("repo") + .to_string() +} + +/// Create a short UUID string for directory naming. +pub fn short_uuid(id: Uuid) -> String { + id.to_string()[..8].to_string() +} + +/// Expand tilde (~) in path to home directory. +pub fn expand_tilde(path: &str) -> PathBuf { + if let Some(rest) = path.strip_prefix("~/") { + if let Some(home) = dirs::home_dir() { + return home.join(rest); + } + } else if path == "~" { + if let Some(home) = dirs::home_dir() { + return home; + } + } + PathBuf::from(path) +} + +/// Sanitize a name for use in directory/branch names. +pub fn sanitize_name(name: &str) -> String { + name.chars() + .map(|c| { + if c.is_alphanumeric() || c == '-' || c == '_' { + c.to_ascii_lowercase() + } else { + '-' + } + }) + .collect::<String>() + .chars() + .take(50) // Limit length + .collect() +} + +#[cfg(test)] +mod tests { + use crate::daemon::*; + + #[test] + fn test_extract_repo_name() { + assert_eq!( + extract_repo_name("https://github.com/user/repo.git"), + "repo" + ); + assert_eq!( + extract_repo_name("https://github.com/user/repo"), + "repo" + ); + assert_eq!( + extract_repo_name("git@github.com:user/repo.git"), + "repo" + ); + } + + #[test] + fn test_sanitize_name() { + assert_eq!(sanitize_name("Hello World!"), "hello-world-"); + assert_eq!(sanitize_name("test_name-123"), "test_name-123"); + assert_eq!(sanitize_name("A".repeat(100).as_str()).len(), 50); + } + + #[test] + fn test_short_uuid() { + let id = Uuid::parse_str("550e8400-e29b-41d4-a716-446655440000").unwrap(); + assert_eq!(short_uuid(id), "550e8400"); + } +} diff --git a/makima/src/daemon/worktree/mod.rs b/makima/src/daemon/worktree/mod.rs new file mode 100644 index 0000000..eb9f031 --- /dev/null +++ b/makima/src/daemon/worktree/mod.rs @@ -0,0 +1,11 @@ +//! Git worktree management for task isolation. +//! +//! Each task gets a unique git worktree with its own branch, +//! providing isolation without the overhead of Docker containers. + +mod manager; + +pub use manager::{ + expand_tilde, is_new_repo_request, sanitize_name, short_uuid, ConflictResolution, MergeState, + TaskBranchInfo, WorktreeError, WorktreeInfo, WorktreeManager, +}; diff --git a/makima/src/daemon/ws/client.rs b/makima/src/daemon/ws/client.rs new file mode 100644 index 0000000..67594a2 --- /dev/null +++ b/makima/src/daemon/ws/client.rs @@ -0,0 +1,290 @@ +//! WebSocket client for connecting to the makima server. + +use std::sync::Arc; +use std::time::Duration; + +use backoff::backoff::Backoff; +use backoff::ExponentialBackoff; +use futures::{SinkExt, StreamExt}; +use tokio::sync::{mpsc, RwLock}; +use tokio_tungstenite::{connect_async, tungstenite::{client::IntoClientRequest, Message}}; +use uuid::Uuid; + +use super::protocol::{DaemonCommand, DaemonMessage}; +use crate::daemon::config::ServerConfig; +use crate::daemon::error::{DaemonError, Result}; + +/// WebSocket client state. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum ConnectionState { + /// Not connected to server. + Disconnected, + /// Currently connecting. + Connecting, + /// Connected and authenticated. + Connected, + /// Connection failed, will retry. + Reconnecting, + /// Permanently failed (e.g., auth failure). + Failed, +} + +/// WebSocket client for daemon-server communication. +pub struct WsClient { + config: ServerConfig, + machine_id: String, + hostname: String, + max_concurrent_tasks: i32, + state: Arc<RwLock<ConnectionState>>, + daemon_id: Arc<RwLock<Option<Uuid>>>, + /// Channel to receive messages to send to server. + outgoing_rx: mpsc::Receiver<DaemonMessage>, + /// Sender for outgoing messages (clone this to send messages). + outgoing_tx: mpsc::Sender<DaemonMessage>, + /// Channel to send received commands to the task manager. + incoming_tx: mpsc::Sender<DaemonCommand>, +} + +impl WsClient { + /// Create a new WebSocket client. + pub fn new( + config: ServerConfig, + machine_id: String, + hostname: String, + max_concurrent_tasks: i32, + incoming_tx: mpsc::Sender<DaemonCommand>, + ) -> Self { + let (outgoing_tx, outgoing_rx) = mpsc::channel(256); + + Self { + config, + machine_id, + hostname, + max_concurrent_tasks, + state: Arc::new(RwLock::new(ConnectionState::Disconnected)), + daemon_id: Arc::new(RwLock::new(None)), + outgoing_rx, + outgoing_tx, + incoming_tx, + } + } + + /// Get a sender for outgoing messages. + pub fn sender(&self) -> mpsc::Sender<DaemonMessage> { + self.outgoing_tx.clone() + } + + /// Get current connection state. + pub async fn state(&self) -> ConnectionState { + *self.state.read().await + } + + /// Get daemon ID if authenticated. + pub async fn daemon_id(&self) -> Option<Uuid> { + *self.daemon_id.read().await + } + + /// Run the WebSocket client with automatic reconnection. + pub async fn run(&mut self) -> Result<()> { + let mut backoff = ExponentialBackoff { + initial_interval: Duration::from_secs(self.config.reconnect_interval_secs), + max_interval: Duration::from_secs(60), + max_elapsed_time: if self.config.max_reconnect_attempts > 0 { + Some(Duration::from_secs( + self.config.reconnect_interval_secs * self.config.max_reconnect_attempts as u64 * 10, + )) + } else { + None // Infinite retries + }, + ..Default::default() + }; + + loop { + *self.state.write().await = ConnectionState::Connecting; + tracing::info!("Connecting to server: {}", self.config.url); + + match self.connect_and_run().await { + Ok(()) => { + // Clean shutdown + tracing::info!("WebSocket connection closed cleanly"); + break; + } + Err(DaemonError::AuthFailed(msg)) => { + tracing::error!("Authentication failed: {}", msg); + *self.state.write().await = ConnectionState::Failed; + return Err(DaemonError::AuthFailed(msg)); + } + Err(e) => { + tracing::warn!("Connection error: {}", e); + *self.state.write().await = ConnectionState::Reconnecting; + + if let Some(delay) = backoff.next_backoff() { + tracing::info!("Reconnecting in {:?}...", delay); + tokio::time::sleep(delay).await; + } else { + tracing::error!("Max reconnection attempts reached"); + *self.state.write().await = ConnectionState::Failed; + return Err(DaemonError::ConnectionLost); + } + } + } + } + + Ok(()) + } + + /// Connect to server and run the message loop. + async fn connect_and_run(&mut self) -> Result<()> { + // Build WebSocket URL + let ws_url = format!("{}/api/v1/mesh/daemons/connect", self.config.url); + tracing::debug!("Connecting to WebSocket: {}", ws_url); + + // Build request with API key header + let mut request = ws_url.into_client_request()?; + request.headers_mut().insert( + "x-makima-api-key", + self.config.api_key.parse().map_err(|_| { + DaemonError::AuthFailed("Invalid API key format".into()) + })?, + ); + + // Connect with API key in headers + let (ws_stream, _response) = connect_async(request).await?; + let (mut write, mut read) = ws_stream.split(); + + // Send daemon info after connection (server authenticated us via header) + let info_msg = DaemonMessage::authenticate( + &self.config.api_key, + &self.machine_id, + &self.hostname, + self.max_concurrent_tasks, + ); + let info_json = serde_json::to_string(&info_msg)?; + write.send(Message::Text(info_json)).await?; + + // Wait for authentication response + let auth_response = read + .next() + .await + .ok_or(DaemonError::ConnectionLost)??; + + let auth_text = match auth_response { + Message::Text(text) => text, + Message::Close(_) => return Err(DaemonError::ConnectionLost), + _ => return Err(DaemonError::AuthFailed("Unexpected response type".into())), + }; + + let command: DaemonCommand = serde_json::from_str(&auth_text)?; + match command { + DaemonCommand::Authenticated { daemon_id } => { + tracing::info!("Authenticated with daemon ID: {}", daemon_id); + *self.daemon_id.write().await = Some(daemon_id); + *self.state.write().await = ConnectionState::Connected; + + // Send daemon directories info to server + let working_directory = std::env::current_dir() + .map(|p| p.to_string_lossy().to_string()) + .unwrap_or_else(|_| ".".to_string()); + let home_directory = dirs::home_dir() + .map(|h| h.join(".makima").join("home")) + .unwrap_or_else(|| std::path::PathBuf::from("~/.makima/home")); + // Create home directory if it doesn't exist + if let Err(e) = std::fs::create_dir_all(&home_directory) { + tracing::warn!("Failed to create home directory {:?}: {}", home_directory, e); + } + let home_directory_str = home_directory.to_string_lossy().to_string(); + let worktrees_directory = dirs::home_dir() + .map(|h| h.join(".makima").join("worktrees").to_string_lossy().to_string()) + .unwrap_or_else(|| "~/.makima/worktrees".to_string()); + + let dirs_msg = DaemonMessage::DaemonDirectories { + working_directory, + home_directory: home_directory_str, + worktrees_directory, + }; + let dirs_json = serde_json::to_string(&dirs_msg)?; + write.send(Message::Text(dirs_json)).await?; + tracing::info!("Sent daemon directories info to server"); + } + DaemonCommand::Error { code, message } => { + return Err(DaemonError::AuthFailed(format!("{}: {}", code, message))); + } + _ => { + return Err(DaemonError::AuthFailed( + "Unexpected response to authentication".into(), + )); + } + } + + // Start main message loop + let heartbeat_interval = Duration::from_secs(self.config.heartbeat_interval_secs); + let mut heartbeat_timer = tokio::time::interval(heartbeat_interval); + + loop { + tokio::select! { + // Handle incoming server commands + msg = read.next() => { + match msg { + Some(Ok(Message::Text(text))) => { + tracing::info!("Received WebSocket message: {} bytes", text.len()); + match serde_json::from_str::<DaemonCommand>(&text) { + Ok(command) => { + tracing::info!("Parsed command: {:?}", command); + tracing::info!("Sending command to task manager channel..."); + if self.incoming_tx.send(command).await.is_err() { + tracing::warn!("Command receiver dropped, shutting down"); + break; + } + tracing::info!("Command sent to task manager successfully"); + } + Err(e) => { + tracing::warn!("Failed to parse server message: {}", e); + tracing::debug!("Raw message: {}", text); + } + } + } + Some(Ok(Message::Ping(data))) => { + write.send(Message::Pong(data)).await?; + } + Some(Ok(Message::Close(_))) | None => { + tracing::info!("Server closed connection"); + return Err(DaemonError::ConnectionLost); + } + Some(Err(e)) => { + tracing::warn!("WebSocket error: {}", e); + return Err(e.into()); + } + _ => {} + } + } + + // Handle outgoing messages + msg = self.outgoing_rx.recv() => { + match msg { + Some(message) => { + let json = serde_json::to_string(&message)?; + tracing::trace!("Sending message: {}", json); + write.send(Message::Text(json)).await?; + } + None => { + // Sender dropped, shutdown + tracing::info!("Outgoing channel closed, shutting down"); + break; + } + } + } + + // Send heartbeat + _ = heartbeat_timer.tick() => { + // Get active task IDs from task manager + // For now, send empty list - will be connected to task manager + let heartbeat = DaemonMessage::heartbeat(vec![]); + let json = serde_json::to_string(&heartbeat)?; + write.send(Message::Text(json)).await?; + } + } + } + + Ok(()) + } +} diff --git a/makima/src/daemon/ws/mod.rs b/makima/src/daemon/ws/mod.rs new file mode 100644 index 0000000..5a0e9d1 --- /dev/null +++ b/makima/src/daemon/ws/mod.rs @@ -0,0 +1,7 @@ +//! WebSocket client and protocol types for daemon-server communication. + +pub mod client; +pub mod protocol; + +pub use client::{ConnectionState, WsClient}; +pub use protocol::{BranchInfo, DaemonCommand, DaemonMessage}; diff --git a/makima/src/daemon/ws/protocol.rs b/makima/src/daemon/ws/protocol.rs new file mode 100644 index 0000000..e86a577 --- /dev/null +++ b/makima/src/daemon/ws/protocol.rs @@ -0,0 +1,658 @@ +//! Protocol types for daemon-server communication. +//! +//! These types mirror the server's protocol exactly for compatibility. + +use serde::{Deserialize, Serialize}; +use uuid::Uuid; + +/// Message from daemon to server. +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(tag = "type", rename_all = "camelCase")] +pub enum DaemonMessage { + /// Authentication request (first message required). + Authenticate { + #[serde(rename = "apiKey")] + api_key: String, + #[serde(rename = "machineId")] + machine_id: String, + hostname: String, + #[serde(rename = "maxConcurrentTasks")] + max_concurrent_tasks: i32, + }, + + /// Periodic heartbeat with current status. + Heartbeat { + #[serde(rename = "activeTasks")] + active_tasks: Vec<Uuid>, + }, + + /// Task output streaming (stdout/stderr from Claude Code). + TaskOutput { + #[serde(rename = "taskId")] + task_id: Uuid, + output: String, + #[serde(rename = "isPartial")] + is_partial: bool, + }, + + /// Task status change notification. + TaskStatusChange { + #[serde(rename = "taskId")] + task_id: Uuid, + #[serde(rename = "oldStatus")] + old_status: String, + #[serde(rename = "newStatus")] + new_status: String, + }, + + /// Task progress update with summary. + TaskProgress { + #[serde(rename = "taskId")] + task_id: Uuid, + summary: String, + }, + + /// Task completion notification. + TaskComplete { + #[serde(rename = "taskId")] + task_id: Uuid, + success: bool, + error: Option<String>, + }, + + /// Register a tool key for orchestrator API access. + RegisterToolKey { + #[serde(rename = "taskId")] + task_id: Uuid, + /// The API key for this orchestrator to use when calling mesh endpoints. + key: String, + }, + + /// Revoke a tool key when task completes. + RevokeToolKey { + #[serde(rename = "taskId")] + task_id: Uuid, + }, + + /// Authentication required - OAuth token expired, provides login URL. + AuthenticationRequired { + /// Task ID that triggered the auth error (if any). + #[serde(rename = "taskId")] + task_id: Option<Uuid>, + /// OAuth login URL for remote authentication. + #[serde(rename = "loginUrl")] + login_url: String, + /// Hostname of the daemon requiring auth. + hostname: Option<String>, + }, + + // ========================================================================= + // Merge Response Messages (sent by daemon after processing merge commands) + // ========================================================================= + + /// Response to ListBranches command. + BranchList { + #[serde(rename = "taskId")] + task_id: Uuid, + branches: Vec<BranchInfo>, + }, + + /// Response to MergeStatus command. + MergeStatusResponse { + #[serde(rename = "taskId")] + task_id: Uuid, + #[serde(rename = "inProgress")] + in_progress: bool, + #[serde(rename = "sourceBranch")] + source_branch: Option<String>, + #[serde(rename = "conflictedFiles")] + conflicted_files: Vec<String>, + }, + + /// Response to merge operations (MergeStart, MergeResolve, MergeCommit, MergeAbort). + MergeResult { + #[serde(rename = "taskId")] + task_id: Uuid, + success: bool, + message: String, + #[serde(rename = "commitSha")] + commit_sha: Option<String>, + /// Present only when conflicts occurred. + conflicts: Option<Vec<String>>, + }, + + /// Response to CheckMergeComplete command. + MergeCompleteCheck { + #[serde(rename = "taskId")] + task_id: Uuid, + #[serde(rename = "canComplete")] + can_complete: bool, + #[serde(rename = "unmergedBranches")] + unmerged_branches: Vec<String>, + #[serde(rename = "mergedCount")] + merged_count: u32, + #[serde(rename = "skippedCount")] + skipped_count: u32, + }, + + // ========================================================================= + // Completion Action Response Messages + // ========================================================================= + + /// Response to RetryCompletionAction command. + CompletionActionResult { + #[serde(rename = "taskId")] + task_id: Uuid, + success: bool, + message: String, + /// PR URL if action was "pr" and successful. + #[serde(rename = "prUrl")] + pr_url: Option<String>, + }, + + /// Report daemon's available directories for task output. + DaemonDirectories { + /// Current working directory of the daemon. + #[serde(rename = "workingDirectory")] + working_directory: String, + /// Path to ~/.makima/home directory (for cloning completed work). + #[serde(rename = "homeDirectory")] + home_directory: String, + /// Path to worktrees directory (~/.makima/worktrees). + #[serde(rename = "worktreesDirectory")] + worktrees_directory: String, + }, + + /// Response to CloneWorktree command. + CloneWorktreeResult { + #[serde(rename = "taskId")] + task_id: Uuid, + success: bool, + message: String, + /// The path where the worktree was cloned. + #[serde(rename = "targetDir")] + target_dir: Option<String>, + }, + + /// Response to CheckTargetExists command. + CheckTargetExistsResult { + #[serde(rename = "taskId")] + task_id: Uuid, + /// Whether the target directory exists. + exists: bool, + /// The path that was checked. + #[serde(rename = "targetDir")] + target_dir: String, + }, + + // ========================================================================= + // Contract File Response Messages + // ========================================================================= + + /// Response to ReadRepoFile command. + RepoFileContent { + /// Request ID from the original command. + #[serde(rename = "requestId")] + request_id: Uuid, + /// Path to the file that was read. + #[serde(rename = "filePath")] + file_path: String, + /// File content (None if error occurred). + content: Option<String>, + /// Whether the operation succeeded. + success: bool, + /// Error message if operation failed. + error: Option<String>, + }, + + // ========================================================================= + // Supervisor Git Response Messages + // ========================================================================= + + /// Response to CreateBranch command. + BranchCreated { + #[serde(rename = "taskId")] + task_id: Uuid, + success: bool, + #[serde(rename = "branchName")] + branch_name: String, + message: String, + }, + + /// Response to MergeTaskToTarget command. + MergeToTargetResult { + #[serde(rename = "taskId")] + task_id: Uuid, + success: bool, + message: String, + #[serde(rename = "commitSha")] + commit_sha: Option<String>, + conflicts: Option<Vec<String>>, + }, + + /// Response to CreatePR command. + PRCreated { + #[serde(rename = "taskId")] + task_id: Uuid, + success: bool, + message: String, + #[serde(rename = "prUrl")] + pr_url: Option<String>, + #[serde(rename = "prNumber")] + pr_number: Option<i32>, + }, + + /// Response to GetTaskDiff command. + TaskDiff { + #[serde(rename = "taskId")] + task_id: Uuid, + success: bool, + diff: Option<String>, + error: Option<String>, + }, +} + +/// Information about a branch (used in BranchList message). +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct BranchInfo { + /// Full branch name. + pub name: String, + /// Task ID extracted from branch name (if parseable). + #[serde(rename = "taskId")] + pub task_id: Option<Uuid>, + /// Whether this branch has been merged. + #[serde(rename = "isMerged")] + pub is_merged: bool, + /// Short SHA of the last commit. + #[serde(rename = "lastCommit")] + pub last_commit: String, + /// Subject line of the last commit. + #[serde(rename = "lastCommitMessage")] + pub last_commit_message: String, +} + +/// Command from server to daemon. +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(tag = "type", rename_all = "camelCase")] +pub enum DaemonCommand { + /// Confirm successful authentication. + Authenticated { + #[serde(rename = "daemonId")] + daemon_id: Uuid, + }, + + /// Spawn a new task in a container. + SpawnTask { + #[serde(rename = "taskId")] + task_id: Uuid, + /// Human-readable task name (used for commit messages). + #[serde(rename = "taskName")] + task_name: String, + plan: String, + #[serde(rename = "repoUrl")] + repo_url: Option<String>, + #[serde(rename = "baseBranch")] + base_branch: Option<String>, + /// Target branch to merge into (used for completion actions). + #[serde(rename = "targetBranch")] + target_branch: Option<String>, + /// Parent task ID if this is a subtask. + #[serde(rename = "parentTaskId")] + parent_task_id: Option<Uuid>, + /// Depth in task hierarchy (0=top-level, 1=subtask, 2=sub-subtask). + depth: i32, + /// Whether this task should run as an orchestrator (true if depth==0 and has subtasks). + #[serde(rename = "isOrchestrator")] + is_orchestrator: bool, + /// Path to user's local repository (outside ~/.makima) for completion actions. + #[serde(rename = "targetRepoPath")] + target_repo_path: Option<String>, + /// Action on completion: "none", "branch", "merge", "pr". + #[serde(rename = "completionAction")] + completion_action: Option<String>, + /// Task ID to continue from (copy worktree from this task). + #[serde(rename = "continueFromTaskId")] + continue_from_task_id: Option<Uuid>, + /// Files to copy from parent task's worktree. + #[serde(rename = "copyFiles")] + copy_files: Option<Vec<String>>, + /// Contract ID if this task is associated with a contract. + #[serde(rename = "contractId")] + contract_id: Option<Uuid>, + /// Whether this task is a supervisor (long-running contract orchestrator). + #[serde(rename = "isSupervisor", default)] + is_supervisor: bool, + }, + + /// Pause a running task. + PauseTask { + #[serde(rename = "taskId")] + task_id: Uuid, + }, + + /// Resume a paused task. + ResumeTask { + #[serde(rename = "taskId")] + task_id: Uuid, + }, + + /// Interrupt a task (gracefully or forced). + InterruptTask { + #[serde(rename = "taskId")] + task_id: Uuid, + graceful: bool, + }, + + /// Send a message to a running task. + SendMessage { + #[serde(rename = "taskId")] + task_id: Uuid, + message: String, + }, + + /// Inject context about sibling task progress. + InjectSiblingContext { + #[serde(rename = "taskId")] + task_id: Uuid, + #[serde(rename = "siblingTaskId")] + sibling_task_id: Uuid, + #[serde(rename = "siblingName")] + sibling_name: String, + #[serde(rename = "siblingStatus")] + sibling_status: String, + #[serde(rename = "progressSummary")] + progress_summary: Option<String>, + #[serde(rename = "changedFiles")] + changed_files: Vec<String>, + }, + + // ========================================================================= + // Merge Commands (for orchestrators to merge subtask branches) + // ========================================================================= + + /// List all subtask branches for a task. + ListBranches { + #[serde(rename = "taskId")] + task_id: Uuid, + }, + + /// Start merging a subtask branch. + MergeStart { + #[serde(rename = "taskId")] + task_id: Uuid, + #[serde(rename = "sourceBranch")] + source_branch: String, + }, + + /// Get current merge status. + MergeStatus { + #[serde(rename = "taskId")] + task_id: Uuid, + }, + + /// Resolve a merge conflict. + MergeResolve { + #[serde(rename = "taskId")] + task_id: Uuid, + file: String, + /// "ours" or "theirs" + strategy: String, + }, + + /// Commit the current merge. + MergeCommit { + #[serde(rename = "taskId")] + task_id: Uuid, + message: String, + }, + + /// Abort the current merge. + MergeAbort { + #[serde(rename = "taskId")] + task_id: Uuid, + }, + + /// Skip merging a subtask branch (mark as intentionally not merged). + MergeSkip { + #[serde(rename = "taskId")] + task_id: Uuid, + #[serde(rename = "subtaskId")] + subtask_id: Uuid, + reason: String, + }, + + /// Check if all subtask branches have been merged or skipped (completion gate). + CheckMergeComplete { + #[serde(rename = "taskId")] + task_id: Uuid, + }, + + // ========================================================================= + // Completion Action Commands + // ========================================================================= + + /// Retry a completion action for a completed task. + RetryCompletionAction { + #[serde(rename = "taskId")] + task_id: Uuid, + /// Human-readable task name (used for commit messages). + #[serde(rename = "taskName")] + task_name: String, + /// The action to execute: "branch", "merge", or "pr". + action: String, + /// Path to the target repository. + #[serde(rename = "targetRepoPath")] + target_repo_path: String, + /// Target branch to merge into (for merge/pr actions). + #[serde(rename = "targetBranch")] + target_branch: Option<String>, + }, + + /// Clone worktree to a target directory. + CloneWorktree { + #[serde(rename = "taskId")] + task_id: Uuid, + /// Path to the target directory. + #[serde(rename = "targetDir")] + target_dir: String, + }, + + /// Check if a target directory exists. + CheckTargetExists { + #[serde(rename = "taskId")] + task_id: Uuid, + /// Path to check. + #[serde(rename = "targetDir")] + target_dir: String, + }, + + // ========================================================================= + // Contract File Commands + // ========================================================================= + + /// Read a file from a repository linked to a contract. + ReadRepoFile { + /// Request ID for correlating response. + #[serde(rename = "requestId")] + request_id: Uuid, + /// Contract ID (used for logging/context). + #[serde(rename = "contractId")] + contract_id: Uuid, + /// Path to the file within the repository. + #[serde(rename = "filePath")] + file_path: String, + /// Full repository path on daemon's filesystem. + #[serde(rename = "repoPath")] + repo_path: String, + }, + + // ========================================================================= + // Supervisor Git Commands + // ========================================================================= + + /// Create a new branch in the supervisor's worktree. + CreateBranch { + #[serde(rename = "taskId")] + task_id: Uuid, + #[serde(rename = "branchName")] + branch_name: String, + /// Optional reference to create branch from (task_id or SHA). + #[serde(rename = "fromRef")] + from_ref: Option<String>, + }, + + /// Merge a task's changes to a target branch. + MergeTaskToTarget { + #[serde(rename = "taskId")] + task_id: Uuid, + /// Target branch to merge into (default: task's base branch). + #[serde(rename = "targetBranch")] + target_branch: Option<String>, + /// Whether to squash commits. + squash: bool, + }, + + /// Create a pull request for a task's changes. + CreatePR { + #[serde(rename = "taskId")] + task_id: Uuid, + title: String, + body: Option<String>, + /// Base branch for the PR (default: main). + #[serde(rename = "baseBranch")] + base_branch: String, + }, + + /// Get the diff for a task's changes. + GetTaskDiff { + #[serde(rename = "taskId")] + task_id: Uuid, + }, + + /// Error response. + Error { + code: String, + message: String, + }, +} + +impl DaemonMessage { + /// Create an authentication message. + pub fn authenticate( + api_key: &str, + machine_id: &str, + hostname: &str, + max_concurrent_tasks: i32, + ) -> Self { + Self::Authenticate { + api_key: api_key.to_string(), + machine_id: machine_id.to_string(), + hostname: hostname.to_string(), + max_concurrent_tasks, + } + } + + /// Create a heartbeat message. + pub fn heartbeat(active_tasks: Vec<Uuid>) -> Self { + Self::Heartbeat { active_tasks } + } + + /// Create a task output message. + pub fn task_output(task_id: Uuid, output: String, is_partial: bool) -> Self { + Self::TaskOutput { + task_id, + output, + is_partial, + } + } + + /// Create a task status change message. + pub fn task_status_change(task_id: Uuid, old_status: &str, new_status: &str) -> Self { + Self::TaskStatusChange { + task_id, + old_status: old_status.to_string(), + new_status: new_status.to_string(), + } + } + + /// Create a task progress message. + pub fn task_progress(task_id: Uuid, summary: String) -> Self { + Self::TaskProgress { task_id, summary } + } + + /// Create a task complete message. + pub fn task_complete(task_id: Uuid, success: bool, error: Option<String>) -> Self { + Self::TaskComplete { + task_id, + success, + error, + } + } + + /// Create a register tool key message. + pub fn register_tool_key(task_id: Uuid, key: String) -> Self { + Self::RegisterToolKey { task_id, key } + } + + /// Create a revoke tool key message. + pub fn revoke_tool_key(task_id: Uuid) -> Self { + Self::RevokeToolKey { task_id } + } +} + +#[cfg(test)] +mod tests { + use crate::daemon::*; + + #[test] + fn test_daemon_message_serialization() { + let msg = DaemonMessage::authenticate("key123", "machine-abc", "worker-1", 4); + let json = serde_json::to_string(&msg).unwrap(); + assert!(json.contains("\"type\":\"authenticate\"")); + assert!(json.contains("\"apiKey\":\"key123\"")); + assert!(json.contains("\"machineId\":\"machine-abc\"")); + } + + #[test] + fn test_daemon_command_deserialization() { + let json = r#"{"type":"spawnTask","taskId":"550e8400-e29b-41d4-a716-446655440000","plan":"Build the feature","repoUrl":"https://github.com/test/repo","baseBranch":"main","parentTaskId":null,"depth":0,"isOrchestrator":false}"#; + let cmd: DaemonCommand = serde_json::from_str(json).unwrap(); + match cmd { + DaemonCommand::SpawnTask { + plan, + repo_url, + base_branch, + parent_task_id, + depth, + is_orchestrator, + .. + } => { + assert_eq!(plan, "Build the feature"); + assert_eq!(repo_url, Some("https://github.com/test/repo".to_string())); + assert_eq!(base_branch, Some("main".to_string())); + assert_eq!(parent_task_id, None); + assert_eq!(depth, 0); + assert!(!is_orchestrator); + } + _ => panic!("Expected SpawnTask"), + } + } + + #[test] + fn test_orchestrator_spawn_deserialization() { + let json = r#"{"type":"spawnTask","taskId":"550e8400-e29b-41d4-a716-446655440000","plan":"Coordinate subtasks","repoUrl":"https://github.com/test/repo","baseBranch":"main","parentTaskId":null,"depth":0,"isOrchestrator":true}"#; + let cmd: DaemonCommand = serde_json::from_str(json).unwrap(); + match cmd { + DaemonCommand::SpawnTask { + is_orchestrator, + depth, + .. + } => { + assert!(is_orchestrator); + assert_eq!(depth, 0); + } + _ => panic!("Expected SpawnTask"), + } + } +} |
