summaryrefslogtreecommitdiff
path: root/makima/src/daemon/db/local.rs
diff options
context:
space:
mode:
authorsoryu <soryu@soryu.co>2026-01-11 05:52:14 +0000
committersoryu <soryu@soryu.co>2026-01-15 00:21:16 +0000
commit87044a747b47bd83249d61a45842c7f7b2eae56d (patch)
treeef2000ce79ffcc2723ef841acef5aa1deb1d5378 /makima/src/daemon/db/local.rs
parent077820c4167c168072d217a1b01df840463a12a8 (diff)
downloadsoryu-87044a747b47bd83249d61a45842c7f7b2eae56d.tar.gz
soryu-87044a747b47bd83249d61a45842c7f7b2eae56d.zip
Contract system
Diffstat (limited to 'makima/src/daemon/db/local.rs')
-rw-r--r--makima/src/daemon/db/local.rs391
1 files changed, 391 insertions, 0 deletions
diff --git a/makima/src/daemon/db/local.rs b/makima/src/daemon/db/local.rs
new file mode 100644
index 0000000..f3ed45a
--- /dev/null
+++ b/makima/src/daemon/db/local.rs
@@ -0,0 +1,391 @@
+//! Local SQLite database for crash recovery and state persistence.
+
+use std::path::Path;
+
+use chrono::{DateTime, Utc};
+use rusqlite::{params, Connection, Result as SqliteResult};
+use uuid::Uuid;
+
+use crate::daemon::task::TaskState;
+
+/// Local task record for persistence.
+#[derive(Debug, Clone)]
+pub struct LocalTask {
+ pub id: Uuid,
+ pub server_task_id: Uuid,
+ pub state: TaskState,
+ pub container_id: Option<String>,
+ pub overlay_path: Option<String>,
+ pub repo_url: Option<String>,
+ pub base_branch: Option<String>,
+ pub plan: String,
+ pub created_at: DateTime<Utc>,
+ pub started_at: Option<DateTime<Utc>>,
+ pub completed_at: Option<DateTime<Utc>>,
+ pub error_message: Option<String>,
+}
+
+/// Buffered output for reliable delivery.
+#[derive(Debug, Clone)]
+pub struct BufferedOutput {
+ pub id: i64,
+ pub task_id: Uuid,
+ pub output: String,
+ pub is_partial: bool,
+ pub timestamp: DateTime<Utc>,
+}
+
+/// Local database for daemon state persistence.
+pub struct LocalDb {
+ conn: Connection,
+}
+
+impl LocalDb {
+ /// Open or create the local database.
+ pub fn open(path: &Path) -> SqliteResult<Self> {
+ // Create parent directory if needed
+ if let Some(parent) = path.parent() {
+ std::fs::create_dir_all(parent).ok();
+ }
+
+ let conn = Connection::open(path)?;
+
+ // Initialize schema
+ conn.execute_batch(Self::schema())?;
+
+ Ok(Self { conn })
+ }
+
+ /// Open an in-memory database (for testing).
+ #[cfg(test)]
+ pub fn open_memory() -> SqliteResult<Self> {
+ let conn = Connection::open_in_memory()?;
+ conn.execute_batch(Self::schema())?;
+ Ok(Self { conn })
+ }
+
+ /// Database schema.
+ fn schema() -> &'static str {
+ r#"
+ -- Local task state for crash recovery
+ CREATE TABLE IF NOT EXISTS tasks (
+ id TEXT PRIMARY KEY,
+ server_task_id TEXT NOT NULL,
+ state TEXT NOT NULL,
+ container_id TEXT,
+ overlay_path TEXT,
+ repo_url TEXT,
+ base_branch TEXT,
+ plan TEXT NOT NULL,
+ created_at TEXT NOT NULL,
+ started_at TEXT,
+ completed_at TEXT,
+ error_message TEXT
+ );
+
+ -- Buffered output for reliable delivery
+ CREATE TABLE IF NOT EXISTS output_buffer (
+ id INTEGER PRIMARY KEY AUTOINCREMENT,
+ task_id TEXT NOT NULL,
+ output TEXT NOT NULL,
+ is_partial INTEGER NOT NULL,
+ timestamp TEXT NOT NULL,
+ sent INTEGER NOT NULL DEFAULT 0
+ );
+
+ -- Daemon state key-value store
+ CREATE TABLE IF NOT EXISTS daemon_state (
+ key TEXT PRIMARY KEY,
+ value TEXT NOT NULL,
+ updated_at TEXT NOT NULL
+ );
+
+ -- Indexes
+ CREATE INDEX IF NOT EXISTS idx_tasks_state ON tasks(state);
+ CREATE INDEX IF NOT EXISTS idx_output_buffer_sent ON output_buffer(sent, id);
+ CREATE INDEX IF NOT EXISTS idx_output_buffer_task ON output_buffer(task_id);
+ "#
+ }
+
+ /// Save a task.
+ pub fn save_task(&self, task: &LocalTask) -> SqliteResult<()> {
+ self.conn.execute(
+ r#"
+ INSERT OR REPLACE INTO tasks
+ (id, server_task_id, state, container_id, overlay_path, repo_url, base_branch, plan, created_at, started_at, completed_at, error_message)
+ VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12)
+ "#,
+ params![
+ task.id.to_string(),
+ task.server_task_id.to_string(),
+ task.state.as_str(),
+ task.container_id,
+ task.overlay_path,
+ task.repo_url,
+ task.base_branch,
+ task.plan,
+ task.created_at.to_rfc3339(),
+ task.started_at.map(|t| t.to_rfc3339()),
+ task.completed_at.map(|t| t.to_rfc3339()),
+ task.error_message,
+ ],
+ )?;
+ Ok(())
+ }
+
+ /// Get a task by ID.
+ pub fn get_task(&self, id: Uuid) -> SqliteResult<Option<LocalTask>> {
+ let mut stmt = self.conn.prepare(
+ "SELECT id, server_task_id, state, container_id, overlay_path, repo_url, base_branch, plan, created_at, started_at, completed_at, error_message FROM tasks WHERE id = ?1",
+ )?;
+
+ let mut rows = stmt.query(params![id.to_string()])?;
+
+ if let Some(row) = rows.next()? {
+ Ok(Some(Self::task_from_row(row)?))
+ } else {
+ Ok(None)
+ }
+ }
+
+ /// Get all running/active tasks (for recovery).
+ pub fn get_active_tasks(&self) -> SqliteResult<Vec<LocalTask>> {
+ let mut stmt = self.conn.prepare(
+ r#"
+ SELECT id, server_task_id, state, container_id, overlay_path, repo_url, base_branch, plan, created_at, started_at, completed_at, error_message
+ FROM tasks
+ WHERE state IN ('initializing', 'starting', 'running', 'paused', 'blocked')
+ "#,
+ )?;
+
+ let rows = stmt.query_map([], |row| Self::task_from_row(row))?;
+
+ rows.collect()
+ }
+
+ /// Delete a task.
+ pub fn delete_task(&self, id: Uuid) -> SqliteResult<()> {
+ self.conn.execute(
+ "DELETE FROM tasks WHERE id = ?1",
+ params![id.to_string()],
+ )?;
+ Ok(())
+ }
+
+ /// Update task state.
+ pub fn update_task_state(&self, id: Uuid, state: TaskState) -> SqliteResult<()> {
+ self.conn.execute(
+ "UPDATE tasks SET state = ?2 WHERE id = ?1",
+ params![id.to_string(), state.as_str()],
+ )?;
+ Ok(())
+ }
+
+ /// Buffer output for reliable delivery.
+ pub fn buffer_output(&self, task_id: Uuid, output: &str, is_partial: bool) -> SqliteResult<i64> {
+ self.conn.execute(
+ r#"
+ INSERT INTO output_buffer (task_id, output, is_partial, timestamp, sent)
+ VALUES (?1, ?2, ?3, datetime('now'), 0)
+ "#,
+ params![task_id.to_string(), output, is_partial as i32],
+ )?;
+ Ok(self.conn.last_insert_rowid())
+ }
+
+ /// Get unsent outputs.
+ pub fn get_unsent_outputs(&self, limit: i64) -> SqliteResult<Vec<BufferedOutput>> {
+ let mut stmt = self.conn.prepare(
+ r#"
+ SELECT id, task_id, output, is_partial, timestamp
+ FROM output_buffer
+ WHERE sent = 0
+ ORDER BY id
+ LIMIT ?1
+ "#,
+ )?;
+
+ let rows = stmt.query_map(params![limit], |row| {
+ let id: i64 = row.get(0)?;
+ let task_id_str: String = row.get(1)?;
+ let task_id = Uuid::parse_str(&task_id_str).unwrap_or_default();
+ let output: String = row.get(2)?;
+ let is_partial: i32 = row.get(3)?;
+ let timestamp_str: String = row.get(4)?;
+ let timestamp = DateTime::parse_from_rfc3339(&timestamp_str)
+ .map(|dt| dt.with_timezone(&Utc))
+ .unwrap_or_else(|_| Utc::now());
+
+ Ok(BufferedOutput {
+ id,
+ task_id,
+ output,
+ is_partial: is_partial != 0,
+ timestamp,
+ })
+ })?;
+
+ rows.collect()
+ }
+
+ /// Mark outputs as sent.
+ pub fn mark_outputs_sent(&self, ids: &[i64]) -> SqliteResult<()> {
+ if ids.is_empty() {
+ return Ok(());
+ }
+
+ let placeholders: Vec<&str> = ids.iter().map(|_| "?").collect();
+ let sql = format!(
+ "UPDATE output_buffer SET sent = 1 WHERE id IN ({})",
+ placeholders.join(",")
+ );
+
+ let params: Vec<rusqlite::types::Value> = ids
+ .iter()
+ .map(|id| rusqlite::types::Value::Integer(*id))
+ .collect();
+
+ self.conn.execute(&sql, rusqlite::params_from_iter(params))?;
+ Ok(())
+ }
+
+ /// Clean up old sent outputs.
+ pub fn cleanup_sent_outputs(&self, older_than_hours: i64) -> SqliteResult<usize> {
+ let result = self.conn.execute(
+ r#"
+ DELETE FROM output_buffer
+ WHERE sent = 1 AND timestamp < datetime('now', ?1 || ' hours')
+ "#,
+ params![format!("-{}", older_than_hours)],
+ )?;
+ Ok(result)
+ }
+
+ /// Get daemon state value.
+ pub fn get_state(&self, key: &str) -> SqliteResult<Option<String>> {
+ let mut stmt = self.conn.prepare(
+ "SELECT value FROM daemon_state WHERE key = ?1",
+ )?;
+
+ let mut rows = stmt.query(params![key])?;
+
+ if let Some(row) = rows.next()? {
+ let value: String = row.get(0)?;
+ Ok(Some(value))
+ } else {
+ Ok(None)
+ }
+ }
+
+ /// Set daemon state value.
+ pub fn set_state(&self, key: &str, value: &str) -> SqliteResult<()> {
+ self.conn.execute(
+ r#"
+ INSERT OR REPLACE INTO daemon_state (key, value, updated_at)
+ VALUES (?1, ?2, datetime('now'))
+ "#,
+ params![key, value],
+ )?;
+ Ok(())
+ }
+
+ /// Parse a task from a database row.
+ fn task_from_row(row: &rusqlite::Row) -> SqliteResult<LocalTask> {
+ let id_str: String = row.get(0)?;
+ let server_task_id_str: String = row.get(1)?;
+ let state_str: String = row.get(2)?;
+ let container_id: Option<String> = row.get(3)?;
+ let overlay_path: Option<String> = row.get(4)?;
+ let repo_url: Option<String> = row.get(5)?;
+ let base_branch: Option<String> = row.get(6)?;
+ let plan: String = row.get(7)?;
+ let created_at_str: String = row.get(8)?;
+ let started_at_str: Option<String> = row.get(9)?;
+ let completed_at_str: Option<String> = row.get(10)?;
+ let error_message: Option<String> = row.get(11)?;
+
+ let id = Uuid::parse_str(&id_str).unwrap_or_default();
+ let server_task_id = Uuid::parse_str(&server_task_id_str).unwrap_or_default();
+ let state = TaskState::from_str(&state_str).unwrap_or_default();
+ let created_at = DateTime::parse_from_rfc3339(&created_at_str)
+ .map(|dt| dt.with_timezone(&Utc))
+ .unwrap_or_else(|_| Utc::now());
+ let started_at = started_at_str
+ .and_then(|s| DateTime::parse_from_rfc3339(&s).ok())
+ .map(|dt| dt.with_timezone(&Utc));
+ let completed_at = completed_at_str
+ .and_then(|s| DateTime::parse_from_rfc3339(&s).ok())
+ .map(|dt| dt.with_timezone(&Utc));
+
+ Ok(LocalTask {
+ id,
+ server_task_id,
+ state,
+ container_id,
+ overlay_path,
+ repo_url,
+ base_branch,
+ plan,
+ created_at,
+ started_at,
+ completed_at,
+ error_message,
+ })
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use crate::daemon::*;
+
+ #[test]
+ fn test_open_memory() {
+ let db = LocalDb::open_memory().unwrap();
+ assert!(db.get_active_tasks().unwrap().is_empty());
+ }
+
+ #[test]
+ fn test_save_and_get_task() {
+ let db = LocalDb::open_memory().unwrap();
+
+ let task = LocalTask {
+ id: Uuid::new_v4(),
+ server_task_id: Uuid::new_v4(),
+ state: TaskState::Running,
+ container_id: Some("abc123".to_string()),
+ overlay_path: Some("/tmp/overlay".to_string()),
+ repo_url: Some("https://github.com/test/repo".to_string()),
+ base_branch: Some("main".to_string()),
+ plan: "Build the feature".to_string(),
+ created_at: Utc::now(),
+ started_at: Some(Utc::now()),
+ completed_at: None,
+ error_message: None,
+ };
+
+ db.save_task(&task).unwrap();
+
+ let loaded = db.get_task(task.id).unwrap().unwrap();
+ assert_eq!(loaded.id, task.id);
+ assert_eq!(loaded.state, TaskState::Running);
+ assert_eq!(loaded.plan, "Build the feature");
+ }
+
+ #[test]
+ fn test_output_buffer() {
+ let db = LocalDb::open_memory().unwrap();
+ let task_id = Uuid::new_v4();
+
+ db.buffer_output(task_id, "line 1", false).unwrap();
+ db.buffer_output(task_id, "line 2", false).unwrap();
+
+ let unsent = db.get_unsent_outputs(10).unwrap();
+ assert_eq!(unsent.len(), 2);
+
+ let ids: Vec<i64> = unsent.iter().map(|o| o.id).collect();
+ db.mark_outputs_sent(&ids).unwrap();
+
+ let unsent = db.get_unsent_outputs(10).unwrap();
+ assert!(unsent.is_empty());
+ }
+}