summaryrefslogtreecommitdiff
path: root/makima/src/daemon/db/local.rs
blob: f3ed45ad0851661012787089b0e47ffd7c46649e (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
//! Local SQLite database for crash recovery and state persistence.

use std::path::Path;

use chrono::{DateTime, Utc};
use rusqlite::{params, Connection, Result as SqliteResult};
use uuid::Uuid;

use crate::daemon::task::TaskState;

/// Local task record for persistence.
#[derive(Debug, Clone)]
pub struct LocalTask {
    pub id: Uuid,
    pub server_task_id: Uuid,
    pub state: TaskState,
    pub container_id: Option<String>,
    pub overlay_path: Option<String>,
    pub repo_url: Option<String>,
    pub base_branch: Option<String>,
    pub plan: String,
    pub created_at: DateTime<Utc>,
    pub started_at: Option<DateTime<Utc>>,
    pub completed_at: Option<DateTime<Utc>>,
    pub error_message: Option<String>,
}

/// Buffered output for reliable delivery.
#[derive(Debug, Clone)]
pub struct BufferedOutput {
    pub id: i64,
    pub task_id: Uuid,
    pub output: String,
    pub is_partial: bool,
    pub timestamp: DateTime<Utc>,
}

/// Local database for daemon state persistence.
pub struct LocalDb {
    conn: Connection,
}

impl LocalDb {
    /// Open or create the local database.
    pub fn open(path: &Path) -> SqliteResult<Self> {
        // Create parent directory if needed
        if let Some(parent) = path.parent() {
            std::fs::create_dir_all(parent).ok();
        }

        let conn = Connection::open(path)?;

        // Initialize schema
        conn.execute_batch(Self::schema())?;

        Ok(Self { conn })
    }

    /// Open an in-memory database (for testing).
    #[cfg(test)]
    pub fn open_memory() -> SqliteResult<Self> {
        let conn = Connection::open_in_memory()?;
        conn.execute_batch(Self::schema())?;
        Ok(Self { conn })
    }

    /// Database schema.
    fn schema() -> &'static str {
        r#"
        -- Local task state for crash recovery
        CREATE TABLE IF NOT EXISTS tasks (
            id TEXT PRIMARY KEY,
            server_task_id TEXT NOT NULL,
            state TEXT NOT NULL,
            container_id TEXT,
            overlay_path TEXT,
            repo_url TEXT,
            base_branch TEXT,
            plan TEXT NOT NULL,
            created_at TEXT NOT NULL,
            started_at TEXT,
            completed_at TEXT,
            error_message TEXT
        );

        -- Buffered output for reliable delivery
        CREATE TABLE IF NOT EXISTS output_buffer (
            id INTEGER PRIMARY KEY AUTOINCREMENT,
            task_id TEXT NOT NULL,
            output TEXT NOT NULL,
            is_partial INTEGER NOT NULL,
            timestamp TEXT NOT NULL,
            sent INTEGER NOT NULL DEFAULT 0
        );

        -- Daemon state key-value store
        CREATE TABLE IF NOT EXISTS daemon_state (
            key TEXT PRIMARY KEY,
            value TEXT NOT NULL,
            updated_at TEXT NOT NULL
        );

        -- Indexes
        CREATE INDEX IF NOT EXISTS idx_tasks_state ON tasks(state);
        CREATE INDEX IF NOT EXISTS idx_output_buffer_sent ON output_buffer(sent, id);
        CREATE INDEX IF NOT EXISTS idx_output_buffer_task ON output_buffer(task_id);
        "#
    }

    /// Save a task.
    pub fn save_task(&self, task: &LocalTask) -> SqliteResult<()> {
        self.conn.execute(
            r#"
            INSERT OR REPLACE INTO tasks
            (id, server_task_id, state, container_id, overlay_path, repo_url, base_branch, plan, created_at, started_at, completed_at, error_message)
            VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12)
            "#,
            params![
                task.id.to_string(),
                task.server_task_id.to_string(),
                task.state.as_str(),
                task.container_id,
                task.overlay_path,
                task.repo_url,
                task.base_branch,
                task.plan,
                task.created_at.to_rfc3339(),
                task.started_at.map(|t| t.to_rfc3339()),
                task.completed_at.map(|t| t.to_rfc3339()),
                task.error_message,
            ],
        )?;
        Ok(())
    }

    /// Get a task by ID.
    pub fn get_task(&self, id: Uuid) -> SqliteResult<Option<LocalTask>> {
        let mut stmt = self.conn.prepare(
            "SELECT id, server_task_id, state, container_id, overlay_path, repo_url, base_branch, plan, created_at, started_at, completed_at, error_message FROM tasks WHERE id = ?1",
        )?;

        let mut rows = stmt.query(params![id.to_string()])?;

        if let Some(row) = rows.next()? {
            Ok(Some(Self::task_from_row(row)?))
        } else {
            Ok(None)
        }
    }

    /// Get all running/active tasks (for recovery).
    pub fn get_active_tasks(&self) -> SqliteResult<Vec<LocalTask>> {
        let mut stmt = self.conn.prepare(
            r#"
            SELECT id, server_task_id, state, container_id, overlay_path, repo_url, base_branch, plan, created_at, started_at, completed_at, error_message
            FROM tasks
            WHERE state IN ('initializing', 'starting', 'running', 'paused', 'blocked')
            "#,
        )?;

        let rows = stmt.query_map([], |row| Self::task_from_row(row))?;

        rows.collect()
    }

    /// Delete a task.
    pub fn delete_task(&self, id: Uuid) -> SqliteResult<()> {
        self.conn.execute(
            "DELETE FROM tasks WHERE id = ?1",
            params![id.to_string()],
        )?;
        Ok(())
    }

    /// Update task state.
    pub fn update_task_state(&self, id: Uuid, state: TaskState) -> SqliteResult<()> {
        self.conn.execute(
            "UPDATE tasks SET state = ?2 WHERE id = ?1",
            params![id.to_string(), state.as_str()],
        )?;
        Ok(())
    }

    /// Buffer output for reliable delivery.
    pub fn buffer_output(&self, task_id: Uuid, output: &str, is_partial: bool) -> SqliteResult<i64> {
        self.conn.execute(
            r#"
            INSERT INTO output_buffer (task_id, output, is_partial, timestamp, sent)
            VALUES (?1, ?2, ?3, datetime('now'), 0)
            "#,
            params![task_id.to_string(), output, is_partial as i32],
        )?;
        Ok(self.conn.last_insert_rowid())
    }

    /// Get unsent outputs.
    pub fn get_unsent_outputs(&self, limit: i64) -> SqliteResult<Vec<BufferedOutput>> {
        let mut stmt = self.conn.prepare(
            r#"
            SELECT id, task_id, output, is_partial, timestamp
            FROM output_buffer
            WHERE sent = 0
            ORDER BY id
            LIMIT ?1
            "#,
        )?;

        let rows = stmt.query_map(params![limit], |row| {
            let id: i64 = row.get(0)?;
            let task_id_str: String = row.get(1)?;
            let task_id = Uuid::parse_str(&task_id_str).unwrap_or_default();
            let output: String = row.get(2)?;
            let is_partial: i32 = row.get(3)?;
            let timestamp_str: String = row.get(4)?;
            let timestamp = DateTime::parse_from_rfc3339(&timestamp_str)
                .map(|dt| dt.with_timezone(&Utc))
                .unwrap_or_else(|_| Utc::now());

            Ok(BufferedOutput {
                id,
                task_id,
                output,
                is_partial: is_partial != 0,
                timestamp,
            })
        })?;

        rows.collect()
    }

    /// Mark outputs as sent.
    pub fn mark_outputs_sent(&self, ids: &[i64]) -> SqliteResult<()> {
        if ids.is_empty() {
            return Ok(());
        }

        let placeholders: Vec<&str> = ids.iter().map(|_| "?").collect();
        let sql = format!(
            "UPDATE output_buffer SET sent = 1 WHERE id IN ({})",
            placeholders.join(",")
        );

        let params: Vec<rusqlite::types::Value> = ids
            .iter()
            .map(|id| rusqlite::types::Value::Integer(*id))
            .collect();

        self.conn.execute(&sql, rusqlite::params_from_iter(params))?;
        Ok(())
    }

    /// Clean up old sent outputs.
    pub fn cleanup_sent_outputs(&self, older_than_hours: i64) -> SqliteResult<usize> {
        let result = self.conn.execute(
            r#"
            DELETE FROM output_buffer
            WHERE sent = 1 AND timestamp < datetime('now', ?1 || ' hours')
            "#,
            params![format!("-{}", older_than_hours)],
        )?;
        Ok(result)
    }

    /// Get daemon state value.
    pub fn get_state(&self, key: &str) -> SqliteResult<Option<String>> {
        let mut stmt = self.conn.prepare(
            "SELECT value FROM daemon_state WHERE key = ?1",
        )?;

        let mut rows = stmt.query(params![key])?;

        if let Some(row) = rows.next()? {
            let value: String = row.get(0)?;
            Ok(Some(value))
        } else {
            Ok(None)
        }
    }

    /// Set daemon state value.
    pub fn set_state(&self, key: &str, value: &str) -> SqliteResult<()> {
        self.conn.execute(
            r#"
            INSERT OR REPLACE INTO daemon_state (key, value, updated_at)
            VALUES (?1, ?2, datetime('now'))
            "#,
            params![key, value],
        )?;
        Ok(())
    }

    /// Parse a task from a database row.
    fn task_from_row(row: &rusqlite::Row) -> SqliteResult<LocalTask> {
        let id_str: String = row.get(0)?;
        let server_task_id_str: String = row.get(1)?;
        let state_str: String = row.get(2)?;
        let container_id: Option<String> = row.get(3)?;
        let overlay_path: Option<String> = row.get(4)?;
        let repo_url: Option<String> = row.get(5)?;
        let base_branch: Option<String> = row.get(6)?;
        let plan: String = row.get(7)?;
        let created_at_str: String = row.get(8)?;
        let started_at_str: Option<String> = row.get(9)?;
        let completed_at_str: Option<String> = row.get(10)?;
        let error_message: Option<String> = row.get(11)?;

        let id = Uuid::parse_str(&id_str).unwrap_or_default();
        let server_task_id = Uuid::parse_str(&server_task_id_str).unwrap_or_default();
        let state = TaskState::from_str(&state_str).unwrap_or_default();
        let created_at = DateTime::parse_from_rfc3339(&created_at_str)
            .map(|dt| dt.with_timezone(&Utc))
            .unwrap_or_else(|_| Utc::now());
        let started_at = started_at_str
            .and_then(|s| DateTime::parse_from_rfc3339(&s).ok())
            .map(|dt| dt.with_timezone(&Utc));
        let completed_at = completed_at_str
            .and_then(|s| DateTime::parse_from_rfc3339(&s).ok())
            .map(|dt| dt.with_timezone(&Utc));

        Ok(LocalTask {
            id,
            server_task_id,
            state,
            container_id,
            overlay_path,
            repo_url,
            base_branch,
            plan,
            created_at,
            started_at,
            completed_at,
            error_message,
        })
    }
}

#[cfg(test)]
mod tests {
    use crate::daemon::*;

    #[test]
    fn test_open_memory() {
        let db = LocalDb::open_memory().unwrap();
        assert!(db.get_active_tasks().unwrap().is_empty());
    }

    #[test]
    fn test_save_and_get_task() {
        let db = LocalDb::open_memory().unwrap();

        let task = LocalTask {
            id: Uuid::new_v4(),
            server_task_id: Uuid::new_v4(),
            state: TaskState::Running,
            container_id: Some("abc123".to_string()),
            overlay_path: Some("/tmp/overlay".to_string()),
            repo_url: Some("https://github.com/test/repo".to_string()),
            base_branch: Some("main".to_string()),
            plan: "Build the feature".to_string(),
            created_at: Utc::now(),
            started_at: Some(Utc::now()),
            completed_at: None,
            error_message: None,
        };

        db.save_task(&task).unwrap();

        let loaded = db.get_task(task.id).unwrap().unwrap();
        assert_eq!(loaded.id, task.id);
        assert_eq!(loaded.state, TaskState::Running);
        assert_eq!(loaded.plan, "Build the feature");
    }

    #[test]
    fn test_output_buffer() {
        let db = LocalDb::open_memory().unwrap();
        let task_id = Uuid::new_v4();

        db.buffer_output(task_id, "line 1", false).unwrap();
        db.buffer_output(task_id, "line 2", false).unwrap();

        let unsent = db.get_unsent_outputs(10).unwrap();
        assert_eq!(unsent.len(), 2);

        let ids: Vec<i64> = unsent.iter().map(|o| o.id).collect();
        db.mark_outputs_sent(&ids).unwrap();

        let unsent = db.get_unsent_outputs(10).unwrap();
        assert!(unsent.is_empty());
    }
}