From 50ba4f81aff469b79868136e5b07dfb30f9782e6 Mon Sep 17 00:00:00 2001 From: soryu Date: Sat, 14 Feb 2026 02:39:57 +0000 Subject: WIP: heartbeat checkpoint --- .../20260214100000_directive_reconcile_mode.sql | 4 ++ makima/src/db/models.rs | 8 +++ makima/src/db/repository.rs | 9 ++- makima/src/server/handlers/mesh_supervisor.rs | 79 +++++++++++++++++----- makima/src/server/state.rs | 30 +++++++- 5 files changed, 109 insertions(+), 21 deletions(-) create mode 100644 makima/migrations/20260214100000_directive_reconcile_mode.sql diff --git a/makima/migrations/20260214100000_directive_reconcile_mode.sql b/makima/migrations/20260214100000_directive_reconcile_mode.sql new file mode 100644 index 0000000..a06e8f2 --- /dev/null +++ b/makima/migrations/20260214100000_directive_reconcile_mode.sql @@ -0,0 +1,4 @@ +-- Add reconcile_mode flag to directives table. +-- When true, directive task questions pause execution indefinitely until answered. +-- When false (default), questions timeout after 30 seconds. +ALTER TABLE directives ADD COLUMN IF NOT EXISTS reconcile_mode BOOLEAN NOT NULL DEFAULT false; diff --git a/makima/src/db/models.rs b/makima/src/db/models.rs index 131dffc..360b99d 100644 --- a/makima/src/db/models.rs +++ b/makima/src/db/models.rs @@ -2714,6 +2714,8 @@ pub struct Directive { pub pr_url: Option, pub pr_branch: Option, pub completion_task_id: Option, + /// Whether questions pause execution indefinitely until answered + pub reconcile_mode: bool, pub goal_updated_at: DateTime, pub started_at: Option>, pub version: i32, @@ -2763,6 +2765,8 @@ pub struct DirectiveSummary { pub orchestrator_task_id: Option, pub pr_url: Option, pub completion_task_id: Option, + /// Whether questions pause execution indefinitely until answered + pub reconcile_mode: bool, pub version: i32, pub created_at: DateTime, pub updated_at: DateTime, @@ -2789,6 +2793,8 @@ pub struct CreateDirectiveRequest { pub repository_url: Option, pub local_path: Option, pub base_branch: Option, + /// Whether questions pause execution indefinitely until answered + pub reconcile_mode: Option, } /// Request to update a directive. @@ -2804,6 +2810,8 @@ pub struct UpdateDirectiveRequest { pub orchestrator_task_id: Option, pub pr_url: Option, pub pr_branch: Option, + /// Whether questions pause execution indefinitely until answered + pub reconcile_mode: Option, pub version: Option, } diff --git a/makima/src/db/repository.rs b/makima/src/db/repository.rs index d8168f6..9b476e6 100644 --- a/makima/src/db/repository.rs +++ b/makima/src/db/repository.rs @@ -4929,8 +4929,8 @@ pub async fn create_directive_for_owner( ) -> Result { sqlx::query_as::<_, Directive>( r#" - INSERT INTO directives (owner_id, title, goal, repository_url, local_path, base_branch) - VALUES ($1, $2, $3, $4, $5, $6) + INSERT INTO directives (owner_id, title, goal, repository_url, local_path, base_branch, reconcile_mode) + VALUES ($1, $2, $3, $4, $5, $6, $7) RETURNING * "#, ) @@ -4940,6 +4940,7 @@ pub async fn create_directive_for_owner( .bind(&req.repository_url) .bind(&req.local_path) .bind(&req.base_branch) + .bind(req.reconcile_mode.unwrap_or(false)) .fetch_one(pool) .await } @@ -4992,6 +4993,7 @@ pub async fn list_directives_for_owner( SELECT d.id, d.owner_id, d.title, d.goal, d.status, d.repository_url, d.orchestrator_task_id, d.pr_url, d.completion_task_id, + d.reconcile_mode, d.version, d.created_at, d.updated_at, COALESCE(s.total_steps, 0) as total_steps, COALESCE(s.completed_steps, 0) as completed_steps, @@ -5055,12 +5057,14 @@ pub async fn update_directive_for_owner( let orchestrator_task_id = req.orchestrator_task_id.or(current.orchestrator_task_id); let pr_url = req.pr_url.as_deref().or(current.pr_url.as_deref()); let pr_branch = req.pr_branch.as_deref().or(current.pr_branch.as_deref()); + let reconcile_mode = req.reconcile_mode.unwrap_or(current.reconcile_mode); let result = sqlx::query_as::<_, Directive>( r#" UPDATE directives SET title = $3, goal = $4, status = $5, repository_url = $6, local_path = $7, base_branch = $8, orchestrator_task_id = $9, pr_url = $10, pr_branch = $11, + reconcile_mode = $12, version = version + 1, updated_at = NOW() WHERE id = $1 AND owner_id = $2 RETURNING * @@ -5077,6 +5081,7 @@ pub async fn update_directive_for_owner( .bind(orchestrator_task_id) .bind(pr_url) .bind(pr_branch) + .bind(reconcile_mode) .fetch_optional(pool) .await .map_err(RepositoryError::Database)?; diff --git a/makima/src/server/handlers/mesh_supervisor.rs b/makima/src/server/handlers/mesh_supervisor.rs index c9cb849..7d4ab46 100644 --- a/makima/src/server/handlers/mesh_supervisor.rs +++ b/makima/src/server/handlers/mesh_supervisor.rs @@ -129,6 +129,9 @@ pub struct PendingQuestionSummary { pub question_id: Uuid, pub task_id: Uuid, pub contract_id: Uuid, + /// Directive this question relates to (if from a directive task) + #[serde(skip_serializing_if = "Option::is_none")] + pub directive_id: Option, pub question: String, pub choices: Vec, pub context: Option, @@ -1694,17 +1697,43 @@ pub async fn ask_question( } }; - let Some(contract_id) = supervisor.contract_id else { + // Determine context: contract or directive + let contract_id = supervisor.contract_id; + let directive_id = supervisor.directive_id; + + if contract_id.is_none() && directive_id.is_none() { return ( StatusCode::BAD_REQUEST, - Json(ApiError::new("NO_CONTRACT", "Supervisor has no associated contract")), + Json(ApiError::new("NO_CONTEXT", "Supervisor has no associated contract or directive")), ).into_response(); + } + + let is_directive_context = directive_id.is_some() && contract_id.is_none(); + + // For directive context, check reconcile_mode to determine behavior + let directive_reconcile_mode = if let Some(did) = directive_id { + if is_directive_context { + match repository::get_directive_for_owner(pool, owner_id, did).await { + Ok(Some(d)) => d.reconcile_mode, + Ok(None) => false, + Err(e) => { + tracing::warn!(error = %e, "Failed to get directive for reconcile_mode check"); + false + } + } + } else { + false + } + } else { + false }; - // Add the question - let question_id = state.add_supervisor_question( + // Add the question (use Uuid::nil() for contract_id in directive-only context) + let effective_contract_id = contract_id.unwrap_or(Uuid::nil()); + let question_id = state.add_supervisor_question_with_directive( supervisor_id, - contract_id, + effective_contract_id, + directive_id, owner_id, request.question.clone(), request.choices.clone(), @@ -1714,15 +1743,18 @@ pub async fn ask_question( ); // Save state: question asked is a key save point (Task 3.3) - let pending_question = PendingQuestion { - id: question_id, - question: request.question.clone(), - choices: request.choices.clone(), - context: request.context.clone(), - question_type: request.question_type.clone(), - asked_at: chrono::Utc::now(), - }; - save_state_on_question_asked(pool, contract_id, pending_question).await; + // Only for contract context — directive tasks don't use supervisor_states table + if let Some(cid) = contract_id { + let pending_question = PendingQuestion { + id: question_id, + question: request.question.clone(), + choices: request.choices.clone(), + context: request.context.clone(), + question_type: request.question_type.clone(), + asked_at: chrono::Utc::now(), + }; + save_state_on_question_asked(pool, cid, pending_question).await; + } // Broadcast question as task output entry for the task's chat let question_data = serde_json::json!({ @@ -1775,9 +1807,10 @@ pub async fn ask_question( ).into_response(); } - // If phaseguard is enabled, pause the supervisor task and return + // If phaseguard is enabled (or directive reconcile mode), pause the supervisor task and return // The task will be auto-resumed when a message is sent to it (e.g., when user answers) - if request.phaseguard { + let use_phaseguard = request.phaseguard || (is_directive_context && directive_reconcile_mode); + if use_phaseguard { // Pause the supervisor task if let Some(daemon_id) = supervisor.daemon_id { let cmd = DaemonCommand::PauseTask { task_id: supervisor_id }; @@ -1808,7 +1841,13 @@ pub async fn ask_question( } // Poll for response with timeout - let timeout_duration = std::time::Duration::from_secs(request.timeout_seconds.max(1) as u64); + // For directive tasks without reconcile mode, use 30s default timeout + let timeout_secs = if is_directive_context && !directive_reconcile_mode { + 30 + } else { + request.timeout_seconds.max(1) as u64 + }; + let timeout_duration = std::time::Duration::from_secs(timeout_secs); let start = std::time::Instant::now(); let poll_interval = std::time::Duration::from_millis(500); @@ -1819,7 +1858,10 @@ pub async fn ask_question( state.cleanup_question_response(question_id); // Clear pending question from supervisor state (Task 3.3) - clear_pending_question(pool, contract_id, question_id).await; + // Skip for directive context — no supervisor_states for directives + if let Some(cid) = contract_id { + clear_pending_question(pool, cid, question_id).await; + } return ( StatusCode::OK, @@ -1880,6 +1922,7 @@ pub async fn list_pending_questions( question_id: q.question_id, task_id: q.task_id, contract_id: q.contract_id, + directive_id: q.directive_id, question: q.question, choices: q.choices, context: q.context, diff --git a/makima/src/server/state.rs b/makima/src/server/state.rs index 58e8545..41c336e 100644 --- a/makima/src/server/state.rs +++ b/makima/src/server/state.rs @@ -142,8 +142,11 @@ pub struct SupervisorQuestionNotification { pub question_id: Uuid, /// Supervisor task that asked the question pub task_id: Uuid, - /// Contract this question relates to + /// Contract this question relates to (Uuid::nil() for directive context) pub contract_id: Uuid, + /// Directive this question relates to (if from a directive task) + #[serde(skip_serializing_if = "Option::is_none")] + pub directive_id: Option, /// Owner ID for data isolation #[serde(skip)] pub owner_id: Option, @@ -170,6 +173,8 @@ pub struct PendingSupervisorQuestion { pub question_id: Uuid, pub task_id: Uuid, pub contract_id: Uuid, + /// Directive this question relates to (if from a directive task) + pub directive_id: Option, pub owner_id: Uuid, pub question: String, pub choices: Vec, @@ -818,6 +823,25 @@ impl AppState { context: Option, multi_select: bool, question_type: String, + ) -> Uuid { + self.add_supervisor_question_with_directive( + task_id, contract_id, None, owner_id, + question, choices, context, multi_select, question_type, + ) + } + + /// Add a pending supervisor question with optional directive context and broadcast it. + pub fn add_supervisor_question_with_directive( + &self, + task_id: Uuid, + contract_id: Uuid, + directive_id: Option, + owner_id: Uuid, + question: String, + choices: Vec, + context: Option, + multi_select: bool, + question_type: String, ) -> Uuid { let question_id = Uuid::new_v4(); let now = chrono::Utc::now(); @@ -829,6 +853,7 @@ impl AppState { question_id, task_id, contract_id, + directive_id, owner_id, question: question.clone(), choices: choices.clone(), @@ -844,6 +869,7 @@ impl AppState { question_id, task_id, contract_id, + directive_id, owner_id: Some(owner_id), question, choices, @@ -857,6 +883,7 @@ impl AppState { question_id = %question_id, task_id = %task_id, contract_id = %contract_id, + directive_id = ?directive_id, question_type = %question_type, "Supervisor question added" ); @@ -904,6 +931,7 @@ impl AppState { question_id, task_id: question.1.task_id, contract_id: question.1.contract_id, + directive_id: question.1.directive_id, owner_id: Some(question.1.owner_id), question: question.1.question, choices: question.1.choices, -- cgit v1.2.3