From a054152e36978d6b5801ef1715117a957e4cd65c Mon Sep 17 00:00:00 2001 From: Claude Date: Tue, 12 May 2026 04:17:47 +0000 Subject: [PATCH 1/3] feat(parquet): one-off event annotations on recordings MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds support for attaching one-off events (restarts, config changes, anomalies, deploys, ...) to parquet recordings as a JSON blob in the file footer. Events are self-describing — each carries optional source/node/instance scope rather than inheriting from file-level metadata — so combined files keep them at the top level instead of nesting under per_source_metadata. `rezolus parquet annotate` gains: - --add-events FILE (also '-' for stdin; accepts JSON, JSON array, or JSONL) - --event key=value,... (inline shorthand, repeatable) - --clear-events (composes with --add-events for replace) Inputs accept RFC3339 timestamps (including the seconds-omitted short form) and humantime durations; canonical storage is u64 nanoseconds. `parquet combine` concatenates events from all inputs, sorts by timestamp, and dedupes by stable `id`. No viewer/UI work yet; this is the schema + annotate pipeline only. --- crates/dashboard/src/events.rs | 185 ++++++++ crates/dashboard/src/lib.rs | 2 + src/parquet_metadata.rs | 10 + src/parquet_tools/annotate.rs | 26 +- src/parquet_tools/combine.rs | 28 ++ src/parquet_tools/events.rs | 810 +++++++++++++++++++++++++++++++++ src/parquet_tools/mod.rs | 26 ++ src/viewer/mod.rs | 2 +- 8 files changed, 1085 insertions(+), 4 deletions(-) create mode 100644 crates/dashboard/src/events.rs create mode 100644 src/parquet_tools/events.rs diff --git a/crates/dashboard/src/events.rs b/crates/dashboard/src/events.rs new file mode 100644 index 000000000..62feea517 --- /dev/null +++ b/crates/dashboard/src/events.rs @@ -0,0 +1,185 @@ +use serde::{Deserialize, Serialize}; +use std::collections::BTreeMap; + +/// A one-off event annotation attached to a parquet recording. Events mark +/// key moments (restarts, config changes, deploys, anomalies, ...) on top of +/// existing time-series metrics. They are stored as a JSON blob in the +/// parquet footer under [`KEY_EVENTS`](crate::events) and are self-describing +/// — every event carries its own optional `source` / `node` / `instance` +/// scope rather than inheriting from file-level metadata. +/// +/// Field semantics: +/// - `timestamp` is nanoseconds since the Unix epoch. Required. +/// - `description` is a short title rendered inline next to the marker. +/// - `kind` is a free-form tag (`restart`, `config_change`, `deploy`, +/// `incident`, `anomaly`, `marker`, `note`, ...). Conventions only, not +/// validated, so users may invent their own without a release. +/// - `details` is longer optional text (e.g. a paragraph of context). +/// - `source` / `node` / `instance` scope the event to a specific +/// recording stream within a (possibly combined) file. When all three are +/// absent the event is global. +/// - `labels` is an open map for arbitrary user tags. +/// - `duration_ns` lets an event span an interval rather than a point — +/// when absent the event renders as a vertical line, when present as a +/// band. +/// - `id` is an optional stable identifier used to dedupe across merges. +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] +pub struct Event { + pub timestamp: u64, + pub description: String, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub kind: Option, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub details: Option, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub source: Option, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub node: Option, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub instance: Option, + #[serde(default, skip_serializing_if = "BTreeMap::is_empty")] + pub labels: BTreeMap, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub duration_ns: Option, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub id: Option, +} + +/// Wrapper for the JSON payload stored in the parquet footer. Lives as an +/// object with a single `events` array so future fields (schema version, +/// global labels) can be added without breaking parsers. +#[derive(Debug, Clone, Serialize, Deserialize, Default, PartialEq, Eq)] +pub struct Events { + #[serde(default)] + pub events: Vec, +} + +impl Events { + pub fn new(events: Vec) -> Self { + Self { events } + } + + /// Sort events by timestamp and drop later duplicates that share the + /// same non-empty `id`. Stable: earlier occurrences of a given id win. + pub fn normalize(&mut self) { + self.events + .sort_by_key(|e| (e.timestamp, e.id.clone().unwrap_or_default())); + let mut seen = std::collections::HashSet::new(); + self.events.retain(|e| match &e.id { + Some(id) if !id.is_empty() => seen.insert(id.clone()), + _ => true, + }); + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn round_trips_minimal_event() { + let json = r#"{"timestamp":1700000000000000000,"description":"restart"}"#; + let e: Event = serde_json::from_str(json).unwrap(); + assert_eq!(e.timestamp, 1_700_000_000_000_000_000); + assert_eq!(e.description, "restart"); + assert!(e.kind.is_none()); + // Optional fields are skipped on serialize + assert_eq!(serde_json::to_string(&e).unwrap(), json); + } + + #[test] + fn round_trips_full_event() { + let e = Event { + timestamp: 1, + description: "d".into(), + kind: Some("restart".into()), + details: Some("long".into()), + source: Some("vllm".into()), + node: Some("gpu01".into()), + instance: Some("0".into()), + labels: BTreeMap::from([("reason".into(), "OOM".into())]), + duration_ns: Some(1000), + id: Some("evt-1".into()), + }; + let json = serde_json::to_string(&e).unwrap(); + let parsed: Event = serde_json::from_str(&json).unwrap(); + assert_eq!(parsed, e); + } + + #[test] + fn normalize_sorts_and_dedupes_by_id() { + let mut events = Events::new(vec![ + Event { + timestamp: 30, + description: "later".into(), + id: Some("a".into()), + kind: None, + details: None, + source: None, + node: None, + instance: None, + labels: BTreeMap::new(), + duration_ns: None, + }, + Event { + timestamp: 10, + description: "first".into(), + id: Some("a".into()), + kind: None, + details: None, + source: None, + node: None, + instance: None, + labels: BTreeMap::new(), + duration_ns: None, + }, + Event { + timestamp: 20, + description: "no id, kept".into(), + id: None, + kind: None, + details: None, + source: None, + node: None, + instance: None, + labels: BTreeMap::new(), + duration_ns: None, + }, + ]); + events.normalize(); + let descs: Vec<&str> = events + .events + .iter() + .map(|e| e.description.as_str()) + .collect(); + assert_eq!(descs, vec!["first", "no id, kept"]); + } + + #[test] + fn normalize_keeps_all_when_ids_missing_or_empty() { + let make = |ts: u64, id: Option<&str>| Event { + timestamp: ts, + description: ts.to_string(), + id: id.map(str::to_string), + kind: None, + details: None, + source: None, + node: None, + instance: None, + labels: BTreeMap::new(), + duration_ns: None, + }; + let mut events = Events::new(vec![ + make(2, None), + make(1, Some("")), + make(3, Some("")), + ]); + events.normalize(); + // Empty id is treated as no-id; nothing gets deduped. + assert_eq!(events.events.len(), 3); + assert_eq!( + events.events.iter().map(|e| e.timestamp).collect::>(), + vec![1, 2, 3] + ); + } +} diff --git a/crates/dashboard/src/lib.rs b/crates/dashboard/src/lib.rs index 0d58aaed7..a5d9774a6 100644 --- a/crates/dashboard/src/lib.rs +++ b/crates/dashboard/src/lib.rs @@ -1,7 +1,9 @@ pub mod dashboard; +pub mod events; mod plot; mod service_extension; +pub use events::{Event, Events}; pub use metriken_query::Tsdb; pub use plot::*; pub use service_extension::{CategoryExtension, Kpi, ServiceExtension, TemplateRegistry}; diff --git a/src/parquet_metadata.rs b/src/parquet_metadata.rs index 547eac189..aae451846 100644 --- a/src/parquet_metadata.rs +++ b/src/parquet_metadata.rs @@ -95,3 +95,13 @@ pub const NESTED_INSTANCE: &str = "instance"; /// The default rezolus node to display when the viewer opens a combined /// file with multiple nodes. Set by `parquet combine --pinned `. pub const KEY_PINNED_NODE: &str = "pinned_node"; + +// ── One-off event annotations ─────────────────────────────────────── + +/// JSON payload of one-off events attached to the recording (restarts, +/// config changes, anomalies, ...). Value is an object `{"events": [...]}` +/// where each entry conforms to `dashboard::events::Event`. Each event +/// carries its own optional `source` / `node` / `instance` scope rather +/// than inheriting from file-level metadata, so the payload stays +/// self-describing across combine. +pub const KEY_EVENTS: &str = "events"; diff --git a/src/parquet_tools/annotate.rs b/src/parquet_tools/annotate.rs index a025a8d5a..56b77c8e5 100644 --- a/src/parquet_tools/annotate.rs +++ b/src/parquet_tools/annotate.rs @@ -17,6 +17,17 @@ pub(super) fn run(args: &ArgMatches, registry: &TemplateRegistry) { let sysinfo_path = args.get_one::("systeminfo"); let overwrite = args.get_flag("overwrite"); + let event_files: Vec<&Path> = args + .get_many::("add-events") + .map(|it| it.map(PathBuf::as_path).collect()) + .unwrap_or_default(); + let inline_events: Vec = args + .get_many::("event") + .map(|it| it.cloned().collect()) + .unwrap_or_default(); + let clear_events = args.get_flag("clear-events"); + let events_requested = clear_events || !event_files.is_empty() || !inline_events.is_empty(); + if let Some(n) = node { set_node_metadata(path, n).unwrap_or_else(|e| { eprintln!("error: failed to set node metadata: {e}"); @@ -42,11 +53,20 @@ pub(super) fn run(args: &ArgMatches, registry: &TemplateRegistry) { println!("Set source={:?} on {:?}", src, path); } + if events_requested { + super::events::run(path, &event_files, &inline_events, clear_events).unwrap_or_else( + |e| { + eprintln!("error: {e}"); + std::process::exit(1); + }, + ); + } + let custom_file = args.get_one::("queries"); - // If only individual edits (--node/--source/--systeminfo) were requested, - // don't also auto-apply a default service template. - if (node.is_some() || new_source.is_some() || sysinfo_path.is_some()) + // If only individual edits (--node/--source/--systeminfo/events) were + // requested, don't also auto-apply a default service template. + if (node.is_some() || new_source.is_some() || sysinfo_path.is_some() || events_requested) && custom_file.is_none() && !args.get_flag("filter") { diff --git a/src/parquet_tools/combine.rs b/src/parquet_tools/combine.rs index 215cddee0..60dc88870 100644 --- a/src/parquet_tools/combine.rs +++ b/src/parquet_tools/combine.rs @@ -880,6 +880,34 @@ fn merge_metadata(inputs: &[InputFile]) -> Result, Box(raw) { + Ok(mut e) => events.events.append(&mut e.events), + Err(e) => eprintln!( + "warning: ignoring malformed events payload in input file: {e}" + ), + } + } + } + if !events.events.is_empty() { + events.normalize(); + result.push(KeyValue { + key: KEY_EVENTS.to_string(), + value: Some(serde_json::to_string(&events)?), + }); + } + Ok(result) } diff --git a/src/parquet_tools/events.rs b/src/parquet_tools/events.rs new file mode 100644 index 000000000..98f5c4c41 --- /dev/null +++ b/src/parquet_tools/events.rs @@ -0,0 +1,810 @@ +//! Event-annotation helpers for `rezolus parquet annotate`. +//! +//! Events are stored as a JSON blob in the parquet footer under +//! [`KEY_EVENTS`](crate::parquet_metadata::KEY_EVENTS). On-disk shape: +//! `{"events":[ {"timestamp":, "description":..., ...} ]}`. Every +//! event is self-describing — it carries its own optional +//! `source`/`node`/`instance` scope rather than inheriting from +//! file-level metadata — so combined files don't need a `per_source_metadata` +//! indirection for them. +//! +//! Input paths: +//! - `--add-events FILE` (also `-` for stdin): JSON or JSONL. JSON form is +//! either `{"events":[...]}` or a bare array. +//! - `--event key=value,...`: inline shorthand, repeatable. `description` +//! values may be quoted to embed commas. +//! - `--clear-events`: wipe existing events. Combined with `--add-events` +//! yields "replace": clear is applied before add. +//! +//! Both file and CLI inputs accept `timestamp` as either nanoseconds (int) +//! or an RFC3339 string. `duration_ns` accepts nanoseconds (int) or a +//! humantime string (e.g. `30s`, `1m30s`). + +use chrono::DateTime; +use parquet::file::metadata::KeyValue; +use std::collections::BTreeMap; +use std::path::Path; + +use crate::parquet_metadata::KEY_EVENTS; +use crate::viewer::{Event, Events}; + +/// Apply event-related operations to a parquet file in the order +/// `clear → add file → add inline`. +/// +/// Returns `Ok(true)` if anything changed (and the file was rewritten), +/// `Ok(false)` if no event operations were requested. +pub(super) fn run( + path: &Path, + add_files: &[&Path], + inline: &[String], + clear: bool, +) -> Result> { + if !clear && add_files.is_empty() && inline.is_empty() { + return Ok(false); + } + + let mut events = if clear { + Events::default() + } else { + read_events(path)?.unwrap_or_default() + }; + + let original_existing_ids: std::collections::HashSet = events + .events + .iter() + .filter_map(|e| e.id.clone()) + .filter(|id| !id.is_empty()) + .collect(); + + let mut added = 0usize; + for file in add_files { + let new_events = read_input_file(file)?; + added += new_events.len(); + events.events.extend(new_events); + } + for s in inline { + let event = parse_inline_event(s)?; + added += 1; + events.events.push(event); + } + + let before = events.events.len(); + events.normalize(); + let dropped = before - events.events.len(); + + write_events(path, &events)?; + + if clear { + println!("Cleared existing events from {:?}", path); + } + if added > 0 { + println!( + "Annotated {:?} with {} event(s) ({} total{})", + path, + added, + events.events.len(), + if dropped > 0 { + format!(", {dropped} deduped by id") + } else { + String::new() + }, + ); + } else if !clear { + println!("No events to add to {:?}", path); + } + + // Heads-up on inline/file events whose id collides with one that was + // already in the file — normalize() keeps the earlier (existing) entry, + // which is rarely what a user wants from a new CLI invocation. + let new_ids: Vec = add_files + .iter() + .filter_map(|p| read_input_file(p).ok()) + .flatten() + .chain(inline.iter().filter_map(|s| parse_inline_event(s).ok())) + .filter_map(|e| e.id) + .filter(|id| !id.is_empty()) + .collect(); + for id in &new_ids { + if original_existing_ids.contains(id) { + eprintln!( + "warning: event id={id:?} already existed in file; kept original (use --clear-events to replace)" + ); + } + } + + Ok(true) +} + +/// Read the existing `events` payload from a parquet file's footer. +pub(super) fn read_events(path: &Path) -> Result, Box> { + let kv_meta = super::read_file_metadata(path)?; + let Some(raw) = kv_meta + .iter() + .find(|kv| kv.key == KEY_EVENTS) + .and_then(|kv| kv.value.as_deref()) + else { + return Ok(None); + }; + let events: Events = serde_json::from_str(raw) + .map_err(|e| format!("invalid existing events payload in {path:?}: {e}"))?; + Ok(Some(events)) +} + +fn write_events(path: &Path, events: &Events) -> Result<(), Box> { + let mut kv_meta = super::read_file_metadata(path)?; + kv_meta.retain(|kv| kv.key != KEY_EVENTS); + if !events.events.is_empty() { + kv_meta.push(KeyValue { + key: KEY_EVENTS.to_string(), + value: Some(serde_json::to_string(events)?), + }); + } + let buf = super::rewrite_parquet(path, kv_meta, None)?; + std::fs::write(path, &buf)?; + Ok(()) +} + +/// Read events from `path`, or from stdin if `path == "-"`. Accepts: +/// - A JSON object `{"events":[...]}` +/// - A bare JSON array `[...]` +/// - A single JSON object `{...}` (one event) +/// - JSONL (one event per line) when `path` ends in `.jsonl` or when the +/// input doesn't start with `{` / `[` after trimming. +pub(super) fn read_input_file(path: &Path) -> Result, Box> { + let content = if path.as_os_str() == "-" { + let mut buf = String::new(); + std::io::Read::read_to_string(&mut std::io::stdin(), &mut buf) + .map_err(|e| format!("failed to read events from stdin: {e}"))?; + buf + } else { + std::fs::read_to_string(path) + .map_err(|e| format!("failed to read events from {path:?}: {e}"))? + }; + + let is_jsonl = path + .extension() + .and_then(|s| s.to_str()) + .map(|s| s.eq_ignore_ascii_case("jsonl") || s.eq_ignore_ascii_case("ndjson")) + .unwrap_or(false); + + parse_events_payload(&content, is_jsonl) + .map_err(|e| format!("failed to parse events from {path:?}: {e}").into()) +} + +fn parse_events_payload(content: &str, force_jsonl: bool) -> Result, String> { + let trimmed = content.trim_start(); + if force_jsonl || !(trimmed.starts_with('{') || trimmed.starts_with('[')) { + return parse_jsonl(content); + } + + // Detect JSONL when the first character is `{` but there are multiple + // top-level objects (no enclosing array/object wrapper). + if trimmed.starts_with('{') && looks_like_jsonl(content) { + return parse_jsonl(content); + } + + let mut value: serde_json::Value = + serde_json::from_str(content).map_err(|e| format!("invalid JSON: {e}"))?; + + let array = match &mut value { + serde_json::Value::Object(obj) => obj + .remove("events") + .ok_or_else(|| "expected an `events` array in object form".to_string())?, + serde_json::Value::Array(_) => value, + _ => return Err("expected an object, array, or JSONL".into()), + }; + + let serde_json::Value::Array(items) = array else { + return Err("`events` must be an array".into()); + }; + + items + .into_iter() + .enumerate() + .map(|(i, v)| value_to_event(v).map_err(|e| format!("event #{i}: {e}"))) + .collect() +} + +fn parse_jsonl(content: &str) -> Result, String> { + let mut out = Vec::new(); + for (i, line) in content.lines().enumerate() { + let line = line.trim(); + if line.is_empty() || line.starts_with('#') { + continue; + } + let v: serde_json::Value = serde_json::from_str(line) + .map_err(|e| format!("line {}: invalid JSON: {e}", i + 1))?; + let event = value_to_event(v).map_err(|e| format!("line {}: {e}", i + 1))?; + out.push(event); + } + Ok(out) +} + +/// Heuristic: JSONL when we see `}` followed by a newline followed by `{` +/// at the top level. We don't do brace-counting; misidentified inputs +/// surface as JSON errors with line numbers, which is fine for a CLI tool. +fn looks_like_jsonl(content: &str) -> bool { + let mut lines = content.lines().filter(|l| !l.trim().is_empty()); + let Some(first) = lines.next() else { + return false; + }; + let trimmed_first = first.trim_start(); + if !trimmed_first.starts_with('{') { + return false; + } + // Two consecutive lines that each start with `{` at column 0-ish ⇒ JSONL. + lines + .next() + .map(|l| l.trim_start().starts_with('{')) + .unwrap_or(false) +} + +/// Convert a `serde_json::Value` representing a single event into our +/// canonical [`Event`], applying timestamp/duration normalization for the +/// human-friendly input forms (RFC3339 timestamps, humantime durations). +fn value_to_event(mut v: serde_json::Value) -> Result { + let serde_json::Value::Object(map) = &mut v else { + return Err("event must be a JSON object".into()); + }; + + if let Some(ts) = map.get_mut("timestamp") { + normalize_timestamp(ts)?; + } + if let Some(d) = map.get_mut("duration_ns") { + normalize_duration(d)?; + } + // Also accept `duration` as an alias for `duration_ns` so users can + // write `"duration": "30s"` in JSON files. + if let Some(d) = map.remove("duration") { + if !map.contains_key("duration_ns") { + let mut d = d; + normalize_duration(&mut d)?; + map.insert("duration_ns".into(), d); + } + } + + serde_json::from_value::(v).map_err(|e| e.to_string()) +} + +fn normalize_timestamp(v: &mut serde_json::Value) -> Result<(), String> { + match v { + serde_json::Value::Number(_) => Ok(()), + serde_json::Value::String(s) => { + let ns = parse_timestamp_str(s)?; + *v = serde_json::Value::Number(ns.into()); + Ok(()) + } + _ => Err("timestamp must be a number (ns) or an RFC3339 string".into()), + } +} + +fn normalize_duration(v: &mut serde_json::Value) -> Result<(), String> { + match v { + serde_json::Value::Number(_) => Ok(()), + serde_json::Value::String(s) => { + let ns = parse_duration_str(s)?; + *v = serde_json::Value::Number(ns.into()); + Ok(()) + } + serde_json::Value::Null => Ok(()), + _ => Err("duration must be a number (ns) or a humantime string like \"30s\"".into()), + } +} + +/// Parse a timestamp string. Accepts RFC3339 (with or without timezone; +/// naive strings are treated as UTC), the short `YYYY-MM-DDTHH:MMZ` +/// "seconds-omitted" form, and bare integer strings (interpreted as +/// nanoseconds). +pub(super) fn parse_timestamp_str(s: &str) -> Result { + if let Ok(n) = s.parse::() { + return Ok(n); + } + if let Ok(dt) = DateTime::parse_from_rfc3339(s) { + let ns = dt + .timestamp_nanos_opt() + .ok_or_else(|| format!("timestamp {s:?} is out of range"))?; + if ns < 0 { + return Err(format!("timestamp {s:?} predates the Unix epoch")); + } + return Ok(ns as u64); + } + // Tolerate the seconds-omitted forms users commonly type by hand + // (e.g. "2026-05-12T15:23Z" or "2026-05-12T15:23+02:00") by patching + // ":00" into the time portion and retrying. + if let Some(patched) = try_patch_seconds(s) { + if let Ok(dt) = DateTime::parse_from_rfc3339(&patched) { + let ns = dt + .timestamp_nanos_opt() + .ok_or_else(|| format!("timestamp {s:?} is out of range"))?; + return Ok(ns as u64); + } + } + // Try naive forms without timezone, treated as UTC. + for fmt in &["%Y-%m-%dT%H:%M:%S", "%Y-%m-%dT%H:%M"] { + if let Ok(dt) = chrono::NaiveDateTime::parse_from_str(s, fmt) { + let ns = dt + .and_utc() + .timestamp_nanos_opt() + .ok_or_else(|| format!("timestamp {s:?} is out of range"))?; + return Ok(ns as u64); + } + } + Err(format!("could not parse {s:?} as RFC3339 or ns integer")) +} + +/// If `s` is an RFC3339-ish string whose time field omits seconds +/// (e.g. `2026-05-12T15:23Z`), return a version with `:00` inserted before +/// the timezone designator. Returns `None` otherwise — including when the +/// time already has seconds. +fn try_patch_seconds(s: &str) -> Option { + let t_pos = s.find('T')?; + let after_t = &s[t_pos + 1..]; + // Locate where the time portion ends: 'Z', '+', or '-' (the last '-' + // since the date itself contains dashes; but `after_t` strips those). + let tz_offset_in_after_t = after_t.find(['Z', '+', '-'])?; + let time_part = &after_t[..tz_offset_in_after_t]; + // Patch only when the time has exactly one colon (HH:MM, no seconds). + if time_part.matches(':').count() != 1 { + return None; + } + let mut out = String::with_capacity(s.len() + 3); + out.push_str(&s[..t_pos + 1 + tz_offset_in_after_t]); + out.push_str(":00"); + out.push_str(&after_t[tz_offset_in_after_t..]); + Some(out) +} + +/// Parse a duration string. Accepts bare integer strings (ns) and +/// humantime strings like `30s`, `1m30s`, `2h`. +fn parse_duration_str(s: &str) -> Result { + if let Ok(n) = s.parse::() { + return Ok(n); + } + let d = humantime::parse_duration(s).map_err(|e| format!("invalid duration {s:?}: {e}"))?; + u64::try_from(d.as_nanos()).map_err(|_| format!("duration {s:?} overflows u64 ns")) +} + +/// Parse a single inline `--event key=value,key=value` shorthand. +/// +/// Quoting: a value may be wrapped in double quotes (`description="foo, bar"`) +/// to embed commas. Backslash escapes are supported inside quoted strings. +/// Unknown keys are rejected so typos surface immediately. +fn parse_inline_event(spec: &str) -> Result { + let pairs = split_kv_pairs(spec)?; + let mut map = serde_json::Map::new(); + let mut labels = BTreeMap::::new(); + + for (key, value) in pairs { + match key.as_str() { + "time" | "timestamp" => { + map.insert("timestamp".into(), serde_json::Value::String(value)); + } + "description" | "desc" => { + map.insert("description".into(), serde_json::Value::String(value)); + } + "kind" => { + map.insert("kind".into(), serde_json::Value::String(value)); + } + "details" => { + map.insert("details".into(), serde_json::Value::String(value)); + } + "source" => { + map.insert("source".into(), serde_json::Value::String(value)); + } + "node" => { + map.insert("node".into(), serde_json::Value::String(value)); + } + "instance" => { + map.insert("instance".into(), serde_json::Value::String(value)); + } + "id" => { + map.insert("id".into(), serde_json::Value::String(value)); + } + "duration" | "duration_ns" => { + map.insert("duration_ns".into(), serde_json::Value::String(value)); + } + other => { + if let Some(label) = other.strip_prefix("label.") { + labels.insert(label.to_string(), value); + } else { + return Err(format!("unknown event key {other:?}")); + } + } + } + } + + if !labels.is_empty() { + map.insert( + "labels".into(), + serde_json::to_value(&labels).map_err(|e| e.to_string())?, + ); + } + + if !map.contains_key("timestamp") { + return Err("event is missing required `time=`/`timestamp=`".into()); + } + if !map.contains_key("description") { + return Err("event is missing required `description=`".into()); + } + + value_to_event(serde_json::Value::Object(map)) +} + +/// Split `--event` payload `key=value,key=value` honoring double-quoted +/// values (so values may contain commas and `=`). Returns key/value pairs +/// in source order. +fn split_kv_pairs(spec: &str) -> Result, String> { + let mut pairs = Vec::new(); + let mut chars = spec.chars().peekable(); + + loop { + // Skip leading whitespace and separator commas. + while matches!(chars.peek(), Some(c) if c.is_whitespace() || *c == ',') { + chars.next(); + } + if chars.peek().is_none() { + break; + } + + // key + let mut key = String::new(); + while let Some(&c) = chars.peek() { + if c == '=' || c == ',' { + break; + } + key.push(c); + chars.next(); + } + let key = key.trim().to_string(); + if key.is_empty() { + return Err("empty event key".into()); + } + if chars.next() != Some('=') { + return Err(format!("event key {key:?} is missing '='")); + } + + // value: optionally quoted + let mut value = String::new(); + if chars.peek() == Some(&'"') { + chars.next(); + loop { + match chars.next() { + Some('\\') => match chars.next() { + Some(c) => value.push(c), + None => return Err("trailing backslash in event value".into()), + }, + Some('"') => break, + Some(c) => value.push(c), + None => return Err(format!("unterminated quoted value for key {key:?}")), + } + } + } else { + while let Some(&c) = chars.peek() { + if c == ',' { + break; + } + value.push(c); + chars.next(); + } + } + + pairs.push((key, value.trim().to_string())); + } + + Ok(pairs) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn parses_object_payload() { + let json = r#"{"events":[{"timestamp":1,"description":"a"}]}"#; + let events = parse_events_payload(json, false).unwrap(); + assert_eq!(events.len(), 1); + assert_eq!(events[0].description, "a"); + } + + #[test] + fn parses_bare_array_payload() { + let json = r#"[{"timestamp":1,"description":"a"},{"timestamp":2,"description":"b"}]"#; + let events = parse_events_payload(json, false).unwrap(); + assert_eq!(events.len(), 2); + } + + #[test] + fn parses_jsonl_payload() { + let json = "{\"timestamp\":1,\"description\":\"a\"}\n{\"timestamp\":2,\"description\":\"b\"}"; + let events = parse_events_payload(json, true).unwrap(); + assert_eq!(events.len(), 2); + } + + #[test] + fn detects_jsonl_without_extension() { + let json = "{\"timestamp\":1,\"description\":\"a\"}\n{\"timestamp\":2,\"description\":\"b\"}"; + // Heuristic kicks in because two top-level `{...}` lines appear. + let events = parse_events_payload(json, false).unwrap(); + assert_eq!(events.len(), 2); + } + + #[test] + fn parses_rfc3339_timestamp() { + let json = + r#"[{"timestamp":"2026-05-12T15:23:00Z","description":"restart"}]"#; + let events = parse_events_payload(json, false).unwrap(); + // 2026-05-12T15:23:00Z in nanos + let expected = DateTime::parse_from_rfc3339("2026-05-12T15:23:00Z") + .unwrap() + .timestamp_nanos_opt() + .unwrap() as u64; + assert_eq!(events[0].timestamp, expected); + } + + #[test] + fn parses_humantime_duration() { + let json = r#"[{"timestamp":1,"description":"d","duration":"30s"}]"#; + let events = parse_events_payload(json, false).unwrap(); + assert_eq!(events[0].duration_ns, Some(30_000_000_000)); + } + + #[test] + fn parses_duration_ns_int() { + let json = r#"[{"timestamp":1,"description":"d","duration_ns":42}]"#; + let events = parse_events_payload(json, false).unwrap(); + assert_eq!(events[0].duration_ns, Some(42)); + } + + #[test] + fn rejects_missing_required_fields() { + let err = parse_events_payload(r#"[{"timestamp":1}]"#, false).unwrap_err(); + assert!(err.contains("description"), "got: {err}"); + } + + #[test] + fn parses_inline_event_minimal() { + let e = parse_inline_event("time=1700000000000000000,description=restart").unwrap(); + assert_eq!(e.timestamp, 1_700_000_000_000_000_000); + assert_eq!(e.description, "restart"); + } + + #[test] + fn parses_inline_event_rfc3339() { + let e = + parse_inline_event(r#"time=2026-05-12T15:23:00Z,kind=restart,description="vllm restart""#) + .unwrap(); + assert_eq!(e.kind.as_deref(), Some("restart")); + assert_eq!(e.description, "vllm restart"); + } + + #[test] + fn parses_inline_event_quoted_description_with_comma() { + let e = parse_inline_event(r#"time=1,description="hello, world""#).unwrap(); + assert_eq!(e.description, "hello, world"); + } + + #[test] + fn parses_inline_event_with_labels() { + let e = parse_inline_event( + "time=1,description=d,label.reason=OOM,label.deployer=ci", + ) + .unwrap(); + assert_eq!(e.labels.get("reason").map(|s| s.as_str()), Some("OOM")); + assert_eq!(e.labels.get("deployer").map(|s| s.as_str()), Some("ci")); + } + + #[test] + fn rejects_unknown_inline_key() { + let err = parse_inline_event("time=1,description=d,whoops=x").unwrap_err(); + assert!(err.contains("whoops"), "got: {err}"); + } + + #[test] + fn rejects_inline_without_description() { + let err = parse_inline_event("time=1").unwrap_err(); + assert!(err.contains("description"), "got: {err}"); + } + + #[test] + fn parse_timestamp_str_accepts_int() { + assert_eq!(parse_timestamp_str("123").unwrap(), 123); + } + + #[test] + fn parse_timestamp_str_accepts_naive_iso() { + // Naive datetimes are treated as UTC. + let got = parse_timestamp_str("2026-05-12T15:23:00").unwrap(); + let expected = DateTime::parse_from_rfc3339("2026-05-12T15:23:00Z") + .unwrap() + .timestamp_nanos_opt() + .unwrap() as u64; + assert_eq!(got, expected); + } + + #[test] + fn parse_timestamp_str_rejects_garbage() { + assert!(parse_timestamp_str("not a date").is_err()); + } + + #[test] + fn parse_timestamp_str_accepts_seconds_omitted_rfc3339() { + let with = parse_timestamp_str("2026-05-12T15:23:00Z").unwrap(); + assert_eq!(parse_timestamp_str("2026-05-12T15:23Z").unwrap(), with); + // Also works with an offset suffix. + let with_offset = parse_timestamp_str("2026-05-12T15:23:00+02:00").unwrap(); + assert_eq!( + parse_timestamp_str("2026-05-12T15:23+02:00").unwrap(), + with_offset + ); + } + + #[test] + fn parse_timestamp_str_naive_seconds_omitted() { + let with = parse_timestamp_str("2026-05-12T15:23:00").unwrap(); + assert_eq!(parse_timestamp_str("2026-05-12T15:23").unwrap(), with); + } + + // ── end-to-end tests against a real parquet file ── + + use arrow::array::UInt64Array; + use arrow::datatypes::{DataType, Field, Schema}; + use arrow::record_batch::RecordBatch; + use parquet::arrow::ArrowWriter; + use parquet::file::metadata::KeyValue; + use parquet::file::properties::WriterProperties; + use std::sync::Arc; + use tempfile::NamedTempFile; + + fn make_minimal_parquet(initial_kv: Vec<(&str, &str)>) -> NamedTempFile { + let ts_field = Field::new("timestamp", DataType::UInt64, false); + let schema = Arc::new(Schema::new(vec![ts_field])); + let batch = RecordBatch::try_new( + schema.clone(), + vec![Arc::new(UInt64Array::from(vec![1u64, 2, 3]))], + ) + .unwrap(); + let kv: Vec = initial_kv + .into_iter() + .map(|(k, v)| KeyValue { + key: k.to_string(), + value: Some(v.to_string()), + }) + .collect(); + let props = WriterProperties::builder() + .set_key_value_metadata(Some(kv)) + .build(); + let tmp = NamedTempFile::new().unwrap(); + let mut writer = + ArrowWriter::try_new(std::fs::File::create(tmp.path()).unwrap(), schema, Some(props)) + .unwrap(); + writer.write(&batch).unwrap(); + writer.close().unwrap(); + tmp + } + + fn write_events_file(events_json: &str) -> NamedTempFile { + let tmp = NamedTempFile::new().unwrap(); + std::fs::write(tmp.path(), events_json).unwrap(); + tmp + } + + #[test] + fn run_adds_events_from_file() { + let parquet = make_minimal_parquet(vec![("source", "rezolus")]); + let events_file = + write_events_file(r#"{"events":[{"timestamp":1,"description":"restart"}]}"#); + + let changed = run( + parquet.path(), + &[events_file.path()], + &[], + false, + ) + .unwrap(); + assert!(changed); + + let stored = read_events(parquet.path()).unwrap().unwrap(); + assert_eq!(stored.events.len(), 1); + assert_eq!(stored.events[0].description, "restart"); + } + + #[test] + fn run_appends_to_existing_events() { + let parquet = make_minimal_parquet(vec![("source", "rezolus")]); + let first = write_events_file(r#"[{"timestamp":1,"description":"a"}]"#); + run(parquet.path(), &[first.path()], &[], false).unwrap(); + + let second = write_events_file(r#"[{"timestamp":2,"description":"b"}]"#); + run(parquet.path(), &[second.path()], &[], false).unwrap(); + + let stored = read_events(parquet.path()).unwrap().unwrap(); + assert_eq!(stored.events.len(), 2); + assert_eq!(stored.events[0].description, "a"); + assert_eq!(stored.events[1].description, "b"); + } + + #[test] + fn run_clear_then_add_replaces_events() { + let parquet = make_minimal_parquet(vec![("source", "rezolus")]); + let first = write_events_file(r#"[{"timestamp":1,"description":"old"}]"#); + run(parquet.path(), &[first.path()], &[], false).unwrap(); + + let second = write_events_file(r#"[{"timestamp":2,"description":"new"}]"#); + run(parquet.path(), &[second.path()], &[], true).unwrap(); + + let stored = read_events(parquet.path()).unwrap().unwrap(); + assert_eq!(stored.events.len(), 1); + assert_eq!(stored.events[0].description, "new"); + } + + #[test] + fn run_clear_only_wipes_events() { + let parquet = make_minimal_parquet(vec![("source", "rezolus")]); + let first = write_events_file(r#"[{"timestamp":1,"description":"old"}]"#); + run(parquet.path(), &[first.path()], &[], false).unwrap(); + + run(parquet.path(), &[], &[], true).unwrap(); + + // Cleared: no key in metadata, read returns None. + assert!(read_events(parquet.path()).unwrap().is_none()); + } + + #[test] + fn run_inline_events_merge_with_file() { + let parquet = make_minimal_parquet(vec![("source", "rezolus")]); + let file = write_events_file(r#"[{"timestamp":1,"description":"file-event"}]"#); + run( + parquet.path(), + &[file.path()], + &["time=2,description=inline-event".to_string()], + false, + ) + .unwrap(); + + let stored = read_events(parquet.path()).unwrap().unwrap(); + assert_eq!(stored.events.len(), 2); + // Sorted by timestamp + assert_eq!(stored.events[0].description, "file-event"); + assert_eq!(stored.events[1].description, "inline-event"); + } + + #[test] + fn run_returns_false_when_no_event_args() { + let parquet = make_minimal_parquet(vec![("source", "rezolus")]); + let changed = run(parquet.path(), &[], &[], false).unwrap(); + assert!(!changed); + } + + #[test] + fn run_preserves_other_metadata() { + let parquet = make_minimal_parquet(vec![("source", "rezolus"), ("node", "web01")]); + let file = write_events_file(r#"[{"timestamp":1,"description":"d"}]"#); + run(parquet.path(), &[file.path()], &[], false).unwrap(); + + let kv = crate::parquet_tools::read_file_metadata(parquet.path()).unwrap(); + assert!(kv.iter().any(|kv| kv.key == "source" + && kv.value.as_deref() == Some("rezolus"))); + assert!(kv + .iter() + .any(|kv| kv.key == "node" && kv.value.as_deref() == Some("web01"))); + } + + #[test] + fn run_dedupes_by_id_keeping_earlier() { + let parquet = make_minimal_parquet(vec![("source", "rezolus")]); + let first = write_events_file( + r#"[{"timestamp":1,"description":"orig","id":"dup"}]"#, + ); + run(parquet.path(), &[first.path()], &[], false).unwrap(); + + let second = write_events_file( + r#"[{"timestamp":2,"description":"new","id":"dup"}]"#, + ); + run(parquet.path(), &[second.path()], &[], false).unwrap(); + + let stored = read_events(parquet.path()).unwrap().unwrap(); + assert_eq!(stored.events.len(), 1); + // Earlier entry won (matches the "use --clear-events to replace" hint). + assert_eq!(stored.events[0].description, "orig"); + } +} diff --git a/src/parquet_tools/mod.rs b/src/parquet_tools/mod.rs index 1ec3b5cc7..a31f06ede 100644 --- a/src/parquet_tools/mod.rs +++ b/src/parquet_tools/mod.rs @@ -1,5 +1,6 @@ mod annotate; pub(crate) mod combine; +mod events; mod filter; mod metadata; @@ -88,6 +89,31 @@ pub fn command() -> Command { .value_parser(value_parser!(PathBuf)) .action(clap::ArgAction::Set) .conflicts_with("undo"), + ) + .arg( + clap::Arg::new("add-events") + .long("add-events") + .value_name("PATH") + .help("Add one-off events from a JSON/JSONL file (or '-' for stdin). Repeatable.") + .value_parser(value_parser!(PathBuf)) + .action(clap::ArgAction::Append) + .conflicts_with("undo"), + ) + .arg( + clap::Arg::new("event") + .long("event") + .value_name("KV") + .help("Add a single event inline, e.g. 'time=2026-05-12T15:23Z,kind=restart,description=\"...\"'. Repeatable.") + .value_parser(value_parser!(String)) + .action(clap::ArgAction::Append) + .conflicts_with("undo"), + ) + .arg( + clap::Arg::new("clear-events") + .long("clear-events") + .help("Remove existing events before applying --add-events / --event") + .action(clap::ArgAction::SetTrue) + .conflicts_with("undo"), ), ) .subcommand( diff --git a/src/viewer/mod.rs b/src/viewer/mod.rs index 58e081bbf..4e69865ab 100644 --- a/src/viewer/mod.rs +++ b/src/viewer/mod.rs @@ -27,7 +27,7 @@ use notify::Watcher; #[cfg(test)] pub use dashboard::Kpi; -pub use dashboard::{ServiceExtension, TemplateRegistry}; +pub use dashboard::{Event, Events, ServiceExtension, TemplateRegistry}; pub use metriken_query::promql; pub use metriken_query::tsdb; From 5d5d8065e468f0ffcf82b65736963019f28a2c68 Mon Sep 17 00:00:00 2001 From: Claude Date: Tue, 12 May 2026 04:19:04 +0000 Subject: [PATCH 2/3] style: apply rustfmt --- crates/dashboard/src/events.rs | 12 ++++---- src/parquet_tools/annotate.rs | 10 +++--- src/parquet_tools/combine.rs | 6 ++-- src/parquet_tools/events.rs | 56 +++++++++++++++------------------- 4 files changed, 38 insertions(+), 46 deletions(-) diff --git a/crates/dashboard/src/events.rs b/crates/dashboard/src/events.rs index 62feea517..872637e8c 100644 --- a/crates/dashboard/src/events.rs +++ b/crates/dashboard/src/events.rs @@ -169,16 +169,16 @@ mod tests { labels: BTreeMap::new(), duration_ns: None, }; - let mut events = Events::new(vec![ - make(2, None), - make(1, Some("")), - make(3, Some("")), - ]); + let mut events = Events::new(vec![make(2, None), make(1, Some("")), make(3, Some(""))]); events.normalize(); // Empty id is treated as no-id; nothing gets deduped. assert_eq!(events.events.len(), 3); assert_eq!( - events.events.iter().map(|e| e.timestamp).collect::>(), + events + .events + .iter() + .map(|e| e.timestamp) + .collect::>(), vec![1, 2, 3] ); } diff --git a/src/parquet_tools/annotate.rs b/src/parquet_tools/annotate.rs index 56b77c8e5..736f82001 100644 --- a/src/parquet_tools/annotate.rs +++ b/src/parquet_tools/annotate.rs @@ -54,12 +54,10 @@ pub(super) fn run(args: &ArgMatches, registry: &TemplateRegistry) { } if events_requested { - super::events::run(path, &event_files, &inline_events, clear_events).unwrap_or_else( - |e| { - eprintln!("error: {e}"); - std::process::exit(1); - }, - ); + super::events::run(path, &event_files, &inline_events, clear_events).unwrap_or_else(|e| { + eprintln!("error: {e}"); + std::process::exit(1); + }); } let custom_file = args.get_one::("queries"); diff --git a/src/parquet_tools/combine.rs b/src/parquet_tools/combine.rs index 60dc88870..dc7fd8e70 100644 --- a/src/parquet_tools/combine.rs +++ b/src/parquet_tools/combine.rs @@ -894,9 +894,9 @@ fn merge_metadata(inputs: &[InputFile]) -> Result, Box(raw) { Ok(mut e) => events.events.append(&mut e.events), - Err(e) => eprintln!( - "warning: ignoring malformed events payload in input file: {e}" - ), + Err(e) => { + eprintln!("warning: ignoring malformed events payload in input file: {e}") + } } } } diff --git a/src/parquet_tools/events.rs b/src/parquet_tools/events.rs index 98f5c4c41..943e4b46a 100644 --- a/src/parquet_tools/events.rs +++ b/src/parquet_tools/events.rs @@ -212,8 +212,8 @@ fn parse_jsonl(content: &str) -> Result, String> { if line.is_empty() || line.starts_with('#') { continue; } - let v: serde_json::Value = serde_json::from_str(line) - .map_err(|e| format!("line {}: invalid JSON: {e}", i + 1))?; + let v: serde_json::Value = + serde_json::from_str(line).map_err(|e| format!("line {}: invalid JSON: {e}", i + 1))?; let event = value_to_event(v).map_err(|e| format!("line {}: {e}", i + 1))?; out.push(event); } @@ -515,14 +515,16 @@ mod tests { #[test] fn parses_jsonl_payload() { - let json = "{\"timestamp\":1,\"description\":\"a\"}\n{\"timestamp\":2,\"description\":\"b\"}"; + let json = + "{\"timestamp\":1,\"description\":\"a\"}\n{\"timestamp\":2,\"description\":\"b\"}"; let events = parse_events_payload(json, true).unwrap(); assert_eq!(events.len(), 2); } #[test] fn detects_jsonl_without_extension() { - let json = "{\"timestamp\":1,\"description\":\"a\"}\n{\"timestamp\":2,\"description\":\"b\"}"; + let json = + "{\"timestamp\":1,\"description\":\"a\"}\n{\"timestamp\":2,\"description\":\"b\"}"; // Heuristic kicks in because two top-level `{...}` lines appear. let events = parse_events_payload(json, false).unwrap(); assert_eq!(events.len(), 2); @@ -530,8 +532,7 @@ mod tests { #[test] fn parses_rfc3339_timestamp() { - let json = - r#"[{"timestamp":"2026-05-12T15:23:00Z","description":"restart"}]"#; + let json = r#"[{"timestamp":"2026-05-12T15:23:00Z","description":"restart"}]"#; let events = parse_events_payload(json, false).unwrap(); // 2026-05-12T15:23:00Z in nanos let expected = DateTime::parse_from_rfc3339("2026-05-12T15:23:00Z") @@ -570,9 +571,10 @@ mod tests { #[test] fn parses_inline_event_rfc3339() { - let e = - parse_inline_event(r#"time=2026-05-12T15:23:00Z,kind=restart,description="vllm restart""#) - .unwrap(); + let e = parse_inline_event( + r#"time=2026-05-12T15:23:00Z,kind=restart,description="vllm restart""#, + ) + .unwrap(); assert_eq!(e.kind.as_deref(), Some("restart")); assert_eq!(e.description, "vllm restart"); } @@ -585,10 +587,8 @@ mod tests { #[test] fn parses_inline_event_with_labels() { - let e = parse_inline_event( - "time=1,description=d,label.reason=OOM,label.deployer=ci", - ) - .unwrap(); + let e = + parse_inline_event("time=1,description=d,label.reason=OOM,label.deployer=ci").unwrap(); assert_eq!(e.labels.get("reason").map(|s| s.as_str()), Some("OOM")); assert_eq!(e.labels.get("deployer").map(|s| s.as_str()), Some("ci")); } @@ -674,9 +674,12 @@ mod tests { .set_key_value_metadata(Some(kv)) .build(); let tmp = NamedTempFile::new().unwrap(); - let mut writer = - ArrowWriter::try_new(std::fs::File::create(tmp.path()).unwrap(), schema, Some(props)) - .unwrap(); + let mut writer = ArrowWriter::try_new( + std::fs::File::create(tmp.path()).unwrap(), + schema, + Some(props), + ) + .unwrap(); writer.write(&batch).unwrap(); writer.close().unwrap(); tmp @@ -694,13 +697,7 @@ mod tests { let events_file = write_events_file(r#"{"events":[{"timestamp":1,"description":"restart"}]}"#); - let changed = run( - parquet.path(), - &[events_file.path()], - &[], - false, - ) - .unwrap(); + let changed = run(parquet.path(), &[events_file.path()], &[], false).unwrap(); assert!(changed); let stored = read_events(parquet.path()).unwrap().unwrap(); @@ -782,8 +779,9 @@ mod tests { run(parquet.path(), &[file.path()], &[], false).unwrap(); let kv = crate::parquet_tools::read_file_metadata(parquet.path()).unwrap(); - assert!(kv.iter().any(|kv| kv.key == "source" - && kv.value.as_deref() == Some("rezolus"))); + assert!(kv + .iter() + .any(|kv| kv.key == "source" && kv.value.as_deref() == Some("rezolus"))); assert!(kv .iter() .any(|kv| kv.key == "node" && kv.value.as_deref() == Some("web01"))); @@ -792,14 +790,10 @@ mod tests { #[test] fn run_dedupes_by_id_keeping_earlier() { let parquet = make_minimal_parquet(vec![("source", "rezolus")]); - let first = write_events_file( - r#"[{"timestamp":1,"description":"orig","id":"dup"}]"#, - ); + let first = write_events_file(r#"[{"timestamp":1,"description":"orig","id":"dup"}]"#); run(parquet.path(), &[first.path()], &[], false).unwrap(); - let second = write_events_file( - r#"[{"timestamp":2,"description":"new","id":"dup"}]"#, - ); + let second = write_events_file(r#"[{"timestamp":2,"description":"new","id":"dup"}]"#); run(parquet.path(), &[second.path()], &[], false).unwrap(); let stored = read_events(parquet.path()).unwrap().unwrap(); From 8ad79938397b930930942787b69e81cc7801edae Mon Sep 17 00:00:00 2001 From: Claude Date: Tue, 12 May 2026 04:46:35 +0000 Subject: [PATCH 3/3] docs: document the events footer key and tidy comments Adds an `events` section to docs/parquet_metadata.md covering the payload shape, CLI surface (--add-events / --event / --clear-events), and combine merge behavior. Extends the post-recording mutator table and combine "what changed" table to include events. Drops a handful of inline comments that just restated the adjacent code. --- docs/parquet_metadata.md | 78 +++++++++++++++++++++++++++++++++++- src/parquet_tools/combine.rs | 4 +- src/parquet_tools/events.rs | 20 +-------- 3 files changed, 80 insertions(+), 22 deletions(-) diff --git a/docs/parquet_metadata.md b/docs/parquet_metadata.md index a642e50c9..8e8d1cb47 100644 --- a/docs/parquet_metadata.md +++ b/docs/parquet_metadata.md @@ -235,6 +235,81 @@ target/release/rezolus parquet combine \ Validation rejects `--pinned ` if no rezolus input has a matching `node` label. +### `events` + +JSON document describing one-off events attached to the recording — +restarts, config changes, deploys, incidents, anomalies, free-form +markers. Stored at the top level in both single-source and combined files +because each event is self-describing (it carries its own optional +`source` / `node` / `instance` scope) rather than inheriting from +file-level metadata. Schema lives in +[crates/dashboard/src/events.rs](../crates/dashboard/src/events.rs). + +Payload shape (canonical, sorted by `timestamp`, deduped by `id`): + +```json +{ + "events": [ + { + "timestamp": 1778598000000000000, + "description": "vllm restart for new config", + "kind": "restart", + "details": "swapped tensor_parallel_size 2 → 4", + "source": "vllm", + "node": "gpu01", + "instance": "0", + "labels": { "reason": "OOM" }, + "duration_ns": 30000000000, + "id": "evt-2026-05-12-001" + } + ] +} +``` + +Only `timestamp` (u64 nanoseconds since the Unix epoch) and `description` +are required. `kind` is free-form — conventional values are `restart`, +`config_change`, `deploy`, `incident`, `anomaly`, `marker`, `note`, but +nothing is validated. `duration_ns` lets an event span an interval rather +than a point; `id` is a stable identifier used to dedupe across merges +(later occurrences are dropped on combine). + +**Update with `parquet annotate`:** + +```bash +target/release/rezolus parquet annotate file.parquet --add-events events.json + +cat events.json | target/release/rezolus parquet annotate file.parquet --add-events - + +target/release/rezolus parquet annotate file.parquet \ + --event 'time=2026-05-12T15:23Z,kind=restart,description="vllm restart",node=gpu01' \ + --event 'time=2026-05-12T16:00Z,kind=marker,description="benchmark start",label.run=ci-42' + +target/release/rezolus parquet annotate file.parquet --clear-events + +target/release/rezolus parquet annotate file.parquet --clear-events --add-events new.json +``` + +- `--add-events FILE` accepts JSON (`{"events":[...]}`), a bare JSON array, + a single JSON object, or JSONL. Use `-` to read from stdin. +- `--event KV` is a repeatable inline shorthand + (`key=value,key=value,...`). Quoted values may contain commas and `=`. + Free-form labels go through `label.=`. Aliases: `time`, + `desc`, `duration` (alongside the canonical `timestamp`, `description`, + `duration_ns`). +- `--clear-events` wipes existing events. Combine with `--add-events` / + `--event` to "replace": order within one invocation is **clear → file + events → inline events**. +- Operations are append-by-default. Events are sorted by timestamp on + write; entries whose non-empty `id` collides with an earlier one are + dropped (a warning is printed when this happens during an annotate + invocation). +- Inputs accept RFC3339 timestamps (including the seconds-omitted short + form `2026-05-12T15:23Z`) and humantime durations (`30s`, `1m30s`); + canonical storage is always `u64` nanoseconds. + +`--add-events` / `--event` / `--clear-events` do not trigger the +service-template flow. + ## `per_source_metadata` Top-level key only used in combined files. Value is a nested JSON object: @@ -280,7 +355,7 @@ mutators are: | Tool | What it can change | |------|--------------------| | `rezolus record --node`, `--instance`, `--metadata k=v` | Anything written at recording time. The catch-all `--metadata` can set any top-level key. | -| `rezolus parquet annotate` | Adds/replaces/removes top-level `service_queries`; with `--node NAME` sets/replaces top-level `node`; with `--source NAME` (`--overwrite` to replace) sets/replaces top-level `source`; with `--systeminfo PATH` (or `-` for stdin) sets/replaces top-level `systeminfo`. | +| `rezolus parquet annotate` | Adds/replaces/removes top-level `service_queries`; with `--node NAME` sets/replaces top-level `node`; with `--source NAME` (`--overwrite` to replace) sets/replaces top-level `source`; with `--systeminfo PATH` (or `-` for stdin) sets/replaces top-level `systeminfo`; with `--add-events PATH` / `--event KV` / `--clear-events` appends, inserts, or wipes one-off `events`. | | `rezolus parquet combine --pinned` | Sets `pinned_node` on the output. | | `rezolus parquet combine` | Merges and re-derives `source`, `descriptions`, `per_source_metadata`, etc. from the inputs. | @@ -411,6 +486,7 @@ target/release/rezolus parquet metadata -i combined.parquet --field pinned_node | `service_queries` | **Removed from top level.** Moves to `per_source_metadata...service_queries` so each instance can carry its own KPI definitions. | | `pinned_node` | New — added by `--pinned web01`. Validated against the actual rezolus nodes seen in the inputs. | | `selection` | Preserved from the primary (rezolus) input if present; otherwise dropped. | +| `events` | Concatenated across all inputs, sorted by timestamp, deduped by stable `id`. Stays at the top level — each event is self-describing via its own `source`/`node`/`instance` fields. | | `per_source_metadata` | Deep-merged. Pre-existing entries from already-combined inputs are preserved; new sub-entries are added per node/instance. | Schema changes alongside the metadata: every metric column is renamed to diff --git a/src/parquet_tools/combine.rs b/src/parquet_tools/combine.rs index dc7fd8e70..dc72c8b38 100644 --- a/src/parquet_tools/combine.rs +++ b/src/parquet_tools/combine.rs @@ -880,10 +880,8 @@ fn merge_metadata(inputs: &[InputFile]) -> Result, Box = add_files .iter() .filter_map(|p| read_input_file(p).ok()) @@ -177,8 +176,6 @@ fn parse_events_payload(content: &str, force_jsonl: bool) -> Result, return parse_jsonl(content); } - // Detect JSONL when the first character is `{` but there are multiple - // top-level objects (no enclosing array/object wrapper). if trimmed.starts_with('{') && looks_like_jsonl(content) { return parse_jsonl(content); } @@ -232,7 +229,6 @@ fn looks_like_jsonl(content: &str) -> bool { if !trimmed_first.starts_with('{') { return false; } - // Two consecutive lines that each start with `{` at column 0-ish ⇒ JSONL. lines .next() .map(|l| l.trim_start().starts_with('{')) @@ -253,8 +249,6 @@ fn value_to_event(mut v: serde_json::Value) -> Result { if let Some(d) = map.get_mut("duration_ns") { normalize_duration(d)?; } - // Also accept `duration` as an alias for `duration_ns` so users can - // write `"duration": "30s"` in JSON files. if let Some(d) = map.remove("duration") { if !map.contains_key("duration_ns") { let mut d = d; @@ -308,9 +302,6 @@ pub(super) fn parse_timestamp_str(s: &str) -> Result { } return Ok(ns as u64); } - // Tolerate the seconds-omitted forms users commonly type by hand - // (e.g. "2026-05-12T15:23Z" or "2026-05-12T15:23+02:00") by patching - // ":00" into the time portion and retrying. if let Some(patched) = try_patch_seconds(s) { if let Ok(dt) = DateTime::parse_from_rfc3339(&patched) { let ns = dt @@ -319,7 +310,6 @@ pub(super) fn parse_timestamp_str(s: &str) -> Result { return Ok(ns as u64); } } - // Try naive forms without timezone, treated as UTC. for fmt in &["%Y-%m-%dT%H:%M:%S", "%Y-%m-%dT%H:%M"] { if let Ok(dt) = chrono::NaiveDateTime::parse_from_str(s, fmt) { let ns = dt @@ -339,11 +329,8 @@ pub(super) fn parse_timestamp_str(s: &str) -> Result { fn try_patch_seconds(s: &str) -> Option { let t_pos = s.find('T')?; let after_t = &s[t_pos + 1..]; - // Locate where the time portion ends: 'Z', '+', or '-' (the last '-' - // since the date itself contains dashes; but `after_t` strips those). let tz_offset_in_after_t = after_t.find(['Z', '+', '-'])?; let time_part = &after_t[..tz_offset_in_after_t]; - // Patch only when the time has exactly one colon (HH:MM, no seconds). if time_part.matches(':').count() != 1 { return None; } @@ -438,7 +425,6 @@ fn split_kv_pairs(spec: &str) -> Result, String> { let mut chars = spec.chars().peekable(); loop { - // Skip leading whitespace and separator commas. while matches!(chars.peek(), Some(c) if c.is_whitespace() || *c == ',') { chars.next(); } @@ -446,7 +432,6 @@ fn split_kv_pairs(spec: &str) -> Result, String> { break; } - // key let mut key = String::new(); while let Some(&c) = chars.peek() { if c == '=' || c == ',' { @@ -463,7 +448,6 @@ fn split_kv_pairs(spec: &str) -> Result, String> { return Err(format!("event key {key:?} is missing '='")); } - // value: optionally quoted let mut value = String::new(); if chars.peek() == Some(&'"') { chars.next();