From 6cb7ad10bb0428865bed1b220845cf60bd4f56e0 Mon Sep 17 00:00:00 2001 From: soryu Date: Sun, 1 Feb 2026 02:01:02 +0000 Subject: [WIP] Heartbeat checkpoint - 2026-02-01 02:01:02 UTC --- makima/src/daemon/cli/monitor.rs | 444 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 444 insertions(+) create mode 100644 makima/src/daemon/cli/monitor.rs (limited to 'makima/src/daemon/cli/monitor.rs') diff --git a/makima/src/daemon/cli/monitor.rs b/makima/src/daemon/cli/monitor.rs new file mode 100644 index 0000000..062c236 --- /dev/null +++ b/makima/src/daemon/cli/monitor.rs @@ -0,0 +1,444 @@ +//! Monitor subcommand - real-time monitoring of contracts. + +use clap::Args; +use uuid::Uuid; + +/// Arguments for monitor command. +#[derive(Args, Debug)] +pub struct MonitorArgs { + /// API URL + #[arg(long, env = "MAKIMA_API_URL", default_value = "https://api.makima.jp")] + pub api_url: String, + + /// API key for authentication + #[arg(long, env = "MAKIMA_API_KEY")] + pub api_key: String, + + /// Contract IDs to monitor (if empty, monitors all active contracts) + #[arg(index = 1)] + pub contract_ids: Vec, + + /// Filter by status (active, completed, pending) + #[arg(long)] + pub status: Option, + + /// Only show stale contracts (no recent activity) + #[arg(long)] + pub stale: bool, + + /// Only show important events (suppress routine updates) + #[arg(long, short = 'q')] + pub quiet: bool, + + /// Output format: tui, text, json + #[arg(long, default_value = "text")] + pub format: OutputFormat, + + /// Poll interval in seconds + #[arg(long, default_value = "5")] + pub poll_interval: u64, + + /// Stale threshold in seconds (contracts with no activity in this time are considered stale) + #[arg(long, default_value = "300")] + pub stale_threshold: u64, +} + +/// Output format for the monitor command. +#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)] +pub enum OutputFormat { + /// Full terminal UI dashboard + Tui, + /// Plain text event stream + #[default] + Text, + /// JSON event stream (for scripting) + Json, +} + +impl std::str::FromStr for OutputFormat { + type Err = String; + + fn from_str(s: &str) -> Result { + match s.to_lowercase().as_str() { + "tui" => Ok(Self::Tui), + "text" => Ok(Self::Text), + "json" => Ok(Self::Json), + _ => Err(format!( + "Invalid format '{}'. Expected: tui, text, json", + s + )), + } + } +} + +/// Contract monitoring state for display. +#[derive(Debug, Clone)] +pub struct ContractState { + pub id: Uuid, + pub name: String, + pub status: String, + pub phase: String, + pub supervisor_state: Option, + pub task_count: i32, + pub running_tasks: i32, + pub pending_tasks: i32, + pub completed_tasks: i32, + pub last_activity: Option, + pub current_activity: Option, + pub has_pending_question: bool, + pub pending_question: Option, +} + +impl ContractState { + /// Parse contract state from API response. + pub fn from_json(value: &serde_json::Value) -> Option { + let id = value + .get("id") + .and_then(|v| v.as_str()) + .and_then(|s| Uuid::parse_str(s).ok())?; + + let name = value + .get("name") + .and_then(|v| v.as_str()) + .unwrap_or("Unnamed") + .to_string(); + + let status = value + .get("status") + .and_then(|v| v.as_str()) + .unwrap_or("unknown") + .to_string(); + + let phase = value + .get("phase") + .and_then(|v| v.as_str()) + .unwrap_or("unknown") + .to_string(); + + // Extract supervisor state from tasks + let tasks = value.get("tasks").and_then(|v| v.as_array()); + let (supervisor_state, running_tasks, pending_tasks, completed_tasks, task_count) = + if let Some(tasks) = tasks { + let mut sv_state = None; + let mut running = 0; + let mut pending = 0; + let mut completed = 0; + + for task in tasks { + let task_status = task.get("status").and_then(|v| v.as_str()); + let role = task.get("role").and_then(|v| v.as_str()); + + if role == Some("supervisor") { + sv_state = task_status.map(|s| s.to_string()); + } + + match task_status { + Some("running") | Some("working") => running += 1, + Some("pending") | Some("queued") => pending += 1, + Some("done") | Some("completed") | Some("merged") => completed += 1, + _ => {} + } + } + + (sv_state, running, pending, completed, tasks.len() as i32) + } else { + (None, 0, 0, 0, 0) + }; + + // Extract last activity time + let last_activity = value + .get("updatedAt") + .and_then(|v| v.as_str()) + .map(|s| s.to_string()); + + // Check for pending questions + let pending_question_value = value.get("pendingQuestion"); + let has_pending_question = pending_question_value + .map(|v| !v.is_null()) + .unwrap_or(false); + let pending_question = pending_question_value + .and_then(|v| v.get("question")) + .and_then(|v| v.as_str()) + .map(|s| s.to_string()); + + // Get current activity from supervisor or most recent task + let current_activity = tasks.and_then(|tasks| { + // Find the most recently updated running task + tasks + .iter() + .filter(|t| { + let status = t.get("status").and_then(|v| v.as_str()); + status == Some("running") || status == Some("working") + }) + .max_by_key(|t| t.get("updatedAt").and_then(|v| v.as_str())) + .and_then(|t| { + let name = t.get("name").and_then(|v| v.as_str())?; + let progress = t.get("progressSummary").and_then(|v| v.as_str()); + Some(match progress { + Some(p) if !p.is_empty() => format!("{}: {}", name, p), + _ => name.to_string(), + }) + }) + }); + + Some(Self { + id, + name, + status, + phase, + supervisor_state, + task_count, + running_tasks, + pending_tasks, + completed_tasks, + last_activity, + current_activity, + has_pending_question, + pending_question, + }) + } + + /// Check if this contract is stale (no recent activity). + pub fn is_stale(&self, threshold_secs: u64) -> bool { + if let Some(ref last) = self.last_activity { + if let Ok(last_time) = chrono::DateTime::parse_from_rfc3339(last) { + let now = chrono::Utc::now(); + let duration = now.signed_duration_since(last_time); + return duration.num_seconds() > threshold_secs as i64; + } + } + // If we can't parse the time, assume it's stale + true + } + + /// Get a status symbol for display. + pub fn status_symbol(&self) -> &'static str { + if self.has_pending_question { + "?" + } else if self.running_tasks > 0 { + "*" + } else if self.status == "completed" { + "+" + } else if self.status == "active" { + "-" + } else { + " " + } + } + + /// Get a colored status for terminal output. + pub fn status_color(&self) -> &'static str { + if self.has_pending_question { + "\x1b[33m" // Yellow + } else if self.running_tasks > 0 { + "\x1b[32m" // Green + } else if self.status == "completed" { + "\x1b[34m" // Blue + } else if self.status == "active" { + "\x1b[0m" // Default + } else { + "\x1b[90m" // Gray + } + } +} + +/// Event emitted during monitoring. +#[derive(Debug, Clone, serde::Serialize)] +#[serde(tag = "type", rename_all = "snake_case")] +pub enum MonitorEvent { + /// Initial snapshot of contract state + Snapshot { + contracts: Vec, + timestamp: String, + }, + /// Contract status changed + StatusChange { + contract_id: Uuid, + contract_name: String, + old_status: String, + new_status: String, + timestamp: String, + }, + /// Task started + TaskStarted { + contract_id: Uuid, + contract_name: String, + task_id: Uuid, + task_name: String, + timestamp: String, + }, + /// Task completed + TaskCompleted { + contract_id: Uuid, + contract_name: String, + task_id: Uuid, + task_name: String, + result: String, + timestamp: String, + }, + /// Question pending (needs user input) + QuestionPending { + contract_id: Uuid, + contract_name: String, + question: String, + timestamp: String, + }, + /// Phase changed + PhaseChange { + contract_id: Uuid, + contract_name: String, + old_phase: String, + new_phase: String, + timestamp: String, + }, + /// Contract became stale + ContractStale { + contract_id: Uuid, + contract_name: String, + last_activity: String, + timestamp: String, + }, + /// Error during monitoring + Error { + message: String, + timestamp: String, + }, +} + +/// Snapshot of contract state for events. +#[derive(Debug, Clone, serde::Serialize)] +pub struct ContractSnapshot { + pub id: Uuid, + pub name: String, + pub status: String, + pub phase: String, + pub running_tasks: i32, + pub pending_tasks: i32, + pub completed_tasks: i32, + pub has_pending_question: bool, +} + +impl From<&ContractState> for ContractSnapshot { + fn from(state: &ContractState) -> Self { + Self { + id: state.id, + name: state.name.clone(), + status: state.status.clone(), + phase: state.phase.clone(), + running_tasks: state.running_tasks, + pending_tasks: state.pending_tasks, + completed_tasks: state.completed_tasks, + has_pending_question: state.has_pending_question, + } + } +} + +impl MonitorEvent { + /// Format event for text output. + pub fn to_text(&self) -> String { + match self { + Self::Snapshot { contracts, timestamp } => { + let mut lines = vec![format!("[{}] Monitoring {} contracts:", timestamp, contracts.len())]; + for c in contracts { + let status_indicator = if c.has_pending_question { + "?" + } else if c.running_tasks > 0 { + "*" + } else { + "-" + }; + lines.push(format!( + " {} {} ({}) - {} | tasks: {} running, {} pending, {} done", + status_indicator, + c.name, + c.phase, + c.status, + c.running_tasks, + c.pending_tasks, + c.completed_tasks + )); + } + lines.join("\n") + } + Self::StatusChange { + contract_name, + old_status, + new_status, + timestamp, + .. + } => { + format!( + "[{}] {} status: {} -> {}", + timestamp, contract_name, old_status, new_status + ) + } + Self::TaskStarted { + contract_name, + task_name, + timestamp, + .. + } => { + format!("[{}] {} task started: {}", timestamp, contract_name, task_name) + } + Self::TaskCompleted { + contract_name, + task_name, + result, + timestamp, + .. + } => { + format!( + "[{}] {} task completed: {} ({})", + timestamp, contract_name, task_name, result + ) + } + Self::QuestionPending { + contract_name, + question, + timestamp, + .. + } => { + format!( + "[{}] {} NEEDS INPUT: {}", + timestamp, contract_name, question + ) + } + Self::PhaseChange { + contract_name, + old_phase, + new_phase, + timestamp, + .. + } => { + format!( + "[{}] {} phase: {} -> {}", + timestamp, contract_name, old_phase, new_phase + ) + } + Self::ContractStale { + contract_name, + last_activity, + timestamp, + .. + } => { + format!( + "[{}] {} is STALE (last activity: {})", + timestamp, contract_name, last_activity + ) + } + Self::Error { message, timestamp } => { + format!("[{}] ERROR: {}", timestamp, message) + } + } + } + + /// Format event for JSON output. + pub fn to_json(&self) -> String { + serde_json::to_string(self).unwrap_or_else(|_| "{}".to_string()) + } +} + +/// Get current timestamp in ISO format. +pub fn now_timestamp() -> String { + chrono::Utc::now().to_rfc3339() +} -- cgit v1.2.3