Skip to content

Commit a12d89e

Browse files
authored
Limit the number of spans in a trace (#128)
* Limit the number of spans in a trace * update helm chart * add counters for dropped spans and truncated traces * moved constants to RawSpanGrouperConstants class * update debug log message * update TopologyTestDriver test
1 parent 5723d26 commit a12d89e

5 files changed

Lines changed: 83 additions & 0 deletions

File tree

raw-spans-grouper/helm/templates/raw-spans-grouper-config.yaml

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,14 @@ data:
5656
5757
span.groupby.session.window.interval = {{ .Values.rawSpansGrouperConfig.span.groupby.internal }}
5858
59+
{{- if hasKey .Values.rawSpansGrouperConfig "maxSpanCount" }}
60+
max.span.count = {
61+
{{- range $k, $v := .Values.rawSpansGrouperConfig.maxSpanCount }}
62+
{{ $k }} = {{ int $v }}
63+
{{- end }}
64+
}
65+
{{- end }}
66+
5967
{{- if hasKey .Values.rawSpansGrouperConfig "metrics" }}
6068
metrics {
6169
reporter {

raw-spans-grouper/raw-spans-grouper/src/main/java/org/hypertrace/core/rawspansgrouper/RawSpanGrouperConstants.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,4 +11,7 @@ public class RawSpanGrouperConstants {
1111
public static final String SPANS_PER_TRACE_METRIC = "spans_per_trace";
1212
public static final String TRACE_CREATION_TIME = "trace.creation.time";
1313
public static final String DATAFLOW_SAMPLING_PERCENT_CONFIG_KEY = "dataflow.metriccollection.sampling.percent";
14+
public static final String INFLIGHT_TRACE_MAX_SPAN_COUNT = "max.span.count";
15+
public static final String DROPPED_SPANS_COUNTER = "hypertrace.dropped.spans";
16+
public static final String TRUNCATED_TRACES_COUNTER = "hypertrace.truncated.traces";
1417
}

raw-spans-grouper/raw-spans-grouper/src/main/java/org/hypertrace/core/rawspansgrouper/RawSpansGroupingTransformer.java

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,24 @@
11
package org.hypertrace.core.rawspansgrouper;
22

33
import static org.hypertrace.core.rawspansgrouper.RawSpanGrouperConstants.DATAFLOW_SAMPLING_PERCENT_CONFIG_KEY;
4+
import static org.hypertrace.core.rawspansgrouper.RawSpanGrouperConstants.DROPPED_SPANS_COUNTER;
5+
import static org.hypertrace.core.rawspansgrouper.RawSpanGrouperConstants.INFLIGHT_TRACE_MAX_SPAN_COUNT;
46
import static org.hypertrace.core.rawspansgrouper.RawSpanGrouperConstants.INFLIGHT_TRACE_STORE;
57
import static org.hypertrace.core.rawspansgrouper.RawSpanGrouperConstants.OUTPUT_TOPIC_PRODUCER;
68
import static org.hypertrace.core.rawspansgrouper.RawSpanGrouperConstants.RAW_SPANS_GROUPER_JOB_CONFIG;
79
import static org.hypertrace.core.rawspansgrouper.RawSpanGrouperConstants.SPAN_GROUPBY_SESSION_WINDOW_INTERVAL_CONFIG_KEY;
810
import static org.hypertrace.core.rawspansgrouper.RawSpanGrouperConstants.TRACE_EMIT_TRIGGER_STORE;
11+
import static org.hypertrace.core.rawspansgrouper.RawSpanGrouperConstants.TRUNCATED_TRACES_COUNTER;
912

1013
import com.typesafe.config.Config;
1114
import java.time.Duration;
1215
import java.time.Instant;
16+
import java.util.HashMap;
17+
import java.util.Map;
18+
import java.util.concurrent.ConcurrentHashMap;
19+
import java.util.concurrent.ConcurrentMap;
20+
21+
import io.micrometer.core.instrument.Counter;
1322
import org.apache.kafka.streams.KeyValue;
1423
import org.apache.kafka.streams.kstream.Transformer;
1524
import org.apache.kafka.streams.processor.Cancellable;
@@ -23,6 +32,7 @@
2332
import org.hypertrace.core.datamodel.RawSpans;
2433
import org.hypertrace.core.datamodel.StructuredTrace;
2534
import org.hypertrace.core.datamodel.shared.HexUtils;
35+
import org.hypertrace.core.serviceframework.metrics.PlatformMetricsRegistry;
2636
import org.hypertrace.core.spannormalizer.TraceIdentity;
2737
import org.slf4j.Logger;
2838
import org.slf4j.LoggerFactory;
@@ -45,6 +55,13 @@ public class RawSpansGroupingTransformer implements
4555
private long groupingWindowTimeoutMs;
4656
private To outputTopic;
4757
private double dataflowSamplingPercent = -1;
58+
private Map<String, Long> maxSpanCountMap = new HashMap<>();
59+
60+
// counter for number of spans dropped per tenant
61+
private static final ConcurrentMap<String, Counter> droppedSpansCounter = new ConcurrentHashMap<>();
62+
63+
// counter for number of truncated traces per tenant
64+
private static final ConcurrentMap<String, Counter> truncatedTracesCounter = new ConcurrentHashMap<>();
4865

4966
@Override
5067
public void init(ProcessorContext context) {
@@ -63,6 +80,13 @@ public void init(ProcessorContext context) {
6380
this.dataflowSamplingPercent = jobConfig.getDouble(DATAFLOW_SAMPLING_PERCENT_CONFIG_KEY);
6481
}
6582

83+
if (jobConfig.hasPath(INFLIGHT_TRACE_MAX_SPAN_COUNT)) {
84+
Config subConfig = jobConfig.getConfig(INFLIGHT_TRACE_MAX_SPAN_COUNT);
85+
subConfig.entrySet().stream().forEach((entry) -> {
86+
maxSpanCountMap.put(entry.getKey(), subConfig.getLong(entry.getKey()));
87+
});
88+
}
89+
6690
this.outputTopic = To.child(OUTPUT_TOPIC_PRODUCER);
6791
restorePunctuators();
6892
}
@@ -71,9 +95,29 @@ public void init(ProcessorContext context) {
7195
public KeyValue<String, StructuredTrace> transform(TraceIdentity key, RawSpan value) {
7296
ValueAndTimestamp<RawSpans> rawSpans = inflightTraceStore.get(key);
7397
RawSpans agg = rawSpans != null ? rawSpans.value() : RawSpans.newBuilder().build();
98+
if (maxSpanCountMap.containsKey(key.getTenantId()) && agg.getRawSpans().size() >= maxSpanCountMap.get(key.getTenantId())) {
99+
if (logger.isDebugEnabled()) {
100+
logger.debug("Dropping span [{}] from tenant_id={}, trace_id={} after grouping {} spans",
101+
value,
102+
key.getTenantId(),
103+
HexUtils.getHex(key.getTraceId()),
104+
agg.getRawSpans().size());
105+
}
106+
// increment the counter for dropped spans
107+
droppedSpansCounter.computeIfAbsent(key.getTenantId(),
108+
k -> PlatformMetricsRegistry.registerCounter(DROPPED_SPANS_COUNTER, Map.of("tenantId", k))).increment();
109+
return null;
110+
}
111+
74112
// add the new span
75113
agg.getRawSpans().add(value);
76114

115+
// increment the counter when the trace size reaches the max.span.count limit.
116+
if (maxSpanCountMap.containsKey(key.getTenantId()) && agg.getRawSpans().size() == maxSpanCountMap.get(key.getTenantId())) {
117+
truncatedTracesCounter.computeIfAbsent(key.getTenantId(),
118+
k -> PlatformMetricsRegistry.registerCounter(TRUNCATED_TRACES_COUNTER, Map.of("tenantId", k))).increment();
119+
}
120+
77121
long currentTimeMs = System.currentTimeMillis();
78122

79123
if (logger.isDebugEnabled()) {

raw-spans-grouper/raw-spans-grouper/src/test/java/org/hypertrace/core/rawspansgrouper/RawSpansGrouperTest.java

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,18 @@ public void whenRawSpansAreReceivedWithInactivityExpectTraceToBeOutput(@TempDir
8080
.setCustomerId("tenant1").setEvent(createEvent("event-4", "tenant1")).build();
8181
RawSpan span5 = RawSpan.newBuilder().setTraceId(ByteBuffer.wrap("trace-2".getBytes()))
8282
.setCustomerId("tenant1").setEvent(createEvent("event-5", "tenant1")).build();
83+
RawSpan span6 = RawSpan.newBuilder().setTraceId(ByteBuffer.wrap("trace-3".getBytes()))
84+
.setCustomerId("tenant1").setEvent(createEvent("event-6", "tenant1")).build();
85+
RawSpan span7 = RawSpan.newBuilder().setTraceId(ByteBuffer.wrap("trace-3".getBytes()))
86+
.setCustomerId("tenant1").setEvent(createEvent("event-7", "tenant1")).build();
87+
RawSpan span8 = RawSpan.newBuilder().setTraceId(ByteBuffer.wrap("trace-3".getBytes()))
88+
.setCustomerId("tenant1").setEvent(createEvent("event-8", "tenant1")).build();
89+
RawSpan span9 = RawSpan.newBuilder().setTraceId(ByteBuffer.wrap("trace-3".getBytes()))
90+
.setCustomerId("tenant1").setEvent(createEvent("event-9", "tenant1")).build();
91+
RawSpan span10 = RawSpan.newBuilder().setTraceId(ByteBuffer.wrap("trace-3".getBytes()))
92+
.setCustomerId("tenant1").setEvent(createEvent("event-10", "tenant1")).build();
93+
RawSpan span11 = RawSpan.newBuilder().setTraceId(ByteBuffer.wrap("trace-3".getBytes()))
94+
.setCustomerId("tenant1").setEvent(createEvent("event-11", "tenant1")).build();
8395

8496
inputTopic.pipeInput(createTraceIdentity(tenantId, "trace-1"), span1);
8597
inputTopic.pipeInput(createTraceIdentity(tenantId, "trace-2"), span4);
@@ -120,6 +132,18 @@ public void whenRawSpansAreReceivedWithInactivityExpectTraceToBeOutput(@TempDir
120132
trace = (StructuredTrace) outputTopic.readValue();
121133
assertEquals(1, trace.getEventList().size());
122134
assertEquals(ByteBuffer.wrap("event-5".getBytes()), trace.getEventList().get(0).getEventId());
135+
136+
inputTopic.pipeInput(createTraceIdentity(tenantId, "trace-3"), span6);
137+
inputTopic.pipeInput(createTraceIdentity(tenantId, "trace-3"), span7);
138+
inputTopic.pipeInput(createTraceIdentity(tenantId, "trace-3"), span8);
139+
inputTopic.pipeInput(createTraceIdentity(tenantId, "trace-3"), span9);
140+
inputTopic.pipeInput(createTraceIdentity(tenantId, "trace-3"), span10);
141+
inputTopic.pipeInput(createTraceIdentity(tenantId, "trace-3"), span11);
142+
td.advanceWallClockTime(Duration.ofSeconds(35));
143+
144+
// trace should be truncated with 5 spans
145+
trace = (StructuredTrace) outputTopic.readValue();
146+
assertEquals(5, trace.getEventList().size());
123147
}
124148

125149
private Event createEvent(String eventId, String tenantId) {

raw-spans-grouper/raw-spans-grouper/src/test/resources/configs/raw-spans-grouper/application.conf

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,10 @@ kafka.streams.config = {
2323
value.subject.name.strategy = "io.confluent.kafka.serializers.subject.TopicRecordNameStrategy"
2424
}
2525

26+
max.span.count = {
27+
tenant1 = 5
28+
}
29+
2630
span.groupby.session.window.interval = 5
2731

2832
span.groupby.session.window.graceperiod.ms = 100

0 commit comments

Comments
 (0)