Skip to content

Commit 96cd005

Browse files
author
Tanmay Krishna
authored
feat: adding two new backend entity type and its resolver - kafka, and sqs (#131)
* Add backend resolver for Kafka * Add amazon sqs backend resolver * Add unit test for kafka backend resolver * Add messaging.system tag check for rabbitmq * Add method to get backendURI for open tracing * Add test case for sqs backend resolver * review changes
1 parent 30eb66a commit 96cd005

11 files changed

Lines changed: 440 additions & 1 deletion

File tree

Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
package org.hypertrace.traceenricher.enrichment.enrichers;
22

33
public enum BackendType {
4-
UNKNOWN, HTTP, HTTPS, GRPC, REDIS, MONGO, JDBC, RABBIT_MQ
4+
UNKNOWN, HTTP, HTTPS, GRPC, REDIS, MONGO, JDBC, RABBIT_MQ, KAFKA, SQS
55
}

hypertrace-trace-enricher/hypertrace-trace-enricher-impl/src/main/java/org/hypertrace/traceenricher/enrichment/enrichers/resolver/backend/BackendEntityResolver.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@ public BackendEntityResolver() {
2020
new MongoBackendResolver(),
2121
new JdbcBackendResolver(),
2222
new RabbitMqBackendResolver(),
23+
new KafkaBackendResolver(),
24+
new SqsBackendResolver(),
2325
new ClientSpanEndpointResolver()
2426
);
2527
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
package org.hypertrace.traceenricher.enrichment.enrichers.resolver.backend;
2+
3+
import java.util.Optional;
4+
import org.apache.commons.lang3.StringUtils;
5+
import org.hypertrace.core.datamodel.Event;
6+
import org.hypertrace.core.datamodel.shared.StructuredTraceGraph;
7+
import org.hypertrace.entity.data.service.v1.Entity;
8+
import org.hypertrace.semantic.convention.utils.messaging.MessagingSemanticConventionUtils;
9+
import org.hypertrace.traceenricher.enrichment.enrichers.BackendType;
10+
import org.slf4j.Logger;
11+
import org.slf4j.LoggerFactory;
12+
13+
14+
public class KafkaBackendResolver extends AbstractBackendResolver {
15+
16+
private static final Logger LOGGER = LoggerFactory.getLogger(KafkaBackendResolver.class);
17+
18+
@Override
19+
public Optional<Entity> resolveEntity(Event event, StructuredTraceGraph structuredTraceGraph) {
20+
if (!MessagingSemanticConventionUtils.isKafkaBackend(event)) {
21+
return Optional.empty();
22+
}
23+
24+
Optional<String> backendURI = MessagingSemanticConventionUtils.getKafkaBackendURI(event);
25+
26+
if (backendURI.isEmpty() || StringUtils.isEmpty(backendURI.get())) {
27+
LOGGER.warn("Unable to infer a kafka backend from event: {}", event);
28+
return Optional.empty();
29+
}
30+
/*
31+
todo: should we clean protocol constants for rabbit_mq, mongo?
32+
this is not added into enriched spans constants proto as there want be http.url
33+
with kafka://bootstrap.9092
34+
* */
35+
Entity.Builder entityBuilder = getBackendEntityBuilder(
36+
BackendType.KAFKA, backendURI.get(), event);
37+
return Optional.of(entityBuilder.build());
38+
}
39+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
package org.hypertrace.traceenricher.enrichment.enrichers.resolver.backend;
2+
3+
import java.util.Optional;
4+
import org.apache.commons.lang3.StringUtils;
5+
import org.hypertrace.core.datamodel.Event;
6+
import org.hypertrace.core.datamodel.shared.StructuredTraceGraph;
7+
import org.hypertrace.entity.data.service.v1.Entity;
8+
import org.hypertrace.semantic.convention.utils.messaging.MessagingSemanticConventionUtils;
9+
import org.hypertrace.traceenricher.enrichment.enrichers.BackendType;
10+
import org.slf4j.Logger;
11+
import org.slf4j.LoggerFactory;
12+
13+
14+
public class SqsBackendResolver extends AbstractBackendResolver {
15+
16+
private static final Logger LOGGER = LoggerFactory.getLogger(SqsBackendResolver.class);
17+
18+
@Override
19+
public Optional<Entity> resolveEntity(Event event, StructuredTraceGraph structuredTraceGraph) {
20+
if (!MessagingSemanticConventionUtils.isSqsBackend(event)) {
21+
return Optional.empty();
22+
}
23+
24+
Optional<String> backendURI = MessagingSemanticConventionUtils.getSqsBackendURI(event);
25+
26+
if (backendURI.isEmpty() || StringUtils.isEmpty(backendURI.get())) {
27+
LOGGER.warn("Unable to infer a SQS backend from event: {}", event);
28+
return Optional.empty();
29+
}
30+
/*
31+
todo: should we clean protocol constants for rabbit_mq, mongo?
32+
this is not added into enriched spans constants proto as there want be http.url
33+
with sqs://xyz:2323.
34+
* */
35+
Entity.Builder entityBuilder = getBackendEntityBuilder(
36+
BackendType.SQS, backendURI.get(), event);
37+
return Optional.of(entityBuilder.build());
38+
}
39+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,97 @@
1+
package org.hypertrace.traceenricher.enrichment.enrichers.resolver.backend;
2+
3+
import static org.mockito.Mockito.mock;
4+
5+
import java.nio.ByteBuffer;
6+
import java.util.Arrays;
7+
import java.util.Map;
8+
import org.hypertrace.core.datamodel.AttributeValue;
9+
import org.hypertrace.core.datamodel.Attributes;
10+
import org.hypertrace.core.datamodel.Event;
11+
import org.hypertrace.core.datamodel.EventRef;
12+
import org.hypertrace.core.datamodel.EventRefType;
13+
import org.hypertrace.core.datamodel.MetricValue;
14+
import org.hypertrace.core.datamodel.Metrics;
15+
import org.hypertrace.core.datamodel.shared.StructuredTraceGraph;
16+
import org.hypertrace.entity.data.service.v1.Entity;
17+
import org.junit.jupiter.api.Assertions;
18+
import org.junit.jupiter.api.BeforeEach;
19+
import org.junit.jupiter.api.Test;
20+
21+
22+
/**
23+
* Unit Test for {@link KafkaBackendResolver}
24+
*/
25+
public class KafkaBackendResolverTest {
26+
27+
private KafkaBackendResolver kafkaBackendResolver;
28+
private StructuredTraceGraph structuredTraceGraph;
29+
30+
@BeforeEach
31+
public void setup() {
32+
kafkaBackendResolver = new KafkaBackendResolver();
33+
structuredTraceGraph = mock(StructuredTraceGraph.class);
34+
}
35+
36+
@Test
37+
public void TestOtelBackendEventResolution() {
38+
String broker = "kafka-test.hypertrace.com:9092";
39+
Entity entity = kafkaBackendResolver.resolveEntity(getOtelKafkaBackendEvent(broker), structuredTraceGraph).get();
40+
Assertions.assertEquals(broker, entity.getEntityName());
41+
}
42+
43+
@Test
44+
public void TestOTBackendEventResolution() {
45+
String brokerHost = "kafka-test.hypertrace.com";
46+
String brokerPort = "9092";
47+
Entity entity = kafkaBackendResolver.resolveEntity(getOTKafkaBackendEvent(brokerHost, brokerPort), structuredTraceGraph).get();
48+
Assertions.assertEquals(String.format("%s:%s", brokerHost, brokerPort), entity.getEntityName());
49+
}
50+
51+
private Event getOtelKafkaBackendEvent(String broker) {
52+
Event event = Event.newBuilder().setCustomerId("customer1")
53+
.setEventId(ByteBuffer.wrap("bdf03dfabf5c70f9".getBytes()))
54+
.setEntityIdList(Arrays.asList("4bfca8f7-4974-36a4-9385-dd76bf5c8824")).setEnrichedAttributes(
55+
Attributes.newBuilder().setAttributeMap(
56+
Map.of("SPAN_TYPE", AttributeValue.newBuilder().setValue("EXIT").build())).build())
57+
.setAttributes(Attributes.newBuilder().setAttributeMap(Map
58+
.of("messaging.system", AttributeValue.newBuilder().setValue("kafka").build(),
59+
"messaging.url", AttributeValue.newBuilder().setValue(broker).build(),
60+
"span.kind", AttributeValue.newBuilder().setValue("client").build(),
61+
"FLAGS", AttributeValue.newBuilder().setValue("0").build())).build())
62+
.setEventName("kafka.connection").setStartTimeMillis(1566869077746L)
63+
.setEndTimeMillis(1566869077750L).setMetrics(Metrics.newBuilder()
64+
.setMetricMap(Map.of("Duration", MetricValue.newBuilder().setValue(4.0).build())).build())
65+
.setEventRefList(Arrays.asList(
66+
EventRef.newBuilder().setTraceId(ByteBuffer.wrap("random_trace_id".getBytes()))
67+
.setEventId(ByteBuffer.wrap("random_event_id".getBytes()))
68+
.setRefType(EventRefType.CHILD_OF).build())).build();
69+
70+
71+
return event;
72+
}
73+
74+
private Event getOTKafkaBackendEvent(String host, String port) {
75+
Event event = Event.newBuilder().setCustomerId("customer1")
76+
.setEventId(ByteBuffer.wrap("bdf03dfabf5c70f9".getBytes()))
77+
.setEntityIdList(Arrays.asList("4bfca8f7-4974-36a4-9385-dd76bf5c8824")).setEnrichedAttributes(
78+
Attributes.newBuilder().setAttributeMap(
79+
Map.of("SPAN_TYPE", AttributeValue.newBuilder().setValue("EXIT").build())).build())
80+
.setAttributes(Attributes.newBuilder().setAttributeMap(Map
81+
.of("peer.service", AttributeValue.newBuilder().setValue("kafka").build(),
82+
"peer.hostname", AttributeValue.newBuilder().setValue(host).build(),
83+
"peer.port", AttributeValue.newBuilder().setValue(port).build(),
84+
"span.kind", AttributeValue.newBuilder().setValue("client").build(),
85+
"FLAGS", AttributeValue.newBuilder().setValue("0").build())).build())
86+
.setEventName("kafka.connection").setStartTimeMillis(1566869077746L)
87+
.setEndTimeMillis(1566869077750L).setMetrics(Metrics.newBuilder()
88+
.setMetricMap(Map.of("Duration", MetricValue.newBuilder().setValue(4.0).build())).build())
89+
.setEventRefList(Arrays.asList(
90+
EventRef.newBuilder().setTraceId(ByteBuffer.wrap("random_trace_id".getBytes()))
91+
.setEventId(ByteBuffer.wrap("random_event_id".getBytes()))
92+
.setRefType(EventRefType.CHILD_OF).build())).build();
93+
94+
95+
return event;
96+
}
97+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
package org.hypertrace.traceenricher.enrichment.enrichers.resolver.backend;
2+
3+
import java.net.MalformedURLException;
4+
import java.net.URL;
5+
import java.nio.ByteBuffer;
6+
import java.util.Arrays;
7+
import java.util.Map;
8+
import org.hypertrace.core.datamodel.AttributeValue;
9+
import org.hypertrace.core.datamodel.Attributes;
10+
import org.hypertrace.core.datamodel.Event;
11+
import org.hypertrace.core.datamodel.EventRef;
12+
import org.hypertrace.core.datamodel.EventRefType;
13+
import org.hypertrace.core.datamodel.MetricValue;
14+
import org.hypertrace.core.datamodel.Metrics;
15+
import org.hypertrace.core.datamodel.shared.StructuredTraceGraph;
16+
import org.hypertrace.entity.data.service.v1.Entity;
17+
import org.junit.jupiter.api.Assertions;
18+
import org.junit.jupiter.api.BeforeEach;
19+
import org.junit.jupiter.api.Test;
20+
21+
import static org.mockito.Mockito.mock;
22+
23+
24+
public class SqsBackendResolverTest {
25+
26+
private SqsBackendResolver sqsBackendResolver;
27+
private StructuredTraceGraph structuredTraceGraph;
28+
29+
@BeforeEach
30+
public void setup() {
31+
sqsBackendResolver = new SqsBackendResolver();
32+
structuredTraceGraph = mock(StructuredTraceGraph.class);
33+
}
34+
35+
@Test
36+
public void TestOtelSqsBackendResolution() {
37+
String sqsConnectionString = "https://queue.amazonaws.com/80398EXAMPLE/MyQueue";
38+
URL sqsURL;
39+
try {
40+
sqsURL = new URL(sqsConnectionString);
41+
String sqsHost = sqsURL.getHost();
42+
Entity entity = sqsBackendResolver.resolveEntity(getOtelSqsBackendEvent(sqsConnectionString), structuredTraceGraph).get();
43+
Assertions.assertEquals(sqsHost, entity.getEntityName());
44+
} catch(MalformedURLException e) {
45+
Assertions.fail("Unable to create URL for given connection string");
46+
}
47+
}
48+
49+
@Test
50+
public void TestOTBackendEventResolution() {
51+
String sqsHost = "sqs.ap-south-1.amazonaws.com";
52+
Entity entity = sqsBackendResolver.resolveEntity(getOTSqsBackendEvent(sqsHost), structuredTraceGraph).get();
53+
Assertions.assertEquals(sqsHost, entity.getEntityName());
54+
}
55+
56+
private Event getOtelSqsBackendEvent(String connectionString) {
57+
Event event = Event.newBuilder().setCustomerId("customer1")
58+
.setEventId(ByteBuffer.wrap("bdf03dfabf5c70f9".getBytes()))
59+
.setEntityIdList(Arrays.asList("4bfca8f7-4974-36a4-9385-dd76bf5c8824")).setEnrichedAttributes(
60+
Attributes.newBuilder().setAttributeMap(
61+
Map.of("SPAN_TYPE", AttributeValue.newBuilder().setValue("EXIT").build())).build())
62+
.setAttributes(Attributes.newBuilder().setAttributeMap(Map
63+
.of("messaging.system", AttributeValue.newBuilder().setValue("sqs").build(),
64+
"messaging.url", AttributeValue.newBuilder().setValue(connectionString).build(),
65+
"span.kind", AttributeValue.newBuilder().setValue("client").build(),
66+
"FLAGS", AttributeValue.newBuilder().setValue("0").build())).build())
67+
.setEventName("RecieveMessage").setStartTimeMillis(1566869077746L)
68+
.setEndTimeMillis(1566869077750L).setMetrics(Metrics.newBuilder()
69+
.setMetricMap(Map.of("Duration", MetricValue.newBuilder().setValue(4.0).build())).build())
70+
.setEventRefList(Arrays.asList(
71+
EventRef.newBuilder().setTraceId(ByteBuffer.wrap("random_trace_id".getBytes()))
72+
.setEventId(ByteBuffer.wrap("random_event_id".getBytes()))
73+
.setRefType(EventRefType.CHILD_OF).build())).build();
74+
75+
76+
return event;
77+
}
78+
79+
private Event getOTSqsBackendEvent(String host) {
80+
Event event = Event.newBuilder().setCustomerId("customer1")
81+
.setEventId(ByteBuffer.wrap("bdf03dfabf5c70f9".getBytes()))
82+
.setEntityIdList(Arrays.asList("4bfca8f7-4974-36a4-9385-dd76bf5c8824")).setEnrichedAttributes(
83+
Attributes.newBuilder().setAttributeMap(
84+
Map.of("SPAN_TYPE", AttributeValue.newBuilder().setValue("EXIT").build())).build())
85+
.setAttributes(Attributes.newBuilder().setAttributeMap(Map
86+
.of("peer.service", AttributeValue.newBuilder().setValue("sqs").build(),
87+
"peer.hostname", AttributeValue.newBuilder().setValue(host).build(),
88+
"span.kind", AttributeValue.newBuilder().setValue("client").build(),
89+
"FLAGS", AttributeValue.newBuilder().setValue("0").build())).build())
90+
.setEventName("RecieveMessage").setStartTimeMillis(1566869077746L)
91+
.setEndTimeMillis(1566869077750L).setMetrics(Metrics.newBuilder()
92+
.setMetricMap(Map.of("Duration", MetricValue.newBuilder().setValue(4.0).build())).build())
93+
.setEventRefList(Arrays.asList(
94+
EventRef.newBuilder().setTraceId(ByteBuffer.wrap("random_trace_id".getBytes()))
95+
.setEventId(ByteBuffer.wrap("random_event_id".getBytes()))
96+
.setRefType(EventRefType.CHILD_OF).build())).build();
97+
98+
99+
return event;
100+
}
101+
}

semantic-convention-utils/src/main/java/org/hypertrace/semantic/convention/utils/db/DbSemanticConventionUtils.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -208,6 +208,10 @@ public static Optional<String> getBackendURIForOtelFormat(Event event) {
208208
return SpanSemanticConventionUtils.getURIForOtelFormat(event);
209209
}
210210

211+
public static Optional<String> getBackendURIForOpenTracingFormat(Event event) {
212+
return SpanSemanticConventionUtils.getURIforOpenTracingFormat(event);
213+
}
214+
211215
/**
212216
* @param attributeValueMap map of attribute key value
213217
* @return backend uri based on otel format

0 commit comments

Comments
 (0)