Skip to content

Commit daf25ed

Browse files
feat: add space enrichment to ingestion pipeline (#113)
* feat: add space enrichment to ingestion pipeline * fix: make backwards compatible * refactor: extract host + port config keys to statics
1 parent 813b68a commit daf25ed

17 files changed

Lines changed: 532 additions & 82 deletions

File tree

hypertrace-trace-enricher/enriched-span-constants/build.gradle.kts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ protobuf {
1616
}
1717
plugins {
1818
id("grpc_java") {
19-
artifact = "io.grpc:protoc-gen-grpc-java:1.32.1"
19+
artifact = "io.grpc:protoc-gen-grpc-java:1.35.0"
2020
}
2121

2222
if (generateLocalGoGrpcFiles) {

hypertrace-trace-enricher/helm/templates/trace-enricher-config.yaml

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,4 +68,15 @@ data:
6868
port = 9012
6969
}
7070
}
71+
72+
SpaceEnricher {
73+
config.service.config = {
74+
host = config-service
75+
port = 50101
76+
}
77+
attribute.service.config = {
78+
host = attribute-service
79+
port = 9012
80+
}
81+
}
7182
}

hypertrace-trace-enricher/hypertrace-trace-enricher-impl/build.gradle.kts

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,9 @@ dependencies {
1919
implementation("org.hypertrace.core.datamodel:data-model:0.1.12")
2020
implementation("org.hypertrace.entity.service:entity-service-client:0.1.23")
2121
implementation("org.hypertrace.core.serviceframework:platform-metrics:0.1.18")
22+
implementation("org.hypertrace.core.grpcutils:grpc-client-utils:0.3.3")
23+
implementation("org.hypertrace.config.service:spaces-config-service-api:0.1.0")
24+
implementation("org.hypertrace.core.grpcutils:grpc-context-utils:0.3.3")
2225

2326
implementation("com.typesafe:config:1.4.1")
2427
implementation("org.apache.httpcomponents:httpclient:4.5.13")
@@ -29,4 +32,6 @@ dependencies {
2932

3033
testImplementation("org.junit.jupiter:junit-jupiter:5.7.0")
3134
testImplementation("org.mockito:mockito-core:3.6.28")
35+
testImplementation("org.mockito:mockito-junit-jupiter:3.6.28")
36+
testImplementation("io.grpc:grpc-core:1.35.0")
3237
}

hypertrace-trace-enricher/hypertrace-trace-enricher-impl/src/main/java/org/hypertrace/traceenricher/enrichment/enrichers/SpaceEnricher.java

Lines changed: 0 additions & 38 deletions
This file was deleted.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
package org.hypertrace.traceenricher.enrichment.enrichers.space;
2+
3+
import static org.hypertrace.traceenricher.enrichedspan.constants.EnrichedSpanConstants.SPACE_IDS_ATTRIBUTE;
4+
5+
import com.google.common.annotations.VisibleForTesting;
6+
import com.typesafe.config.Config;
7+
import io.grpc.Channel;
8+
import io.grpc.ManagedChannelBuilder;
9+
import java.util.Collection;
10+
import java.util.List;
11+
import java.util.stream.Collectors;
12+
import org.hypertrace.core.attribute.service.cachingclient.CachingAttributeClient;
13+
import org.hypertrace.core.datamodel.Event;
14+
import org.hypertrace.core.datamodel.StructuredTrace;
15+
import org.hypertrace.core.datamodel.shared.trace.AttributeValueCreator;
16+
import org.hypertrace.entity.data.service.client.EntityDataServiceClientProvider;
17+
import org.hypertrace.trace.reader.attributes.TraceAttributeReader;
18+
import org.hypertrace.traceenricher.enrichedspan.constants.utils.EnrichedSpanUtils;
19+
import org.hypertrace.traceenricher.enrichment.AbstractTraceEnricher;
20+
21+
public class SpaceEnricher extends AbstractTraceEnricher {
22+
private static final String CONFIG_SERVICE_HOST_KEY = "config.service.config.host";
23+
private static final String CONFIG_SERVICE_PORT_KEY = "config.service.config.port";
24+
private static final String ATTRIBUTE_SERVICE_HOST_KEY = "attribute.service.config.host";
25+
private static final String ATTRIBUTE_SERVICE_PORT_KEY = "attribute.service.config.port";
26+
27+
private SpaceRulesCachingClient ruleClient;
28+
private SpaceRuleEvaluator ruleEvaluator;
29+
30+
@Override
31+
public void init(Config enricherConfig, EntityDataServiceClientProvider provider) {
32+
super.init(enricherConfig, provider);
33+
34+
Channel configChannel =
35+
ManagedChannelBuilder.forAddress(
36+
enricherConfig.getString(CONFIG_SERVICE_HOST_KEY),
37+
enricherConfig.getInt(CONFIG_SERVICE_PORT_KEY))
38+
.usePlaintext()
39+
.build();
40+
41+
Channel attributeChannel =
42+
ManagedChannelBuilder.forAddress(
43+
enricherConfig.getString(ATTRIBUTE_SERVICE_HOST_KEY),
44+
enricherConfig.getInt(ATTRIBUTE_SERVICE_PORT_KEY))
45+
.usePlaintext()
46+
.build();
47+
48+
// TODO - we need a way to share caching clients like the attribute reader across enrichers
49+
this.init(
50+
new SpaceRulesCachingClient(configChannel),
51+
new SpaceRuleEvaluator(
52+
TraceAttributeReader.build(CachingAttributeClient.builder(attributeChannel).build())));
53+
}
54+
55+
/**
56+
* The current design of enrichers with required no arg constructors does not allow mocking
57+
* dependencies, so exposing an init method to support mocking clients for tests
58+
*/
59+
@VisibleForTesting
60+
void init(SpaceRulesCachingClient ruleClient, SpaceRuleEvaluator ruleEvaluator) {
61+
this.ruleClient = ruleClient;
62+
this.ruleEvaluator = ruleEvaluator;
63+
}
64+
65+
@Override
66+
public void enrichEvent(StructuredTrace trace, Event event) {
67+
addEnrichedAttribute(
68+
event,
69+
SPACE_IDS_ATTRIBUTE,
70+
AttributeValueCreator.create(this.calculateSpaces(trace, event)));
71+
}
72+
73+
@Override
74+
public void enrichTrace(StructuredTrace trace) {
75+
List<String> includedSpaceIds =
76+
trace.getEventList().stream()
77+
.map(EnrichedSpanUtils::getSpaceIds)
78+
.flatMap(Collection::stream)
79+
.distinct()
80+
.collect(Collectors.toList());
81+
82+
trace
83+
.getAttributes()
84+
.getAttributeMap()
85+
.put(SPACE_IDS_ATTRIBUTE, AttributeValueCreator.create(includedSpaceIds));
86+
}
87+
88+
private List<String> calculateSpaces(StructuredTrace trace, Event span) {
89+
return this.ruleClient.getRulesForTenant(span.getCustomerId()).stream()
90+
.map(rule -> this.ruleEvaluator.calculateSpacesForRule(trace, span, rule))
91+
.flatMap(Collection::stream)
92+
.distinct()
93+
.collect(Collectors.toList());
94+
}
95+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
package org.hypertrace.traceenricher.enrichment.enrichers.space;
2+
3+
import java.util.Collections;
4+
import java.util.List;
5+
import java.util.Optional;
6+
import org.hypertrace.core.attribute.service.v1.LiteralValue;
7+
import org.hypertrace.core.datamodel.Event;
8+
import org.hypertrace.core.datamodel.StructuredTrace;
9+
import org.hypertrace.spaces.config.service.v1.AttributeValueRuleData;
10+
import org.hypertrace.spaces.config.service.v1.SpaceConfigRule;
11+
import org.hypertrace.trace.reader.attributes.TraceAttributeReader;
12+
13+
class SpaceRuleEvaluator {
14+
private final TraceAttributeReader attributeReader;
15+
16+
SpaceRuleEvaluator(TraceAttributeReader attributeReader) {
17+
this.attributeReader = attributeReader;
18+
}
19+
20+
public List<String> calculateSpacesForRule(
21+
StructuredTrace trace, Event span, SpaceConfigRule rule) {
22+
switch (rule.getRuleDataCase()) {
23+
case ATTRIBUTE_VALUE_RULE_DATA:
24+
return this.calculateSpacesForAttribute(trace, span, rule.getAttributeValueRuleData());
25+
case RULEDATA_NOT_SET:
26+
default:
27+
return List.of();
28+
}
29+
}
30+
31+
private List<String> calculateSpacesForAttribute(
32+
StructuredTrace trace, Event span, AttributeValueRuleData attributeValueRuleData) {
33+
34+
return this.attributeReader
35+
.getSpanValue(
36+
trace,
37+
span,
38+
attributeValueRuleData.getAttributeScope(),
39+
attributeValueRuleData.getAttributeKey())
40+
.mapOptional(this::stringifyLiteral)
41+
.filter(string -> !string.isEmpty())
42+
.map(List::of)
43+
.onErrorComplete()
44+
.defaultIfEmpty(Collections.emptyList())
45+
.blockingGet();
46+
}
47+
48+
private Optional<String> stringifyLiteral(LiteralValue literalValue) {
49+
switch (literalValue.getValueCase()) {
50+
case INT_VALUE:
51+
return Optional.of(String.valueOf(literalValue.getIntValue()));
52+
case STRING_VALUE:
53+
return Optional.of(literalValue.getStringValue());
54+
case FLOAT_VALUE:
55+
return Optional.of(String.valueOf(literalValue.getFloatValue()));
56+
case BOOLEAN_VALUE:
57+
return Optional.of(String.valueOf(literalValue.getBooleanValue()));
58+
case VALUE_NOT_SET:
59+
default:
60+
return Optional.empty();
61+
}
62+
}
63+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
package org.hypertrace.traceenricher.enrichment.enrichers.space;
2+
3+
import com.google.common.cache.CacheBuilder;
4+
import com.google.common.cache.CacheLoader;
5+
import com.google.common.cache.LoadingCache;
6+
import com.google.common.cache.Weigher;
7+
import io.grpc.Channel;
8+
import java.util.Collections;
9+
import java.util.List;
10+
import java.util.concurrent.TimeUnit;
11+
import org.hypertrace.core.grpcutils.client.GrpcClientRequestContextUtil;
12+
import org.hypertrace.core.grpcutils.client.RequestContextClientCallCredsProviderFactory;
13+
import org.hypertrace.spaces.config.service.v1.GetRulesRequest;
14+
import org.hypertrace.spaces.config.service.v1.SpaceConfigRule;
15+
import org.hypertrace.spaces.config.service.v1.SpacesConfigServiceGrpc;
16+
import org.hypertrace.spaces.config.service.v1.SpacesConfigServiceGrpc.SpacesConfigServiceBlockingStub;
17+
import org.slf4j.Logger;
18+
import org.slf4j.LoggerFactory;
19+
20+
class SpaceRulesCachingClient {
21+
private static final Logger LOG = LoggerFactory.getLogger(SpaceRulesCachingClient.class);
22+
23+
private final SpacesConfigServiceBlockingStub configServiceStub;
24+
25+
public SpaceRulesCachingClient(Channel spacesConfigChannel) {
26+
this.configServiceStub =
27+
SpacesConfigServiceGrpc.newBlockingStub(spacesConfigChannel)
28+
.withCallCredentials(
29+
RequestContextClientCallCredsProviderFactory.getClientCallCredsProvider().get());
30+
}
31+
32+
private final LoadingCache<String, List<SpaceConfigRule>> cache =
33+
CacheBuilder.newBuilder()
34+
.expireAfterWrite(3, TimeUnit.MINUTES)
35+
.maximumWeight(10_000)
36+
.weigher((Weigher<String, List<SpaceConfigRule>>) (key, value) -> value.size())
37+
.build(CacheLoader.from(this::loadRulesForTenant));
38+
39+
public List<SpaceConfigRule> getRulesForTenant(String tenantId) {
40+
try {
41+
return cache.get(tenantId);
42+
} catch (Exception exception) {
43+
LOG.error("Error fetching space config rules", exception);
44+
return Collections.emptyList();
45+
}
46+
}
47+
48+
private List<SpaceConfigRule> loadRulesForTenant(String tenantId) {
49+
return GrpcClientRequestContextUtil.executeInTenantContext(
50+
tenantId,
51+
() -> this.configServiceStub.getRules(GetRulesRequest.getDefaultInstance()).getRulesList());
52+
}
53+
}

hypertrace-trace-enricher/hypertrace-trace-enricher-impl/src/test/java/org/hypertrace/traceenricher/enrichment/enrichers/SpaceEnricherTest.java

Lines changed: 0 additions & 34 deletions
This file was deleted.

0 commit comments

Comments
 (0)