Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
185 changes: 185 additions & 0 deletions crates/dashboard/src/events.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,185 @@
use serde::{Deserialize, Serialize};
use std::collections::BTreeMap;

/// A one-off event annotation attached to a parquet recording. Events mark
/// key moments (restarts, config changes, deploys, anomalies, ...) on top of
/// existing time-series metrics. They are stored as a JSON blob in the
/// parquet footer under [`KEY_EVENTS`](crate::events) and are self-describing
/// — every event carries its own optional `source` / `node` / `instance`
/// scope rather than inheriting from file-level metadata.
///
/// Field semantics:
/// - `timestamp` is nanoseconds since the Unix epoch. Required.
/// - `description` is a short title rendered inline next to the marker.
/// - `kind` is a free-form tag (`restart`, `config_change`, `deploy`,
/// `incident`, `anomaly`, `marker`, `note`, ...). Conventions only, not
/// validated, so users may invent their own without a release.
/// - `details` is longer optional text (e.g. a paragraph of context).
/// - `source` / `node` / `instance` scope the event to a specific
/// recording stream within a (possibly combined) file. When all three are
/// absent the event is global.
/// - `labels` is an open map for arbitrary user tags.
/// - `duration_ns` lets an event span an interval rather than a point —
/// when absent the event renders as a vertical line, when present as a
/// band.
/// - `id` is an optional stable identifier used to dedupe across merges.
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct Event {
pub timestamp: u64,
pub description: String,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub kind: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub details: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub source: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub node: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub instance: Option<String>,
#[serde(default, skip_serializing_if = "BTreeMap::is_empty")]
pub labels: BTreeMap<String, String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub duration_ns: Option<u64>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub id: Option<String>,
}

/// Wrapper for the JSON payload stored in the parquet footer. Lives as an
/// object with a single `events` array so future fields (schema version,
/// global labels) can be added without breaking parsers.
#[derive(Debug, Clone, Serialize, Deserialize, Default, PartialEq, Eq)]
pub struct Events {
#[serde(default)]
pub events: Vec<Event>,
}

impl Events {
pub fn new(events: Vec<Event>) -> Self {
Self { events }
}

/// Sort events by timestamp and drop later duplicates that share the
/// same non-empty `id`. Stable: earlier occurrences of a given id win.
pub fn normalize(&mut self) {
self.events
.sort_by_key(|e| (e.timestamp, e.id.clone().unwrap_or_default()));
let mut seen = std::collections::HashSet::new();
self.events.retain(|e| match &e.id {
Some(id) if !id.is_empty() => seen.insert(id.clone()),
_ => true,
});
}
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn round_trips_minimal_event() {
let json = r#"{"timestamp":1700000000000000000,"description":"restart"}"#;
let e: Event = serde_json::from_str(json).unwrap();
assert_eq!(e.timestamp, 1_700_000_000_000_000_000);
assert_eq!(e.description, "restart");
assert!(e.kind.is_none());
// Optional fields are skipped on serialize
assert_eq!(serde_json::to_string(&e).unwrap(), json);
}

#[test]
fn round_trips_full_event() {
let e = Event {
timestamp: 1,
description: "d".into(),
kind: Some("restart".into()),
details: Some("long".into()),
source: Some("vllm".into()),
node: Some("gpu01".into()),
instance: Some("0".into()),
labels: BTreeMap::from([("reason".into(), "OOM".into())]),
duration_ns: Some(1000),
id: Some("evt-1".into()),
};
let json = serde_json::to_string(&e).unwrap();
let parsed: Event = serde_json::from_str(&json).unwrap();
assert_eq!(parsed, e);
}

#[test]
fn normalize_sorts_and_dedupes_by_id() {
let mut events = Events::new(vec![
Event {
timestamp: 30,
description: "later".into(),
id: Some("a".into()),
kind: None,
details: None,
source: None,
node: None,
instance: None,
labels: BTreeMap::new(),
duration_ns: None,
},
Event {
timestamp: 10,
description: "first".into(),
id: Some("a".into()),
kind: None,
details: None,
source: None,
node: None,
instance: None,
labels: BTreeMap::new(),
duration_ns: None,
},
Event {
timestamp: 20,
description: "no id, kept".into(),
id: None,
kind: None,
details: None,
source: None,
node: None,
instance: None,
labels: BTreeMap::new(),
duration_ns: None,
},
]);
events.normalize();
let descs: Vec<&str> = events
.events
.iter()
.map(|e| e.description.as_str())
.collect();
assert_eq!(descs, vec!["first", "no id, kept"]);
}

#[test]
fn normalize_keeps_all_when_ids_missing_or_empty() {
let make = |ts: u64, id: Option<&str>| Event {
timestamp: ts,
description: ts.to_string(),
id: id.map(str::to_string),
kind: None,
details: None,
source: None,
node: None,
instance: None,
labels: BTreeMap::new(),
duration_ns: None,
};
let mut events = Events::new(vec![make(2, None), make(1, Some("")), make(3, Some(""))]);
events.normalize();
// Empty id is treated as no-id; nothing gets deduped.
assert_eq!(events.events.len(), 3);
assert_eq!(
events
.events
.iter()
.map(|e| e.timestamp)
.collect::<Vec<_>>(),
vec![1, 2, 3]
);
}
}
2 changes: 2 additions & 0 deletions crates/dashboard/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
pub mod dashboard;
pub mod events;
mod plot;
mod service_extension;

pub use events::{Event, Events};
pub use metriken_query::Tsdb;
pub use plot::*;
pub use service_extension::{CategoryExtension, Kpi, ServiceExtension, TemplateRegistry};
Expand Down
78 changes: 77 additions & 1 deletion docs/parquet_metadata.md
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,81 @@ target/release/rezolus parquet combine \
Validation rejects `--pinned <name>` if no rezolus input has a matching `node`
label.

### `events`

JSON document describing one-off events attached to the recording —
restarts, config changes, deploys, incidents, anomalies, free-form
markers. Stored at the top level in both single-source and combined files
because each event is self-describing (it carries its own optional
`source` / `node` / `instance` scope) rather than inheriting from
file-level metadata. Schema lives in
[crates/dashboard/src/events.rs](../crates/dashboard/src/events.rs).

Payload shape (canonical, sorted by `timestamp`, deduped by `id`):

```json
{
"events": [
{
"timestamp": 1778598000000000000,
"description": "vllm restart for new config",
"kind": "restart",
"details": "swapped tensor_parallel_size 2 → 4",
"source": "vllm",
"node": "gpu01",
"instance": "0",
"labels": { "reason": "OOM" },
"duration_ns": 30000000000,
"id": "evt-2026-05-12-001"
}
]
}
```

Only `timestamp` (u64 nanoseconds since the Unix epoch) and `description`
are required. `kind` is free-form — conventional values are `restart`,
`config_change`, `deploy`, `incident`, `anomaly`, `marker`, `note`, but
nothing is validated. `duration_ns` lets an event span an interval rather
than a point; `id` is a stable identifier used to dedupe across merges
(later occurrences are dropped on combine).

**Update with `parquet annotate`:**

```bash
target/release/rezolus parquet annotate file.parquet --add-events events.json

cat events.json | target/release/rezolus parquet annotate file.parquet --add-events -

target/release/rezolus parquet annotate file.parquet \
--event 'time=2026-05-12T15:23Z,kind=restart,description="vllm restart",node=gpu01' \
--event 'time=2026-05-12T16:00Z,kind=marker,description="benchmark start",label.run=ci-42'

target/release/rezolus parquet annotate file.parquet --clear-events

target/release/rezolus parquet annotate file.parquet --clear-events --add-events new.json
```

- `--add-events FILE` accepts JSON (`{"events":[...]}`), a bare JSON array,
a single JSON object, or JSONL. Use `-` to read from stdin.
- `--event KV` is a repeatable inline shorthand
(`key=value,key=value,...`). Quoted values may contain commas and `=`.
Free-form labels go through `label.<name>=<value>`. Aliases: `time`,
`desc`, `duration` (alongside the canonical `timestamp`, `description`,
`duration_ns`).
- `--clear-events` wipes existing events. Combine with `--add-events` /
`--event` to "replace": order within one invocation is **clear → file
events → inline events**.
- Operations are append-by-default. Events are sorted by timestamp on
write; entries whose non-empty `id` collides with an earlier one are
dropped (a warning is printed when this happens during an annotate
invocation).
- Inputs accept RFC3339 timestamps (including the seconds-omitted short
form `2026-05-12T15:23Z`) and humantime durations (`30s`, `1m30s`);
canonical storage is always `u64` nanoseconds.

`--add-events` / `--event` / `--clear-events` do not trigger the
service-template flow.

## `per_source_metadata`

Top-level key only used in combined files. Value is a nested JSON object:
Expand Down Expand Up @@ -280,7 +355,7 @@ mutators are:
| Tool | What it can change |
|------|--------------------|
| `rezolus record --node`, `--instance`, `--metadata k=v` | Anything written at recording time. The catch-all `--metadata` can set any top-level key. |
| `rezolus parquet annotate` | Adds/replaces/removes top-level `service_queries`; with `--node NAME` sets/replaces top-level `node`; with `--source NAME` (`--overwrite` to replace) sets/replaces top-level `source`; with `--systeminfo PATH` (or `-` for stdin) sets/replaces top-level `systeminfo`. |
| `rezolus parquet annotate` | Adds/replaces/removes top-level `service_queries`; with `--node NAME` sets/replaces top-level `node`; with `--source NAME` (`--overwrite` to replace) sets/replaces top-level `source`; with `--systeminfo PATH` (or `-` for stdin) sets/replaces top-level `systeminfo`; with `--add-events PATH` / `--event KV` / `--clear-events` appends, inserts, or wipes one-off `events`. |
| `rezolus parquet combine --pinned` | Sets `pinned_node` on the output. |
| `rezolus parquet combine` | Merges and re-derives `source`, `descriptions`, `per_source_metadata`, etc. from the inputs. |

Expand Down Expand Up @@ -411,6 +486,7 @@ target/release/rezolus parquet metadata -i combined.parquet --field pinned_node
| `service_queries` | **Removed from top level.** Moves to `per_source_metadata.<source>.<id>.service_queries` so each instance can carry its own KPI definitions. |
| `pinned_node` | New — added by `--pinned web01`. Validated against the actual rezolus nodes seen in the inputs. |
| `selection` | Preserved from the primary (rezolus) input if present; otherwise dropped. |
| `events` | Concatenated across all inputs, sorted by timestamp, deduped by stable `id`. Stays at the top level — each event is self-describing via its own `source`/`node`/`instance` fields. |
| `per_source_metadata` | Deep-merged. Pre-existing entries from already-combined inputs are preserved; new sub-entries are added per node/instance. |

Schema changes alongside the metadata: every metric column is renamed to
Expand Down
10 changes: 10 additions & 0 deletions src/parquet_metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,3 +95,13 @@ pub const NESTED_INSTANCE: &str = "instance";
/// The default rezolus node to display when the viewer opens a combined
/// file with multiple nodes. Set by `parquet combine --pinned <node>`.
pub const KEY_PINNED_NODE: &str = "pinned_node";

// ── One-off event annotations ───────────────────────────────────────

/// JSON payload of one-off events attached to the recording (restarts,
/// config changes, anomalies, ...). Value is an object `{"events": [...]}`
/// where each entry conforms to `dashboard::events::Event`. Each event
/// carries its own optional `source` / `node` / `instance` scope rather
/// than inheriting from file-level metadata, so the payload stays
/// self-describing across combine.
pub const KEY_EVENTS: &str = "events";
24 changes: 21 additions & 3 deletions src/parquet_tools/annotate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,17 @@ pub(super) fn run(args: &ArgMatches, registry: &TemplateRegistry) {
let sysinfo_path = args.get_one::<PathBuf>("systeminfo");
let overwrite = args.get_flag("overwrite");

let event_files: Vec<&Path> = args
.get_many::<PathBuf>("add-events")
.map(|it| it.map(PathBuf::as_path).collect())
.unwrap_or_default();
let inline_events: Vec<String> = args
.get_many::<String>("event")
.map(|it| it.cloned().collect())
.unwrap_or_default();
let clear_events = args.get_flag("clear-events");
let events_requested = clear_events || !event_files.is_empty() || !inline_events.is_empty();

if let Some(n) = node {
set_node_metadata(path, n).unwrap_or_else(|e| {
eprintln!("error: failed to set node metadata: {e}");
Expand All @@ -42,11 +53,18 @@ pub(super) fn run(args: &ArgMatches, registry: &TemplateRegistry) {
println!("Set source={:?} on {:?}", src, path);
}

if events_requested {
super::events::run(path, &event_files, &inline_events, clear_events).unwrap_or_else(|e| {
eprintln!("error: {e}");
std::process::exit(1);
});
}

let custom_file = args.get_one::<PathBuf>("queries");

// If only individual edits (--node/--source/--systeminfo) were requested,
// don't also auto-apply a default service template.
if (node.is_some() || new_source.is_some() || sysinfo_path.is_some())
// If only individual edits (--node/--source/--systeminfo/events) were
// requested, don't also auto-apply a default service template.
if (node.is_some() || new_source.is_some() || sysinfo_path.is_some() || events_requested)
&& custom_file.is_none()
&& !args.get_flag("filter")
{
Expand Down
26 changes: 26 additions & 0 deletions src/parquet_tools/combine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -880,6 +880,32 @@ fn merge_metadata(inputs: &[InputFile]) -> Result<Vec<KeyValue>, Box<dyn std::er
});
}

// Events are self-describing (each carries its own source/node/instance),
// so they stay at the top level rather than nesting under per_source_metadata.
let mut events = ::dashboard::Events::default();
for input in inputs {
if let Some(raw) = input
.kv_metadata
.iter()
.find(|kv| kv.key == KEY_EVENTS)
.and_then(|kv| kv.value.as_deref())
{
match serde_json::from_str::<::dashboard::Events>(raw) {
Ok(mut e) => events.events.append(&mut e.events),
Err(e) => {
eprintln!("warning: ignoring malformed events payload in input file: {e}")
}
}
}
}
if !events.events.is_empty() {
events.normalize();
result.push(KeyValue {
key: KEY_EVENTS.to_string(),
value: Some(serde_json::to_string(&events)?),
});
}

Ok(result)
}

Expand Down
Loading
Loading