From 5d8a27a9bce221fb6ad8e99ad44dd78541ada468 Mon Sep 17 00:00:00 2001 From: Stephen Wakely Date: Wed, 24 Jun 2026 11:13:34 +0100 Subject: [PATCH 1/3] feat(metrics): extract V3 columnar codec into standalone datadog-metrics-v3 crate Extract the pure-protocol V3 codec (interner, types, writer, serializer) from lib/saluki-components into a zero-dependency lib/datadog-metrics-v3 crate. The serializer is reimplemented using raw protobuf wire-format encoding with no external deps. saluki-components now re-exports from this crate. --- Cargo.lock | 5 + Cargo.toml | 2 + lib/datadog-metrics-v3/Cargo.toml | 9 + lib/datadog-metrics-v3/src/interner.rs | 94 +++ lib/datadog-metrics-v3/src/lib.rs | 8 + lib/datadog-metrics-v3/src/serializer.rs | 324 ++++++++++ lib/datadog-metrics-v3/src/types.rs | 155 +++++ lib/datadog-metrics-v3/src/writer.rs | 589 ++++++++++++++++++ lib/saluki-components/Cargo.toml | 1 + .../src/encoders/datadog/metrics/mod.rs | 6 + .../encoders/datadog/metrics/v3/encoder.rs | 219 +++++++ .../encoders/datadog/metrics/v3/interner.rs | 94 +++ .../src/encoders/datadog/metrics/v3/mod.rs | 15 + .../encoders/datadog/metrics/v3/serializer.rs | 315 ++++++++++ .../src/encoders/datadog/metrics/v3/types.rs | 155 +++++ .../src/encoders/datadog/metrics/v3/writer.rs | 589 ++++++++++++++++++ 16 files changed, 2580 insertions(+) create mode 100644 lib/datadog-metrics-v3/Cargo.toml create mode 100644 lib/datadog-metrics-v3/src/interner.rs create mode 100644 lib/datadog-metrics-v3/src/lib.rs create mode 100644 lib/datadog-metrics-v3/src/serializer.rs create mode 100644 lib/datadog-metrics-v3/src/types.rs create mode 100644 lib/datadog-metrics-v3/src/writer.rs create mode 100644 lib/saluki-components/src/encoders/datadog/metrics/v3/encoder.rs create mode 100644 lib/saluki-components/src/encoders/datadog/metrics/v3/interner.rs create mode 100644 lib/saluki-components/src/encoders/datadog/metrics/v3/mod.rs create mode 100644 lib/saluki-components/src/encoders/datadog/metrics/v3/serializer.rs create mode 100644 lib/saluki-components/src/encoders/datadog/metrics/v3/types.rs create mode 100644 lib/saluki-components/src/encoders/datadog/metrics/v3/writer.rs diff --git a/Cargo.lock b/Cargo.lock index 33f952ab75a..5f58d238b95 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1173,6 +1173,10 @@ dependencies = [ "tracing-subscriber", ] +[[package]] +name = "datadog-metrics-v3" +version = "0.1.0" + [[package]] name = "datadog-protos" version = "0.1.0" @@ -4263,6 +4267,7 @@ dependencies = [ "bytesize", "chrono", "datadog-agent-config-testing", + "datadog-metrics-v3", "datadog-protos", "ddsketch", "derive-where", diff --git a/Cargo.toml b/Cargo.toml index 6d5bde6f1b1..dc9130d287e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -10,6 +10,7 @@ members = [ "lib/datadog-agent/config", "lib/datadog-agent/config-overlay-model", "lib/datadog-agent/config-testing", + "lib/datadog-metrics-v3", "lib/ddsketch", "lib/ottl", "lib/process-memory", @@ -43,6 +44,7 @@ license = "Apache-2.0" repository = "https://github.com/DataDog/saluki" [workspace.dependencies] +datadog-metrics-v3 = { path = "lib/datadog-metrics-v3" } datadog-agent-commons = { path = "lib/datadog-agent/commons" } datadog-agent-config = { path = "lib/datadog-agent/config" } datadog-agent-config-overlay-model = { path = "lib/datadog-agent/config-overlay-model" } diff --git a/lib/datadog-metrics-v3/Cargo.toml b/lib/datadog-metrics-v3/Cargo.toml new file mode 100644 index 00000000000..a6404d8b5d8 --- /dev/null +++ b/lib/datadog-metrics-v3/Cargo.toml @@ -0,0 +1,9 @@ +[package] +name = "datadog-metrics-v3" +version = "0.1.0" +edition = "2021" +description = "V3 columnar protobuf codec for Datadog metrics — zero external dependencies" +license = "Apache-2.0" + +[dependencies] +# intentionally empty — only std required diff --git a/lib/datadog-metrics-v3/src/interner.rs b/lib/datadog-metrics-v3/src/interner.rs new file mode 100644 index 00000000000..5e8968cd093 --- /dev/null +++ b/lib/datadog-metrics-v3/src/interner.rs @@ -0,0 +1,94 @@ +//! Generic interning for dictionary deduplication. + +use std::{collections::HashMap, hash::Hash}; + +/// Generic interning structure for dictionary deduplication. +/// +/// Assigns unique 1-based IDs to values, returning the same ID for duplicate values. +/// ID 0 is reserved for "empty/none" in the V3 format. +#[derive(Debug)] +pub struct Interner { + index: HashMap, + last_id: i64, +} + +impl Default for Interner { + fn default() -> Self { + Self::new() + } +} + +impl Interner { + /// Creates a new empty interner. + pub fn new() -> Self { + Self { + index: HashMap::new(), + last_id: 0, + } + } + + /// Gets the ID for a key, inserting it if not present. + /// + /// Returns `(id, is_new)` where `is_new` is true if the key was newly inserted. + /// IDs are 1-based (0 is reserved for empty/none values). + pub fn get_or_insert(&mut self, key: K) -> (i64, bool) { + if let Some(&id) = self.index.get(&key) { + (id, false) + } else { + self.last_id += 1; + self.index.insert(key, self.last_id); + (self.last_id, true) + } + } + + /// Returns the number of interned values. + #[allow(dead_code)] + pub fn len(&self) -> usize { + self.index.len() + } + + /// Returns true if no values have been interned. + #[allow(dead_code)] + pub fn is_empty(&self) -> bool { + self.index.is_empty() + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_interner_basic() { + let mut interner: Interner = Interner::new(); + + // First insertion returns ID 1 and is_new=true + let (id1, is_new1) = interner.get_or_insert("hello".to_string()); + assert_eq!(id1, 1); + assert!(is_new1); + + // Second insertion of same value returns same ID and is_new=false + let (id2, is_new2) = interner.get_or_insert("hello".to_string()); + assert_eq!(id2, 1); + assert!(!is_new2); + + // New value gets next ID + let (id3, is_new3) = interner.get_or_insert("world".to_string()); + assert_eq!(id3, 2); + assert!(is_new3); + + assert_eq!(interner.len(), 2); + } + + #[test] + fn test_interner_tuples() { + let mut interner: Interner<(i32, i32, i32)> = Interner::new(); + + let (id1, _) = interner.get_or_insert((1, 2, 3)); + let (id2, _) = interner.get_or_insert((1, 2, 3)); + let (id3, _) = interner.get_or_insert((4, 5, 6)); + + assert_eq!(id1, id2); + assert_ne!(id1, id3); + } +} diff --git a/lib/datadog-metrics-v3/src/lib.rs b/lib/datadog-metrics-v3/src/lib.rs new file mode 100644 index 00000000000..39be757671b --- /dev/null +++ b/lib/datadog-metrics-v3/src/lib.rs @@ -0,0 +1,8 @@ +mod interner; +mod serializer; +mod types; +mod writer; + +pub use serializer::serialize_v3_payload; +pub use types::{V3MetricType, V3ValueType}; +pub use writer::{V3EncodedData, V3MetricBuilder, V3Writer}; diff --git a/lib/datadog-metrics-v3/src/serializer.rs b/lib/datadog-metrics-v3/src/serializer.rs new file mode 100644 index 00000000000..3a0350aa134 --- /dev/null +++ b/lib/datadog-metrics-v3/src/serializer.rs @@ -0,0 +1,324 @@ +//! V3 payload protobuf serialization using raw wire-format encoding. +//! +//! No external dependencies — implements the protobuf wire format subset needed +//! by the V3 MetricData message directly. + +use super::writer::V3EncodedData; + +// ── Protobuf wire types ────────────────────────────────────────────────────── + +const WIRE_VARINT: u32 = 0; +const WIRE_LEN: u32 = 2; + +// ── Field numbers (from payload_v3.proto) ──────────────────────────────────── + +mod field { + pub const DICT_NAME_STR: u32 = 1; + pub const DICT_TAGS_STR: u32 = 2; + pub const DICT_TAGSETS: u32 = 3; + pub const DICT_RESOURCE_STR: u32 = 4; + pub const DICT_RESOURCE_LEN: u32 = 5; + pub const DICT_RESOURCE_TYPE: u32 = 6; + pub const DICT_RESOURCE_NAME: u32 = 7; + pub const DICT_SOURCE_TYPE_NAME: u32 = 8; + pub const DICT_ORIGIN_INFO: u32 = 9; + pub const TYPES: u32 = 10; + pub const NAMES: u32 = 11; + pub const TAGS: u32 = 12; + pub const RESOURCES: u32 = 13; + pub const INTERVALS: u32 = 14; + pub const NUM_POINTS: u32 = 15; + pub const TIMESTAMPS: u32 = 16; + pub const VALS_SINT64: u32 = 17; + pub const VALS_FLOAT32: u32 = 18; + pub const VALS_FLOAT64: u32 = 19; + pub const SKETCH_NUM_BINS: u32 = 20; + pub const SKETCH_BIN_KEYS: u32 = 21; + pub const SKETCH_BIN_CNTS: u32 = 22; + pub const SOURCE_TYPE_NAME: u32 = 23; + pub const ORIGIN_INFO: u32 = 24; +} + +// ── Varint primitives ──────────────────────────────────────────────────────── + +fn write_varint(buf: &mut Vec, mut value: u64) { + loop { + let byte = (value & 0x7f) as u8; + value >>= 7; + if value == 0 { + buf.push(byte); + return; + } + buf.push(byte | 0x80); + } +} + +fn varint_len(mut value: u64) -> usize { + if value == 0 { + return 1; + } + let mut n = 0; + while value > 0 { + value >>= 7; + n += 1; + } + n +} + +#[inline] +fn zigzag64(v: i64) -> u64 { + ((v << 1) ^ (v >> 63)) as u64 +} + +#[inline] +fn zigzag32(v: i32) -> u32 { + ((v << 1) ^ (v >> 31)) as u32 +} + +fn write_tag(buf: &mut Vec, field: u32, wire: u32) { + write_varint(buf, ((field as u64) << 3) | (wire as u64)); +} + +// ── Field writers ──────────────────────────────────────────────────────────── + +/// Writes a `bytes` field (wire type 2). No-op if data is empty. +fn write_bytes_field(buf: &mut Vec, field: u32, data: &[u8]) { + if data.is_empty() { + return; + } + write_tag(buf, field, WIRE_LEN); + write_varint(buf, data.len() as u64); + buf.extend_from_slice(data); +} + +/// Writes a packed repeated `sint64` field (zigzag-encoded varints). +fn write_packed_sint64(buf: &mut Vec, field: u32, values: &[i64]) { + if values.is_empty() { + return; + } + let size: usize = values.iter().map(|&v| varint_len(zigzag64(v))).sum(); + write_tag(buf, field, WIRE_LEN); + write_varint(buf, size as u64); + for &v in values { + write_varint(buf, zigzag64(v)); + } +} + +/// Writes a packed repeated `int64` field (standard unsigned varint, no zigzag). +fn write_packed_int64(buf: &mut Vec, field: u32, values: &[i64]) { + if values.is_empty() { + return; + } + let size: usize = values.iter().map(|&v| varint_len(v as u64)).sum(); + write_tag(buf, field, WIRE_LEN); + write_varint(buf, size as u64); + for &v in values { + write_varint(buf, v as u64); + } +} + +/// Writes a packed repeated `uint64` field. +fn write_packed_uint64(buf: &mut Vec, field: u32, values: &[u64]) { + if values.is_empty() { + return; + } + let size: usize = values.iter().map(|&v| varint_len(v)).sum(); + write_tag(buf, field, WIRE_LEN); + write_varint(buf, size as u64); + for &v in values { + write_varint(buf, v); + } +} + +/// Writes a packed repeated `int32` field (sign-extended to unsigned varint). +fn write_packed_int32(buf: &mut Vec, field: u32, values: &[i32]) { + if values.is_empty() { + return; + } + // int32 wire format: negative values sign-extend to 10 bytes; cast to u64. + let size: usize = values.iter().map(|&v| varint_len(v as i64 as u64)).sum(); + write_tag(buf, field, WIRE_LEN); + write_varint(buf, size as u64); + for &v in values { + write_varint(buf, v as i64 as u64); + } +} + +/// Writes a packed repeated `sint32` field (zigzag-encoded). +fn write_packed_sint32(buf: &mut Vec, field: u32, values: &[i32]) { + if values.is_empty() { + return; + } + let size: usize = values.iter().map(|&v| varint_len(zigzag32(v) as u64)).sum(); + write_tag(buf, field, WIRE_LEN); + write_varint(buf, size as u64); + for &v in values { + write_varint(buf, zigzag32(v) as u64); + } +} + +/// Writes a packed repeated `uint32` field. +fn write_packed_uint32(buf: &mut Vec, field: u32, values: &[u32]) { + if values.is_empty() { + return; + } + let size: usize = values.iter().map(|&v| varint_len(v as u64)).sum(); + write_tag(buf, field, WIRE_LEN); + write_varint(buf, size as u64); + for &v in values { + write_varint(buf, v as u64); + } +} + +/// Writes a packed repeated `float` field — 4 bytes little-endian each. +fn write_packed_float(buf: &mut Vec, field: u32, values: &[f32]) { + if values.is_empty() { + return; + } + write_tag(buf, field, WIRE_LEN); + write_varint(buf, (values.len() * 4) as u64); + for &v in values { + buf.extend_from_slice(&v.to_le_bytes()); + } +} + +/// Writes a packed repeated `double` field — 8 bytes little-endian each. +fn write_packed_double(buf: &mut Vec, field: u32, values: &[f64]) { + if values.is_empty() { + return; + } + write_tag(buf, field, WIRE_LEN); + write_varint(buf, (values.len() * 8) as u64); + for &v in values { + buf.extend_from_slice(&v.to_le_bytes()); + } +} + +// ── Public API ──────────────────────────────────────────────────────────────── + +/// Serializes [`V3EncodedData`] to protobuf wire format. +/// +/// The output conforms to the `MetricData` message in `payload_v3.proto`. +/// Fields are written in ascending field-number order as the spec requires. +pub fn serialize_v3_payload(data: &V3EncodedData, output: &mut Vec) { + // Dictionary fields + write_bytes_field(output, field::DICT_NAME_STR, &data.dict_name_bytes); + write_bytes_field(output, field::DICT_TAGS_STR, &data.dict_tags_bytes); + write_packed_sint64(output, field::DICT_TAGSETS, &data.dict_tagsets); + write_bytes_field(output, field::DICT_RESOURCE_STR, &data.dict_resource_str_bytes); + write_packed_int64(output, field::DICT_RESOURCE_LEN, &data.dict_resource_len); + write_packed_sint64(output, field::DICT_RESOURCE_TYPE, &data.dict_resource_type); + write_packed_sint64(output, field::DICT_RESOURCE_NAME, &data.dict_resource_name); + write_bytes_field(output, field::DICT_SOURCE_TYPE_NAME, &data.dict_source_type_bytes); + write_packed_int32(output, field::DICT_ORIGIN_INFO, &data.dict_origin_info); + // Per-metric columns + write_packed_uint64(output, field::TYPES, &data.types); + write_packed_sint64(output, field::NAMES, &data.names); + write_packed_sint64(output, field::TAGS, &data.tags); + write_packed_sint64(output, field::RESOURCES, &data.resources); + write_packed_uint64(output, field::INTERVALS, &data.intervals); + write_packed_uint64(output, field::NUM_POINTS, &data.num_points); + // Point data + write_packed_sint64(output, field::TIMESTAMPS, &data.timestamps); + write_packed_sint64(output, field::VALS_SINT64, &data.vals_sint64); + write_packed_float(output, field::VALS_FLOAT32, &data.vals_float32); + write_packed_double(output, field::VALS_FLOAT64, &data.vals_float64); + // Sketch data + write_packed_uint64(output, field::SKETCH_NUM_BINS, &data.sketch_num_bins); + write_packed_sint32(output, field::SKETCH_BIN_KEYS, &data.sketch_bin_keys); + write_packed_uint32(output, field::SKETCH_BIN_CNTS, &data.sketch_bin_cnts); + // Additional per-metric columns (higher field numbers, written after point data) + write_packed_sint64(output, field::SOURCE_TYPE_NAME, &data.source_type_names); + write_packed_sint64(output, field::ORIGIN_INFO, &data.origin_infos); +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::{V3MetricType, V3Writer}; + + #[test] + fn test_serialize_empty_is_empty() { + let data = V3EncodedData::default(); + let mut output = Vec::new(); + serialize_v3_payload(&data, &mut output); + assert!(output.is_empty()); + } + + #[test] + fn test_serialize_basic_gauge() { + let mut writer = V3Writer::new(); + let mut m = writer.write(V3MetricType::Gauge, "test.metric"); + m.add_point(1000, 42.0); + m.close(); + let data = writer.close(); + let mut output = Vec::new(); + serialize_v3_payload(&data, &mut output); + assert!(!output.is_empty()); + // Field 10 (TYPES) tag = (10 << 3) | 2 = 82 = 0x52 + assert!(output.contains(&0x52)); + } + + #[test] + fn test_varint_encoding() { + let mut buf = Vec::new(); + write_varint(&mut buf, 0); + assert_eq!(buf, [0x00]); + buf.clear(); + write_varint(&mut buf, 127); + assert_eq!(buf, [0x7f]); + buf.clear(); + write_varint(&mut buf, 128); + assert_eq!(buf, [0x80, 0x01]); + buf.clear(); + write_varint(&mut buf, 300); + assert_eq!(buf, [0xac, 0x02]); + } + + #[test] + fn test_zigzag64() { + assert_eq!(zigzag64(0), 0); + assert_eq!(zigzag64(-1), 1); + assert_eq!(zigzag64(1), 2); + assert_eq!(zigzag64(-2), 3); + assert_eq!(zigzag64(2147483647), 4294967294); + assert_eq!(zigzag64(-2147483648), 4294967295); + } + + #[test] + fn test_zigzag32() { + assert_eq!(zigzag32(0), 0); + assert_eq!(zigzag32(-1), 1); + assert_eq!(zigzag32(1), 2); + assert_eq!(zigzag32(-2), 3); + } + + #[test] + fn test_packed_sint64_roundtrip_length() { + let values: Vec = vec![0, 1, -1, 100, -100, i64::MAX / 2]; + let mut buf = Vec::new(); + write_packed_sint64(&mut buf, 1, &values); + // Tag (1 << 3 | 2 = 0x0a) + length varint + encoded values + assert!(buf.len() > 2); + assert_eq!(buf[0], 0x0a); // field 1, wire type 2 + } + + #[test] + fn test_float_little_endian() { + let values = vec![1.0f32]; + let mut buf = Vec::new(); + write_packed_float(&mut buf, 1, &values); + // 1.0f32 = 0x3f800000; LE bytes = [0x00, 0x00, 0x80, 0x3f] + let payload = &buf[2..]; // skip tag + length + assert_eq!(payload, &[0x00, 0x00, 0x80, 0x3f]); + } + + #[test] + fn test_double_little_endian() { + let values = vec![1.0f64]; + let mut buf = Vec::new(); + write_packed_double(&mut buf, 1, &values); + let payload = &buf[2..]; // skip tag + length + assert_eq!(payload, &1.0f64.to_le_bytes()[..]); + } +} diff --git a/lib/datadog-metrics-v3/src/types.rs b/lib/datadog-metrics-v3/src/types.rs new file mode 100644 index 00000000000..3ffbcb97799 --- /dev/null +++ b/lib/datadog-metrics-v3/src/types.rs @@ -0,0 +1,155 @@ +//! V3 payload type definitions and protocol buffer field numbers. + +/// Protocol buffer field numbers for MetricData message. +/// +/// These correspond to the field numbers in `payload_v3.proto`. +pub mod field_numbers { + // Dictionary fields + pub const DICT_NAME_STR: u32 = 1; + pub const DICT_TAGS_STR: u32 = 2; + pub const DICT_TAGSETS: u32 = 3; + pub const DICT_RESOURCE_STR: u32 = 4; + pub const DICT_RESOURCE_LEN: u32 = 5; + pub const DICT_RESOURCE_TYPE: u32 = 6; + pub const DICT_RESOURCE_NAME: u32 = 7; + pub const DICT_SOURCE_TYPE_NAME: u32 = 8; + pub const DICT_ORIGIN_INFO: u32 = 9; + + // Per-metric columns + pub const TYPES: u32 = 10; + pub const NAMES: u32 = 11; + pub const TAGS: u32 = 12; + pub const RESOURCES: u32 = 13; + pub const INTERVALS: u32 = 14; + pub const NUM_POINTS: u32 = 15; + + // Point data + pub const TIMESTAMPS: u32 = 16; + pub const VALS_SINT64: u32 = 17; + pub const VALS_FLOAT32: u32 = 18; + pub const VALS_FLOAT64: u32 = 19; + + // Sketch data + pub const SKETCH_NUM_BINS: u32 = 20; + pub const SKETCH_BIN_KEYS: u32 = 21; + pub const SKETCH_BIN_CNTS: u32 = 22; + + // Additional per-metric columns + pub const SOURCE_TYPE_NAME: u32 = 23; + pub const ORIGIN_INFO: u32 = 24; +} + +/// V3 metric type values. +/// +/// These match the `metricType` enum in `payload_v3.proto`. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +#[repr(u8)] +pub enum V3MetricType { + Count = 1, + Rate = 2, + Gauge = 3, + Sketch = 4, +} + +impl V3MetricType { + /// Returns the numeric value for encoding in the types column. + pub fn as_u64(self) -> u64 { + self as u64 + } +} + +/// V3 value type values. +/// +/// These are encoded in bits 4-7 of the types column and indicate which +/// value array contains the metric's points. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +#[repr(u8)] +pub enum V3ValueType { + /// Value is zero, not stored explicitly. + Zero = 0x00, + /// Value is stored in vals_sint64. + Sint64 = 0x10, + /// Value is stored in vals_float32. + Float32 = 0x20, + /// Value is stored in vals_float64. + Float64 = 0x30, +} + +impl V3ValueType { + /// Returns the numeric value for encoding in the types column. + pub fn as_u64(self) -> u64 { + self as u64 + } + + /// Determines the best value type for a given f64 value. + /// + /// Prefers smaller representations when lossless: + /// - Zero for 0.0 + /// - Sint64 for integers that fit in 49 bits + /// - Float32 for values representable as f32 + /// - Float64 otherwise + pub fn for_value(v: f64) -> Self { + if v == 0.0 { + return Self::Zero; + } + + // Varint range that fits in 7 bytes or less (49 bits) + const VARINT_WIDTH: i32 = 7 * 7 - 1; + const MAX_INT: i64 = 1 << VARINT_WIDTH; + const MIN_INT: i64 = -MAX_INT; + + let i = v as i64; + if (MIN_INT..MAX_INT).contains(&i) && (i as f64) == v { + return Self::Sint64; + } + + if (v as f32 as f64) == v { + return Self::Float32; + } + + Self::Float64 + } + + /// Returns the maximum (largest encoding) of two value types. + pub fn max(self, other: Self) -> Self { + if (other as u8) > (self as u8) { + other + } else { + self + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_value_type_for_value() { + assert_eq!(V3ValueType::for_value(0.0), V3ValueType::Zero); + assert_eq!(V3ValueType::for_value(100.0), V3ValueType::Sint64); + assert_eq!(V3ValueType::for_value(-100.0), V3ValueType::Sint64); + assert_eq!(V3ValueType::for_value(1.5), V3ValueType::Float32); + assert_eq!(V3ValueType::for_value(2.75), V3ValueType::Float32); + + // Large integers that don't fit in 49 bits AND can't be exactly represented in f32 + // Powers of 2 like (1 << 50) can be exactly represented in f32, so we add 1 + // to make it an odd number that requires more precision than f32 provides + let large = ((1i64 << 50) + 1) as f64; + assert_eq!(V3ValueType::for_value(large), V3ValueType::Float64); + + // Values that require f64 precision - use PI which has more precision than f32 can hold + // and isn't an integer, so it won't be stored as Sint64 + let pi = std::f64::consts::PI; + // PI requires full f64 precision to store exactly + assert_eq!(V3ValueType::for_value(pi), V3ValueType::Float64); + } + + #[test] + fn test_value_type_max() { + assert_eq!(V3ValueType::Zero.max(V3ValueType::Sint64), V3ValueType::Sint64); + assert_eq!(V3ValueType::Sint64.max(V3ValueType::Float32), V3ValueType::Float32); + assert_eq!(V3ValueType::Float32.max(V3ValueType::Float64), V3ValueType::Float64); + assert_eq!(V3ValueType::Float64.max(V3ValueType::Zero), V3ValueType::Float64); + } +} diff --git a/lib/datadog-metrics-v3/src/writer.rs b/lib/datadog-metrics-v3/src/writer.rs new file mode 100644 index 00000000000..315707590d3 --- /dev/null +++ b/lib/datadog-metrics-v3/src/writer.rs @@ -0,0 +1,589 @@ +//! V3 columnar metrics writer. +//! +//! The [`V3Writer`] accumulates metrics in columnar format with dictionary deduplication, +//! then produces [`V3EncodedData`] ready for protobuf serialization. + +use super::interner::Interner; +use super::types::{V3MetricType, V3ValueType}; + +/// Appends a varint-length-prefixed string to the destination buffer. +fn append_len_str(dst: &mut Vec, s: &str) { + let mut len = s.len() as u64; + loop { + let mut byte = (len & 0x7F) as u8; + len >>= 7; + if len != 0 { + byte |= 0x80; + } + dst.push(byte); + if len == 0 { + break; + } + } + dst.extend_from_slice(s.as_bytes()); +} + +/// Delta-encodes a slice in place, working backwards. +/// +/// After encoding, `s[i]` contains the difference `s[i] - s[i-1]`. +pub fn delta_encode(s: &mut [i64]) { + if s.len() < 2 { + return; + } + for i in (1..s.len()).rev() { + s[i] -= s[i - 1]; + } +} + +/// Delta-encodes i32 values in place, working backwards. +pub fn delta_encode_i32(s: &mut [i32]) { + if s.len() < 2 { + return; + } + for i in (1..s.len()).rev() { + s[i] -= s[i - 1]; + } +} + +/// Encoded V3 payload data ready for protobuf serialization. +#[derive(Debug, Default)] +pub struct V3EncodedData { + // Dictionary encoded bytes (varint-length-prefixed strings) + pub dict_name_bytes: Vec, + pub dict_tags_bytes: Vec, + pub dict_tagsets: Vec, + pub dict_resource_str_bytes: Vec, + pub dict_resource_len: Vec, + pub dict_resource_type: Vec, + pub dict_resource_name: Vec, + pub dict_source_type_bytes: Vec, + pub dict_origin_info: Vec, + + // Per-metric columns (one entry per metric) + pub types: Vec, + pub names: Vec, + pub tags: Vec, + pub resources: Vec, + pub intervals: Vec, + pub num_points: Vec, + pub source_type_names: Vec, + pub origin_infos: Vec, + + // Point data (varies per metric based on num_points) + pub timestamps: Vec, + pub vals_sint64: Vec, + pub vals_float32: Vec, + pub vals_float64: Vec, + + // Sketch data + pub sketch_num_bins: Vec, + pub sketch_bin_keys: Vec, + pub sketch_bin_cnts: Vec, +} + +/// V3 columnar metrics writer. +/// +/// Accumulates metrics in columnar format with dictionary deduplication. +/// Call [`V3Writer::write`] for each metric, then [`V3Writer::close`] to finalize +/// and get the encoded data. +#[derive(Debug, Default)] +pub struct V3Writer { + // Interners for dictionary deduplication + name_interner: Interner, + tag_interner: Interner, + tagset_interner: Interner>, + resource_str_interner: Interner, + resource_interner: Interner>, + source_type_interner: Interner, + origin_interner: Interner<(i32, i32, i32)>, + + // Dictionary encoded bytes + dict_name_bytes: Vec, + dict_tags_bytes: Vec, + dict_tagsets: Vec, + dict_resource_str_bytes: Vec, + dict_resource_len: Vec, + dict_resource_type: Vec, + dict_resource_name: Vec, + dict_source_type_bytes: Vec, + dict_origin_info: Vec, + + // Per-metric columns + types: Vec, + names: Vec, + tags: Vec, + resources: Vec, + intervals: Vec, + num_points: Vec, + source_type_names: Vec, + origin_infos: Vec, + + // Point data + timestamps: Vec, + vals_sint64: Vec, + vals_float32: Vec, + vals_float64: Vec, + + // Sketch data + sketch_num_bins: Vec, + sketch_bin_keys: Vec, + sketch_bin_cnts: Vec, +} + +impl V3Writer { + /// Creates a new V3 writer. + pub fn new() -> Self { + Self::default() + } + + /// Begins writing a new metric. + /// + /// Returns a [`V3MetricBuilder`] that must be used to set the metric's + /// properties and add points, then closed with [`V3MetricBuilder::close`]. + pub fn write(&mut self, metric_type: V3MetricType, name: &str) -> V3MetricBuilder<'_> { + let name_id = self.intern_name(name); + let metric_idx = self.types.len(); + let point_start_idx = self.vals_float64.len(); + + // Initialize the per-metric columns with default values + self.types.push(metric_type.as_u64()); + self.names.push(name_id); + self.tags.push(0); + self.resources.push(0); + self.intervals.push(0); + self.num_points.push(0); + self.source_type_names.push(0); + self.origin_infos.push(0); + + V3MetricBuilder { + writer: self, + point_start_idx, + metric_idx, + } + } + + /// Finalizes the writer and returns the encoded data. + /// + /// This performs delta encoding on all index arrays. + pub fn close(mut self) -> V3EncodedData { + // Delta-encode all the index arrays + delta_encode(&mut self.names); + delta_encode(&mut self.tags); + delta_encode(&mut self.resources); + delta_encode(&mut self.source_type_names); + delta_encode(&mut self.origin_infos); + delta_encode(&mut self.timestamps); + + V3EncodedData { + dict_name_bytes: self.dict_name_bytes, + dict_tags_bytes: self.dict_tags_bytes, + dict_tagsets: self.dict_tagsets, + dict_resource_str_bytes: self.dict_resource_str_bytes, + dict_resource_len: self.dict_resource_len, + dict_resource_type: self.dict_resource_type, + dict_resource_name: self.dict_resource_name, + dict_source_type_bytes: self.dict_source_type_bytes, + dict_origin_info: self.dict_origin_info, + types: self.types, + names: self.names, + tags: self.tags, + resources: self.resources, + intervals: self.intervals, + num_points: self.num_points, + source_type_names: self.source_type_names, + origin_infos: self.origin_infos, + timestamps: self.timestamps, + vals_sint64: self.vals_sint64, + vals_float32: self.vals_float32, + vals_float64: self.vals_float64, + sketch_num_bins: self.sketch_num_bins, + sketch_bin_keys: self.sketch_bin_keys, + sketch_bin_cnts: self.sketch_bin_cnts, + } + } + + // Internal helper methods + + fn intern_name(&mut self, name: &str) -> i64 { + if name.is_empty() { + return 0; + } + let (id, is_new) = self.name_interner.get_or_insert(name.to_string()); + if is_new { + append_len_str(&mut self.dict_name_bytes, name); + } + id + } + + fn intern_tag(&mut self, tag: &str) -> i64 { + if tag.is_empty() { + return 0; + } + let (id, is_new) = self.tag_interner.get_or_insert(tag.to_string()); + if is_new { + append_len_str(&mut self.dict_tags_bytes, tag); + } + id + } + + fn intern_tagset(&mut self, tag_ids: Vec) -> i64 { + if tag_ids.is_empty() { + return 0; + } + let (id, is_new) = self.tagset_interner.get_or_insert(tag_ids.clone()); + if is_new { + self.encode_tagset(&tag_ids); + } + id + } + + fn encode_tagset(&mut self, tag_ids: &[i64]) { + // Push the length + self.dict_tagsets.push(tag_ids.len() as i64); + + let start = self.dict_tagsets.len(); + + // Add all tag IDs + self.dict_tagsets.extend_from_slice(tag_ids); + + // Sort and delta-encode the tagset portion + self.dict_tagsets[start..].sort_unstable(); + delta_encode(&mut self.dict_tagsets[start..]); + } + + fn intern_resource_str(&mut self, s: &str) -> i64 { + if s.is_empty() { + return 0; + } + let (id, is_new) = self.resource_str_interner.get_or_insert(s.to_string()); + if is_new { + append_len_str(&mut self.dict_resource_str_bytes, s); + } + id + } + + fn intern_resources(&mut self, resources: &[(String, String)]) -> i64 { + if resources.is_empty() { + return 0; + } + + // Convert to (type_id, name_id) pairs + let id_pairs: Vec<(i64, i64)> = resources + .iter() + .map(|(t, n)| (self.intern_resource_str(t), self.intern_resource_str(n))) + .collect(); + + let (id, is_new) = self.resource_interner.get_or_insert(id_pairs.clone()); + if is_new { + self.encode_resources(&id_pairs); + } + id + } + + fn encode_resources(&mut self, id_pairs: &[(i64, i64)]) { + self.dict_resource_len.push(id_pairs.len() as i64); + + let type_start = self.dict_resource_type.len(); + let name_start = self.dict_resource_name.len(); + + for (type_id, name_id) in id_pairs { + self.dict_resource_type.push(*type_id); + self.dict_resource_name.push(*name_id); + } + + delta_encode(&mut self.dict_resource_type[type_start..]); + delta_encode(&mut self.dict_resource_name[name_start..]); + } + + fn intern_source_type(&mut self, s: &str) -> i64 { + if s.is_empty() { + return 0; + } + let (id, is_new) = self.source_type_interner.get_or_insert(s.to_string()); + if is_new { + append_len_str(&mut self.dict_source_type_bytes, s); + } + id + } + + fn intern_origin(&mut self, product: i32, category: i32, service: i32) -> i64 { + if product == 0 && category == 0 && service == 0 { + return 0; + } + let (id, is_new) = self.origin_interner.get_or_insert((product, category, service)); + if is_new { + self.dict_origin_info.push(product); + self.dict_origin_info.push(category); + self.dict_origin_info.push(service); + } + id + } +} + +/// Builder for a single metric within a V3 payload. +/// +/// Use the setter methods to configure the metric, add points with [`add_point`](Self::add_point), +/// then call [`close`](Self::close) to finalize. +pub struct V3MetricBuilder<'a> { + writer: &'a mut V3Writer, + point_start_idx: usize, + metric_idx: usize, +} + +impl<'a> V3MetricBuilder<'a> { + /// Sets the tags for this metric. + /// + /// Tags should be in "key:value" format. + pub fn set_tags(&mut self, tags: I) + where + I: Iterator, + S: AsRef, + { + let tag_ids: Vec = tags.map(|t| self.writer.intern_tag(t.as_ref())).collect(); + let tagset_id = self.writer.intern_tagset(tag_ids); + self.writer.tags[self.metric_idx] = tagset_id; + } + + /// Sets the resources for this metric. + /// + /// Resources are (type, name) pairs, e.g., ("host", "server1"). + pub fn set_resources(&mut self, resources: I) + where + I: Iterator, + { + let resources: Vec<(String, String)> = resources.collect(); + if resources.is_empty() { + self.writer.resources[self.metric_idx] = 0; + return; + } + let res_id = self.writer.intern_resources(&resources); + self.writer.resources[self.metric_idx] = res_id; + } + + /// Sets the interval for this metric (used for rate metrics). + pub fn set_interval(&mut self, interval: u64) { + self.writer.intervals[self.metric_idx] = interval; + } + + /// Sets the source type name for this metric. + pub fn set_source_type(&mut self, source_type: &str) { + if source_type.is_empty() { + self.writer.source_type_names[self.metric_idx] = 0; + return; + } + let id = self.writer.intern_source_type(source_type); + self.writer.source_type_names[self.metric_idx] = id; + } + + /// Sets the origin metadata for this metric. + pub fn set_origin(&mut self, product: u32, category: u32, service: u32) { + let id = self + .writer + .intern_origin(product as i32, category as i32, service as i32); + self.writer.origin_infos[self.metric_idx] = id; + } + + /// Adds a data point to this metric. + pub fn add_point(&mut self, timestamp: i64, value: f64) { + self.writer.timestamps.push(timestamp); + self.writer.vals_float64.push(value); + self.writer.num_points[self.metric_idx] += 1; + } + + /// Adds sketch data for a distribution metric. + /// + /// For sketches, the summary values (count, sum, min, max) are stored as points, + /// and the bin keys/counts are stored separately. + pub fn add_sketch( + &mut self, timestamp: i64, count: i64, sum: f64, min: f64, max: f64, bin_keys: &[i32], bin_counts: &[u32], + ) { + self.writer.timestamps.push(timestamp); + + // Count goes in sint64, sum/min/max go in float64 + self.writer.vals_sint64.push(count); + self.writer.vals_float64.push(sum); + self.writer.vals_float64.push(min); + self.writer.vals_float64.push(max); + + // Store bin data + self.writer.sketch_num_bins.push(bin_keys.len() as u64); + + let key_start = self.writer.sketch_bin_keys.len(); + self.writer.sketch_bin_keys.extend_from_slice(bin_keys); + self.writer.sketch_bin_cnts.extend_from_slice(bin_counts); + + // Delta-encode this sketch's bin keys + delta_encode_i32(&mut self.writer.sketch_bin_keys[key_start..]); + + self.writer.num_points[self.metric_idx] += 1; + } + + /// Finalizes this metric. + /// + /// This compacts the point values to use the smallest representation + /// that can hold all values without loss. + pub fn close(mut self) { + self.compact_values(); + } + + fn compact_values(&mut self) { + let count = self.writer.num_points[self.metric_idx] as usize; + if count == 0 { + return; + } + + let start = self.point_start_idx; + let end = self.writer.vals_float64.len(); + + // Determine the maximum value type needed + let mut val_ty = V3ValueType::Zero; + for i in start..end { + let val = self.writer.vals_float64[i]; + let pnt_val_ty = V3ValueType::for_value(val); + val_ty = val_ty.max(pnt_val_ty); + } + + // Update the type field + self.writer.types[self.metric_idx] |= val_ty.as_u64(); + + // Convert values to the appropriate storage + match val_ty { + V3ValueType::Zero => { + // Values are all zero, don't store anything + self.writer.vals_float64.truncate(start); + } + V3ValueType::Sint64 => { + for i in start..end { + self.writer.vals_sint64.push(self.writer.vals_float64[i] as i64); + } + self.writer.vals_float64.truncate(start); + } + V3ValueType::Float32 => { + for i in start..end { + self.writer.vals_float32.push(self.writer.vals_float64[i] as f32); + } + self.writer.vals_float64.truncate(start); + } + V3ValueType::Float64 => { + // Already stored in vals_float64, keep them + } + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_delta_encode() { + let mut data = vec![100, 110, 130, 145]; + delta_encode(&mut data); + assert_eq!(data, vec![100, 10, 20, 15]); + } + + #[test] + fn test_delta_encode_empty() { + let mut data: Vec = vec![]; + delta_encode(&mut data); + assert!(data.is_empty()); + } + + #[test] + fn test_delta_encode_single() { + let mut data = vec![42]; + delta_encode(&mut data); + assert_eq!(data, vec![42]); + } + + #[test] + fn test_append_len_str() { + let mut buf = Vec::new(); + append_len_str(&mut buf, "hello"); + // Length 5 = 0x05, then "hello" + assert_eq!(buf, vec![5, b'h', b'e', b'l', b'l', b'o']); + } + + #[test] + fn test_writer_basic() { + let mut writer = V3Writer::new(); + + { + let mut metric = writer.write(V3MetricType::Gauge, "test.metric"); + metric.set_tags(["env:prod", "service:web"].iter().copied()); + metric.add_point(1000, 42.0); + metric.add_point(1010, 43.5); + metric.close(); + } + + let data = writer.close(); + + assert_eq!(data.types.len(), 1); + assert_eq!(data.names.len(), 1); + assert_eq!(data.timestamps.len(), 2); + } + + #[test] + fn test_writer_multiple_metrics() { + let mut writer = V3Writer::new(); + + { + let mut m1 = writer.write(V3MetricType::Count, "metric1"); + m1.add_point(1000, 10.0); + m1.close(); + } + + { + let mut m2 = writer.write(V3MetricType::Rate, "metric2"); + m2.set_interval(60); + m2.add_point(2000, 20.0); + m2.close(); + } + + let data = writer.close(); + + assert_eq!(data.types.len(), 2); + assert_eq!(data.names.len(), 2); + assert_eq!(data.intervals[0], 0); + // Second metric's interval won't be 60 directly since names is delta-encoded, + // but we can verify the structure is correct + } + + #[test] + fn test_value_compaction_zero() { + let mut writer = V3Writer::new(); + + { + let mut metric = writer.write(V3MetricType::Gauge, "zero.metric"); + metric.add_point(1000, 0.0); + metric.add_point(2000, 0.0); + metric.close(); + } + + let data = writer.close(); + + // Values should be compacted - zero values don't need storage + assert!(data.vals_float64.is_empty()); + assert!(data.vals_sint64.is_empty()); + assert!(data.vals_float32.is_empty()); + } + + #[test] + fn test_value_compaction_int() { + let mut writer = V3Writer::new(); + + { + let mut metric = writer.write(V3MetricType::Count, "int.metric"); + metric.add_point(1000, 100.0); + metric.add_point(2000, 200.0); + metric.close(); + } + + let data = writer.close(); + + // Integer values should be stored in sint64 + assert!(data.vals_float64.is_empty()); + assert_eq!(data.vals_sint64, vec![100, 200]); + assert!(data.vals_float32.is_empty()); + } +} diff --git a/lib/saluki-components/Cargo.toml b/lib/saluki-components/Cargo.toml index 88fbce6da36..ce4cd298bb0 100644 --- a/lib/saluki-components/Cargo.toml +++ b/lib/saluki-components/Cargo.toml @@ -23,6 +23,7 @@ antithesis = [ ] [dependencies] +datadog-metrics-v3 = { workspace = true } antithesis_sdk = { workspace = true, optional = true } arc-swap = { workspace = true } async-trait = { workspace = true } diff --git a/lib/saluki-components/src/encoders/datadog/metrics/mod.rs b/lib/saluki-components/src/encoders/datadog/metrics/mod.rs index d0710b634cb..08592fc8f03 100644 --- a/lib/saluki-components/src/encoders/datadog/metrics/mod.rs +++ b/lib/saluki-components/src/encoders/datadog/metrics/mod.rs @@ -42,6 +42,12 @@ use crate::common::datadog::{ const SERIES_V2_COMPRESSED_SIZE_LIMIT: usize = 512_000; // 500 KiB const SERIES_V2_UNCOMPRESSED_SIZE_LIMIT: usize = 5_242_880; // 5 MiB +// V3 series uses the same intake limits as V2. +#[allow(dead_code)] +pub(crate) const SERIES_V3_COMPRESSED_SIZE_LIMIT: usize = SERIES_V2_COMPRESSED_SIZE_LIMIT; +#[allow(dead_code)] +pub(crate) const SERIES_V3_UNCOMPRESSED_SIZE_LIMIT: usize = SERIES_V2_UNCOMPRESSED_SIZE_LIMIT; + // V1 series JSON endpoint limits match the Datadog Agent's generic serializer defaults. const SERIES_V1_COMPRESSED_SIZE_LIMIT: usize = DEFAULT_SERIALIZER_COMPRESSED_SIZE_LIMIT; const SERIES_V1_UNCOMPRESSED_SIZE_LIMIT: usize = DEFAULT_SERIALIZER_UNCOMPRESSED_SIZE_LIMIT; diff --git a/lib/saluki-components/src/encoders/datadog/metrics/v3/encoder.rs b/lib/saluki-components/src/encoders/datadog/metrics/v3/encoder.rs new file mode 100644 index 00000000000..aff27fe0c01 --- /dev/null +++ b/lib/saluki-components/src/encoders/datadog/metrics/v3/encoder.rs @@ -0,0 +1,219 @@ +use ddsketch_agent::DDSketch; +use http::{uri::PathAndQuery, HeaderValue, Method, Uri}; +use saluki_context::tags::SharedTagSet; +use saluki_core::data_model::event::metric::{Metric, MetricOrigin, MetricValues}; + +use crate::{ + common::datadog::{ + request_builder::EndpointEncoder, DEFAULT_INTAKE_COMPRESSED_SIZE_LIMIT, DEFAULT_INTAKE_UNCOMPRESSED_SIZE_LIMIT, + }, + encoders::datadog::metrics::{ + MetricsEndpoint, CONTENT_TYPE_PROTOBUF, SERIES_V3_COMPRESSED_SIZE_LIMIT, SERIES_V3_UNCOMPRESSED_SIZE_LIMIT, + }, +}; + +use super::{V3MetricType, V3Writer}; + +#[derive(Debug)] +pub struct MetricsV3EndpointEncoder { + endpoint: MetricsEndpoint, + additional_tags: SharedTagSet, + writer: V3Writer, +} + +impl MetricsV3EndpointEncoder { + /// Creates a new `MetricsEndpointEncoder` for the given endpoint. + pub fn from_endpoint(endpoint: MetricsEndpoint) -> Self { + Self { + endpoint, + additional_tags: SharedTagSet::default(), + writer: Default::default(), + } + } + + /// Sets the additional tags to be included with every metric encoded by this encoder. + /// + /// These tags are added in a deduplicated fashion, the same as instrumented tags and origin tags. This is an + /// optimized codepath for tag inclusion in high-volume scenarios, where creating new additional contexts + /// through the traditional means (e.g., `ContextResolver`) would be too expensive. + pub fn with_additional_tags(mut self, additional_tags: SharedTagSet) -> Self { + self.additional_tags = additional_tags; + self + } +} + +impl EndpointEncoder for MetricsV3EndpointEncoder { + type Input = Metric; + type EncodeError = protobuf::Error; + + fn encoder_name() -> &'static str { + "metricsv3" + } + + fn compressed_size_limit(&self) -> usize { + match self.endpoint { + MetricsEndpoint::Series => SERIES_V3_COMPRESSED_SIZE_LIMIT, + MetricsEndpoint::Sketches => DEFAULT_INTAKE_COMPRESSED_SIZE_LIMIT, + } + } + + fn uncompressed_size_limit(&self) -> usize { + match self.endpoint { + MetricsEndpoint::Series => SERIES_V3_UNCOMPRESSED_SIZE_LIMIT, + MetricsEndpoint::Sketches => DEFAULT_INTAKE_UNCOMPRESSED_SIZE_LIMIT, + } + } + + fn is_valid_input(&self, input: &Self::Input) -> bool { + let input_endpoint = MetricsEndpoint::from_metric(input); + input_endpoint == self.endpoint + } + + fn encode(&mut self, input: &Self::Input, buffer: &mut Vec) -> Result<(), Self::EncodeError> { + write_metric_to_v3(&mut self.writer, input, &self.additional_tags); + Ok(()) + } + + fn endpoint_uri(&self) -> Uri { + match self.endpoint { + MetricsEndpoint::Series => PathAndQuery::from_static("/api/v3/series").into(), + MetricsEndpoint::Sketches => PathAndQuery::from_static("/api/beta/sketches").into(), + } + } + + fn endpoint_method(&self) -> Method { + // Both endpoints use POST. + Method::POST + } + + fn content_type(&self) -> HeaderValue { + // Both endpoints encode via Protocol Buffers. + CONTENT_TYPE_PROTOBUF.clone() + } +} + +/// Writes a single metric to the V3 writer. +fn write_metric_to_v3(writer: &mut V3Writer, metric: &Metric, additional_tags: &SharedTagSet) { + let metric_type = match metric.values() { + MetricValues::Counter(..) => V3MetricType::Count, + MetricValues::Rate(..) => V3MetricType::Rate, + MetricValues::Gauge(..) | MetricValues::Set(..) => V3MetricType::Gauge, + MetricValues::Histogram(..) | MetricValues::Distribution(..) => V3MetricType::Sketch, + }; + + let mut builder = writer.write(metric_type, metric.context().name()); + + // Tags - chain instrumented + additional + origin tags + let all_tags: Vec = metric + .context() + .tags() + .into_iter() + .chain(additional_tags.into_iter()) + .chain(metric.context().origin_tags()) + .filter(|t| !t.name().starts_with("dd.internal.resource")) + .map(|t| t.as_str().to_string()) + .collect(); + builder.set_tags(all_tags.iter().map(|s| s.as_str())); + + // Resources - extract host and any dd.internal.resource tags + let mut resources: Vec<(String, String)> = Vec::new(); + if let Some(host) = metric.metadata().hostname() { + resources.push(("host".to_string(), host.to_string())); + } + // Extract dd.internal.resource tags as resources + for tag in metric.context().tags().into_iter().chain(additional_tags.into_iter()) { + if tag.name() == "dd.internal.resource" { + if let Some(value) = tag.value() { + if let Some((rtype, rname)) = value.split_once(':') { + resources.push((rtype.to_string(), rname.to_string())); + } + } + } + } + builder.set_resources(resources.into_iter()); + + // Origin metadata + if let Some(origin) = metric.metadata().origin() { + match origin { + MetricOrigin::SourceType(source_type) => { + builder.set_source_type(source_type.as_ref()); + } + MetricOrigin::OriginMetadata { + product, + subproduct, + product_detail, + } => { + builder.set_origin(*product, *subproduct, *product_detail); + } + } + } + + // Points based on metric type + match metric.values() { + MetricValues::Counter(points) | MetricValues::Gauge(points) => { + for (ts, val) in points { + let timestamp = ts.map(|t| t.get() as i64).unwrap_or(0); + builder.add_point(timestamp, val); + } + } + MetricValues::Rate(points, interval) => { + builder.set_interval(interval.as_secs()); + for (ts, val) in points { + let timestamp = ts.map(|t| t.get() as i64).unwrap_or(0); + // Scale by interval as done in V2 + let scaled = val / interval.as_secs_f64(); + builder.add_point(timestamp, scaled); + } + } + MetricValues::Set(points) => { + // Set values are already converted to count in the iterator + for (ts, count) in points { + let timestamp = ts.map(|t| t.get() as i64).unwrap_or(0); + builder.add_point(timestamp, count); + } + } + MetricValues::Distribution(sketches) => { + for (ts, sketch) in sketches { + let timestamp = ts.map(|t| t.get() as i64).unwrap_or(0); + if !sketch.is_empty() { + let bin_keys: Vec = sketch.bins().iter().map(|b| b.key()).collect(); + let bin_counts: Vec = sketch.bins().iter().map(|b| b.count()).collect(); + builder.add_sketch( + timestamp, + sketch.count() as i64, + sketch.sum().unwrap_or(0.0), + sketch.min().unwrap_or(0.0), + sketch.max().unwrap_or(0.0), + &bin_keys, + &bin_counts, + ); + } + } + } + MetricValues::Histogram(histograms) => { + for (ts, histogram) in histograms { + let timestamp = ts.map(|t| t.get() as i64).unwrap_or(0); + // Convert histogram to DDSketch + let mut sketch = DDSketch::default(); + for sample in histogram.samples() { + sketch.insert_n(sample.value.into_inner(), sample.weight as u32); + } + if !sketch.is_empty() { + let bin_keys: Vec = sketch.bins().iter().map(|b| b.key()).collect(); + let bin_counts: Vec = sketch.bins().iter().map(|b| b.count()).collect(); + builder.add_sketch( + timestamp, + sketch.count() as i64, + sketch.sum().unwrap_or(0.0), + sketch.min().unwrap_or(0.0), + sketch.max().unwrap_or(0.0), + &bin_keys, + &bin_counts, + ); + } + } + } + } + + builder.close(); +} diff --git a/lib/saluki-components/src/encoders/datadog/metrics/v3/interner.rs b/lib/saluki-components/src/encoders/datadog/metrics/v3/interner.rs new file mode 100644 index 00000000000..5e8968cd093 --- /dev/null +++ b/lib/saluki-components/src/encoders/datadog/metrics/v3/interner.rs @@ -0,0 +1,94 @@ +//! Generic interning for dictionary deduplication. + +use std::{collections::HashMap, hash::Hash}; + +/// Generic interning structure for dictionary deduplication. +/// +/// Assigns unique 1-based IDs to values, returning the same ID for duplicate values. +/// ID 0 is reserved for "empty/none" in the V3 format. +#[derive(Debug)] +pub struct Interner { + index: HashMap, + last_id: i64, +} + +impl Default for Interner { + fn default() -> Self { + Self::new() + } +} + +impl Interner { + /// Creates a new empty interner. + pub fn new() -> Self { + Self { + index: HashMap::new(), + last_id: 0, + } + } + + /// Gets the ID for a key, inserting it if not present. + /// + /// Returns `(id, is_new)` where `is_new` is true if the key was newly inserted. + /// IDs are 1-based (0 is reserved for empty/none values). + pub fn get_or_insert(&mut self, key: K) -> (i64, bool) { + if let Some(&id) = self.index.get(&key) { + (id, false) + } else { + self.last_id += 1; + self.index.insert(key, self.last_id); + (self.last_id, true) + } + } + + /// Returns the number of interned values. + #[allow(dead_code)] + pub fn len(&self) -> usize { + self.index.len() + } + + /// Returns true if no values have been interned. + #[allow(dead_code)] + pub fn is_empty(&self) -> bool { + self.index.is_empty() + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_interner_basic() { + let mut interner: Interner = Interner::new(); + + // First insertion returns ID 1 and is_new=true + let (id1, is_new1) = interner.get_or_insert("hello".to_string()); + assert_eq!(id1, 1); + assert!(is_new1); + + // Second insertion of same value returns same ID and is_new=false + let (id2, is_new2) = interner.get_or_insert("hello".to_string()); + assert_eq!(id2, 1); + assert!(!is_new2); + + // New value gets next ID + let (id3, is_new3) = interner.get_or_insert("world".to_string()); + assert_eq!(id3, 2); + assert!(is_new3); + + assert_eq!(interner.len(), 2); + } + + #[test] + fn test_interner_tuples() { + let mut interner: Interner<(i32, i32, i32)> = Interner::new(); + + let (id1, _) = interner.get_or_insert((1, 2, 3)); + let (id2, _) = interner.get_or_insert((1, 2, 3)); + let (id3, _) = interner.get_or_insert((4, 5, 6)); + + assert_eq!(id1, id2); + assert_ne!(id1, id3); + } +} diff --git a/lib/saluki-components/src/encoders/datadog/metrics/v3/mod.rs b/lib/saluki-components/src/encoders/datadog/metrics/v3/mod.rs new file mode 100644 index 00000000000..d2f13987a7c --- /dev/null +++ b/lib/saluki-components/src/encoders/datadog/metrics/v3/mod.rs @@ -0,0 +1,15 @@ +//! V3 columnar metrics payload encoder. +//! +//! Unlike the V2 row-based protobuf format, V3 uses a columnar layout with +//! dictionary-based string deduplication and delta encoding for compact payloads. +//! +//! Series endpoint: /api/v3/series +//! Sketches endpoint: /api/beta/sketches +//! +//! Protocol implementation lives in the `datadog-metrics-v3` crate. + +pub mod encoder; + +pub use datadog_metrics_v3::{ + serialize_v3_payload, V3EncodedData, V3MetricBuilder, V3MetricType, V3ValueType, V3Writer, +}; diff --git a/lib/saluki-components/src/encoders/datadog/metrics/v3/serializer.rs b/lib/saluki-components/src/encoders/datadog/metrics/v3/serializer.rs new file mode 100644 index 00000000000..030d89caca3 --- /dev/null +++ b/lib/saluki-components/src/encoders/datadog/metrics/v3/serializer.rs @@ -0,0 +1,315 @@ +//! V3 payload protobuf serialization. +//! +//! Serializes [`V3EncodedData`] to protobuf wire format using `CodedOutputStream`. + +use protobuf::{rt::WireType, CodedOutputStream}; + +use super::types::field_numbers; +use super::writer::V3EncodedData; + +/// Serializes V3 encoded data to protobuf wire format. +/// +/// The output is a MetricData message as defined in `payload_v3.proto`. +pub fn serialize_v3_payload(data: &V3EncodedData, output: &mut Vec) -> Result<(), protobuf::Error> { + let mut os = CodedOutputStream::vec(output); + + // Dictionary fields (bytes - varint-length-prefixed strings concatenated) + if !data.dict_name_bytes.is_empty() { + os.write_bytes(field_numbers::DICT_NAME_STR, &data.dict_name_bytes)?; + } + if !data.dict_tags_bytes.is_empty() { + os.write_bytes(field_numbers::DICT_TAGS_STR, &data.dict_tags_bytes)?; + } + + // Packed repeated fields for dictionaries + write_packed_sint64(&mut os, field_numbers::DICT_TAGSETS, &data.dict_tagsets)?; + + if !data.dict_resource_str_bytes.is_empty() { + os.write_bytes(field_numbers::DICT_RESOURCE_STR, &data.dict_resource_str_bytes)?; + } + + write_packed_int64(&mut os, field_numbers::DICT_RESOURCE_LEN, &data.dict_resource_len)?; + write_packed_sint64(&mut os, field_numbers::DICT_RESOURCE_TYPE, &data.dict_resource_type)?; + write_packed_sint64(&mut os, field_numbers::DICT_RESOURCE_NAME, &data.dict_resource_name)?; + + if !data.dict_source_type_bytes.is_empty() { + os.write_bytes(field_numbers::DICT_SOURCE_TYPE_NAME, &data.dict_source_type_bytes)?; + } + + write_packed_int32(&mut os, field_numbers::DICT_ORIGIN_INFO, &data.dict_origin_info)?; + + // Per-metric columns + write_packed_uint64(&mut os, field_numbers::TYPES, &data.types)?; + write_packed_sint64(&mut os, field_numbers::NAMES, &data.names)?; + write_packed_sint64(&mut os, field_numbers::TAGS, &data.tags)?; + write_packed_sint64(&mut os, field_numbers::RESOURCES, &data.resources)?; + write_packed_uint64(&mut os, field_numbers::INTERVALS, &data.intervals)?; + write_packed_uint64(&mut os, field_numbers::NUM_POINTS, &data.num_points)?; + write_packed_sint64(&mut os, field_numbers::SOURCE_TYPE_NAME, &data.source_type_names)?; + write_packed_sint64(&mut os, field_numbers::ORIGIN_INFO, &data.origin_infos)?; + + // Point data + write_packed_sint64(&mut os, field_numbers::TIMESTAMPS, &data.timestamps)?; + write_packed_sint64(&mut os, field_numbers::VALS_SINT64, &data.vals_sint64)?; + write_packed_float(&mut os, field_numbers::VALS_FLOAT32, &data.vals_float32)?; + write_packed_double(&mut os, field_numbers::VALS_FLOAT64, &data.vals_float64)?; + + // Sketch data + write_packed_uint64(&mut os, field_numbers::SKETCH_NUM_BINS, &data.sketch_num_bins)?; + write_packed_sint32(&mut os, field_numbers::SKETCH_BIN_KEYS, &data.sketch_bin_keys)?; + write_packed_uint32(&mut os, field_numbers::SKETCH_BIN_CNTS, &data.sketch_bin_cnts)?; + + os.flush()?; + Ok(()) +} + +/// Writes a packed repeated sint64 field. +fn write_packed_sint64(os: &mut CodedOutputStream, field: u32, values: &[i64]) -> Result<(), protobuf::Error> { + if values.is_empty() { + return Ok(()); + } + + // Calculate the encoded size + let mut size: usize = 0; + for &v in values { + size += encoded_len_sint64(v); + } + + // Write the field header and length + os.write_tag(field, WireType::LengthDelimited)?; + os.write_raw_varint32(size as u32)?; + + // Write the values + for &v in values { + os.write_sint64_no_tag(v)?; + } + + Ok(()) +} + +/// Writes a packed repeated int64 field. +fn write_packed_int64(os: &mut CodedOutputStream, field: u32, values: &[i64]) -> Result<(), protobuf::Error> { + if values.is_empty() { + return Ok(()); + } + + let mut size: usize = 0; + for &v in values { + size += encoded_len_varint64(v as u64); + } + + os.write_tag(field, WireType::LengthDelimited)?; + os.write_raw_varint32(size as u32)?; + + for &v in values { + os.write_int64_no_tag(v)?; + } + + Ok(()) +} + +/// Writes a packed repeated uint64 field. +fn write_packed_uint64(os: &mut CodedOutputStream, field: u32, values: &[u64]) -> Result<(), protobuf::Error> { + if values.is_empty() { + return Ok(()); + } + + let mut size: usize = 0; + for &v in values { + size += encoded_len_varint64(v); + } + + os.write_tag(field, WireType::LengthDelimited)?; + os.write_raw_varint32(size as u32)?; + + for &v in values { + os.write_uint64_no_tag(v)?; + } + + Ok(()) +} + +/// Writes a packed repeated int32 field. +fn write_packed_int32(os: &mut CodedOutputStream, field: u32, values: &[i32]) -> Result<(), protobuf::Error> { + if values.is_empty() { + return Ok(()); + } + + let mut size: usize = 0; + for &v in values { + size += encoded_len_varint32(v as u32); + } + + os.write_tag(field, WireType::LengthDelimited)?; + os.write_raw_varint32(size as u32)?; + + for &v in values { + os.write_int32_no_tag(v)?; + } + + Ok(()) +} + +/// Writes a packed repeated sint32 field. +fn write_packed_sint32(os: &mut CodedOutputStream, field: u32, values: &[i32]) -> Result<(), protobuf::Error> { + if values.is_empty() { + return Ok(()); + } + + let mut size: usize = 0; + for &v in values { + size += encoded_len_sint32(v); + } + + os.write_tag(field, WireType::LengthDelimited)?; + os.write_raw_varint32(size as u32)?; + + for &v in values { + os.write_sint32_no_tag(v)?; + } + + Ok(()) +} + +/// Writes a packed repeated uint32 field. +fn write_packed_uint32(os: &mut CodedOutputStream, field: u32, values: &[u32]) -> Result<(), protobuf::Error> { + if values.is_empty() { + return Ok(()); + } + + let mut size: usize = 0; + for &v in values { + size += encoded_len_varint32(v); + } + + os.write_tag(field, WireType::LengthDelimited)?; + os.write_raw_varint32(size as u32)?; + + for &v in values { + os.write_uint32_no_tag(v)?; + } + + Ok(()) +} + +/// Writes a packed repeated float field. +fn write_packed_float(os: &mut CodedOutputStream, field: u32, values: &[f32]) -> Result<(), protobuf::Error> { + if values.is_empty() { + return Ok(()); + } + + // Each float is 4 bytes + let size = values.len() * 4; + + os.write_tag(field, WireType::LengthDelimited)?; + os.write_raw_varint32(size as u32)?; + + for &v in values { + os.write_float_no_tag(v)?; + } + + Ok(()) +} + +/// Writes a packed repeated double field. +fn write_packed_double(os: &mut CodedOutputStream, field: u32, values: &[f64]) -> Result<(), protobuf::Error> { + if values.is_empty() { + return Ok(()); + } + + // Each double is 8 bytes + let size = values.len() * 8; + + os.write_tag(field, WireType::LengthDelimited)?; + os.write_raw_varint32(size as u32)?; + + for &v in values { + os.write_double_no_tag(v)?; + } + + Ok(()) +} + +// Varint encoding length helpers + +fn encoded_len_varint64(value: u64) -> usize { + if value == 0 { + return 1; + } + let bits = 64 - value.leading_zeros() as usize; + (bits + 6) / 7 +} + +fn encoded_len_varint32(value: u32) -> usize { + if value == 0 { + return 1; + } + let bits = 32 - value.leading_zeros() as usize; + (bits + 6) / 7 +} + +fn encoded_len_sint64(value: i64) -> usize { + // Zigzag encoding: (value << 1) ^ (value >> 63) + let encoded = ((value << 1) ^ (value >> 63)) as u64; + encoded_len_varint64(encoded) +} + +fn encoded_len_sint32(value: i32) -> usize { + // Zigzag encoding: (value << 1) ^ (value >> 31) + let encoded = ((value << 1) ^ (value >> 31)) as u32; + encoded_len_varint32(encoded) +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::encoders::datadog::metrics::v3::{V3MetricType, V3Writer}; + + #[test] + fn test_serialize_empty() { + let data = V3EncodedData::default(); + let mut output = Vec::new(); + serialize_v3_payload(&data, &mut output).unwrap(); + assert!(output.is_empty()); + } + + #[test] + fn test_serialize_basic_metric() { + let mut writer = V3Writer::new(); + + { + let mut metric = writer.write(V3MetricType::Gauge, "test.metric"); + metric.add_point(1000, 42.0); + metric.close(); + } + + let data = writer.close(); + let mut output = Vec::new(); + serialize_v3_payload(&data, &mut output).unwrap(); + + // Should produce non-empty output + assert!(!output.is_empty()); + } + + #[test] + fn test_encoded_len_varint() { + assert_eq!(encoded_len_varint64(0), 1); + assert_eq!(encoded_len_varint64(1), 1); + assert_eq!(encoded_len_varint64(127), 1); + assert_eq!(encoded_len_varint64(128), 2); + assert_eq!(encoded_len_varint64(16383), 2); + assert_eq!(encoded_len_varint64(16384), 3); + } + + #[test] + fn test_encoded_len_sint() { + // Zigzag: 0 -> 0, -1 -> 1, 1 -> 2, -2 -> 3, 2 -> 4, ... + assert_eq!(encoded_len_sint64(0), 1); + assert_eq!(encoded_len_sint64(-1), 1); + assert_eq!(encoded_len_sint64(1), 1); + assert_eq!(encoded_len_sint64(-64), 1); + assert_eq!(encoded_len_sint64(63), 1); + assert_eq!(encoded_len_sint64(64), 2); + assert_eq!(encoded_len_sint64(-65), 2); + } +} diff --git a/lib/saluki-components/src/encoders/datadog/metrics/v3/types.rs b/lib/saluki-components/src/encoders/datadog/metrics/v3/types.rs new file mode 100644 index 00000000000..3ffbcb97799 --- /dev/null +++ b/lib/saluki-components/src/encoders/datadog/metrics/v3/types.rs @@ -0,0 +1,155 @@ +//! V3 payload type definitions and protocol buffer field numbers. + +/// Protocol buffer field numbers for MetricData message. +/// +/// These correspond to the field numbers in `payload_v3.proto`. +pub mod field_numbers { + // Dictionary fields + pub const DICT_NAME_STR: u32 = 1; + pub const DICT_TAGS_STR: u32 = 2; + pub const DICT_TAGSETS: u32 = 3; + pub const DICT_RESOURCE_STR: u32 = 4; + pub const DICT_RESOURCE_LEN: u32 = 5; + pub const DICT_RESOURCE_TYPE: u32 = 6; + pub const DICT_RESOURCE_NAME: u32 = 7; + pub const DICT_SOURCE_TYPE_NAME: u32 = 8; + pub const DICT_ORIGIN_INFO: u32 = 9; + + // Per-metric columns + pub const TYPES: u32 = 10; + pub const NAMES: u32 = 11; + pub const TAGS: u32 = 12; + pub const RESOURCES: u32 = 13; + pub const INTERVALS: u32 = 14; + pub const NUM_POINTS: u32 = 15; + + // Point data + pub const TIMESTAMPS: u32 = 16; + pub const VALS_SINT64: u32 = 17; + pub const VALS_FLOAT32: u32 = 18; + pub const VALS_FLOAT64: u32 = 19; + + // Sketch data + pub const SKETCH_NUM_BINS: u32 = 20; + pub const SKETCH_BIN_KEYS: u32 = 21; + pub const SKETCH_BIN_CNTS: u32 = 22; + + // Additional per-metric columns + pub const SOURCE_TYPE_NAME: u32 = 23; + pub const ORIGIN_INFO: u32 = 24; +} + +/// V3 metric type values. +/// +/// These match the `metricType` enum in `payload_v3.proto`. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +#[repr(u8)] +pub enum V3MetricType { + Count = 1, + Rate = 2, + Gauge = 3, + Sketch = 4, +} + +impl V3MetricType { + /// Returns the numeric value for encoding in the types column. + pub fn as_u64(self) -> u64 { + self as u64 + } +} + +/// V3 value type values. +/// +/// These are encoded in bits 4-7 of the types column and indicate which +/// value array contains the metric's points. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +#[repr(u8)] +pub enum V3ValueType { + /// Value is zero, not stored explicitly. + Zero = 0x00, + /// Value is stored in vals_sint64. + Sint64 = 0x10, + /// Value is stored in vals_float32. + Float32 = 0x20, + /// Value is stored in vals_float64. + Float64 = 0x30, +} + +impl V3ValueType { + /// Returns the numeric value for encoding in the types column. + pub fn as_u64(self) -> u64 { + self as u64 + } + + /// Determines the best value type for a given f64 value. + /// + /// Prefers smaller representations when lossless: + /// - Zero for 0.0 + /// - Sint64 for integers that fit in 49 bits + /// - Float32 for values representable as f32 + /// - Float64 otherwise + pub fn for_value(v: f64) -> Self { + if v == 0.0 { + return Self::Zero; + } + + // Varint range that fits in 7 bytes or less (49 bits) + const VARINT_WIDTH: i32 = 7 * 7 - 1; + const MAX_INT: i64 = 1 << VARINT_WIDTH; + const MIN_INT: i64 = -MAX_INT; + + let i = v as i64; + if (MIN_INT..MAX_INT).contains(&i) && (i as f64) == v { + return Self::Sint64; + } + + if (v as f32 as f64) == v { + return Self::Float32; + } + + Self::Float64 + } + + /// Returns the maximum (largest encoding) of two value types. + pub fn max(self, other: Self) -> Self { + if (other as u8) > (self as u8) { + other + } else { + self + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_value_type_for_value() { + assert_eq!(V3ValueType::for_value(0.0), V3ValueType::Zero); + assert_eq!(V3ValueType::for_value(100.0), V3ValueType::Sint64); + assert_eq!(V3ValueType::for_value(-100.0), V3ValueType::Sint64); + assert_eq!(V3ValueType::for_value(1.5), V3ValueType::Float32); + assert_eq!(V3ValueType::for_value(2.75), V3ValueType::Float32); + + // Large integers that don't fit in 49 bits AND can't be exactly represented in f32 + // Powers of 2 like (1 << 50) can be exactly represented in f32, so we add 1 + // to make it an odd number that requires more precision than f32 provides + let large = ((1i64 << 50) + 1) as f64; + assert_eq!(V3ValueType::for_value(large), V3ValueType::Float64); + + // Values that require f64 precision - use PI which has more precision than f32 can hold + // and isn't an integer, so it won't be stored as Sint64 + let pi = std::f64::consts::PI; + // PI requires full f64 precision to store exactly + assert_eq!(V3ValueType::for_value(pi), V3ValueType::Float64); + } + + #[test] + fn test_value_type_max() { + assert_eq!(V3ValueType::Zero.max(V3ValueType::Sint64), V3ValueType::Sint64); + assert_eq!(V3ValueType::Sint64.max(V3ValueType::Float32), V3ValueType::Float32); + assert_eq!(V3ValueType::Float32.max(V3ValueType::Float64), V3ValueType::Float64); + assert_eq!(V3ValueType::Float64.max(V3ValueType::Zero), V3ValueType::Float64); + } +} diff --git a/lib/saluki-components/src/encoders/datadog/metrics/v3/writer.rs b/lib/saluki-components/src/encoders/datadog/metrics/v3/writer.rs new file mode 100644 index 00000000000..315707590d3 --- /dev/null +++ b/lib/saluki-components/src/encoders/datadog/metrics/v3/writer.rs @@ -0,0 +1,589 @@ +//! V3 columnar metrics writer. +//! +//! The [`V3Writer`] accumulates metrics in columnar format with dictionary deduplication, +//! then produces [`V3EncodedData`] ready for protobuf serialization. + +use super::interner::Interner; +use super::types::{V3MetricType, V3ValueType}; + +/// Appends a varint-length-prefixed string to the destination buffer. +fn append_len_str(dst: &mut Vec, s: &str) { + let mut len = s.len() as u64; + loop { + let mut byte = (len & 0x7F) as u8; + len >>= 7; + if len != 0 { + byte |= 0x80; + } + dst.push(byte); + if len == 0 { + break; + } + } + dst.extend_from_slice(s.as_bytes()); +} + +/// Delta-encodes a slice in place, working backwards. +/// +/// After encoding, `s[i]` contains the difference `s[i] - s[i-1]`. +pub fn delta_encode(s: &mut [i64]) { + if s.len() < 2 { + return; + } + for i in (1..s.len()).rev() { + s[i] -= s[i - 1]; + } +} + +/// Delta-encodes i32 values in place, working backwards. +pub fn delta_encode_i32(s: &mut [i32]) { + if s.len() < 2 { + return; + } + for i in (1..s.len()).rev() { + s[i] -= s[i - 1]; + } +} + +/// Encoded V3 payload data ready for protobuf serialization. +#[derive(Debug, Default)] +pub struct V3EncodedData { + // Dictionary encoded bytes (varint-length-prefixed strings) + pub dict_name_bytes: Vec, + pub dict_tags_bytes: Vec, + pub dict_tagsets: Vec, + pub dict_resource_str_bytes: Vec, + pub dict_resource_len: Vec, + pub dict_resource_type: Vec, + pub dict_resource_name: Vec, + pub dict_source_type_bytes: Vec, + pub dict_origin_info: Vec, + + // Per-metric columns (one entry per metric) + pub types: Vec, + pub names: Vec, + pub tags: Vec, + pub resources: Vec, + pub intervals: Vec, + pub num_points: Vec, + pub source_type_names: Vec, + pub origin_infos: Vec, + + // Point data (varies per metric based on num_points) + pub timestamps: Vec, + pub vals_sint64: Vec, + pub vals_float32: Vec, + pub vals_float64: Vec, + + // Sketch data + pub sketch_num_bins: Vec, + pub sketch_bin_keys: Vec, + pub sketch_bin_cnts: Vec, +} + +/// V3 columnar metrics writer. +/// +/// Accumulates metrics in columnar format with dictionary deduplication. +/// Call [`V3Writer::write`] for each metric, then [`V3Writer::close`] to finalize +/// and get the encoded data. +#[derive(Debug, Default)] +pub struct V3Writer { + // Interners for dictionary deduplication + name_interner: Interner, + tag_interner: Interner, + tagset_interner: Interner>, + resource_str_interner: Interner, + resource_interner: Interner>, + source_type_interner: Interner, + origin_interner: Interner<(i32, i32, i32)>, + + // Dictionary encoded bytes + dict_name_bytes: Vec, + dict_tags_bytes: Vec, + dict_tagsets: Vec, + dict_resource_str_bytes: Vec, + dict_resource_len: Vec, + dict_resource_type: Vec, + dict_resource_name: Vec, + dict_source_type_bytes: Vec, + dict_origin_info: Vec, + + // Per-metric columns + types: Vec, + names: Vec, + tags: Vec, + resources: Vec, + intervals: Vec, + num_points: Vec, + source_type_names: Vec, + origin_infos: Vec, + + // Point data + timestamps: Vec, + vals_sint64: Vec, + vals_float32: Vec, + vals_float64: Vec, + + // Sketch data + sketch_num_bins: Vec, + sketch_bin_keys: Vec, + sketch_bin_cnts: Vec, +} + +impl V3Writer { + /// Creates a new V3 writer. + pub fn new() -> Self { + Self::default() + } + + /// Begins writing a new metric. + /// + /// Returns a [`V3MetricBuilder`] that must be used to set the metric's + /// properties and add points, then closed with [`V3MetricBuilder::close`]. + pub fn write(&mut self, metric_type: V3MetricType, name: &str) -> V3MetricBuilder<'_> { + let name_id = self.intern_name(name); + let metric_idx = self.types.len(); + let point_start_idx = self.vals_float64.len(); + + // Initialize the per-metric columns with default values + self.types.push(metric_type.as_u64()); + self.names.push(name_id); + self.tags.push(0); + self.resources.push(0); + self.intervals.push(0); + self.num_points.push(0); + self.source_type_names.push(0); + self.origin_infos.push(0); + + V3MetricBuilder { + writer: self, + point_start_idx, + metric_idx, + } + } + + /// Finalizes the writer and returns the encoded data. + /// + /// This performs delta encoding on all index arrays. + pub fn close(mut self) -> V3EncodedData { + // Delta-encode all the index arrays + delta_encode(&mut self.names); + delta_encode(&mut self.tags); + delta_encode(&mut self.resources); + delta_encode(&mut self.source_type_names); + delta_encode(&mut self.origin_infos); + delta_encode(&mut self.timestamps); + + V3EncodedData { + dict_name_bytes: self.dict_name_bytes, + dict_tags_bytes: self.dict_tags_bytes, + dict_tagsets: self.dict_tagsets, + dict_resource_str_bytes: self.dict_resource_str_bytes, + dict_resource_len: self.dict_resource_len, + dict_resource_type: self.dict_resource_type, + dict_resource_name: self.dict_resource_name, + dict_source_type_bytes: self.dict_source_type_bytes, + dict_origin_info: self.dict_origin_info, + types: self.types, + names: self.names, + tags: self.tags, + resources: self.resources, + intervals: self.intervals, + num_points: self.num_points, + source_type_names: self.source_type_names, + origin_infos: self.origin_infos, + timestamps: self.timestamps, + vals_sint64: self.vals_sint64, + vals_float32: self.vals_float32, + vals_float64: self.vals_float64, + sketch_num_bins: self.sketch_num_bins, + sketch_bin_keys: self.sketch_bin_keys, + sketch_bin_cnts: self.sketch_bin_cnts, + } + } + + // Internal helper methods + + fn intern_name(&mut self, name: &str) -> i64 { + if name.is_empty() { + return 0; + } + let (id, is_new) = self.name_interner.get_or_insert(name.to_string()); + if is_new { + append_len_str(&mut self.dict_name_bytes, name); + } + id + } + + fn intern_tag(&mut self, tag: &str) -> i64 { + if tag.is_empty() { + return 0; + } + let (id, is_new) = self.tag_interner.get_or_insert(tag.to_string()); + if is_new { + append_len_str(&mut self.dict_tags_bytes, tag); + } + id + } + + fn intern_tagset(&mut self, tag_ids: Vec) -> i64 { + if tag_ids.is_empty() { + return 0; + } + let (id, is_new) = self.tagset_interner.get_or_insert(tag_ids.clone()); + if is_new { + self.encode_tagset(&tag_ids); + } + id + } + + fn encode_tagset(&mut self, tag_ids: &[i64]) { + // Push the length + self.dict_tagsets.push(tag_ids.len() as i64); + + let start = self.dict_tagsets.len(); + + // Add all tag IDs + self.dict_tagsets.extend_from_slice(tag_ids); + + // Sort and delta-encode the tagset portion + self.dict_tagsets[start..].sort_unstable(); + delta_encode(&mut self.dict_tagsets[start..]); + } + + fn intern_resource_str(&mut self, s: &str) -> i64 { + if s.is_empty() { + return 0; + } + let (id, is_new) = self.resource_str_interner.get_or_insert(s.to_string()); + if is_new { + append_len_str(&mut self.dict_resource_str_bytes, s); + } + id + } + + fn intern_resources(&mut self, resources: &[(String, String)]) -> i64 { + if resources.is_empty() { + return 0; + } + + // Convert to (type_id, name_id) pairs + let id_pairs: Vec<(i64, i64)> = resources + .iter() + .map(|(t, n)| (self.intern_resource_str(t), self.intern_resource_str(n))) + .collect(); + + let (id, is_new) = self.resource_interner.get_or_insert(id_pairs.clone()); + if is_new { + self.encode_resources(&id_pairs); + } + id + } + + fn encode_resources(&mut self, id_pairs: &[(i64, i64)]) { + self.dict_resource_len.push(id_pairs.len() as i64); + + let type_start = self.dict_resource_type.len(); + let name_start = self.dict_resource_name.len(); + + for (type_id, name_id) in id_pairs { + self.dict_resource_type.push(*type_id); + self.dict_resource_name.push(*name_id); + } + + delta_encode(&mut self.dict_resource_type[type_start..]); + delta_encode(&mut self.dict_resource_name[name_start..]); + } + + fn intern_source_type(&mut self, s: &str) -> i64 { + if s.is_empty() { + return 0; + } + let (id, is_new) = self.source_type_interner.get_or_insert(s.to_string()); + if is_new { + append_len_str(&mut self.dict_source_type_bytes, s); + } + id + } + + fn intern_origin(&mut self, product: i32, category: i32, service: i32) -> i64 { + if product == 0 && category == 0 && service == 0 { + return 0; + } + let (id, is_new) = self.origin_interner.get_or_insert((product, category, service)); + if is_new { + self.dict_origin_info.push(product); + self.dict_origin_info.push(category); + self.dict_origin_info.push(service); + } + id + } +} + +/// Builder for a single metric within a V3 payload. +/// +/// Use the setter methods to configure the metric, add points with [`add_point`](Self::add_point), +/// then call [`close`](Self::close) to finalize. +pub struct V3MetricBuilder<'a> { + writer: &'a mut V3Writer, + point_start_idx: usize, + metric_idx: usize, +} + +impl<'a> V3MetricBuilder<'a> { + /// Sets the tags for this metric. + /// + /// Tags should be in "key:value" format. + pub fn set_tags(&mut self, tags: I) + where + I: Iterator, + S: AsRef, + { + let tag_ids: Vec = tags.map(|t| self.writer.intern_tag(t.as_ref())).collect(); + let tagset_id = self.writer.intern_tagset(tag_ids); + self.writer.tags[self.metric_idx] = tagset_id; + } + + /// Sets the resources for this metric. + /// + /// Resources are (type, name) pairs, e.g., ("host", "server1"). + pub fn set_resources(&mut self, resources: I) + where + I: Iterator, + { + let resources: Vec<(String, String)> = resources.collect(); + if resources.is_empty() { + self.writer.resources[self.metric_idx] = 0; + return; + } + let res_id = self.writer.intern_resources(&resources); + self.writer.resources[self.metric_idx] = res_id; + } + + /// Sets the interval for this metric (used for rate metrics). + pub fn set_interval(&mut self, interval: u64) { + self.writer.intervals[self.metric_idx] = interval; + } + + /// Sets the source type name for this metric. + pub fn set_source_type(&mut self, source_type: &str) { + if source_type.is_empty() { + self.writer.source_type_names[self.metric_idx] = 0; + return; + } + let id = self.writer.intern_source_type(source_type); + self.writer.source_type_names[self.metric_idx] = id; + } + + /// Sets the origin metadata for this metric. + pub fn set_origin(&mut self, product: u32, category: u32, service: u32) { + let id = self + .writer + .intern_origin(product as i32, category as i32, service as i32); + self.writer.origin_infos[self.metric_idx] = id; + } + + /// Adds a data point to this metric. + pub fn add_point(&mut self, timestamp: i64, value: f64) { + self.writer.timestamps.push(timestamp); + self.writer.vals_float64.push(value); + self.writer.num_points[self.metric_idx] += 1; + } + + /// Adds sketch data for a distribution metric. + /// + /// For sketches, the summary values (count, sum, min, max) are stored as points, + /// and the bin keys/counts are stored separately. + pub fn add_sketch( + &mut self, timestamp: i64, count: i64, sum: f64, min: f64, max: f64, bin_keys: &[i32], bin_counts: &[u32], + ) { + self.writer.timestamps.push(timestamp); + + // Count goes in sint64, sum/min/max go in float64 + self.writer.vals_sint64.push(count); + self.writer.vals_float64.push(sum); + self.writer.vals_float64.push(min); + self.writer.vals_float64.push(max); + + // Store bin data + self.writer.sketch_num_bins.push(bin_keys.len() as u64); + + let key_start = self.writer.sketch_bin_keys.len(); + self.writer.sketch_bin_keys.extend_from_slice(bin_keys); + self.writer.sketch_bin_cnts.extend_from_slice(bin_counts); + + // Delta-encode this sketch's bin keys + delta_encode_i32(&mut self.writer.sketch_bin_keys[key_start..]); + + self.writer.num_points[self.metric_idx] += 1; + } + + /// Finalizes this metric. + /// + /// This compacts the point values to use the smallest representation + /// that can hold all values without loss. + pub fn close(mut self) { + self.compact_values(); + } + + fn compact_values(&mut self) { + let count = self.writer.num_points[self.metric_idx] as usize; + if count == 0 { + return; + } + + let start = self.point_start_idx; + let end = self.writer.vals_float64.len(); + + // Determine the maximum value type needed + let mut val_ty = V3ValueType::Zero; + for i in start..end { + let val = self.writer.vals_float64[i]; + let pnt_val_ty = V3ValueType::for_value(val); + val_ty = val_ty.max(pnt_val_ty); + } + + // Update the type field + self.writer.types[self.metric_idx] |= val_ty.as_u64(); + + // Convert values to the appropriate storage + match val_ty { + V3ValueType::Zero => { + // Values are all zero, don't store anything + self.writer.vals_float64.truncate(start); + } + V3ValueType::Sint64 => { + for i in start..end { + self.writer.vals_sint64.push(self.writer.vals_float64[i] as i64); + } + self.writer.vals_float64.truncate(start); + } + V3ValueType::Float32 => { + for i in start..end { + self.writer.vals_float32.push(self.writer.vals_float64[i] as f32); + } + self.writer.vals_float64.truncate(start); + } + V3ValueType::Float64 => { + // Already stored in vals_float64, keep them + } + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_delta_encode() { + let mut data = vec![100, 110, 130, 145]; + delta_encode(&mut data); + assert_eq!(data, vec![100, 10, 20, 15]); + } + + #[test] + fn test_delta_encode_empty() { + let mut data: Vec = vec![]; + delta_encode(&mut data); + assert!(data.is_empty()); + } + + #[test] + fn test_delta_encode_single() { + let mut data = vec![42]; + delta_encode(&mut data); + assert_eq!(data, vec![42]); + } + + #[test] + fn test_append_len_str() { + let mut buf = Vec::new(); + append_len_str(&mut buf, "hello"); + // Length 5 = 0x05, then "hello" + assert_eq!(buf, vec![5, b'h', b'e', b'l', b'l', b'o']); + } + + #[test] + fn test_writer_basic() { + let mut writer = V3Writer::new(); + + { + let mut metric = writer.write(V3MetricType::Gauge, "test.metric"); + metric.set_tags(["env:prod", "service:web"].iter().copied()); + metric.add_point(1000, 42.0); + metric.add_point(1010, 43.5); + metric.close(); + } + + let data = writer.close(); + + assert_eq!(data.types.len(), 1); + assert_eq!(data.names.len(), 1); + assert_eq!(data.timestamps.len(), 2); + } + + #[test] + fn test_writer_multiple_metrics() { + let mut writer = V3Writer::new(); + + { + let mut m1 = writer.write(V3MetricType::Count, "metric1"); + m1.add_point(1000, 10.0); + m1.close(); + } + + { + let mut m2 = writer.write(V3MetricType::Rate, "metric2"); + m2.set_interval(60); + m2.add_point(2000, 20.0); + m2.close(); + } + + let data = writer.close(); + + assert_eq!(data.types.len(), 2); + assert_eq!(data.names.len(), 2); + assert_eq!(data.intervals[0], 0); + // Second metric's interval won't be 60 directly since names is delta-encoded, + // but we can verify the structure is correct + } + + #[test] + fn test_value_compaction_zero() { + let mut writer = V3Writer::new(); + + { + let mut metric = writer.write(V3MetricType::Gauge, "zero.metric"); + metric.add_point(1000, 0.0); + metric.add_point(2000, 0.0); + metric.close(); + } + + let data = writer.close(); + + // Values should be compacted - zero values don't need storage + assert!(data.vals_float64.is_empty()); + assert!(data.vals_sint64.is_empty()); + assert!(data.vals_float32.is_empty()); + } + + #[test] + fn test_value_compaction_int() { + let mut writer = V3Writer::new(); + + { + let mut metric = writer.write(V3MetricType::Count, "int.metric"); + metric.add_point(1000, 100.0); + metric.add_point(2000, 200.0); + metric.close(); + } + + let data = writer.close(); + + // Integer values should be stored in sint64 + assert!(data.vals_float64.is_empty()); + assert_eq!(data.vals_sint64, vec![100, 200]); + assert!(data.vals_float32.is_empty()); + } +} From 8ef42128a693d7a99d1fa6e885664f8f7131dcf0 Mon Sep 17 00:00:00 2001 From: Stephen Wakely Date: Wed, 24 Jun 2026 12:45:27 +0100 Subject: [PATCH 2/3] fix(metrics): fix V3 codec correctness issues found in code review MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Three issues addressed: 1. compact_values() skipped for Sketch metrics. Sketch summary values (sum/min/max) must stay in vals_float64. The old code could move them to vals_float32 or vals_sint64 based on magnitude, corrupting the payload — the decoder reads these from vals_float64 by position, not by type tag. 2. V3MetricBuilder Drop guard. Without a Drop impl, forgetting close() silently left value-type bits unset in the types column. Now panics in debug builds if close() is not called before drop. 3. MetricsV3EndpointEncoder::encode() now writes to buffer. The EndpointEncoder contract requires each encode() call to write to buffer so the request builder can track sizes. Previously the V3 encoder accumulated into self.writer but wrote nothing to buffer, producing empty payloads. Fixed by serializing a single-metric V3 payload per encode() call. Full cross-metric deduplication requires a batch-flush hook on EndpointEncoder (tracked via TODO comment). --- lib/datadog-metrics-v3/src/writer.rs | 43 +++++++++++++++++++ .../encoders/datadog/metrics/v3/encoder.rs | 31 +++++++++++-- 2 files changed, 71 insertions(+), 3 deletions(-) diff --git a/lib/datadog-metrics-v3/src/writer.rs b/lib/datadog-metrics-v3/src/writer.rs index 315707590d3..65647f81ec2 100644 --- a/lib/datadog-metrics-v3/src/writer.rs +++ b/lib/datadog-metrics-v3/src/writer.rs @@ -159,6 +159,7 @@ impl V3Writer { writer: self, point_start_idx, metric_idx, + closed: false, } } @@ -324,10 +325,16 @@ impl V3Writer { /// /// Use the setter methods to configure the metric, add points with [`add_point`](Self::add_point), /// then call [`close`](Self::close) to finalize. +/// +/// # Panics (debug only) +/// +/// Dropping a `V3MetricBuilder` without calling `close()` panics in debug builds, because +/// omitting `close()` leaves the metric's value-type bits unset in the `types` column. pub struct V3MetricBuilder<'a> { writer: &'a mut V3Writer, point_start_idx: usize, metric_idx: usize, + closed: bool, } impl<'a> V3MetricBuilder<'a> { @@ -424,9 +431,19 @@ impl<'a> V3MetricBuilder<'a> { /// that can hold all values without loss. pub fn close(mut self) { self.compact_values(); + self.closed = true; } fn compact_values(&mut self) { + // Sketch summary values (sum/min/max) are stored in vals_float64 and MUST + // remain there — the decoder reads them from that specific column by position. + // Compacting them into vals_float32 or vals_sint64 would silently corrupt the + // payload: the decoder would read garbage values from the wrong column. + let metric_type = self.writer.types[self.metric_idx] & 0x0F; + if metric_type == V3MetricType::Sketch as u64 { + return; + } + let count = self.writer.num_points[self.metric_idx] as usize; if count == 0 { return; @@ -471,6 +488,15 @@ impl<'a> V3MetricBuilder<'a> { } } +impl Drop for V3MetricBuilder<'_> { + fn drop(&mut self) { + debug_assert!( + self.closed, + "V3MetricBuilder dropped without calling close(): value-type bits not set in types column" + ); + } +} + #[cfg(test)] mod tests { use super::*; @@ -586,4 +612,21 @@ mod tests { assert_eq!(data.vals_sint64, vec![100, 200]); assert!(data.vals_float32.is_empty()); } + + #[test] + fn test_sketch_summary_values_stay_in_float64() { + let mut writer = V3Writer::new(); + + { + let mut metric = writer.write(V3MetricType::Sketch, "dist.metric"); + metric.add_sketch(1000, 5, 10.0, 1.0, 10.0, &[1, 2], &[3, 2]); + metric.close(); + } + + let data = writer.close(); + + assert!(!data.vals_float64.is_empty(), "sketch sum/min/max must stay in vals_float64"); + assert!(!data.vals_sint64.is_empty(), "sketch count must be in vals_sint64"); + assert!(data.vals_float32.is_empty(), "sketch values must not be compacted to float32"); + } } diff --git a/lib/saluki-components/src/encoders/datadog/metrics/v3/encoder.rs b/lib/saluki-components/src/encoders/datadog/metrics/v3/encoder.rs index aff27fe0c01..22c9ecb221e 100644 --- a/lib/saluki-components/src/encoders/datadog/metrics/v3/encoder.rs +++ b/lib/saluki-components/src/encoders/datadog/metrics/v3/encoder.rs @@ -1,3 +1,5 @@ +use std::fmt; + use ddsketch_agent::DDSketch; use http::{uri::PathAndQuery, HeaderValue, Method, Uri}; use saluki_context::tags::SharedTagSet; @@ -12,7 +14,19 @@ use crate::{ }, }; -use super::{V3MetricType, V3Writer}; +use super::{serialize_v3_payload, V3MetricType, V3Writer}; + +/// Infallible encode error (V3 serialization is always successful). +#[derive(Debug)] +pub struct V3EncodeError; + +impl fmt::Display for V3EncodeError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.write_str("V3 encode error (unreachable)") + } +} + +impl std::error::Error for V3EncodeError {} #[derive(Debug)] pub struct MetricsV3EndpointEncoder { @@ -44,7 +58,7 @@ impl MetricsV3EndpointEncoder { impl EndpointEncoder for MetricsV3EndpointEncoder { type Input = Metric; - type EncodeError = protobuf::Error; + type EncodeError = V3EncodeError; fn encoder_name() -> &'static str { "metricsv3" @@ -70,7 +84,18 @@ impl EndpointEncoder for MetricsV3EndpointEncoder { } fn encode(&mut self, input: &Self::Input, buffer: &mut Vec) -> Result<(), Self::EncodeError> { - write_metric_to_v3(&mut self.writer, input, &self.additional_tags); + // V3 is a columnar format that ideally batches many metrics before serializing, + // but the EndpointEncoder interface requires immediate per-call output. + // We write a single-metric V3 payload per call. While this sacrifices cross-metric + // string deduplication, it satisfies the size-accounting contract of the request + // builder and produces valid V3 wire format. + // + // TODO: Add a batch-oriented flush mechanism to EndpointEncoder to unlock the full + // columnar benefits (cross-metric delta encoding and dictionary deduplication). + let mut single_writer = V3Writer::new(); + write_metric_to_v3(&mut single_writer, input, &self.additional_tags); + let data = single_writer.close(); + serialize_v3_payload(&data, buffer); Ok(()) } From 845b7b62dda90f2a4aa5dba151bf669b2a63bcf9 Mon Sep 17 00:00:00 2001 From: Stephen Wakely Date: Wed, 24 Jun 2026 13:27:20 +0100 Subject: [PATCH 3/3] chore(metrics): remove unused WIRE_VARINT constant and redundant field_numbers module field_numbers was a duplicate of the private mod field in serializer.rs and was not referenced anywhere. WIRE_VARINT (protobuf wire type 0) is not used by the current serializer. --- lib/datadog-metrics-v3/src/serializer.rs | 1 - lib/datadog-metrics-v3/src/types.rs | 41 +----------------------- 2 files changed, 1 insertion(+), 41 deletions(-) diff --git a/lib/datadog-metrics-v3/src/serializer.rs b/lib/datadog-metrics-v3/src/serializer.rs index 3a0350aa134..093b07fc5df 100644 --- a/lib/datadog-metrics-v3/src/serializer.rs +++ b/lib/datadog-metrics-v3/src/serializer.rs @@ -7,7 +7,6 @@ use super::writer::V3EncodedData; // ── Protobuf wire types ────────────────────────────────────────────────────── -const WIRE_VARINT: u32 = 0; const WIRE_LEN: u32 = 2; // ── Field numbers (from payload_v3.proto) ──────────────────────────────────── diff --git a/lib/datadog-metrics-v3/src/types.rs b/lib/datadog-metrics-v3/src/types.rs index 3ffbcb97799..64a5f74ac0f 100644 --- a/lib/datadog-metrics-v3/src/types.rs +++ b/lib/datadog-metrics-v3/src/types.rs @@ -1,43 +1,4 @@ -//! V3 payload type definitions and protocol buffer field numbers. - -/// Protocol buffer field numbers for MetricData message. -/// -/// These correspond to the field numbers in `payload_v3.proto`. -pub mod field_numbers { - // Dictionary fields - pub const DICT_NAME_STR: u32 = 1; - pub const DICT_TAGS_STR: u32 = 2; - pub const DICT_TAGSETS: u32 = 3; - pub const DICT_RESOURCE_STR: u32 = 4; - pub const DICT_RESOURCE_LEN: u32 = 5; - pub const DICT_RESOURCE_TYPE: u32 = 6; - pub const DICT_RESOURCE_NAME: u32 = 7; - pub const DICT_SOURCE_TYPE_NAME: u32 = 8; - pub const DICT_ORIGIN_INFO: u32 = 9; - - // Per-metric columns - pub const TYPES: u32 = 10; - pub const NAMES: u32 = 11; - pub const TAGS: u32 = 12; - pub const RESOURCES: u32 = 13; - pub const INTERVALS: u32 = 14; - pub const NUM_POINTS: u32 = 15; - - // Point data - pub const TIMESTAMPS: u32 = 16; - pub const VALS_SINT64: u32 = 17; - pub const VALS_FLOAT32: u32 = 18; - pub const VALS_FLOAT64: u32 = 19; - - // Sketch data - pub const SKETCH_NUM_BINS: u32 = 20; - pub const SKETCH_BIN_KEYS: u32 = 21; - pub const SKETCH_BIN_CNTS: u32 = 22; - - // Additional per-metric columns - pub const SOURCE_TYPE_NAME: u32 = 23; - pub const ORIGIN_INFO: u32 = 24; -} +//! V3 payload type definitions. /// V3 metric type values. ///