summaryrefslogtreecommitdiff
path: root/makima/src/server/handlers/mesh.rs
diff options
context:
space:
mode:
authorsoryu <soryu@soryu.co>2026-01-21 17:31:46 +0000
committerGitHub <noreply@github.com>2026-01-21 17:31:46 +0000
commit94e5604e770d6589f786ea71e51738e21492f301 (patch)
tree6c9b0f32a8d77464bc1a5131ba0828d252851abc /makima/src/server/handlers/mesh.rs
parentda246c4c4e23c9ad976705f9a3fa80e0d75b4425 (diff)
downloadsoryu-94e5604e770d6589f786ea71e51738e21492f301.tar.gz
soryu-94e5604e770d6589f786ea71e51738e21492f301.zip
Add task branching feature (#15)
Diffstat (limited to 'makima/src/server/handlers/mesh.rs')
-rw-r--r--makima/src/server/handlers/mesh.rs269
1 files changed, 263 insertions, 6 deletions
diff --git a/makima/src/server/handlers/mesh.rs b/makima/src/server/handlers/mesh.rs
index 275dc3c..99c3d9d 100644
--- a/makima/src/server/handlers/mesh.rs
+++ b/makima/src/server/handlers/mesh.rs
@@ -9,9 +9,10 @@ use axum::{
use uuid::Uuid;
use crate::db::models::{
- CreateTaskRequest, DaemonDirectory, DaemonDirectoriesResponse, DaemonListResponse,
- SendMessageRequest, Task, TaskEventListResponse, TaskListResponse, TaskOutputEntry,
- TaskOutputResponse, TaskWithSubtasks, UpdateTaskRequest,
+ BranchTaskRequest, BranchTaskResponse, CreateTaskRequest, DaemonDirectory,
+ DaemonDirectoriesResponse, DaemonListResponse, SendMessageRequest, Task,
+ TaskEventListResponse, TaskListResponse, TaskOutputEntry, TaskOutputResponse,
+ TaskWithSubtasks, UpdateTaskRequest,
};
use crate::db::repository::{self, RepositoryError};
use crate::server::auth::Authenticated;
@@ -2196,7 +2197,7 @@ pub async fn reassign_task(
// Create a NEW task with the conversation context
let create_req = CreateTaskRequest {
- contract_id: task.contract_id.unwrap_or(Uuid::nil()),
+ contract_id: task.contract_id,
name: format!("{} (resumed)", task.name),
description: task.description.clone(),
plan: updated_plan.clone(),
@@ -2212,6 +2213,8 @@ pub async fn reassign_task(
continue_from_task_id: Some(id), // Continue from the old task's worktree if possible
copy_files: None,
checkpoint_sha: task.last_checkpoint_sha.clone(),
+ branched_from_task_id: None,
+ conversation_history: None,
};
let new_task = match repository::create_task_for_owner(pool, auth.owner_id, create_req).await {
@@ -2913,7 +2916,7 @@ pub async fn fork_task(
// Create the new forked task
let create_req = CreateTaskRequest {
- contract_id: task.contract_id.unwrap_or(Uuid::nil()),
+ contract_id: task.contract_id,
name: req.new_task_name.clone(),
description: task.description.clone(),
plan: req.new_task_plan.clone(),
@@ -2929,6 +2932,8 @@ pub async fn fork_task(
continue_from_task_id: None,
copy_files: None,
checkpoint_sha: Some(checkpoint.commit_sha.clone()),
+ branched_from_task_id: None,
+ conversation_history: None,
};
let new_task = match repository::create_task_for_owner(pool, auth.owner_id, create_req).await {
@@ -3068,7 +3073,7 @@ pub async fn resume_from_checkpoint(
});
let create_req = CreateTaskRequest {
- contract_id: task.contract_id.unwrap_or(Uuid::nil()),
+ contract_id: task.contract_id,
name: task_name,
description: task.description.clone(),
plan: req.plan,
@@ -3084,6 +3089,8 @@ pub async fn resume_from_checkpoint(
continue_from_task_id: Some(task_id), // Copy worktree from original task
copy_files: None,
checkpoint_sha: Some(checkpoint.commit_sha.clone()),
+ branched_from_task_id: None,
+ conversation_history: None,
};
let new_task = match repository::create_task_for_owner(pool, auth.owner_id, create_req).await {
@@ -3304,3 +3311,253 @@ pub async fn branch_from_checkpoint(
)
.into_response()
}
+
+// =============================================================================
+// Task Branching
+// =============================================================================
+
+/// Branch a task, creating a new anonymous task from an existing task's conversation.
+///
+/// Creates a new task that:
+/// - Has no contract_id (anonymous task)
+/// - Has branched_from_task_id pointing to the source task
+/// - Optionally includes conversation history from the source task
+/// - Can be started on an available daemon
+#[utoipa::path(
+ post,
+ path = "/api/v1/mesh/tasks/{id}/branch",
+ params(
+ ("id" = Uuid, Path, description = "Source task ID to branch from")
+ ),
+ request_body = BranchTaskRequest,
+ responses(
+ (status = 201, description = "Task branched successfully", body = BranchTaskResponse),
+ (status = 400, description = "Invalid request", body = ApiError),
+ (status = 401, description = "Unauthorized", body = ApiError),
+ (status = 404, description = "Source task not found", body = ApiError),
+ (status = 503, description = "Database not configured", body = ApiError),
+ (status = 500, description = "Internal server error", body = ApiError),
+ ),
+ security(
+ ("bearer_auth" = []),
+ ("api_key" = [])
+ ),
+ tag = "Mesh"
+)]
+pub async fn branch_task(
+ State(state): State<SharedState>,
+ Authenticated(auth): Authenticated,
+ Path(source_task_id): Path<Uuid>,
+ Json(req): Json<BranchTaskRequest>,
+) -> impl IntoResponse {
+ let Some(ref pool) = state.db_pool else {
+ return (
+ StatusCode::SERVICE_UNAVAILABLE,
+ Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")),
+ )
+ .into_response();
+ };
+
+ // Get the source task (must belong to the same owner)
+ let source_task = match repository::get_task_for_owner(pool, source_task_id, auth.owner_id).await {
+ Ok(Some(task)) => task,
+ Ok(None) => {
+ return (
+ StatusCode::NOT_FOUND,
+ Json(ApiError::new("NOT_FOUND", "Source task not found")),
+ )
+ .into_response();
+ }
+ Err(e) => {
+ tracing::error!("Failed to get source task: {}", e);
+ return (
+ StatusCode::INTERNAL_SERVER_ERROR,
+ Json(ApiError::new("DB_ERROR", e.to_string())),
+ )
+ .into_response();
+ }
+ };
+
+ // Build conversation history if requested
+ let (conversation_history, message_count) = if req.include_conversation {
+ match repository::get_task_output(pool, source_task_id, Some(500)).await {
+ Ok(events) => {
+ let entries: Vec<TaskOutputEntry> = events
+ .into_iter()
+ .filter_map(TaskOutputEntry::from_task_event)
+ .collect();
+ let count = entries.len();
+
+ // Convert entries to a JSON array for conversation_history
+ let history_json = serde_json::to_value(&entries).unwrap_or(serde_json::Value::Null);
+ (Some(history_json), count)
+ }
+ Err(e) => {
+ tracing::warn!("Failed to get task output for branching: {}", e);
+ (None, 0)
+ }
+ }
+ } else {
+ (None, 0)
+ };
+
+ // Generate task name if not provided
+ let task_name = req.name.unwrap_or_else(|| {
+ format!("{} (branch)", source_task.name)
+ });
+
+ // Create the branched task (anonymous - no contract_id)
+ let create_req = CreateTaskRequest {
+ contract_id: None, // Anonymous task
+ name: task_name,
+ description: Some(format!("Branched from task: {}", source_task.name)),
+ plan: req.message,
+ parent_task_id: None,
+ is_supervisor: false,
+ priority: source_task.priority,
+ repository_url: source_task.repository_url.clone(),
+ base_branch: source_task.base_branch.clone(),
+ target_branch: None, // Branched tasks don't auto-merge
+ merge_mode: None,
+ target_repo_path: source_task.target_repo_path.clone(),
+ completion_action: Some("none".to_string()), // Don't auto-complete
+ continue_from_task_id: Some(source_task_id), // Continue from source task's worktree
+ copy_files: None,
+ checkpoint_sha: None,
+ branched_from_task_id: Some(source_task_id),
+ conversation_history,
+ };
+
+ let task = match repository::create_task_for_owner(pool, auth.owner_id, create_req).await {
+ Ok(task) => task,
+ Err(e) => {
+ tracing::error!("Failed to create branched task: {}", e);
+ return (
+ StatusCode::INTERNAL_SERVER_ERROR,
+ Json(ApiError::new("DB_ERROR", e.to_string())),
+ )
+ .into_response();
+ }
+ };
+
+ // Record history event for task branching
+ let _ = repository::record_history_event(
+ pool,
+ auth.owner_id,
+ None, // No contract for anonymous tasks
+ Some(task.id),
+ "task",
+ Some("branched"),
+ None,
+ serde_json::json!({
+ "name": &task.name,
+ "sourceTaskId": source_task_id,
+ "sourceTaskName": &source_task.name,
+ "messageCount": message_count,
+ }),
+ ).await;
+
+ // Try to find an available daemon to start the task
+ let daemon_id = state.daemon_connections
+ .iter()
+ .find(|d| d.value().owner_id == auth.owner_id)
+ .map(|d| d.value().id);
+
+ // If a daemon is available, start the task
+ if let Some(target_daemon_id) = daemon_id {
+ // Update task with daemon assignment
+ let update_req = UpdateTaskRequest {
+ status: Some("starting".to_string()),
+ daemon_id: Some(target_daemon_id),
+ ..Default::default()
+ };
+
+ if let Ok(Some(updated_task)) = repository::update_task_for_owner(pool, task.id, auth.owner_id, update_req).await {
+ // Send SpawnTask command to daemon
+ let command = DaemonCommand::SpawnTask {
+ task_id: task.id,
+ task_name: updated_task.name.clone(),
+ plan: updated_task.plan.clone(),
+ repo_url: updated_task.repository_url.clone(),
+ base_branch: updated_task.base_branch.clone(),
+ target_branch: updated_task.target_branch.clone(),
+ parent_task_id: None,
+ depth: 0,
+ is_orchestrator: false,
+ target_repo_path: updated_task.target_repo_path.clone(),
+ completion_action: updated_task.completion_action.clone(),
+ continue_from_task_id: updated_task.continue_from_task_id,
+ copy_files: None,
+ contract_id: None,
+ is_supervisor: false,
+ resume_session: message_count > 0, // Resume if we have conversation history
+ conversation_history: updated_task.conversation_state.clone(),
+ };
+
+ if let Err(e) = state.send_daemon_command(target_daemon_id, command).await {
+ tracing::warn!(
+ task_id = %task.id,
+ daemon_id = %target_daemon_id,
+ error = %e,
+ "Failed to send SpawnTask command for branched task, task created but not started"
+ );
+ // Task was created but not started - return without daemon_id
+ return (
+ StatusCode::CREATED,
+ Json(BranchTaskResponse {
+ task,
+ message_count,
+ daemon_id: None,
+ }),
+ )
+ .into_response();
+ }
+
+ tracing::info!(
+ task_id = %task.id,
+ source_task_id = %source_task_id,
+ daemon_id = %target_daemon_id,
+ message_count = message_count,
+ "Branched task created and started"
+ );
+
+ // Broadcast task update notification
+ state.broadcast_task_update(TaskUpdateNotification {
+ task_id: task.id,
+ owner_id: Some(auth.owner_id),
+ version: updated_task.version,
+ status: "starting".to_string(),
+ updated_fields: vec!["status".to_string(), "daemon_id".to_string()],
+ updated_by: "system".to_string(),
+ });
+
+ return (
+ StatusCode::CREATED,
+ Json(BranchTaskResponse {
+ task: updated_task,
+ message_count,
+ daemon_id: Some(target_daemon_id),
+ }),
+ )
+ .into_response();
+ }
+ }
+
+ // No daemon available or failed to start - return task without daemon_id
+ tracing::info!(
+ task_id = %task.id,
+ source_task_id = %source_task_id,
+ message_count = message_count,
+ "Branched task created (no daemon available to start)"
+ );
+
+ (
+ StatusCode::CREATED,
+ Json(BranchTaskResponse {
+ task,
+ message_count,
+ daemon_id: None,
+ }),
+ )
+ .into_response()
+}