summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorsoryu <soryu@soryu.co>2026-02-14 02:39:57 +0000
committersoryu <soryu@soryu.co>2026-02-14 02:39:57 +0000
commit50ba4f81aff469b79868136e5b07dfb30f9782e6 (patch)
tree8a32f19dbdec7f6b1da6481bd950ab3cc18439de
parentc1e55ce4fec79f9909b957f86bd7fa8b76939746 (diff)
downloadsoryu-50ba4f81aff469b79868136e5b07dfb30f9782e6.tar.gz
soryu-50ba4f81aff469b79868136e5b07dfb30f9782e6.zip
WIP: heartbeat checkpoint
-rw-r--r--makima/migrations/20260214100000_directive_reconcile_mode.sql4
-rw-r--r--makima/src/db/models.rs8
-rw-r--r--makima/src/db/repository.rs9
-rw-r--r--makima/src/server/handlers/mesh_supervisor.rs79
-rw-r--r--makima/src/server/state.rs30
5 files changed, 109 insertions, 21 deletions
diff --git a/makima/migrations/20260214100000_directive_reconcile_mode.sql b/makima/migrations/20260214100000_directive_reconcile_mode.sql
new file mode 100644
index 0000000..a06e8f2
--- /dev/null
+++ b/makima/migrations/20260214100000_directive_reconcile_mode.sql
@@ -0,0 +1,4 @@
+-- Add reconcile_mode flag to directives table.
+-- When true, directive task questions pause execution indefinitely until answered.
+-- When false (default), questions timeout after 30 seconds.
+ALTER TABLE directives ADD COLUMN IF NOT EXISTS reconcile_mode BOOLEAN NOT NULL DEFAULT false;
diff --git a/makima/src/db/models.rs b/makima/src/db/models.rs
index 131dffc..360b99d 100644
--- a/makima/src/db/models.rs
+++ b/makima/src/db/models.rs
@@ -2714,6 +2714,8 @@ pub struct Directive {
pub pr_url: Option<String>,
pub pr_branch: Option<String>,
pub completion_task_id: Option<Uuid>,
+ /// Whether questions pause execution indefinitely until answered
+ pub reconcile_mode: bool,
pub goal_updated_at: DateTime<Utc>,
pub started_at: Option<DateTime<Utc>>,
pub version: i32,
@@ -2763,6 +2765,8 @@ pub struct DirectiveSummary {
pub orchestrator_task_id: Option<Uuid>,
pub pr_url: Option<String>,
pub completion_task_id: Option<Uuid>,
+ /// Whether questions pause execution indefinitely until answered
+ pub reconcile_mode: bool,
pub version: i32,
pub created_at: DateTime<Utc>,
pub updated_at: DateTime<Utc>,
@@ -2789,6 +2793,8 @@ pub struct CreateDirectiveRequest {
pub repository_url: Option<String>,
pub local_path: Option<String>,
pub base_branch: Option<String>,
+ /// Whether questions pause execution indefinitely until answered
+ pub reconcile_mode: Option<bool>,
}
/// Request to update a directive.
@@ -2804,6 +2810,8 @@ pub struct UpdateDirectiveRequest {
pub orchestrator_task_id: Option<Uuid>,
pub pr_url: Option<String>,
pub pr_branch: Option<String>,
+ /// Whether questions pause execution indefinitely until answered
+ pub reconcile_mode: Option<bool>,
pub version: Option<i32>,
}
diff --git a/makima/src/db/repository.rs b/makima/src/db/repository.rs
index d8168f6..9b476e6 100644
--- a/makima/src/db/repository.rs
+++ b/makima/src/db/repository.rs
@@ -4929,8 +4929,8 @@ pub async fn create_directive_for_owner(
) -> Result<Directive, sqlx::Error> {
sqlx::query_as::<_, Directive>(
r#"
- INSERT INTO directives (owner_id, title, goal, repository_url, local_path, base_branch)
- VALUES ($1, $2, $3, $4, $5, $6)
+ INSERT INTO directives (owner_id, title, goal, repository_url, local_path, base_branch, reconcile_mode)
+ VALUES ($1, $2, $3, $4, $5, $6, $7)
RETURNING *
"#,
)
@@ -4940,6 +4940,7 @@ pub async fn create_directive_for_owner(
.bind(&req.repository_url)
.bind(&req.local_path)
.bind(&req.base_branch)
+ .bind(req.reconcile_mode.unwrap_or(false))
.fetch_one(pool)
.await
}
@@ -4992,6 +4993,7 @@ pub async fn list_directives_for_owner(
SELECT
d.id, d.owner_id, d.title, d.goal, d.status, d.repository_url,
d.orchestrator_task_id, d.pr_url, d.completion_task_id,
+ d.reconcile_mode,
d.version, d.created_at, d.updated_at,
COALESCE(s.total_steps, 0) as total_steps,
COALESCE(s.completed_steps, 0) as completed_steps,
@@ -5055,12 +5057,14 @@ pub async fn update_directive_for_owner(
let orchestrator_task_id = req.orchestrator_task_id.or(current.orchestrator_task_id);
let pr_url = req.pr_url.as_deref().or(current.pr_url.as_deref());
let pr_branch = req.pr_branch.as_deref().or(current.pr_branch.as_deref());
+ let reconcile_mode = req.reconcile_mode.unwrap_or(current.reconcile_mode);
let result = sqlx::query_as::<_, Directive>(
r#"
UPDATE directives
SET title = $3, goal = $4, status = $5, repository_url = $6, local_path = $7,
base_branch = $8, orchestrator_task_id = $9, pr_url = $10, pr_branch = $11,
+ reconcile_mode = $12,
version = version + 1, updated_at = NOW()
WHERE id = $1 AND owner_id = $2
RETURNING *
@@ -5077,6 +5081,7 @@ pub async fn update_directive_for_owner(
.bind(orchestrator_task_id)
.bind(pr_url)
.bind(pr_branch)
+ .bind(reconcile_mode)
.fetch_optional(pool)
.await
.map_err(RepositoryError::Database)?;
diff --git a/makima/src/server/handlers/mesh_supervisor.rs b/makima/src/server/handlers/mesh_supervisor.rs
index c9cb849..7d4ab46 100644
--- a/makima/src/server/handlers/mesh_supervisor.rs
+++ b/makima/src/server/handlers/mesh_supervisor.rs
@@ -129,6 +129,9 @@ pub struct PendingQuestionSummary {
pub question_id: Uuid,
pub task_id: Uuid,
pub contract_id: Uuid,
+ /// Directive this question relates to (if from a directive task)
+ #[serde(skip_serializing_if = "Option::is_none")]
+ pub directive_id: Option<Uuid>,
pub question: String,
pub choices: Vec<String>,
pub context: Option<String>,
@@ -1694,17 +1697,43 @@ pub async fn ask_question(
}
};
- let Some(contract_id) = supervisor.contract_id else {
+ // Determine context: contract or directive
+ let contract_id = supervisor.contract_id;
+ let directive_id = supervisor.directive_id;
+
+ if contract_id.is_none() && directive_id.is_none() {
return (
StatusCode::BAD_REQUEST,
- Json(ApiError::new("NO_CONTRACT", "Supervisor has no associated contract")),
+ Json(ApiError::new("NO_CONTEXT", "Supervisor has no associated contract or directive")),
).into_response();
+ }
+
+ let is_directive_context = directive_id.is_some() && contract_id.is_none();
+
+ // For directive context, check reconcile_mode to determine behavior
+ let directive_reconcile_mode = if let Some(did) = directive_id {
+ if is_directive_context {
+ match repository::get_directive_for_owner(pool, owner_id, did).await {
+ Ok(Some(d)) => d.reconcile_mode,
+ Ok(None) => false,
+ Err(e) => {
+ tracing::warn!(error = %e, "Failed to get directive for reconcile_mode check");
+ false
+ }
+ }
+ } else {
+ false
+ }
+ } else {
+ false
};
- // Add the question
- let question_id = state.add_supervisor_question(
+ // Add the question (use Uuid::nil() for contract_id in directive-only context)
+ let effective_contract_id = contract_id.unwrap_or(Uuid::nil());
+ let question_id = state.add_supervisor_question_with_directive(
supervisor_id,
- contract_id,
+ effective_contract_id,
+ directive_id,
owner_id,
request.question.clone(),
request.choices.clone(),
@@ -1714,15 +1743,18 @@ pub async fn ask_question(
);
// Save state: question asked is a key save point (Task 3.3)
- let pending_question = PendingQuestion {
- id: question_id,
- question: request.question.clone(),
- choices: request.choices.clone(),
- context: request.context.clone(),
- question_type: request.question_type.clone(),
- asked_at: chrono::Utc::now(),
- };
- save_state_on_question_asked(pool, contract_id, pending_question).await;
+ // Only for contract context — directive tasks don't use supervisor_states table
+ if let Some(cid) = contract_id {
+ let pending_question = PendingQuestion {
+ id: question_id,
+ question: request.question.clone(),
+ choices: request.choices.clone(),
+ context: request.context.clone(),
+ question_type: request.question_type.clone(),
+ asked_at: chrono::Utc::now(),
+ };
+ save_state_on_question_asked(pool, cid, pending_question).await;
+ }
// Broadcast question as task output entry for the task's chat
let question_data = serde_json::json!({
@@ -1775,9 +1807,10 @@ pub async fn ask_question(
).into_response();
}
- // If phaseguard is enabled, pause the supervisor task and return
+ // If phaseguard is enabled (or directive reconcile mode), pause the supervisor task and return
// The task will be auto-resumed when a message is sent to it (e.g., when user answers)
- if request.phaseguard {
+ let use_phaseguard = request.phaseguard || (is_directive_context && directive_reconcile_mode);
+ if use_phaseguard {
// Pause the supervisor task
if let Some(daemon_id) = supervisor.daemon_id {
let cmd = DaemonCommand::PauseTask { task_id: supervisor_id };
@@ -1808,7 +1841,13 @@ pub async fn ask_question(
}
// Poll for response with timeout
- let timeout_duration = std::time::Duration::from_secs(request.timeout_seconds.max(1) as u64);
+ // For directive tasks without reconcile mode, use 30s default timeout
+ let timeout_secs = if is_directive_context && !directive_reconcile_mode {
+ 30
+ } else {
+ request.timeout_seconds.max(1) as u64
+ };
+ let timeout_duration = std::time::Duration::from_secs(timeout_secs);
let start = std::time::Instant::now();
let poll_interval = std::time::Duration::from_millis(500);
@@ -1819,7 +1858,10 @@ pub async fn ask_question(
state.cleanup_question_response(question_id);
// Clear pending question from supervisor state (Task 3.3)
- clear_pending_question(pool, contract_id, question_id).await;
+ // Skip for directive context — no supervisor_states for directives
+ if let Some(cid) = contract_id {
+ clear_pending_question(pool, cid, question_id).await;
+ }
return (
StatusCode::OK,
@@ -1880,6 +1922,7 @@ pub async fn list_pending_questions(
question_id: q.question_id,
task_id: q.task_id,
contract_id: q.contract_id,
+ directive_id: q.directive_id,
question: q.question,
choices: q.choices,
context: q.context,
diff --git a/makima/src/server/state.rs b/makima/src/server/state.rs
index 58e8545..41c336e 100644
--- a/makima/src/server/state.rs
+++ b/makima/src/server/state.rs
@@ -142,8 +142,11 @@ pub struct SupervisorQuestionNotification {
pub question_id: Uuid,
/// Supervisor task that asked the question
pub task_id: Uuid,
- /// Contract this question relates to
+ /// Contract this question relates to (Uuid::nil() for directive context)
pub contract_id: Uuid,
+ /// Directive this question relates to (if from a directive task)
+ #[serde(skip_serializing_if = "Option::is_none")]
+ pub directive_id: Option<Uuid>,
/// Owner ID for data isolation
#[serde(skip)]
pub owner_id: Option<Uuid>,
@@ -170,6 +173,8 @@ pub struct PendingSupervisorQuestion {
pub question_id: Uuid,
pub task_id: Uuid,
pub contract_id: Uuid,
+ /// Directive this question relates to (if from a directive task)
+ pub directive_id: Option<Uuid>,
pub owner_id: Uuid,
pub question: String,
pub choices: Vec<String>,
@@ -819,6 +824,25 @@ impl AppState {
multi_select: bool,
question_type: String,
) -> Uuid {
+ self.add_supervisor_question_with_directive(
+ task_id, contract_id, None, owner_id,
+ question, choices, context, multi_select, question_type,
+ )
+ }
+
+ /// Add a pending supervisor question with optional directive context and broadcast it.
+ pub fn add_supervisor_question_with_directive(
+ &self,
+ task_id: Uuid,
+ contract_id: Uuid,
+ directive_id: Option<Uuid>,
+ owner_id: Uuid,
+ question: String,
+ choices: Vec<String>,
+ context: Option<String>,
+ multi_select: bool,
+ question_type: String,
+ ) -> Uuid {
let question_id = Uuid::new_v4();
let now = chrono::Utc::now();
@@ -829,6 +853,7 @@ impl AppState {
question_id,
task_id,
contract_id,
+ directive_id,
owner_id,
question: question.clone(),
choices: choices.clone(),
@@ -844,6 +869,7 @@ impl AppState {
question_id,
task_id,
contract_id,
+ directive_id,
owner_id: Some(owner_id),
question,
choices,
@@ -857,6 +883,7 @@ impl AppState {
question_id = %question_id,
task_id = %task_id,
contract_id = %contract_id,
+ directive_id = ?directive_id,
question_type = %question_type,
"Supervisor question added"
);
@@ -904,6 +931,7 @@ impl AppState {
question_id,
task_id: question.1.task_id,
contract_id: question.1.contract_id,
+ directive_id: question.1.directive_id,
owner_id: Some(question.1.owner_id),
question: question.1.question,
choices: question.1.choices,