1313import java .io .ByteArrayOutputStream ;
1414import java .io .IOException ;
1515import java .util .ArrayList ;
16+ import java .util .Arrays ;
1617import java .util .Collections ;
1718import java .util .HashMap ;
1819import java .util .HashSet ;
1920import java .util .List ;
2021import java .util .Map ;
22+ import java .util .Objects ;
2123import java .util .Optional ;
2224import java .util .Set ;
2325import java .util .concurrent .Callable ;
3335import org .apache .avro .specific .SpecificDatumWriter ;
3436import org .apache .avro .specific .SpecificRecordBase ;
3537import org .apache .commons .lang3 .StringUtils ;
38+ import org .apache .commons .lang3 .tuple .Pair ;
3639import org .hypertrace .core .datamodel .AttributeValue ;
3740import org .hypertrace .core .datamodel .Attributes ;
3841import org .hypertrace .core .datamodel .Event ;
@@ -71,13 +74,27 @@ public class JaegerSpanNormalizer implements SpanNormalizer<Span, RawSpan> {
7174 private static final String DEFAULT_TENANT_ID_CONFIG = "processor.defaultTenantId" ;
7275 // list of tenant ids to exclude
7376 private static final String TENANT_IDS_TO_EXCLUDE_CONFIG = "processor.excludeTenantIds" ;
74- private final List <String > tenantIdsToExclude ;
77+
78+ /**
79+ * Config key using which a list of criterion can be specified to drop the matching spans.
80+ * Any span matching any one of the criterion is dropped. Each criteria is a comma separated
81+ * list of key:value pairs and multiple pairs in one criteria are AND'ed.
82+ *
83+ * For example: ["messaging.destination_kind:queue,messaging.operation:receive,messaging.system:jms"]
84+ * drops all spans which have all 3 attribute:value pairs.
85+ */
86+ private static final String SPAN_DROP_CRITERION_CONFIG = "processor.spanDropCriterion" ;
87+
88+ private static final String COMMA = "," ;
89+ private static final String COLON = ":" ;
7590
7691 private static final String SPAN_NORMALIZATION_TIME_METRIC = "span.normalization.time" ;
7792
7893 private static JaegerSpanNormalizer INSTANCE ;
7994 private final FieldsGenerator fieldsGenerator ;
8095 private final TenantIdProvider tenantIdProvider ;
96+ private final List <String > tenantIdsToExclude ;
97+ private final List <List <Pair <String , String >>> spanDropCriterion ;
8198 private final ConcurrentMap <String , Timer > tenantToSpanNormalizationTimer = new ConcurrentHashMap <>();
8299
83100 public static JaegerSpanNormalizer get (Config config ) {
@@ -139,6 +156,32 @@ public JaegerSpanNormalizer(Config config) {
139156 if (!this .tenantIdsToExclude .isEmpty ()) {
140157 LOG .info ("list of tenant ids to exclude : {}" , this .tenantIdsToExclude );
141158 }
159+
160+ List <String > criterion = config .hasPath (SPAN_DROP_CRITERION_CONFIG )
161+ ? config .getStringList (SPAN_DROP_CRITERION_CONFIG )
162+ : Collections .emptyList ();
163+
164+ // Parse the config to see if there is any criteria to drop spans.
165+ this .spanDropCriterion = criterion .stream ()
166+ // Split each criteria based on comma
167+ .map (s -> s .split (COMMA ))
168+ .map (a -> Arrays .stream (a ).map (this ::convertToPair ).filter (Objects ::nonNull ).collect (Collectors .toList ()))
169+ .collect (Collectors .toList ());
170+
171+ if (!this .spanDropCriterion .isEmpty ()) {
172+ LOG .info ("Span drop criterion: {}" , this .spanDropCriterion );
173+ }
174+ }
175+
176+ @ Nullable
177+ private Pair <String , String > convertToPair (String s ) {
178+ if (s != null && s .contains (COLON )) {
179+ String [] parts = s .split (COLON );
180+ if (parts .length == 2 ) {
181+ return Pair .of (parts [0 ], parts [1 ]);
182+ }
183+ }
184+ return null ;
142185 }
143186
144187 public Timer getSpanNormalizationTimer (String tenantId ) {
@@ -166,6 +209,10 @@ public RawSpan convert(Span jaegerSpan) throws Exception {
166209 return null ;
167210 }
168211
212+ if (!this .spanDropCriterion .isEmpty () && shouldDropSpan (tags )) {
213+ return null ;
214+ }
215+
169216 // Record the time taken for converting the span, along with the tenant id tag.
170217 return tenantToSpanNormalizationTimer
171218 .computeIfAbsent (
@@ -176,6 +223,17 @@ public RawSpan convert(Span jaegerSpan) throws Exception {
176223 .recordCallable (getRawSpanNormalizerCallable (jaegerSpan , tags , tenantId ));
177224 }
178225
226+ /**
227+ * Method to check if the given span attributes match any of the drop criterion. Returns true if the span should be
228+ * dropped, false otherwise.
229+ */
230+ private boolean shouldDropSpan (Map <String , KeyValue > tags ) {
231+ return this .spanDropCriterion .stream ()
232+ .anyMatch (l -> l .stream ()
233+ .allMatch (p -> tags .containsKey (p .getLeft ())
234+ && StringUtils .equals (tags .get (p .getLeft ()).getVStr (), p .getRight ())));
235+ }
236+
179237 @ Nonnull
180238 private Callable <RawSpan > getRawSpanNormalizerCallable (Span jaegerSpan ,
181239 Map <String , KeyValue > spanTags , String tenantId ) {
@@ -220,7 +278,7 @@ private Event buildEvent(
220278
221279 // SPAN REFS
222280 List <JaegerSpanInternalModel .SpanRef > referencesList = jaegerSpan .getReferencesList ();
223- if (referencesList != null && referencesList .size () > 0 ) {
281+ if (referencesList .size () > 0 ) {
224282 eventBuilder .setEventRefList (new ArrayList <>());
225283 // Convert the reflist to a set to remove duplicate references. This has been observed in the
226284 // field.
@@ -272,13 +330,13 @@ private Event buildEvent(
272330
273331 // WARNINGS
274332 ProtocolStringList warningsList = jaegerSpan .getWarningsList ();
275- if (warningsList != null && warningsList .size () > 0 ) {
333+ if (warningsList .size () > 0 ) {
276334 jaegerFieldsBuilder .setWarnings (warningsList );
277335 }
278336 // LOGS - NOTE: This is modeled as a google.protobuf.Any
279337 List <JaegerSpanInternalModel .Log > logsList = jaegerSpan .getLogsList ();
280338 List <String > eventLogsList = new ArrayList <>();
281- if (logsList != null && logsList .size () > 0 ) {
339+ if (logsList .size () > 0 ) {
282340 for (Log log : logsList ) {
283341 try {
284342 String json = JsonFormat .printer ().omittingInsignificantWhitespace ().print (log );
@@ -292,11 +350,11 @@ private Event buildEvent(
292350
293351 // Jaeger service name can come from either first class field in Span or the tag `jaeger.servicename`
294352 String serviceName =
295- ( jaegerSpan . getProcess () != null && ! StringUtils .isEmpty (jaegerSpan .getProcess ().getServiceName () ))
353+ ! StringUtils .isEmpty (jaegerSpan .getProcess ().getServiceName ())
296354 ? jaegerSpan .getProcess ().getServiceName ()
297- : ( attributeFieldMap .containsKey (OLD_JAEGER_SERVICENAME_KEY )
355+ : attributeFieldMap .containsKey (OLD_JAEGER_SERVICENAME_KEY )
298356 ? attributeFieldMap .get (OLD_JAEGER_SERVICENAME_KEY ).getValue ()
299- : StringUtils .EMPTY ) ;
357+ : StringUtils .EMPTY ;
300358
301359 if (!StringUtils .isEmpty (serviceName )) {
302360 eventBuilder .setServiceName (serviceName );
@@ -337,7 +395,7 @@ public static <T extends SpecificRecordBase> String convertToJsonString(T object
337395 writer .write (object , encoder );
338396 encoder .flush ();
339397 output .flush ();
340- return new String ( output .toByteArray () );
398+ return output .toString ( );
341399 }
342400 }
343401}
0 commit comments