From 2efd88d3a76842deea263a8f90b1bcd433d3b5ce Mon Sep 17 00:00:00 2001 From: soryu Date: Fri, 13 Feb 2026 20:48:13 +0000 Subject: WIP: heartbeat checkpoint --- .../components/SupervisorQuestionNotification.tsx | 89 +++--- .../src/components/directives/DirectiveDetail.tsx | 31 +++ makima/frontend/src/hooks/useDirectives.ts | 9 +- makima/frontend/src/lib/api.ts | 23 +- makima/frontend/src/routes/directives.tsx | 3 +- makima/src/server/handlers/directives.rs | 298 +++++++++++++++++++++ makima/src/server/mod.rs | 2 + 7 files changed, 420 insertions(+), 35 deletions(-) diff --git a/makima/frontend/src/components/SupervisorQuestionNotification.tsx b/makima/frontend/src/components/SupervisorQuestionNotification.tsx index b1cbacc..1038b6b 100644 --- a/makima/frontend/src/components/SupervisorQuestionNotification.tsx +++ b/makima/frontend/src/components/SupervisorQuestionNotification.tsx @@ -19,42 +19,69 @@ export function SupervisorQuestionNotification() { navigate(`/mesh/${taskId}`); }; + const handleGoToDirective = (questionId: string, contractId: string) => { + dismissNotification(questionId); + // For directive questions, contractId holds the directive ID + navigate(`/directives/${contractId}`); + }; + return (
- {filteredQuestions.map((question) => ( -
- {/* Header */} -
-
- ? - - Task needs input - + {filteredQuestions.map((question) => { + const isDirective = question.questionType === "directive"; + return ( +
+ {/* Header */} +
+
+ ? + + {isDirective ? "Directive needs input" : "Task needs input"} + +
+
+ {isDirective && ( + + )} + +
- -
- {/* Question preview */} -
- {question.context && ( -
- {question.context} -
- )} -

- {question.question} -

+ {/* Question preview */} +
+ {question.context && ( +
+ {question.context} +
+ )} +

+ {question.question} +

+
-
- ))} + ); + })}
); } diff --git a/makima/frontend/src/components/directives/DirectiveDetail.tsx b/makima/frontend/src/components/directives/DirectiveDetail.tsx index 369cdaa..3b245f4 100644 --- a/makima/frontend/src/components/directives/DirectiveDetail.tsx +++ b/makima/frontend/src/components/directives/DirectiveDetail.tsx @@ -37,6 +37,7 @@ interface DirectiveDetailProps { onDelete: () => void; onRefresh: () => void; onCleanupTasks: () => void; + onToggleReconcileMode: (enabled: boolean) => void; } export function DirectiveDetail({ @@ -51,6 +52,7 @@ export function DirectiveDetail({ onDelete, onRefresh, onCleanupTasks, + onToggleReconcileMode, }: DirectiveDetailProps) { const [editingGoal, setEditingGoal] = useState(false); const [goalText, setGoalText] = useState(directive.goal); @@ -296,6 +298,35 @@ export function DirectiveDetail({ Delete
+ + {/* Reconcile Mode Toggle */} +
+ + + Reconcile Mode + + {directive.reconcileMode && ( + + Tasks pause on questions until you answer + + )} +
{/* Goal */} diff --git a/makima/frontend/src/hooks/useDirectives.ts b/makima/frontend/src/hooks/useDirectives.ts index e67733c..0a40338 100644 --- a/makima/frontend/src/hooks/useDirectives.ts +++ b/makima/frontend/src/hooks/useDirectives.ts @@ -20,6 +20,7 @@ import { skipDirectiveStep, updateDirectiveGoal, cleanupDirectiveTasks, + setDirectiveReconcileMode, } from "../lib/api"; export function useDirectives() { @@ -160,11 +161,17 @@ export function useDirective(id: string | undefined) { await refresh(); }, [id, refresh]); + const toggleReconcileMode = useCallback(async (enabled: boolean) => { + if (!id) return; + await setDirectiveReconcileMode(id, enabled); + await refresh(); + }, [id, refresh]); + return { directive, loading, error, refresh, update, addStep, removeStep, start, pause, advance, completeStep, failStep, skipStep, - updateGoal, cleanupTasks, + updateGoal, cleanupTasks, toggleReconcileMode, }; } diff --git a/makima/frontend/src/lib/api.ts b/makima/frontend/src/lib/api.ts index 9d9cb1c..e71dc07 100644 --- a/makima/frontend/src/lib/api.ts +++ b/makima/frontend/src/lib/api.ts @@ -2247,8 +2247,8 @@ export interface PendingQuestion { createdAt: string; /** Whether multiple choices can be selected */ multiSelect?: boolean; - /** Question type - "general" for regular questions, "phase_confirmation" for phase transitions, "contract_complete" for contract completion */ - questionType?: "general" | "phase_confirmation" | "contract_complete"; + /** Question type - "general" for regular questions, "phase_confirmation" for phase transitions, "contract_complete" for contract completion, "directive" for directive task questions */ + questionType?: "general" | "phase_confirmation" | "contract_complete" | "directive"; /** Phase confirmation specific data (when questionType is "phase_confirmation") */ phaseConfirmation?: { currentPhase: ContractPhase; @@ -3025,6 +3025,8 @@ export interface Directive { completionTaskId: string | null; /** Whether the memory system is enabled for this directive */ memoryEnabled: boolean; + /** When true, directive task questions use phaseguard (indefinite pause) instead of timeout */ + reconcileMode: boolean; goalUpdatedAt: string; startedAt: string | null; version: number; @@ -3064,6 +3066,8 @@ export interface DirectiveSummary { completionTaskId: string | null; /** Whether the memory system is enabled for this directive */ memoryEnabled: boolean; + /** When true, directive task questions use phaseguard (indefinite pause) instead of timeout */ + reconcileMode: boolean; version: number; createdAt: string; updatedAt: string; @@ -3086,6 +3090,8 @@ export interface CreateDirectiveRequest { baseBranch?: string; /** Enable the memory system for this directive (default: false) */ memoryEnabled?: boolean; + /** Enable reconcile mode — questions pause tasks indefinitely until answered (default: false) */ + reconcileMode?: boolean; } export interface UpdateDirectiveRequest { @@ -3098,6 +3104,8 @@ export interface UpdateDirectiveRequest { orchestratorTaskId?: string; /** Enable or disable the memory system for this directive */ memoryEnabled?: boolean; + /** Enable or disable reconcile mode for this directive */ + reconcileMode?: boolean; version?: number; } @@ -3246,6 +3254,17 @@ export async function cleanupDirectiveTasks(id: string): Promise<{ deleted: numb return res.json(); } +/** Set reconcile mode for a directive. When enabled, task questions pause indefinitely until answered. */ +export async function setDirectiveReconcileMode(id: string, enabled: boolean): Promise { + const res = await authFetch(`${API_BASE}/api/v1/directives/${id}/reconcile-mode`, { + method: "POST", + headers: { "Content-Type": "application/json" }, + body: JSON.stringify({ enabled }), + }); + if (!res.ok) throw new Error(`Failed to set reconcile mode: ${res.statusText}`); + return res.json(); +} + // ============================================================================= // Directive Memory Types & API // ============================================================================= diff --git a/makima/frontend/src/routes/directives.tsx b/makima/frontend/src/routes/directives.tsx index ca4437c..5f9b3a8 100644 --- a/makima/frontend/src/routes/directives.tsx +++ b/makima/frontend/src/routes/directives.tsx @@ -12,7 +12,7 @@ export default function DirectivesPage() { const navigate = useNavigate(); const { id: selectedId } = useParams<{ id: string }>(); const { directives, loading: listLoading, create, remove } = useDirectives(); - const { directive, refresh: refreshDetail, start, pause, advance, completeStep, failStep, skipStep, updateGoal, cleanupTasks } = useDirective(selectedId); + const { directive, refresh: refreshDetail, start, pause, advance, completeStep, failStep, skipStep, updateGoal, cleanupTasks, toggleReconcileMode } = useDirective(selectedId); const [showCreate, setShowCreate] = useState(false); const [newTitle, setNewTitle] = useState(""); @@ -210,6 +210,7 @@ export default function DirectivesPage() { onDelete={handleDelete} onRefresh={refreshDetail} onCleanupTasks={cleanupTasks} + onToggleReconcileMode={toggleReconcileMode} /> ) : (
diff --git a/makima/src/server/handlers/directives.rs b/makima/src/server/handlers/directives.rs index 1aa935b..f11155c 100644 --- a/makima/src/server/handlers/directives.rs +++ b/makima/src/server/handlers/directives.rs @@ -1291,3 +1291,301 @@ pub async fn cleanup_tasks( } } } + +// ============================================================================= +// Reconcile Mode +// ============================================================================= + +/// Request to set reconcile mode. +#[derive(Debug, Deserialize, ToSchema)] +#[serde(rename_all = "camelCase")] +pub struct SetReconcileModeRequest { + pub enabled: bool, +} + +/// Toggle reconcile mode for a directive. +/// When enabled, directive task questions use phaseguard (indefinite pause) instead of timeout. +#[utoipa::path( + post, + path = "/api/v1/directives/{id}/reconcile-mode", + params(("id" = Uuid, Path, description = "Directive ID")), + request_body = SetReconcileModeRequest, + responses( + (status = 200, description = "Reconcile mode updated", body = Directive), + (status = 404, description = "Not found", body = ApiError), + (status = 503, description = "Database not configured", body = ApiError), + ), + security(("bearer_auth" = []), ("api_key" = [])), + tag = "Directives" +)] +pub async fn set_reconcile_mode( + State(state): State, + Authenticated(auth): Authenticated, + Path(id): Path, + Json(req): Json, +) -> impl IntoResponse { + let Some(ref pool) = state.db_pool else { + return ( + StatusCode::SERVICE_UNAVAILABLE, + Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")), + ) + .into_response(); + }; + + let update = UpdateDirectiveRequest { + reconcile_mode: Some(req.enabled), + ..Default::default() + }; + + match repository::update_directive_for_owner(pool, auth.owner_id, id, update).await { + Ok(Some(directive)) => Json(directive).into_response(), + Ok(None) => ( + StatusCode::NOT_FOUND, + Json(ApiError::new("NOT_FOUND", "Directive not found")), + ) + .into_response(), + Err(e) => { + tracing::error!("Failed to set reconcile mode: {}", e); + ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(ApiError::new("UPDATE_FAILED", &e.to_string())), + ) + .into_response() + } + } +} + +// ============================================================================= +// Directive Question Asking (for directive tasks) +// ============================================================================= + +/// Request for a directive task to ask a question. +#[derive(Debug, Deserialize, ToSchema)] +#[serde(rename_all = "camelCase")] +pub struct DirectiveAskQuestionRequest { + /// The question to ask the user + pub question: String, + /// Optional choices (if empty, free-form text response) + #[serde(default)] + pub choices: Vec, + /// Optional context about what this relates to + pub context: Option, + /// When true, allow selecting multiple choices + #[serde(default)] + pub multi_select: bool, +} + +/// Response from asking a directive question. +#[derive(Debug, Serialize, ToSchema)] +#[serde(rename_all = "camelCase")] +pub struct DirectiveAskQuestionResponse { + /// The question ID for tracking + pub question_id: Uuid, + /// The user's response (None if timed out or reconcile mode) + pub response: Option, + /// Whether the question timed out + pub timed_out: bool, + /// Whether reconcile mode paused the task + pub reconcile_paused: bool, +} + +/// Ask a question from a directive task. +/// Uses tool-key authentication (called by directive step tasks via their tool key). +/// +/// Behaviour depends on the directive's reconcile_mode: +/// - reconcile_mode=false (default): polls with 60-second timeout +/// - reconcile_mode=true: uses phaseguard — pauses the task indefinitely until user responds +#[utoipa::path( + post, + path = "/api/v1/directives/{id}/ask-question", + params(("id" = Uuid, Path, description = "Directive ID")), + request_body = DirectiveAskQuestionRequest, + responses( + (status = 200, description = "Question submitted/answered", body = DirectiveAskQuestionResponse), + (status = 401, description = "Unauthorized"), + (status = 404, description = "Directive not found"), + (status = 408, description = "Question timed out", body = DirectiveAskQuestionResponse), + (status = 503, description = "Database not configured"), + ), + security(("bearer_auth" = []), ("api_key" = [])), + tag = "Directives" +)] +pub async fn ask_directive_question( + State(state): State, + Authenticated(auth): Authenticated, + Path(directive_id): Path, + headers: HeaderMap, + Json(request): Json, +) -> impl IntoResponse { + let Some(ref pool) = state.db_pool else { + return ( + StatusCode::SERVICE_UNAVAILABLE, + Json(ApiError::new("DB_UNAVAILABLE", "Database not configured")), + ) + .into_response(); + }; + + // Determine task_id from auth: try tool key first, fall back to owner-based auth + let task_id = match extract_auth(&state, &headers) { + AuthSource::ToolKey(tid) => tid, + _ => { + return ( + StatusCode::UNAUTHORIZED, + Json(ApiError::new("UNAUTHORIZED", "Directive question endpoints require tool key auth from a directive task")), + ) + .into_response(); + } + }; + + // Verify the directive exists and belongs to this owner + let directive = match repository::get_directive_for_owner(pool, auth.owner_id, directive_id).await { + Ok(Some(d)) => d, + Ok(None) => { + return ( + StatusCode::NOT_FOUND, + Json(ApiError::new("NOT_FOUND", "Directive not found")), + ) + .into_response(); + } + Err(e) => { + tracing::error!("Failed to get directive: {}", e); + return ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(ApiError::new("DB_ERROR", "Failed to get directive")), + ) + .into_response(); + } + }; + + let reconcile_mode = directive.reconcile_mode; + + // Add the question to the shared state (reuses existing supervisor question infrastructure) + // We use directive_id as the "contract_id" field for grouping purposes + let question_id = state.add_supervisor_question( + task_id, + directive_id, // use directive_id in place of contract_id + auth.owner_id, + request.question.clone(), + request.choices.clone(), + request.context.clone(), + request.multi_select, + "directive".to_string(), // question_type = "directive" + ); + + // Broadcast question as task output entry for the task's chat + let question_data = serde_json::json!({ + "question_id": question_id.to_string(), + "choices": request.choices, + "context": request.context, + "multi_select": request.multi_select, + "question_type": "directive", + "directive_id": directive_id.to_string(), + "reconcile_mode": reconcile_mode, + }); + state.broadcast_task_output(TaskOutputNotification { + task_id, + owner_id: Some(auth.owner_id), + message_type: "supervisor_question".to_string(), + content: request.question.clone(), + tool_name: None, + tool_input: Some(question_data.clone()), + is_error: None, + cost_usd: None, + duration_ms: None, + is_partial: false, + }); + + // Persist to database so it appears when reloading the page + let event_data = serde_json::json!({ + "messageType": "supervisor_question", + "content": request.question, + "toolInput": question_data, + }); + let _ = repository::create_task_event( + pool, + task_id, + "output", + None, + None, + Some(event_data), + ).await; + + tracing::info!( + task_id = %task_id, + directive_id = %directive_id, + question_id = %question_id, + reconcile_mode = reconcile_mode, + "Directive task asked question" + ); + + // If reconcile mode: pause the task indefinitely (phaseguard behaviour) + if reconcile_mode { + // Get the task to find its daemon + if let Ok(Some(task)) = repository::get_task_for_owner(pool, task_id, auth.owner_id).await { + if let Some(daemon_id) = task.daemon_id { + let cmd = crate::server::state::DaemonCommand::PauseTask { task_id }; + if let Err(e) = state.send_daemon_command(daemon_id, cmd).await { + tracing::warn!(task_id = %task_id, error = %e, "Failed to pause directive task for reconcile mode"); + } else { + tracing::info!(task_id = %task_id, "Paused directive task for reconcile-mode question"); + } + } + + // Update task status to paused in DB + let update = crate::db::models::UpdateTaskRequest { + status: Some("paused".to_string()), + ..Default::default() + }; + let _ = repository::update_task_for_owner(pool, task_id, auth.owner_id, update).await; + } + + return ( + StatusCode::OK, + Json(DirectiveAskQuestionResponse { + question_id, + response: None, + timed_out: false, + reconcile_paused: true, + }), + ).into_response(); + } + + // Default mode: poll with 60-second timeout + let timeout_duration = std::time::Duration::from_secs(60); + let start = std::time::Instant::now(); + let poll_interval = std::time::Duration::from_millis(500); + + loop { + // Check if response has been submitted + if let Some(response) = state.get_question_response(question_id) { + state.cleanup_question_response(question_id); + + return ( + StatusCode::OK, + Json(DirectiveAskQuestionResponse { + question_id, + response: Some(response.response), + timed_out: false, + reconcile_paused: false, + }), + ).into_response(); + } + + // Check timeout + if start.elapsed() >= timeout_duration { + state.remove_pending_question(question_id); + + return ( + StatusCode::REQUEST_TIMEOUT, + Json(DirectiveAskQuestionResponse { + question_id, + response: None, + timed_out: true, + reconcile_paused: false, + }), + ).into_response(); + } + + tokio::time::sleep(poll_interval).await; + } +} diff --git a/makima/src/server/mod.rs b/makima/src/server/mod.rs index 7110ef8..12d6027 100644 --- a/makima/src/server/mod.rs +++ b/makima/src/server/mod.rs @@ -238,6 +238,8 @@ pub fn make_router(state: SharedState) -> Router { .route("/directives/{id}/steps/{step_id}/skip", post(directives::skip_step)) .route("/directives/{id}/goal", put(directives::update_goal)) .route("/directives/{id}/cleanup-tasks", post(directives::cleanup_tasks)) + .route("/directives/{id}/reconcile-mode", post(directives::set_reconcile_mode)) + .route("/directives/{id}/ask-question", post(directives::ask_directive_question)) // Directive memory endpoints .route("/directives/{id}/memories", get(directives::list_memories).post(directives::set_memory).delete(directives::clear_memories)) .route("/directives/{id}/memories/batch", post(directives::batch_set_memories)) -- cgit v1.2.3