Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.openjdk.jmh.annotations.TearDown;
import org.openjdk.jmh.annotations.Threads;
import org.openjdk.jmh.annotations.Warmup;
import org.openjdk.jmh.infra.BenchmarkParams;

/*
* Run this along with a profiler to measure the CPU usage of BatchSpanProcessor's exporter thread.
Expand All @@ -47,7 +48,8 @@ public static class BenchmarkState {
private long droppedSpans;

@Setup(Level.Iteration)
public final void setup() {
public final void setup(BenchmarkParams params) {
numThreads = params.getThreads();
metricReader = InMemoryMetricReader.create();
MeterProvider meterProvider =
SdkMeterProvider.builder().registerMetricReader(metricReader).build();
Expand Down Expand Up @@ -110,7 +112,6 @@ private static void doWork(BenchmarkState benchmarkState) {
@OutputTimeUnit(TimeUnit.SECONDS)
public void export_01Thread(
BenchmarkState benchmarkState, @SuppressWarnings("unused") ThreadState threadState) {
benchmarkState.numThreads = 1;
doWork(benchmarkState);
}

Expand All @@ -123,7 +124,6 @@ public void export_01Thread(
@OutputTimeUnit(TimeUnit.SECONDS)
public void export_02Thread(
BenchmarkState benchmarkState, @SuppressWarnings("unused") ThreadState threadState) {
benchmarkState.numThreads = 2;
doWork(benchmarkState);
}

Expand All @@ -136,7 +136,6 @@ public void export_02Thread(
@OutputTimeUnit(TimeUnit.SECONDS)
public void export_05Thread(
BenchmarkState benchmarkState, @SuppressWarnings("unused") ThreadState threadState) {
benchmarkState.numThreads = 5;
doWork(benchmarkState);
}

Expand All @@ -149,7 +148,6 @@ public void export_05Thread(
@OutputTimeUnit(TimeUnit.SECONDS)
public void export_10Thread(
BenchmarkState benchmarkState, @SuppressWarnings("unused") ThreadState threadState) {
benchmarkState.numThreads = 10;
doWork(benchmarkState);
}

Expand All @@ -162,7 +160,6 @@ public void export_10Thread(
@OutputTimeUnit(TimeUnit.SECONDS)
public void export_20Thread(
BenchmarkState benchmarkState, @SuppressWarnings("unused") ThreadState threadState) {
benchmarkState.numThreads = 20;
doWork(benchmarkState);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.openjdk.jmh.annotations.TearDown;
import org.openjdk.jmh.annotations.Threads;
import org.openjdk.jmh.annotations.Warmup;
import org.openjdk.jmh.infra.BenchmarkParams;

public class BatchSpanProcessorDroppedSpansBenchmark {

Expand All @@ -36,9 +37,12 @@ public static class BenchmarkState {
private long exportedSpans;
private long droppedSpans;
private int numThreads;
private int numIterations;

@Setup(Level.Iteration)
public final void setup() {
public final void setup(BenchmarkParams params) {
numThreads = params.getThreads();
numIterations = params.getMeasurement().getCount();
metricReader = InMemoryMetricReader.create();
MeterProvider meterProvider =
SdkMeterProvider.builder().registerMetricReader(metricReader).build();
Expand All @@ -52,7 +56,7 @@ public final void setup() {
public final void recordMetrics() {
BatchSpanProcessorMetrics metrics =
new BatchSpanProcessorMetrics(metricReader.collectAllMetrics(), numThreads);
dropRatio = metrics.dropRatio();
dropRatio = metrics.dropRatio(numIterations);
exportedSpans = metrics.exportedSpans();
droppedSpans = metrics.droppedSpans();
}
Expand All @@ -64,7 +68,7 @@ public final void tearDown() {
}

@State(Scope.Thread)
@AuxCounters(AuxCounters.Type.OPERATIONS)
@AuxCounters(AuxCounters.Type.EVENTS)
public static class ThreadState {
BenchmarkState benchmarkState;

Expand Down Expand Up @@ -95,7 +99,6 @@ public long droppedSpans() {
@BenchmarkMode(Mode.Throughput)
public void export(
BenchmarkState benchmarkState, @SuppressWarnings("unused") ThreadState threadState) {
benchmarkState.numThreads = 5;
benchmarkState.processor.onEnd(
(ReadableSpan) benchmarkState.tracer.spanBuilder("span").startSpan());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,13 @@

package io.opentelemetry.sdk.trace.export;

import static io.opentelemetry.api.common.AttributeKey.stringKey;
import static io.opentelemetry.api.common.AttributeKey.booleanKey;

import io.opentelemetry.sdk.metrics.data.LongPointData;
import io.opentelemetry.sdk.metrics.data.MetricData;
import java.util.Collection;
import java.util.OptionalLong;
import org.openjdk.jmh.annotations.AuxCounters;

public class BatchSpanProcessorMetrics {
private final Collection<MetricData> allMetrics;
Expand All @@ -21,13 +22,26 @@ public BatchSpanProcessorMetrics(Collection<MetricData> allMetrics, int numThrea
this.numThreads = numThreads;
}

public double dropRatio() {
/**
* Returns the share of dropped spans as a value in {@code [0, 1]}.
*
* <p>Only meaningful for benchmarks whose {@link AuxCounters} use {@link
* AuxCounters.Type#EVENTS}: JMH emits a {@code ScalarResult} with {@code AggregationPolicy.SUM},
* which sums the aux counters across the {@code numThreads} thread-local {@code @State} instances
* and the {@code numIterations} measurement iterations. The raw {@code dropped / (exported +
* dropped)} is therefore inflated by {@code numThreads * numIterations}, and both factors are
* divided back out here so the value reported by JMH equals the per-iteration drop ratio.
*
* <p>For {@link AuxCounters.Type#OPERATIONS} JMH emits a {@code ThroughputResult} (rate per unit
* time) aggregated as a mean across iterations, so this method does not apply — the drop rate is
* read directly from {@link #droppedSpans()} and the ratio, if needed, is computed from the two
* rates.
*/
public double dropRatio(int numIterations) {
long exported = getMetric(false);
long dropped = getMetric(true);
long total = exported + dropped;
// Due to peculiarities of JMH reporting we have to divide this by the number of the
// concurrent threads running the actual benchmark.
return total == 0 ? 0 : (double) dropped / total / numThreads;
return total == 0 ? 0.0 : (double) dropped / total / numThreads / numIterations;
}

public long exportedSpans() {
Expand All @@ -39,14 +53,17 @@ public long droppedSpans() {
}

private long getMetric(boolean dropped) {
String labelValue = String.valueOf(dropped);
OptionalLong value =
allMetrics.stream()
.filter(metricData -> metricData.getName().equals("processedSpans"))
.filter(metricData -> !metricData.isEmpty())
.map(metricData -> metricData.getLongSumData().getPoints())
.flatMap(Collection::stream)
.filter(point -> labelValue.equals(point.getAttributes().get(stringKey("dropped"))))
.filter(
point -> {
Boolean attrDropped = point.getAttributes().get(booleanKey("dropped"));
return attrDropped != null && attrDropped == dropped;
})
.mapToLong(LongPointData::getValue)
.findFirst();
return value.isPresent() ? value.getAsLong() : 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.openjdk.jmh.annotations.TearDown;
import org.openjdk.jmh.annotations.Threads;
import org.openjdk.jmh.annotations.Warmup;
import org.openjdk.jmh.infra.BenchmarkParams;

@State(Scope.Benchmark)
public class BatchSpanProcessorMultiThreadBenchmark {
Expand All @@ -37,15 +38,19 @@ public static class BenchmarkState {
private BatchSpanProcessor processor;
private Tracer tracer;
private int numThreads = 1;
private int numIterations = 1;

@Param({"0"})
private int delayMs;

private double dropRatio;
private long exportedSpans;
private long droppedSpans;

@Setup(Level.Iteration)
public final void setup() {
public final void setup(BenchmarkParams params) {
numThreads = params.getThreads();
numIterations = params.getMeasurement().getCount();
collector = InMemoryMetricReader.create();
MeterProvider meterProvider =
SdkMeterProvider.builder().registerMetricReader(collector).build();
Expand All @@ -59,14 +64,15 @@ public final void setup() {
public final void recordMetrics() {
BatchSpanProcessorMetrics metrics =
new BatchSpanProcessorMetrics(collector.collectAllMetrics(), numThreads);
dropRatio = metrics.dropRatio(numIterations);
exportedSpans = metrics.exportedSpans();
droppedSpans = metrics.droppedSpans();
processor.shutdown().join(10, TimeUnit.SECONDS);
}
}

@State(Scope.Thread)
@AuxCounters(AuxCounters.Type.OPERATIONS)
@AuxCounters(AuxCounters.Type.EVENTS)
public static class ThreadState {
BenchmarkState benchmarkState;

Expand All @@ -75,6 +81,10 @@ public final void recordMetrics(BenchmarkState benchmarkState) {
this.benchmarkState = benchmarkState;
}

public double dropRatio() {
return benchmarkState.dropRatio;
}

public long exportedSpans() {
return benchmarkState.exportedSpans;
}
Expand All @@ -93,7 +103,6 @@ public long droppedSpans() {
@OutputTimeUnit(TimeUnit.SECONDS)
public void export_01Thread(
BenchmarkState benchmarkState, @SuppressWarnings("unused") ThreadState threadState) {
benchmarkState.numThreads = 1;
benchmarkState.processor.onEnd(
(ReadableSpan) benchmarkState.tracer.spanBuilder("span").startSpan());
}
Expand All @@ -107,7 +116,6 @@ public void export_01Thread(
@OutputTimeUnit(TimeUnit.SECONDS)
public void export_02Thread(
BenchmarkState benchmarkState, @SuppressWarnings("unused") ThreadState threadState) {
benchmarkState.numThreads = 2;
benchmarkState.processor.onEnd(
(ReadableSpan) benchmarkState.tracer.spanBuilder("span").startSpan());
}
Expand All @@ -121,7 +129,6 @@ public void export_02Thread(
@OutputTimeUnit(TimeUnit.SECONDS)
public void export_05Thread(
BenchmarkState benchmarkState, @SuppressWarnings("unused") ThreadState threadState) {
benchmarkState.numThreads = 5;
benchmarkState.processor.onEnd(
(ReadableSpan) benchmarkState.tracer.spanBuilder("span").startSpan());
}
Expand All @@ -135,7 +142,6 @@ public void export_05Thread(
@OutputTimeUnit(TimeUnit.SECONDS)
public void export_10Thread(
BenchmarkState benchmarkState, @SuppressWarnings("unused") ThreadState threadState) {
benchmarkState.numThreads = 10;
benchmarkState.processor.onEnd(
(ReadableSpan) benchmarkState.tracer.spanBuilder("span").startSpan());
}
Expand All @@ -149,7 +155,6 @@ public void export_10Thread(
@OutputTimeUnit(TimeUnit.SECONDS)
public void export_20Thread(
BenchmarkState benchmarkState, @SuppressWarnings("unused") ThreadState threadState) {
benchmarkState.numThreads = 20;
benchmarkState.processor.onEnd(
(ReadableSpan) benchmarkState.tracer.spanBuilder("span").startSpan());
}
Expand Down
Loading