diff options
| author | soryu <soryu@soryu.co> | 2026-01-11 05:52:14 +0000 |
|---|---|---|
| committer | soryu <soryu@soryu.co> | 2026-01-15 00:21:16 +0000 |
| commit | 87044a747b47bd83249d61a45842c7f7b2eae56d (patch) | |
| tree | ef2000ce79ffcc2723ef841acef5aa1deb1d5378 /makima/src/daemon/db | |
| parent | 077820c4167c168072d217a1b01df840463a12a8 (diff) | |
| download | soryu-87044a747b47bd83249d61a45842c7f7b2eae56d.tar.gz soryu-87044a747b47bd83249d61a45842c7f7b2eae56d.zip | |
Contract system
Diffstat (limited to 'makima/src/daemon/db')
| -rw-r--r-- | makima/src/daemon/db/local.rs | 391 | ||||
| -rw-r--r-- | makima/src/daemon/db/mod.rs | 5 |
2 files changed, 396 insertions, 0 deletions
diff --git a/makima/src/daemon/db/local.rs b/makima/src/daemon/db/local.rs new file mode 100644 index 0000000..f3ed45a --- /dev/null +++ b/makima/src/daemon/db/local.rs @@ -0,0 +1,391 @@ +//! Local SQLite database for crash recovery and state persistence. + +use std::path::Path; + +use chrono::{DateTime, Utc}; +use rusqlite::{params, Connection, Result as SqliteResult}; +use uuid::Uuid; + +use crate::daemon::task::TaskState; + +/// Local task record for persistence. +#[derive(Debug, Clone)] +pub struct LocalTask { + pub id: Uuid, + pub server_task_id: Uuid, + pub state: TaskState, + pub container_id: Option<String>, + pub overlay_path: Option<String>, + pub repo_url: Option<String>, + pub base_branch: Option<String>, + pub plan: String, + pub created_at: DateTime<Utc>, + pub started_at: Option<DateTime<Utc>>, + pub completed_at: Option<DateTime<Utc>>, + pub error_message: Option<String>, +} + +/// Buffered output for reliable delivery. +#[derive(Debug, Clone)] +pub struct BufferedOutput { + pub id: i64, + pub task_id: Uuid, + pub output: String, + pub is_partial: bool, + pub timestamp: DateTime<Utc>, +} + +/// Local database for daemon state persistence. +pub struct LocalDb { + conn: Connection, +} + +impl LocalDb { + /// Open or create the local database. + pub fn open(path: &Path) -> SqliteResult<Self> { + // Create parent directory if needed + if let Some(parent) = path.parent() { + std::fs::create_dir_all(parent).ok(); + } + + let conn = Connection::open(path)?; + + // Initialize schema + conn.execute_batch(Self::schema())?; + + Ok(Self { conn }) + } + + /// Open an in-memory database (for testing). + #[cfg(test)] + pub fn open_memory() -> SqliteResult<Self> { + let conn = Connection::open_in_memory()?; + conn.execute_batch(Self::schema())?; + Ok(Self { conn }) + } + + /// Database schema. + fn schema() -> &'static str { + r#" + -- Local task state for crash recovery + CREATE TABLE IF NOT EXISTS tasks ( + id TEXT PRIMARY KEY, + server_task_id TEXT NOT NULL, + state TEXT NOT NULL, + container_id TEXT, + overlay_path TEXT, + repo_url TEXT, + base_branch TEXT, + plan TEXT NOT NULL, + created_at TEXT NOT NULL, + started_at TEXT, + completed_at TEXT, + error_message TEXT + ); + + -- Buffered output for reliable delivery + CREATE TABLE IF NOT EXISTS output_buffer ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + task_id TEXT NOT NULL, + output TEXT NOT NULL, + is_partial INTEGER NOT NULL, + timestamp TEXT NOT NULL, + sent INTEGER NOT NULL DEFAULT 0 + ); + + -- Daemon state key-value store + CREATE TABLE IF NOT EXISTS daemon_state ( + key TEXT PRIMARY KEY, + value TEXT NOT NULL, + updated_at TEXT NOT NULL + ); + + -- Indexes + CREATE INDEX IF NOT EXISTS idx_tasks_state ON tasks(state); + CREATE INDEX IF NOT EXISTS idx_output_buffer_sent ON output_buffer(sent, id); + CREATE INDEX IF NOT EXISTS idx_output_buffer_task ON output_buffer(task_id); + "# + } + + /// Save a task. + pub fn save_task(&self, task: &LocalTask) -> SqliteResult<()> { + self.conn.execute( + r#" + INSERT OR REPLACE INTO tasks + (id, server_task_id, state, container_id, overlay_path, repo_url, base_branch, plan, created_at, started_at, completed_at, error_message) + VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12) + "#, + params![ + task.id.to_string(), + task.server_task_id.to_string(), + task.state.as_str(), + task.container_id, + task.overlay_path, + task.repo_url, + task.base_branch, + task.plan, + task.created_at.to_rfc3339(), + task.started_at.map(|t| t.to_rfc3339()), + task.completed_at.map(|t| t.to_rfc3339()), + task.error_message, + ], + )?; + Ok(()) + } + + /// Get a task by ID. + pub fn get_task(&self, id: Uuid) -> SqliteResult<Option<LocalTask>> { + let mut stmt = self.conn.prepare( + "SELECT id, server_task_id, state, container_id, overlay_path, repo_url, base_branch, plan, created_at, started_at, completed_at, error_message FROM tasks WHERE id = ?1", + )?; + + let mut rows = stmt.query(params![id.to_string()])?; + + if let Some(row) = rows.next()? { + Ok(Some(Self::task_from_row(row)?)) + } else { + Ok(None) + } + } + + /// Get all running/active tasks (for recovery). + pub fn get_active_tasks(&self) -> SqliteResult<Vec<LocalTask>> { + let mut stmt = self.conn.prepare( + r#" + SELECT id, server_task_id, state, container_id, overlay_path, repo_url, base_branch, plan, created_at, started_at, completed_at, error_message + FROM tasks + WHERE state IN ('initializing', 'starting', 'running', 'paused', 'blocked') + "#, + )?; + + let rows = stmt.query_map([], |row| Self::task_from_row(row))?; + + rows.collect() + } + + /// Delete a task. + pub fn delete_task(&self, id: Uuid) -> SqliteResult<()> { + self.conn.execute( + "DELETE FROM tasks WHERE id = ?1", + params![id.to_string()], + )?; + Ok(()) + } + + /// Update task state. + pub fn update_task_state(&self, id: Uuid, state: TaskState) -> SqliteResult<()> { + self.conn.execute( + "UPDATE tasks SET state = ?2 WHERE id = ?1", + params![id.to_string(), state.as_str()], + )?; + Ok(()) + } + + /// Buffer output for reliable delivery. + pub fn buffer_output(&self, task_id: Uuid, output: &str, is_partial: bool) -> SqliteResult<i64> { + self.conn.execute( + r#" + INSERT INTO output_buffer (task_id, output, is_partial, timestamp, sent) + VALUES (?1, ?2, ?3, datetime('now'), 0) + "#, + params![task_id.to_string(), output, is_partial as i32], + )?; + Ok(self.conn.last_insert_rowid()) + } + + /// Get unsent outputs. + pub fn get_unsent_outputs(&self, limit: i64) -> SqliteResult<Vec<BufferedOutput>> { + let mut stmt = self.conn.prepare( + r#" + SELECT id, task_id, output, is_partial, timestamp + FROM output_buffer + WHERE sent = 0 + ORDER BY id + LIMIT ?1 + "#, + )?; + + let rows = stmt.query_map(params![limit], |row| { + let id: i64 = row.get(0)?; + let task_id_str: String = row.get(1)?; + let task_id = Uuid::parse_str(&task_id_str).unwrap_or_default(); + let output: String = row.get(2)?; + let is_partial: i32 = row.get(3)?; + let timestamp_str: String = row.get(4)?; + let timestamp = DateTime::parse_from_rfc3339(×tamp_str) + .map(|dt| dt.with_timezone(&Utc)) + .unwrap_or_else(|_| Utc::now()); + + Ok(BufferedOutput { + id, + task_id, + output, + is_partial: is_partial != 0, + timestamp, + }) + })?; + + rows.collect() + } + + /// Mark outputs as sent. + pub fn mark_outputs_sent(&self, ids: &[i64]) -> SqliteResult<()> { + if ids.is_empty() { + return Ok(()); + } + + let placeholders: Vec<&str> = ids.iter().map(|_| "?").collect(); + let sql = format!( + "UPDATE output_buffer SET sent = 1 WHERE id IN ({})", + placeholders.join(",") + ); + + let params: Vec<rusqlite::types::Value> = ids + .iter() + .map(|id| rusqlite::types::Value::Integer(*id)) + .collect(); + + self.conn.execute(&sql, rusqlite::params_from_iter(params))?; + Ok(()) + } + + /// Clean up old sent outputs. + pub fn cleanup_sent_outputs(&self, older_than_hours: i64) -> SqliteResult<usize> { + let result = self.conn.execute( + r#" + DELETE FROM output_buffer + WHERE sent = 1 AND timestamp < datetime('now', ?1 || ' hours') + "#, + params![format!("-{}", older_than_hours)], + )?; + Ok(result) + } + + /// Get daemon state value. + pub fn get_state(&self, key: &str) -> SqliteResult<Option<String>> { + let mut stmt = self.conn.prepare( + "SELECT value FROM daemon_state WHERE key = ?1", + )?; + + let mut rows = stmt.query(params![key])?; + + if let Some(row) = rows.next()? { + let value: String = row.get(0)?; + Ok(Some(value)) + } else { + Ok(None) + } + } + + /// Set daemon state value. + pub fn set_state(&self, key: &str, value: &str) -> SqliteResult<()> { + self.conn.execute( + r#" + INSERT OR REPLACE INTO daemon_state (key, value, updated_at) + VALUES (?1, ?2, datetime('now')) + "#, + params![key, value], + )?; + Ok(()) + } + + /// Parse a task from a database row. + fn task_from_row(row: &rusqlite::Row) -> SqliteResult<LocalTask> { + let id_str: String = row.get(0)?; + let server_task_id_str: String = row.get(1)?; + let state_str: String = row.get(2)?; + let container_id: Option<String> = row.get(3)?; + let overlay_path: Option<String> = row.get(4)?; + let repo_url: Option<String> = row.get(5)?; + let base_branch: Option<String> = row.get(6)?; + let plan: String = row.get(7)?; + let created_at_str: String = row.get(8)?; + let started_at_str: Option<String> = row.get(9)?; + let completed_at_str: Option<String> = row.get(10)?; + let error_message: Option<String> = row.get(11)?; + + let id = Uuid::parse_str(&id_str).unwrap_or_default(); + let server_task_id = Uuid::parse_str(&server_task_id_str).unwrap_or_default(); + let state = TaskState::from_str(&state_str).unwrap_or_default(); + let created_at = DateTime::parse_from_rfc3339(&created_at_str) + .map(|dt| dt.with_timezone(&Utc)) + .unwrap_or_else(|_| Utc::now()); + let started_at = started_at_str + .and_then(|s| DateTime::parse_from_rfc3339(&s).ok()) + .map(|dt| dt.with_timezone(&Utc)); + let completed_at = completed_at_str + .and_then(|s| DateTime::parse_from_rfc3339(&s).ok()) + .map(|dt| dt.with_timezone(&Utc)); + + Ok(LocalTask { + id, + server_task_id, + state, + container_id, + overlay_path, + repo_url, + base_branch, + plan, + created_at, + started_at, + completed_at, + error_message, + }) + } +} + +#[cfg(test)] +mod tests { + use crate::daemon::*; + + #[test] + fn test_open_memory() { + let db = LocalDb::open_memory().unwrap(); + assert!(db.get_active_tasks().unwrap().is_empty()); + } + + #[test] + fn test_save_and_get_task() { + let db = LocalDb::open_memory().unwrap(); + + let task = LocalTask { + id: Uuid::new_v4(), + server_task_id: Uuid::new_v4(), + state: TaskState::Running, + container_id: Some("abc123".to_string()), + overlay_path: Some("/tmp/overlay".to_string()), + repo_url: Some("https://github.com/test/repo".to_string()), + base_branch: Some("main".to_string()), + plan: "Build the feature".to_string(), + created_at: Utc::now(), + started_at: Some(Utc::now()), + completed_at: None, + error_message: None, + }; + + db.save_task(&task).unwrap(); + + let loaded = db.get_task(task.id).unwrap().unwrap(); + assert_eq!(loaded.id, task.id); + assert_eq!(loaded.state, TaskState::Running); + assert_eq!(loaded.plan, "Build the feature"); + } + + #[test] + fn test_output_buffer() { + let db = LocalDb::open_memory().unwrap(); + let task_id = Uuid::new_v4(); + + db.buffer_output(task_id, "line 1", false).unwrap(); + db.buffer_output(task_id, "line 2", false).unwrap(); + + let unsent = db.get_unsent_outputs(10).unwrap(); + assert_eq!(unsent.len(), 2); + + let ids: Vec<i64> = unsent.iter().map(|o| o.id).collect(); + db.mark_outputs_sent(&ids).unwrap(); + + let unsent = db.get_unsent_outputs(10).unwrap(); + assert!(unsent.is_empty()); + } +} diff --git a/makima/src/daemon/db/mod.rs b/makima/src/daemon/db/mod.rs new file mode 100644 index 0000000..2c6e0f3 --- /dev/null +++ b/makima/src/daemon/db/mod.rs @@ -0,0 +1,5 @@ +//! Local database for daemon state persistence. + +pub mod local; + +pub use local::{BufferedOutput, LocalDb, LocalTask}; |
