Skip to content

Commit f12463f

Browse files
authored
Add partitioner config service (#154)
* 1. Introduce GlobalConfigServiceFactory to create global services that are not tied to a tenant 2. Add TenantIsolationConfigService that managed tenant isolation config. This config will be fetched by all stream processing kafka streams apps * Address review comments * Replace with a single object for persistence * remove unused dependencies and cleanup * remove commented code * add validation logic to check if tenant_ids are disjoint across groups * remove config in application.conf, remove service factory and integration test. * address few more review comments * use ImmutableList.of * rename the service and use new protos * fix the name * address review comments * remove the integration test, GlobalConfigServiceFactory and related changes * address few more review comments
1 parent db14728 commit f12463f

15 files changed

Lines changed: 521 additions & 0 deletions

File tree

buf.work.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,3 +8,4 @@ directories:
88
- notification-rule-config-service-api/src/main/proto
99
- spaces-config-service-api/src/main/proto
1010
- span-processing-config-service-api/src/main/proto
11+
- partitioner-config-service-api/src/main/proto

config-service/src/integrationTest/resources/configs/config-service/application.conf

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ service.name = config-service
33
service.port = 50101
44
service.admin.port = 50102
55

6+
67
generic.config.service {
78
document.store {
89
dataStoreType = mongo
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
import com.google.protobuf.gradle.generateProtoTasks
2+
import com.google.protobuf.gradle.id
3+
import com.google.protobuf.gradle.ofSourceSet
4+
import com.google.protobuf.gradle.plugins
5+
import com.google.protobuf.gradle.protobuf
6+
import com.google.protobuf.gradle.protoc
7+
8+
plugins {
9+
`java-library`
10+
id("com.google.protobuf") version "0.8.17"
11+
id("org.hypertrace.publish-plugin")
12+
}
13+
14+
protobuf {
15+
protoc {
16+
artifact = "com.google.protobuf:protoc:${libs.versions.protoc.get()}"
17+
}
18+
plugins {
19+
id("grpc") {
20+
artifact = "io.grpc:protoc-gen-grpc-java:${libs.versions.grpc.get()}"
21+
}
22+
}
23+
generateProtoTasks {
24+
ofSourceSet("main").forEach { task ->
25+
task.plugins {
26+
id("grpc")
27+
}
28+
}
29+
}
30+
}
31+
32+
dependencies {
33+
api(libs.bundles.grpc.api)
34+
}
35+
36+
sourceSets {
37+
main {
38+
java {
39+
srcDirs("build/generated/source/proto/main/java", "build/generated/source/proto/main/grpc")
40+
}
41+
}
42+
}
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
version: v1
2+
breaking:
3+
use:
4+
- PACKAGE
5+
- WIRE_JSON
6+
lint:
7+
use:
8+
- DEFAULT
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
syntax = "proto3";
2+
3+
package org.hypertrace.partitioner.config.service.v1;
4+
5+
option java_multiple_files = true;
6+
7+
8+
service PartitionerConfigService {
9+
10+
rpc GetPartitionerProfile(GetPartitionerProfileRequest) returns (GetPartitionerProfileResponse) {}
11+
12+
rpc GetPartitionerProfiles(GetPartitionerProfilesRequest) returns (GetPartitionerProfilesResponse) {}
13+
14+
rpc PutPartitionerProfiles(PutPartitionerProfilesRequest) returns (PutPartitionerProfilesResponse) {}
15+
16+
rpc DeletePartitionerProfiles(DeletePartitionerProfilesRequest) returns (DeletePartitionerProfilesResponse) {}
17+
18+
}
19+
20+
message GetPartitionerProfileRequest {
21+
string profile_name = 1; // not optional
22+
}
23+
24+
message GetPartitionerProfileResponse {
25+
PartitionerProfile profile = 1;
26+
}
27+
28+
message PutPartitionerProfilesRequest {
29+
repeated PartitionerProfile profiles = 1;
30+
}
31+
32+
message PutPartitionerProfilesResponse {
33+
}
34+
35+
message GetPartitionerProfilesRequest {
36+
}
37+
38+
message GetPartitionerProfilesResponse {
39+
repeated PartitionerProfile profiles = 1;
40+
}
41+
42+
message DeletePartitionerProfilesRequest {
43+
repeated string profile_names = 1;
44+
}
45+
46+
message DeletePartitionerProfilesResponse {
47+
}
48+
49+
message PartitionerProfile {
50+
string name = 1;
51+
string partition_key = 2;
52+
repeated PartitionerGroup groups = 3;
53+
int32 default_group_weight = 4;
54+
}
55+
56+
message PartitionerGroup {
57+
string name = 1;
58+
repeated string member_ids = 2;
59+
int32 weight = 3;
60+
}
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
plugins {
2+
`java-library`
3+
jacoco
4+
id("org.hypertrace.jacoco-report-plugin")
5+
}
6+
7+
dependencies {
8+
api(projects.partitionerConfigServiceApi)
9+
implementation(libs.protobuf.javautil)
10+
11+
implementation(libs.guice)
12+
implementation(libs.slf4j.api)
13+
implementation(libs.typesafe.config)
14+
implementation(libs.hypertrace.documentstore)
15+
16+
annotationProcessor(libs.lombok)
17+
compileOnly(libs.lombok)
18+
19+
testImplementation(libs.junit.jupiter)
20+
testImplementation(libs.mockito.core)
21+
testImplementation(libs.mockito.junit)
22+
testImplementation(testFixtures(projects.configServiceApi))
23+
}
24+
25+
tasks.test {
26+
useJUnitPlatform()
27+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
package org.hypertrace.partitioner.config.service;
2+
3+
import com.google.inject.Guice;
4+
import com.google.inject.Injector;
5+
import com.typesafe.config.Config;
6+
import io.grpc.BindableService;
7+
8+
public class PartitionerConfigServiceFactory {
9+
public static BindableService build(Config config) {
10+
Injector injector = Guice.createInjector(new PartitionerConfigServiceModule(config));
11+
return injector.getInstance(BindableService.class);
12+
}
13+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
package org.hypertrace.partitioner.config.service;
2+
3+
import com.google.inject.Inject;
4+
import io.grpc.Status;
5+
import io.grpc.stub.StreamObserver;
6+
import java.util.List;
7+
import java.util.Optional;
8+
import lombok.extern.slf4j.Slf4j;
9+
import org.hypertrace.partitioner.config.service.store.PartitionerProfilesStore;
10+
import org.hypertrace.partitioner.config.service.v1.*;
11+
12+
@Slf4j
13+
public class PartitionerConfigServiceImpl
14+
extends PartitionerConfigServiceGrpc.PartitionerConfigServiceImplBase {
15+
16+
private final PartitionerProfilesStore partitionerProfilesStore;
17+
private final PartitionerConfigServiceRequestValidator validator;
18+
19+
@Inject
20+
public PartitionerConfigServiceImpl(
21+
PartitionerProfilesStore partitionerProfilesStore,
22+
PartitionerConfigServiceRequestValidator validator) {
23+
this.partitionerProfilesStore = partitionerProfilesStore;
24+
this.validator = validator;
25+
}
26+
27+
public void getPartitionerProfile(
28+
GetPartitionerProfileRequest request,
29+
StreamObserver<GetPartitionerProfileResponse> responseObserver) {
30+
try {
31+
validator.validateOrThrow(request);
32+
Optional<PartitionerProfile> profile =
33+
partitionerProfilesStore.getPartitionerProfile(request.getProfileName());
34+
if (profile.isPresent()) {
35+
responseObserver.onNext(
36+
GetPartitionerProfileResponse.newBuilder().setProfile(profile.get()).build());
37+
} else {
38+
throw Status.NOT_FOUND
39+
.withDescription(
40+
String.format("Partitioner profile for {} not found", request.getProfileName()))
41+
.asRuntimeException();
42+
}
43+
responseObserver.onCompleted();
44+
} catch (Exception e) {
45+
log.error("Get profile failed for request:{}", request, e);
46+
responseObserver.onError(e);
47+
}
48+
}
49+
50+
public void putPartitionerProfiles(
51+
PutPartitionerProfilesRequest request,
52+
StreamObserver<PutPartitionerProfilesResponse> responseObserver) {
53+
try {
54+
validator.validateOrThrow(request);
55+
partitionerProfilesStore.putPartitionerProfiles(request.getProfilesList());
56+
responseObserver.onNext(PutPartitionerProfilesResponse.newBuilder().build());
57+
responseObserver.onCompleted();
58+
} catch (Exception e) {
59+
log.error("Put profiles failed for request:{}", request, e);
60+
responseObserver.onError(e);
61+
}
62+
}
63+
64+
public void getPartitionerProfiles(
65+
GetPartitionerProfilesRequest request,
66+
StreamObserver<GetPartitionerProfilesResponse> responseObserver) {
67+
try {
68+
List<PartitionerProfile> profiles = partitionerProfilesStore.getAllPartitionProfiles();
69+
responseObserver.onNext(
70+
GetPartitionerProfilesResponse.newBuilder().addAllProfiles(profiles).build());
71+
responseObserver.onCompleted();
72+
} catch (Exception e) {
73+
log.error("Get profiles failed for request:{}", request, e);
74+
responseObserver.onError(e);
75+
}
76+
}
77+
78+
public void deletePartitionerProfiles(
79+
DeletePartitionerProfilesRequest request,
80+
StreamObserver<DeletePartitionerProfilesResponse> responseObserver) {
81+
try {
82+
validator.validateOrThrow(request);
83+
partitionerProfilesStore.deletePartitionerProfiles(request.getProfileNamesList());
84+
responseObserver.onNext(DeletePartitionerProfilesResponse.newBuilder().build());
85+
responseObserver.onCompleted();
86+
} catch (Exception e) {
87+
log.error("Delete profiles failed for request:{}", request, e);
88+
responseObserver.onError(e);
89+
}
90+
}
91+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
package org.hypertrace.partitioner.config.service;
2+
3+
import com.google.inject.AbstractModule;
4+
import com.typesafe.config.Config;
5+
import io.grpc.BindableService;
6+
import org.hypertrace.core.documentstore.Datastore;
7+
import org.hypertrace.core.documentstore.DatastoreProvider;
8+
import org.hypertrace.partitioner.config.service.store.PartitionerProfilesDocumentStore;
9+
import org.hypertrace.partitioner.config.service.store.PartitionerProfilesStore;
10+
11+
public class PartitionerConfigServiceModule extends AbstractModule {
12+
13+
public static final String GENERIC_CONFIG_SERVICE = "generic.config.service";
14+
public static final String DOC_STORE_CONFIG_KEY = "document.store";
15+
public static final String DATA_STORE_TYPE = "dataStoreType";
16+
17+
private final Config config;
18+
19+
public PartitionerConfigServiceModule(Config config) {
20+
this.config = config;
21+
}
22+
23+
@Override
24+
protected void configure() {
25+
bind(BindableService.class).to(PartitionerConfigServiceImpl.class);
26+
bind(PartitionerProfilesStore.class)
27+
.toInstance(getDocumentStore(config.getConfig(GENERIC_CONFIG_SERVICE)));
28+
}
29+
30+
private PartitionerProfilesDocumentStore getDocumentStore(Config config) {
31+
Config docStoreConfig = config.getConfig(DOC_STORE_CONFIG_KEY);
32+
String dataStoreType = docStoreConfig.getString(DATA_STORE_TYPE);
33+
Config dataStoreConfig = docStoreConfig.getConfig(dataStoreType);
34+
Datastore datastore = DatastoreProvider.getDatastore(dataStoreType, dataStoreConfig);
35+
return new PartitionerProfilesDocumentStore(datastore);
36+
}
37+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,100 @@
1+
package org.hypertrace.partitioner.config.service;
2+
3+
import io.grpc.Status;
4+
import java.util.HashMap;
5+
import java.util.Map;
6+
import org.hypertrace.partitioner.config.service.v1.*;
7+
8+
public class PartitionerConfigServiceRequestValidator {
9+
10+
public void validateOrThrow(GetPartitionerProfileRequest request) {
11+
if (request.getProfileName().isBlank()) {
12+
throw Status.INVALID_ARGUMENT.withDescription("profile cannot be empty").asRuntimeException();
13+
}
14+
}
15+
16+
public void validateOrThrow(PutPartitionerProfilesRequest request) {
17+
if (request.getProfilesCount() == 0) {
18+
throw Status.INVALID_ARGUMENT.withDescription("profiles can't be empty").asRuntimeException();
19+
}
20+
21+
request
22+
.getProfilesList()
23+
.forEach(
24+
profile -> {
25+
if (profile.getName().isBlank()) {
26+
throw Status.INVALID_ARGUMENT
27+
.withDescription("profile cannot be empty")
28+
.asRuntimeException();
29+
}
30+
if (profile.getGroupsCount() == 0) {
31+
throw Status.INVALID_ARGUMENT
32+
.withDescription("partition_groups cannot be empty")
33+
.asRuntimeException();
34+
}
35+
if (profile.getPartitionKey().isBlank()) {
36+
throw Status.INVALID_ARGUMENT
37+
.withDescription("partition key cannot be empty")
38+
.asRuntimeException();
39+
}
40+
profile
41+
.getGroupsList()
42+
.forEach(
43+
partitionerGroup -> {
44+
if (partitionerGroup.getName().isBlank()) {
45+
throw Status.INVALID_ARGUMENT
46+
.withDescription("partition_group name cannot be empty")
47+
.asRuntimeException();
48+
}
49+
if (partitionerGroup.getMemberIdsCount() == 0) {
50+
throw Status.INVALID_ARGUMENT
51+
.withDescription("partition_group memberIds cannot be empty")
52+
.asRuntimeException();
53+
}
54+
});
55+
56+
Map<String, String> memberIdPartitionerGroupName = new HashMap<>();
57+
profile
58+
.getGroupsList()
59+
.forEach(
60+
partitionerGroup -> {
61+
partitionerGroup
62+
.getMemberIdsList()
63+
.forEach(
64+
memberId -> {
65+
if (memberIdPartitionerGroupName.containsKey(memberId)) {
66+
throw Status.INVALID_ARGUMENT
67+
.withDescription(
68+
String.format(
69+
"memberId {%s} already member of group {%s}",
70+
memberId,
71+
memberIdPartitionerGroupName.get(memberId)))
72+
.asRuntimeException();
73+
} else {
74+
memberIdPartitionerGroupName.put(
75+
memberId, partitionerGroup.getName());
76+
}
77+
});
78+
});
79+
});
80+
}
81+
82+
public void validateOrThrow(DeletePartitionerProfilesRequest request) {
83+
if (request.getProfileNamesCount() == 0) {
84+
throw Status.INVALID_ARGUMENT
85+
.withDescription("profile names can't be empty")
86+
.asRuntimeException();
87+
}
88+
89+
request
90+
.getProfileNamesList()
91+
.forEach(
92+
profile -> {
93+
if (profile.isBlank()) {
94+
throw Status.INVALID_ARGUMENT
95+
.withDescription("profile cannot be empty")
96+
.asRuntimeException();
97+
}
98+
});
99+
}
100+
}

0 commit comments

Comments
 (0)