Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion crates/health/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ name = "forge-hw-health"
path = "src/main.rs"

[dependencies]
arc-swap = { workspace = true }
async-trait = { workspace = true }
base64 = { workspace = true }
chrono = { workspace = true }
Expand Down
46 changes: 24 additions & 22 deletions crates/health/benches/collector_pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ use std::sync::Arc;
use carbide_health::endpoint::{BmcAddr, EndpointMetadata, MachineData};
use carbide_health::metrics::MetricsManager;
use carbide_health::sink::{
CollectorEvent, CompositeDataSink, DataSink, EventContext, FirmwareInfo, LogRecord,
MetricSample, PrometheusSink,
CompositeSyncEventNode, EventContext, FirmwareInfo, HealthEvent, LogRecord, MetricSample,
PrometheusSink, SyncEventNode,
};
use criterion::{BenchmarkId, Criterion, Throughput, criterion_group, criterion_main};
use mac_address::MacAddress;
Expand All @@ -34,14 +34,15 @@ const MACHINE_ID: &str = "fm100htjtiaehv1n5vh67tbmqq4eabcjdng40f7jupsadbedhruh6r

struct CountingSink;

impl DataSink for CountingSink {
fn sink_type(&self) -> &'static str {
impl SyncEventNode for CountingSink {
fn node_type(&self) -> &'static str {
"counting_sink"
}

fn handle_event(&self, context: &EventContext, event: &CollectorEvent) {
fn handle_event(&self, context: &EventContext, event: &HealthEvent) -> Vec<HealthEvent> {
black_box(context);
black_box(event);
Vec::new()
}
}

Expand All @@ -65,14 +66,14 @@ fn event_context() -> EventContext {
}
}

fn build_sensor_metric_event(idx: usize, unique_keys: usize) -> CollectorEvent {
fn build_sensor_metric_event(idx: usize, unique_keys: usize) -> HealthEvent {
let unique_keys = unique_keys.max(1);
let sensor_idx = idx % unique_keys;
let sensor_key = format!("sensor-{sensor_idx}");
let machine_idx = idx % 16;
let rack_idx = idx % 4;

CollectorEvent::Metric(
HealthEvent::MeasurementObserved(
MetricSample {
key: sensor_key.clone(),
name: "hw_sensor".to_string(),
Expand All @@ -92,8 +93,8 @@ fn build_sensor_metric_event(idx: usize, unique_keys: usize) -> CollectorEvent {
)
}

fn build_nmxt_metric_event(idx: usize) -> CollectorEvent {
CollectorEvent::Metric(
fn build_nmxt_metric_event(idx: usize) -> HealthEvent {
HealthEvent::MeasurementObserved(
MetricSample {
key: format!("effective_ber:{}", idx % 64),
name: "switch_nmxt".to_string(),
Expand All @@ -112,8 +113,8 @@ fn build_nmxt_metric_event(idx: usize) -> CollectorEvent {
)
}

fn build_log_event(idx: usize) -> CollectorEvent {
CollectorEvent::Log(
fn build_log_event(idx: usize) -> HealthEvent {
HealthEvent::LogObserved(
LogRecord {
body: format!("BMC event line {idx}"),
severity: "INFO".to_string(),
Expand All @@ -128,9 +129,10 @@ fn build_log_event(idx: usize) -> CollectorEvent {
)
}

fn build_firmware_event(idx: usize) -> CollectorEvent {
fn build_firmware_event(idx: usize) -> HealthEvent {
let component = format!("component-{idx}");
CollectorEvent::Firmware(FirmwareInfo {
HealthEvent::FirmwareObserved(FirmwareInfo {
id: format!("firmware-{idx}"),
component: component.clone(),
version: format!("1.0.{}", idx % 100),
attributes: vec![
Expand All @@ -140,8 +142,8 @@ fn build_firmware_event(idx: usize) -> CollectorEvent {
})
}

fn bench_collector_event_build(c: &mut Criterion) {
let mut group = c.benchmark_group("collector_event_build");
fn bench_health_event_build(c: &mut Criterion) {
let mut group = c.benchmark_group("health_event_build");
let sample_count = 10_000usize;
group.throughput(Throughput::Elements(sample_count as u64));

Expand Down Expand Up @@ -181,20 +183,20 @@ fn bench_collector_event_build(c: &mut Criterion) {
}

fn emit_metric_batch_building(
sink: &dyn DataSink,
sink: &dyn SyncEventNode,
context: &EventContext,
batch_size: usize,
unique_keys: usize,
) {
let start = CollectorEvent::MetricCollectionStart;
let start = HealthEvent::ScrapeBatchStarted;
sink.handle_event(context, &start);

for idx in 0..batch_size {
let event = build_sensor_metric_event(idx, unique_keys);
sink.handle_event(context, &event);
}

let end = CollectorEvent::MetricCollectionEnd;
let end = HealthEvent::ScrapeBatchFinished;
sink.handle_event(context, &end);
}

Expand Down Expand Up @@ -224,21 +226,21 @@ fn bench_collector_build_and_emit_prometheus(c: &mut Criterion) {
}

struct CompositeBuildEmitState {
sink: CompositeDataSink,
sink: CompositeSyncEventNode,
context: EventContext,
}

impl CompositeBuildEmitState {
fn new(sink_count: usize) -> Self {
let mut sinks: Vec<Arc<dyn DataSink>> = Vec::with_capacity(sink_count);
let mut sinks: Vec<Arc<dyn SyncEventNode>> = Vec::with_capacity(sink_count);
for _ in 0..sink_count {
sinks.push(Arc::new(CountingSink));
}

let metrics_manager = Arc::new(
MetricsManager::new("bench_collector").expect("metrics manager should initialize"),
);
let sink = CompositeDataSink::new(sinks, metrics_manager);
let sink = CompositeSyncEventNode::new(sinks, metrics_manager);

Self {
sink,
Expand Down Expand Up @@ -268,7 +270,7 @@ fn bench_collector_build_and_emit_composite(c: &mut Criterion) {

criterion_group!(
benches,
bench_collector_event_build,
bench_health_event_build,
bench_collector_build_and_emit_prometheus,
bench_collector_build_and_emit_composite
);
Expand Down
93 changes: 48 additions & 45 deletions crates/health/benches/processor_pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,11 @@ use std::sync::Arc;
use carbide_health::endpoint::{BmcAddr, EndpointMetadata, MachineData};
use carbide_health::metrics::MetricsManager;
use carbide_health::processor::{
EventProcessingPipeline, EventProcessor, HealthReportProcessor, LeakEventProcessor,
RackLeakProcessor,
EventGraph, HealthReportProcessor, LeakSyncEventNode, RackLeakProcessor,
};
use carbide_health::sink::{
CollectorEvent, CompositeDataSink, DataSink, EventContext, MetricSample, SensorThresholdContext,
CompositeSyncEventNode, EventContext, HealthEvent, MetricSample, SensorThresholdContext,
SyncEventNode,
};
use carbide_uuid::rack::RackId;
use criterion::{BenchmarkId, Criterion, Throughput, criterion_group, criterion_main};
Expand All @@ -38,45 +38,38 @@ const MACHINE_ID: &str = "fm100htjtiaehv1n5vh67tbmqq4eabcjdng40f7jupsadbedhruh6r

struct CountingSink;

impl DataSink for CountingSink {
fn sink_type(&self) -> &'static str {
impl SyncEventNode for CountingSink {
fn node_type(&self) -> &'static str {
"counting_sink"
}

fn handle_event(&self, context: &EventContext, event: &CollectorEvent) {
fn handle_event(&self, context: &EventContext, event: &HealthEvent) -> Vec<HealthEvent> {
std::hint::black_box(context);
std::hint::black_box(event);
Vec::new()
}
}

struct NoopProcessor;

impl EventProcessor for NoopProcessor {
fn processor_type(&self) -> &'static str {
impl SyncEventNode for NoopProcessor {
fn node_type(&self) -> &'static str {
"noop_processor"
}

fn process_event(
&self,
_context: &EventContext,
_event: &CollectorEvent,
) -> Vec<CollectorEvent> {
fn handle_event(&self, _context: &EventContext, _event: &HealthEvent) -> Vec<HealthEvent> {
Vec::new()
}
}

struct ReemitProcessor;

impl EventProcessor for ReemitProcessor {
fn processor_type(&self) -> &'static str {
impl SyncEventNode for ReemitProcessor {
fn node_type(&self) -> &'static str {
"reemit_processor"
}

fn process_event(
&self,
_context: &EventContext,
event: &CollectorEvent,
) -> Vec<CollectorEvent> {
fn handle_event(&self, _context: &EventContext, event: &HealthEvent) -> Vec<HealthEvent> {
vec![event.clone()]
}
}
Expand All @@ -101,19 +94,33 @@ fn event_context() -> EventContext {
}
}

fn make_composite_sink(count: usize, metrics_manager: Arc<MetricsManager>) -> Arc<dyn DataSink> {
let mut sinks: Vec<Arc<dyn DataSink>> = Vec::with_capacity(count);
fn make_composite_sink(
count: usize,
metrics_manager: Arc<MetricsManager>,
) -> Arc<dyn SyncEventNode> {
let mut sinks: Vec<Arc<dyn SyncEventNode>> = Vec::with_capacity(count);
for _ in 0..count {
sinks.push(Arc::new(CountingSink));
}
Arc::new(CompositeDataSink::new(sinks, metrics_manager))
Arc::new(CompositeSyncEventNode::new(sinks, metrics_manager))
}

fn make_event_graph(
sink: Arc<dyn SyncEventNode>,
processors: Vec<Arc<dyn SyncEventNode>>,
metrics_manager: Arc<MetricsManager>,
) -> EventGraph {
let mut nodes = Vec::with_capacity(processors.len() + 1);
nodes.push(sink);
nodes.extend(processors);
EventGraph::new(nodes, metrics_manager)
}

fn metric_events(
batch_size: usize,
unique_keys: usize,
with_health_context: bool,
) -> Vec<CollectorEvent> {
) -> Vec<HealthEvent> {
let unique_keys = unique_keys.max(1);

(0..batch_size)
Expand Down Expand Up @@ -150,18 +157,18 @@ fn metric_events(
bmc_health: BmcHealth::Warning,
});
}
CollectorEvent::Metric(metric.into())
HealthEvent::MeasurementObserved(metric.into())
})
.collect()
}

fn emit_metric_batch(sink: &dyn DataSink, context: &EventContext, events: &[CollectorEvent]) {
let start = CollectorEvent::MetricCollectionStart;
fn emit_metric_batch(sink: &dyn SyncEventNode, context: &EventContext, events: &[HealthEvent]) {
let start = HealthEvent::ScrapeBatchStarted;
sink.handle_event(context, &start);
for event in events {
sink.handle_event(context, event);
}
let end = CollectorEvent::MetricCollectionEnd;
let end = HealthEvent::ScrapeBatchFinished;
sink.handle_event(context, &end);
}

Expand All @@ -173,18 +180,14 @@ fn bench_pipeline_baseline(c: &mut Criterion) {
let metrics_manager: Arc<MetricsManager> =
Arc::new(MetricsManager::new("bench").expect("metrics manager should initialize"));
let sink = make_composite_sink(2, metrics_manager.clone());
let mut processors: Vec<Arc<dyn EventProcessor>> = Vec::with_capacity(processor_count);
let mut processors: Vec<Arc<dyn SyncEventNode>> = Vec::with_capacity(processor_count);
for _ in 0..processor_count {
processors.push(Arc::new(NoopProcessor));
}
let sink: Arc<dyn DataSink> = if processors.is_empty() {
let sink: Arc<dyn SyncEventNode> = if processors.is_empty() {
sink
} else {
Arc::new(EventProcessingPipeline::new(
processors,
sink,
metrics_manager.clone(),
))
Arc::new(make_event_graph(sink, processors, metrics_manager.clone()))
};
let context = event_context();
let events = metric_events(batch_size, 64, false);
Expand All @@ -208,13 +211,13 @@ fn bench_pipeline_health_processors(c: &mut Criterion) {
let metrics_manager: Arc<MetricsManager> =
Arc::new(MetricsManager::new("bench").expect("metrics manager should initialize"));

let processors: Vec<Arc<dyn EventProcessor>> = vec![
let processors: Vec<Arc<dyn SyncEventNode>> = vec![
Arc::new(HealthReportProcessor::default()),
Arc::new(LeakEventProcessor::new(1)),
Arc::new(LeakSyncEventNode::new(1)),
];
let pipeline = EventProcessingPipeline::new(
processors,
let pipeline = make_event_graph(
make_composite_sink(2, metrics_manager.clone()),
processors,
metrics_manager,
);
let context = event_context();
Expand All @@ -240,9 +243,9 @@ fn bench_pipeline_loop_guard(c: &mut Criterion) {
let metrics_manager: Arc<MetricsManager> =
Arc::new(MetricsManager::new("bench").expect("metrics manager should initialize"));

let pipeline = EventProcessingPipeline::new(
vec![Arc::new(ReemitProcessor)],
let pipeline = make_event_graph(
make_composite_sink(2, metrics_manager.clone()),
vec![Arc::new(ReemitProcessor)],
metrics_manager,
);
let context = event_context();
Expand Down Expand Up @@ -286,14 +289,14 @@ fn bench_pipeline_rack_leak(c: &mut Criterion) {
let metrics_manager: Arc<MetricsManager> =
Arc::new(MetricsManager::new("bench").expect("metrics manager should initialize"));

let processors: Vec<Arc<dyn EventProcessor>> = vec![
let processors: Vec<Arc<dyn SyncEventNode>> = vec![
Arc::new(HealthReportProcessor::default()),
Arc::new(LeakEventProcessor::new(1)),
Arc::new(LeakSyncEventNode::new(1)),
Arc::new(RackLeakProcessor::new(2)),
];
let pipeline = EventProcessingPipeline::new(
processors,
let pipeline = make_event_graph(
make_composite_sink(2, metrics_manager.clone()),
processors,
metrics_manager,
);

Expand Down
Loading
Loading