diff options
| author | soryu <soryu@soryu.co> | 2026-02-01 02:01:02 +0000 |
|---|---|---|
| committer | soryu <soryu@soryu.co> | 2026-02-01 02:01:02 +0000 |
| commit | 6cb7ad10bb0428865bed1b220845cf60bd4f56e0 (patch) | |
| tree | f295b760974c8a75a2489b7d19974ccf6a4ffe8d | |
| parent | 7567153e6281b94e39e52be5d060b381ed69597d (diff) | |
| download | soryu-makima/task-task-275e8de2-275e8de2.tar.gz soryu-makima/task-task-275e8de2-275e8de2.zip | |
[WIP] Heartbeat checkpoint - 2026-02-01 02:01:02 UTCmakima/task-task-275e8de2-275e8de2
| -rw-r--r-- | makima/src/bin/makima.rs | 244 | ||||
| -rw-r--r-- | makima/src/daemon/cli/mod.rs | 22 | ||||
| -rw-r--r-- | makima/src/daemon/cli/monitor.rs | 444 |
3 files changed, 709 insertions, 1 deletions
diff --git a/makima/src/bin/makima.rs b/makima/src/bin/makima.rs index ac577b8..d09a1bd 100644 --- a/makima/src/bin/makima.rs +++ b/makima/src/bin/makima.rs @@ -6,7 +6,7 @@ use std::sync::Arc; use makima::daemon::api::{ApiClient, CreateContractRequest}; use makima::daemon::cli::{ - Cli, CliConfig, Commands, ConfigCommand, ContractCommand, RedTeamCommand, SupervisorCommand, ViewArgs, + Cli, CliConfig, Commands, ConfigCommand, ContractCommand, ContractsCommand, RedTeamCommand, SupervisorCommand, ViewArgs, }; use makima::daemon::tui::{self, Action, App, ListItem, ViewType, TuiWsClient, WsEvent, OutputLine, OutputMessageType, WsConnectionState, RepositorySuggestion}; use makima::daemon::config::{DaemonConfig, RepoEntry}; @@ -31,6 +31,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> { Commands::View(args) => run_view(args).await, Commands::Config(cmd) => run_config(cmd).await, Commands::RedTeam(cmd) => run_red_team(cmd).await, + Commands::Contracts(cmd) => run_contracts(cmd).await, } } @@ -805,6 +806,247 @@ async fn run_red_team(cmd: RedTeamCommand) -> Result<(), Box<dyn std::error::Err } } +/// Run contracts commands (multi-contract operations). +async fn run_contracts(cmd: ContractsCommand) -> Result<(), Box<dyn std::error::Error + Send + Sync>> { + use makima::daemon::cli::monitor::{ContractState, MonitorEvent, OutputFormat, now_timestamp, ContractSnapshot}; + use std::collections::HashMap; + + match cmd { + ContractsCommand::Monitor(args) => { + let client = ApiClient::new(args.api_url, args.api_key)?; + let poll_interval = std::time::Duration::from_secs(args.poll_interval); + let stale_threshold = args.stale_threshold; + + // Track previous state for change detection + let mut prev_states: HashMap<uuid::Uuid, ContractState> = HashMap::new(); + let mut first_run = true; + + eprintln!("Starting contract monitor (poll interval: {}s, Ctrl+C to stop)...", args.poll_interval); + + loop { + // Fetch contracts + let result = match client.list_contracts().await { + Ok(r) => r, + Err(e) => { + let event = MonitorEvent::Error { + message: format!("Failed to fetch contracts: {}", e), + timestamp: now_timestamp(), + }; + emit_event(&event, args.format); + tokio::time::sleep(poll_interval).await; + continue; + } + }; + + let contracts = result.0.get("contracts") + .and_then(|v| v.as_array()) + .cloned() + .unwrap_or_default(); + + // Parse contract states + let mut current_states: Vec<ContractState> = Vec::new(); + for contract_json in &contracts { + // If specific contract IDs were provided, filter to those + if !args.contract_ids.is_empty() { + let contract_id = contract_json + .get("id") + .and_then(|v| v.as_str()) + .and_then(|s| uuid::Uuid::parse_str(s).ok()); + if let Some(id) = contract_id { + if !args.contract_ids.contains(&id) { + continue; + } + } else { + continue; + } + } + + // For full state, we need to fetch individual contract details + let contract_id = match contract_json + .get("id") + .and_then(|v| v.as_str()) + .and_then(|s| uuid::Uuid::parse_str(s).ok()) + { + Some(id) => id, + None => continue, + }; + + // Get full contract details including tasks + let full_contract = match client.get_contract(contract_id).await { + Ok(r) => r.0, + Err(_) => contract_json.clone(), + }; + + if let Some(state) = ContractState::from_json(&full_contract) { + // Apply filters + if let Some(ref status_filter) = args.status { + if state.status != *status_filter { + continue; + } + } + + if args.stale && !state.is_stale(stale_threshold) { + continue; + } + + current_states.push(state); + } + } + + // On first run, emit a snapshot + if first_run { + let snapshots: Vec<ContractSnapshot> = current_states + .iter() + .map(ContractSnapshot::from) + .collect(); + let event = MonitorEvent::Snapshot { + contracts: snapshots, + timestamp: now_timestamp(), + }; + emit_event(&event, args.format); + first_run = false; + + // Store initial states + for state in ¤t_states { + prev_states.insert(state.id, state.clone()); + } + } else { + // Detect changes and emit events + for state in ¤t_states { + if let Some(prev) = prev_states.get(&state.id) { + // Check for status change + if prev.status != state.status { + let event = MonitorEvent::StatusChange { + contract_id: state.id, + contract_name: state.name.clone(), + old_status: prev.status.clone(), + new_status: state.status.clone(), + timestamp: now_timestamp(), + }; + emit_event(&event, args.format); + } + + // Check for phase change + if prev.phase != state.phase { + let event = MonitorEvent::PhaseChange { + contract_id: state.id, + contract_name: state.name.clone(), + old_phase: prev.phase.clone(), + new_phase: state.phase.clone(), + timestamp: now_timestamp(), + }; + emit_event(&event, args.format); + } + + // Check for new pending question + if !prev.has_pending_question && state.has_pending_question { + if let Some(ref question) = state.pending_question { + let event = MonitorEvent::QuestionPending { + contract_id: state.id, + contract_name: state.name.clone(), + question: question.clone(), + timestamp: now_timestamp(), + }; + emit_event(&event, args.format); + } + } + + // Check for task count changes (detect started/completed tasks) + if state.running_tasks > prev.running_tasks && !args.quiet { + // A task started + let event = MonitorEvent::TaskStarted { + contract_id: state.id, + contract_name: state.name.clone(), + task_id: state.id, // We don't have the actual task ID here + task_name: state.current_activity.clone().unwrap_or_else(|| "Unknown".to_string()), + timestamp: now_timestamp(), + }; + emit_event(&event, args.format); + } + + if state.completed_tasks > prev.completed_tasks && !args.quiet { + // A task completed + let event = MonitorEvent::TaskCompleted { + contract_id: state.id, + contract_name: state.name.clone(), + task_id: state.id, + task_name: "Task".to_string(), + result: "completed".to_string(), + timestamp: now_timestamp(), + }; + emit_event(&event, args.format); + } + + // Check for staleness + if !prev.is_stale(stale_threshold) && state.is_stale(stale_threshold) { + let event = MonitorEvent::ContractStale { + contract_id: state.id, + contract_name: state.name.clone(), + last_activity: state.last_activity.clone().unwrap_or_else(|| "unknown".to_string()), + timestamp: now_timestamp(), + }; + emit_event(&event, args.format); + } + } else { + // New contract appeared - emit as snapshot of just this contract + if !args.quiet { + let event = MonitorEvent::Snapshot { + contracts: vec![ContractSnapshot::from(state)], + timestamp: now_timestamp(), + }; + emit_event(&event, args.format); + } + } + + // Update state + prev_states.insert(state.id, state.clone()); + } + } + + // Wait for next poll + tokio::time::sleep(poll_interval).await; + } + } + } +} + +/// Emit a monitor event in the specified format. +fn emit_event(event: &makima::daemon::cli::monitor::MonitorEvent, format: makima::daemon::cli::monitor::OutputFormat) { + use makima::daemon::cli::monitor::OutputFormat; + match format { + OutputFormat::Text => { + println!("{}", event.to_text()); + } + OutputFormat::Json => { + println!("{}", event.to_json()); + } + OutputFormat::Tui => { + // For now, TUI mode falls back to text with colors + let text = event.to_text(); + // Add color based on event type + let colored = match event { + makima::daemon::cli::monitor::MonitorEvent::QuestionPending { .. } => { + format!("\x1b[33m{}\x1b[0m", text) // Yellow + } + makima::daemon::cli::monitor::MonitorEvent::Error { .. } => { + format!("\x1b[31m{}\x1b[0m", text) // Red + } + makima::daemon::cli::monitor::MonitorEvent::ContractStale { .. } => { + format!("\x1b[90m{}\x1b[0m", text) // Gray + } + makima::daemon::cli::monitor::MonitorEvent::TaskStarted { .. } => { + format!("\x1b[32m{}\x1b[0m", text) // Green + } + makima::daemon::cli::monitor::MonitorEvent::TaskCompleted { .. } => { + format!("\x1b[34m{}\x1b[0m", text) // Blue + } + _ => text, + }; + println!("{}", colored); + } + } +} + /// Load contracts from API async fn load_contracts(client: &ApiClient) -> Result<Vec<ListItem>, Box<dyn std::error::Error + Send + Sync>> { let result = client.list_contracts().await?; diff --git a/makima/src/daemon/cli/mod.rs b/makima/src/daemon/cli/mod.rs index c848e8e..624a02f 100644 --- a/makima/src/daemon/cli/mod.rs +++ b/makima/src/daemon/cli/mod.rs @@ -3,6 +3,7 @@ pub mod config; pub mod contract; pub mod daemon; +pub mod monitor; pub mod red_team; pub mod server; pub mod supervisor; @@ -14,6 +15,7 @@ use uuid::Uuid; pub use config::CliConfig; pub use contract::ContractArgs; pub use daemon::DaemonArgs; +pub use monitor::MonitorArgs; pub use red_team::handle_notify; pub use server::ServerArgs; pub use supervisor::SupervisorArgs; @@ -65,6 +67,10 @@ pub enum Commands { /// Red team commands for adversarial monitoring #[command(name = "red-team", subcommand)] RedTeam(RedTeamCommand), + + /// Commands for monitoring and managing multiple contracts + #[command(subcommand)] + Contracts(ContractsCommand), } /// Config subcommands for CLI configuration. @@ -211,6 +217,22 @@ pub enum RedTeamCommand { Notify(RedTeamNotifyArgs), } +/// Contracts subcommands for monitoring and managing multiple contracts. +#[derive(Subcommand, Debug)] +pub enum ContractsCommand { + /// Real-time monitoring of contracts + /// + /// Polls for contract changes and displays updates in real-time. + /// Supports filtering by contract IDs, status, and staleness. + /// + /// Examples: + /// makima contracts monitor # Monitor all active contracts + /// makima contracts monitor abc-123 def-456 # Monitor specific contracts + /// makima contracts monitor --stale # Only show stale contracts + /// makima contracts monitor --format json # JSON output for scripting + Monitor(MonitorArgs), +} + /// Arguments for red-team notify command. #[derive(Args, Debug)] pub struct RedTeamNotifyArgs { diff --git a/makima/src/daemon/cli/monitor.rs b/makima/src/daemon/cli/monitor.rs new file mode 100644 index 0000000..062c236 --- /dev/null +++ b/makima/src/daemon/cli/monitor.rs @@ -0,0 +1,444 @@ +//! Monitor subcommand - real-time monitoring of contracts. + +use clap::Args; +use uuid::Uuid; + +/// Arguments for monitor command. +#[derive(Args, Debug)] +pub struct MonitorArgs { + /// API URL + #[arg(long, env = "MAKIMA_API_URL", default_value = "https://api.makima.jp")] + pub api_url: String, + + /// API key for authentication + #[arg(long, env = "MAKIMA_API_KEY")] + pub api_key: String, + + /// Contract IDs to monitor (if empty, monitors all active contracts) + #[arg(index = 1)] + pub contract_ids: Vec<Uuid>, + + /// Filter by status (active, completed, pending) + #[arg(long)] + pub status: Option<String>, + + /// Only show stale contracts (no recent activity) + #[arg(long)] + pub stale: bool, + + /// Only show important events (suppress routine updates) + #[arg(long, short = 'q')] + pub quiet: bool, + + /// Output format: tui, text, json + #[arg(long, default_value = "text")] + pub format: OutputFormat, + + /// Poll interval in seconds + #[arg(long, default_value = "5")] + pub poll_interval: u64, + + /// Stale threshold in seconds (contracts with no activity in this time are considered stale) + #[arg(long, default_value = "300")] + pub stale_threshold: u64, +} + +/// Output format for the monitor command. +#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)] +pub enum OutputFormat { + /// Full terminal UI dashboard + Tui, + /// Plain text event stream + #[default] + Text, + /// JSON event stream (for scripting) + Json, +} + +impl std::str::FromStr for OutputFormat { + type Err = String; + + fn from_str(s: &str) -> Result<Self, Self::Err> { + match s.to_lowercase().as_str() { + "tui" => Ok(Self::Tui), + "text" => Ok(Self::Text), + "json" => Ok(Self::Json), + _ => Err(format!( + "Invalid format '{}'. Expected: tui, text, json", + s + )), + } + } +} + +/// Contract monitoring state for display. +#[derive(Debug, Clone)] +pub struct ContractState { + pub id: Uuid, + pub name: String, + pub status: String, + pub phase: String, + pub supervisor_state: Option<String>, + pub task_count: i32, + pub running_tasks: i32, + pub pending_tasks: i32, + pub completed_tasks: i32, + pub last_activity: Option<String>, + pub current_activity: Option<String>, + pub has_pending_question: bool, + pub pending_question: Option<String>, +} + +impl ContractState { + /// Parse contract state from API response. + pub fn from_json(value: &serde_json::Value) -> Option<Self> { + let id = value + .get("id") + .and_then(|v| v.as_str()) + .and_then(|s| Uuid::parse_str(s).ok())?; + + let name = value + .get("name") + .and_then(|v| v.as_str()) + .unwrap_or("Unnamed") + .to_string(); + + let status = value + .get("status") + .and_then(|v| v.as_str()) + .unwrap_or("unknown") + .to_string(); + + let phase = value + .get("phase") + .and_then(|v| v.as_str()) + .unwrap_or("unknown") + .to_string(); + + // Extract supervisor state from tasks + let tasks = value.get("tasks").and_then(|v| v.as_array()); + let (supervisor_state, running_tasks, pending_tasks, completed_tasks, task_count) = + if let Some(tasks) = tasks { + let mut sv_state = None; + let mut running = 0; + let mut pending = 0; + let mut completed = 0; + + for task in tasks { + let task_status = task.get("status").and_then(|v| v.as_str()); + let role = task.get("role").and_then(|v| v.as_str()); + + if role == Some("supervisor") { + sv_state = task_status.map(|s| s.to_string()); + } + + match task_status { + Some("running") | Some("working") => running += 1, + Some("pending") | Some("queued") => pending += 1, + Some("done") | Some("completed") | Some("merged") => completed += 1, + _ => {} + } + } + + (sv_state, running, pending, completed, tasks.len() as i32) + } else { + (None, 0, 0, 0, 0) + }; + + // Extract last activity time + let last_activity = value + .get("updatedAt") + .and_then(|v| v.as_str()) + .map(|s| s.to_string()); + + // Check for pending questions + let pending_question_value = value.get("pendingQuestion"); + let has_pending_question = pending_question_value + .map(|v| !v.is_null()) + .unwrap_or(false); + let pending_question = pending_question_value + .and_then(|v| v.get("question")) + .and_then(|v| v.as_str()) + .map(|s| s.to_string()); + + // Get current activity from supervisor or most recent task + let current_activity = tasks.and_then(|tasks| { + // Find the most recently updated running task + tasks + .iter() + .filter(|t| { + let status = t.get("status").and_then(|v| v.as_str()); + status == Some("running") || status == Some("working") + }) + .max_by_key(|t| t.get("updatedAt").and_then(|v| v.as_str())) + .and_then(|t| { + let name = t.get("name").and_then(|v| v.as_str())?; + let progress = t.get("progressSummary").and_then(|v| v.as_str()); + Some(match progress { + Some(p) if !p.is_empty() => format!("{}: {}", name, p), + _ => name.to_string(), + }) + }) + }); + + Some(Self { + id, + name, + status, + phase, + supervisor_state, + task_count, + running_tasks, + pending_tasks, + completed_tasks, + last_activity, + current_activity, + has_pending_question, + pending_question, + }) + } + + /// Check if this contract is stale (no recent activity). + pub fn is_stale(&self, threshold_secs: u64) -> bool { + if let Some(ref last) = self.last_activity { + if let Ok(last_time) = chrono::DateTime::parse_from_rfc3339(last) { + let now = chrono::Utc::now(); + let duration = now.signed_duration_since(last_time); + return duration.num_seconds() > threshold_secs as i64; + } + } + // If we can't parse the time, assume it's stale + true + } + + /// Get a status symbol for display. + pub fn status_symbol(&self) -> &'static str { + if self.has_pending_question { + "?" + } else if self.running_tasks > 0 { + "*" + } else if self.status == "completed" { + "+" + } else if self.status == "active" { + "-" + } else { + " " + } + } + + /// Get a colored status for terminal output. + pub fn status_color(&self) -> &'static str { + if self.has_pending_question { + "\x1b[33m" // Yellow + } else if self.running_tasks > 0 { + "\x1b[32m" // Green + } else if self.status == "completed" { + "\x1b[34m" // Blue + } else if self.status == "active" { + "\x1b[0m" // Default + } else { + "\x1b[90m" // Gray + } + } +} + +/// Event emitted during monitoring. +#[derive(Debug, Clone, serde::Serialize)] +#[serde(tag = "type", rename_all = "snake_case")] +pub enum MonitorEvent { + /// Initial snapshot of contract state + Snapshot { + contracts: Vec<ContractSnapshot>, + timestamp: String, + }, + /// Contract status changed + StatusChange { + contract_id: Uuid, + contract_name: String, + old_status: String, + new_status: String, + timestamp: String, + }, + /// Task started + TaskStarted { + contract_id: Uuid, + contract_name: String, + task_id: Uuid, + task_name: String, + timestamp: String, + }, + /// Task completed + TaskCompleted { + contract_id: Uuid, + contract_name: String, + task_id: Uuid, + task_name: String, + result: String, + timestamp: String, + }, + /// Question pending (needs user input) + QuestionPending { + contract_id: Uuid, + contract_name: String, + question: String, + timestamp: String, + }, + /// Phase changed + PhaseChange { + contract_id: Uuid, + contract_name: String, + old_phase: String, + new_phase: String, + timestamp: String, + }, + /// Contract became stale + ContractStale { + contract_id: Uuid, + contract_name: String, + last_activity: String, + timestamp: String, + }, + /// Error during monitoring + Error { + message: String, + timestamp: String, + }, +} + +/// Snapshot of contract state for events. +#[derive(Debug, Clone, serde::Serialize)] +pub struct ContractSnapshot { + pub id: Uuid, + pub name: String, + pub status: String, + pub phase: String, + pub running_tasks: i32, + pub pending_tasks: i32, + pub completed_tasks: i32, + pub has_pending_question: bool, +} + +impl From<&ContractState> for ContractSnapshot { + fn from(state: &ContractState) -> Self { + Self { + id: state.id, + name: state.name.clone(), + status: state.status.clone(), + phase: state.phase.clone(), + running_tasks: state.running_tasks, + pending_tasks: state.pending_tasks, + completed_tasks: state.completed_tasks, + has_pending_question: state.has_pending_question, + } + } +} + +impl MonitorEvent { + /// Format event for text output. + pub fn to_text(&self) -> String { + match self { + Self::Snapshot { contracts, timestamp } => { + let mut lines = vec![format!("[{}] Monitoring {} contracts:", timestamp, contracts.len())]; + for c in contracts { + let status_indicator = if c.has_pending_question { + "?" + } else if c.running_tasks > 0 { + "*" + } else { + "-" + }; + lines.push(format!( + " {} {} ({}) - {} | tasks: {} running, {} pending, {} done", + status_indicator, + c.name, + c.phase, + c.status, + c.running_tasks, + c.pending_tasks, + c.completed_tasks + )); + } + lines.join("\n") + } + Self::StatusChange { + contract_name, + old_status, + new_status, + timestamp, + .. + } => { + format!( + "[{}] {} status: {} -> {}", + timestamp, contract_name, old_status, new_status + ) + } + Self::TaskStarted { + contract_name, + task_name, + timestamp, + .. + } => { + format!("[{}] {} task started: {}", timestamp, contract_name, task_name) + } + Self::TaskCompleted { + contract_name, + task_name, + result, + timestamp, + .. + } => { + format!( + "[{}] {} task completed: {} ({})", + timestamp, contract_name, task_name, result + ) + } + Self::QuestionPending { + contract_name, + question, + timestamp, + .. + } => { + format!( + "[{}] {} NEEDS INPUT: {}", + timestamp, contract_name, question + ) + } + Self::PhaseChange { + contract_name, + old_phase, + new_phase, + timestamp, + .. + } => { + format!( + "[{}] {} phase: {} -> {}", + timestamp, contract_name, old_phase, new_phase + ) + } + Self::ContractStale { + contract_name, + last_activity, + timestamp, + .. + } => { + format!( + "[{}] {} is STALE (last activity: {})", + timestamp, contract_name, last_activity + ) + } + Self::Error { message, timestamp } => { + format!("[{}] ERROR: {}", timestamp, message) + } + } + } + + /// Format event for JSON output. + pub fn to_json(&self) -> String { + serde_json::to_string(self).unwrap_or_else(|_| "{}".to_string()) + } +} + +/// Get current timestamp in ISO format. +pub fn now_timestamp() -> String { + chrono::Utc::now().to_rfc3339() +} |
