summaryrefslogtreecommitdiff
path: root/makima/src
diff options
context:
space:
mode:
authorsoryu <soryu@soryu.co>2026-01-21 16:11:09 +0000
committersoryu <soryu@soryu.co>2026-01-21 16:11:09 +0000
commit279888491d367ee6307fc4e2b5335355d2e13ba4 (patch)
treea4e49cd9990886c830f5a6022104d9f26945cb01 /makima/src
parent242626b9d2532f716d6b1543f55aa6b16173d1b8 (diff)
downloadsoryu-279888491d367ee6307fc4e2b5335355d2e13ba4.tar.gz
soryu-279888491d367ee6307fc4e2b5335355d2e13ba4.zip
Add branch_task endpoint for task branching
Implement POST /api/v1/mesh/tasks/{id}/branch endpoint that: - Creates a new anonymous task from an existing task's conversation - Sets branched_from_task_id to track the source task - Optionally includes conversation history for context continuation - Automatically starts on available daemon if one is connected - Uses continue_from_task_id to inherit worktree from source - Records history event for branching operation Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
Diffstat (limited to 'makima/src')
-rw-r--r--makima/src/server/handlers/mesh.rs257
1 files changed, 254 insertions, 3 deletions
diff --git a/makima/src/server/handlers/mesh.rs b/makima/src/server/handlers/mesh.rs
index 275dc3c..b6eadf1 100644
--- a/makima/src/server/handlers/mesh.rs
+++ b/makima/src/server/handlers/mesh.rs
@@ -9,9 +9,10 @@ use axum::{
use uuid::Uuid;
use crate::db::models::{
- CreateTaskRequest, DaemonDirectory, DaemonDirectoriesResponse, DaemonListResponse,
- SendMessageRequest, Task, TaskEventListResponse, TaskListResponse, TaskOutputEntry,
- TaskOutputResponse, TaskWithSubtasks, UpdateTaskRequest,
+ BranchTaskRequest, BranchTaskResponse, CreateTaskRequest, DaemonDirectory,
+ DaemonDirectoriesResponse, DaemonListResponse, SendMessageRequest, Task,
+ TaskEventListResponse, TaskListResponse, TaskOutputEntry, TaskOutputResponse,
+ TaskWithSubtasks, UpdateTaskRequest,
};
use crate::db::repository::{self, RepositoryError};
use crate::server::auth::Authenticated;
@@ -3304,3 +3305,253 @@ pub async fn branch_from_checkpoint(
)
.into_response()
}
+
+// =============================================================================
+// Task Branching
+// =============================================================================
+
+/// Branch a task, creating a new anonymous task from an existing task's conversation.
+///
+/// Creates a new task that:
+/// - Has no contract_id (anonymous task)
+/// - Has branched_from_task_id pointing to the source task
+/// - Optionally includes conversation history from the source task
+/// - Can be started on an available daemon
+#[utoipa::path(
+ post,
+ path = "/api/v1/mesh/tasks/{id}/branch",
+ params(
+ ("id" = Uuid, Path, description = "Source task ID to branch from")
+ ),
+ request_body = BranchTaskRequest,
+ responses(
+ (status = 201, description = "Task branched successfully", body = BranchTaskResponse),
+ (status = 400, description = "Invalid request", body = ApiError),
+ (status = 401, description = "Unauthorized", body = ApiError),
+ (status = 404, description = "Source task not found", body = ApiError),
+ (status = 503, description = "Database not configured", body = ApiError),
+ (status = 500, description = "Internal server error", body = ApiError),
+ ),
+ security(
+ ("bearer_auth" = []),
+ ("api_key" = [])
+ ),
+ tag = "Mesh"
+)]
+pub async fn branch_task(
+ State(state): State<SharedState>,
+ Authenticated(auth): Authenticated,
+ Path(source_task_id): Path<Uuid>,
+ Json(req): Json<BranchTaskRequest>,
+) -> impl IntoResponse {
+ let Some(ref pool) = state.db_pool else {
+ return (
+ StatusCode::SERVICE_UNAVAILABLE,
+ Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")),
+ )
+ .into_response();
+ };
+
+ // Get the source task (must belong to the same owner)
+ let source_task = match repository::get_task_for_owner(pool, source_task_id, auth.owner_id).await {
+ Ok(Some(task)) => task,
+ Ok(None) => {
+ return (
+ StatusCode::NOT_FOUND,
+ Json(ApiError::new("NOT_FOUND", "Source task not found")),
+ )
+ .into_response();
+ }
+ Err(e) => {
+ tracing::error!("Failed to get source task: {}", e);
+ return (
+ StatusCode::INTERNAL_SERVER_ERROR,
+ Json(ApiError::new("DB_ERROR", e.to_string())),
+ )
+ .into_response();
+ }
+ };
+
+ // Build conversation history if requested
+ let (conversation_history, message_count) = if req.include_conversation {
+ match repository::get_task_output(pool, source_task_id, Some(500)).await {
+ Ok(events) => {
+ let entries: Vec<TaskOutputEntry> = events
+ .into_iter()
+ .filter_map(TaskOutputEntry::from_task_event)
+ .collect();
+ let count = entries.len();
+
+ // Convert entries to a JSON array for conversation_history
+ let history_json = serde_json::to_value(&entries).unwrap_or(serde_json::Value::Null);
+ (Some(history_json), count)
+ }
+ Err(e) => {
+ tracing::warn!("Failed to get task output for branching: {}", e);
+ (None, 0)
+ }
+ }
+ } else {
+ (None, 0)
+ };
+
+ // Generate task name if not provided
+ let task_name = req.name.unwrap_or_else(|| {
+ format!("{} (branch)", source_task.name)
+ });
+
+ // Create the branched task (anonymous - no contract_id)
+ let create_req = CreateTaskRequest {
+ contract_id: None, // Anonymous task
+ name: task_name,
+ description: Some(format!("Branched from task: {}", source_task.name)),
+ plan: req.message,
+ parent_task_id: None,
+ is_supervisor: false,
+ priority: source_task.priority,
+ repository_url: source_task.repository_url.clone(),
+ base_branch: source_task.base_branch.clone(),
+ target_branch: None, // Branched tasks don't auto-merge
+ merge_mode: None,
+ target_repo_path: source_task.target_repo_path.clone(),
+ completion_action: Some("none".to_string()), // Don't auto-complete
+ continue_from_task_id: Some(source_task_id), // Continue from source task's worktree
+ copy_files: None,
+ checkpoint_sha: None,
+ branched_from_task_id: Some(source_task_id),
+ conversation_history,
+ };
+
+ let task = match repository::create_task_for_owner(pool, auth.owner_id, create_req).await {
+ Ok(task) => task,
+ Err(e) => {
+ tracing::error!("Failed to create branched task: {}", e);
+ return (
+ StatusCode::INTERNAL_SERVER_ERROR,
+ Json(ApiError::new("DB_ERROR", e.to_string())),
+ )
+ .into_response();
+ }
+ };
+
+ // Record history event for task branching
+ let _ = repository::record_history_event(
+ pool,
+ auth.owner_id,
+ None, // No contract for anonymous tasks
+ Some(task.id),
+ "task",
+ Some("branched"),
+ None,
+ serde_json::json!({
+ "name": &task.name,
+ "sourceTaskId": source_task_id,
+ "sourceTaskName": &source_task.name,
+ "messageCount": message_count,
+ }),
+ ).await;
+
+ // Try to find an available daemon to start the task
+ let daemon_id = state.daemon_connections
+ .iter()
+ .find(|d| d.value().owner_id == auth.owner_id)
+ .map(|d| d.value().id);
+
+ // If a daemon is available, start the task
+ if let Some(target_daemon_id) = daemon_id {
+ // Update task with daemon assignment
+ let update_req = UpdateTaskRequest {
+ status: Some("starting".to_string()),
+ daemon_id: Some(target_daemon_id),
+ ..Default::default()
+ };
+
+ if let Ok(Some(updated_task)) = repository::update_task_for_owner(pool, task.id, auth.owner_id, update_req).await {
+ // Send SpawnTask command to daemon
+ let command = DaemonCommand::SpawnTask {
+ task_id: task.id,
+ task_name: updated_task.name.clone(),
+ plan: updated_task.plan.clone(),
+ repo_url: updated_task.repository_url.clone(),
+ base_branch: updated_task.base_branch.clone(),
+ target_branch: updated_task.target_branch.clone(),
+ parent_task_id: None,
+ depth: 0,
+ is_orchestrator: false,
+ target_repo_path: updated_task.target_repo_path.clone(),
+ completion_action: updated_task.completion_action.clone(),
+ continue_from_task_id: updated_task.continue_from_task_id,
+ copy_files: None,
+ contract_id: None,
+ is_supervisor: false,
+ resume_session: message_count > 0, // Resume if we have conversation history
+ conversation_history: updated_task.conversation_state.clone(),
+ };
+
+ if let Err(e) = state.send_daemon_command(target_daemon_id, command).await {
+ tracing::warn!(
+ task_id = %task.id,
+ daemon_id = %target_daemon_id,
+ error = %e,
+ "Failed to send SpawnTask command for branched task, task created but not started"
+ );
+ // Task was created but not started - return without daemon_id
+ return (
+ StatusCode::CREATED,
+ Json(BranchTaskResponse {
+ task,
+ message_count,
+ daemon_id: None,
+ }),
+ )
+ .into_response();
+ }
+
+ tracing::info!(
+ task_id = %task.id,
+ source_task_id = %source_task_id,
+ daemon_id = %target_daemon_id,
+ message_count = message_count,
+ "Branched task created and started"
+ );
+
+ // Broadcast task update notification
+ state.broadcast_task_update(TaskUpdateNotification {
+ task_id: task.id,
+ owner_id: Some(auth.owner_id),
+ version: updated_task.version,
+ status: "starting".to_string(),
+ updated_fields: vec!["status".to_string(), "daemon_id".to_string()],
+ updated_by: "system".to_string(),
+ });
+
+ return (
+ StatusCode::CREATED,
+ Json(BranchTaskResponse {
+ task: updated_task,
+ message_count,
+ daemon_id: Some(target_daemon_id),
+ }),
+ )
+ .into_response();
+ }
+ }
+
+ // No daemon available or failed to start - return task without daemon_id
+ tracing::info!(
+ task_id = %task.id,
+ source_task_id = %source_task_id,
+ message_count = message_count,
+ "Branched task created (no daemon available to start)"
+ );
+
+ (
+ StatusCode::CREATED,
+ Json(BranchTaskResponse {
+ task,
+ message_count,
+ daemon_id: None,
+ }),
+ )
+ .into_response()
+}