2121import org .hypertrace .core .grpcutils .client .GrpcChannelConfig ;
2222import org .hypertrace .core .grpcutils .client .GrpcChannelRegistry ;
2323import org .hypertrace .core .grpcutils .client .RequestContextClientCallCredsProviderFactory ;
24+ import org .hypertrace .core .kafka .event .listener .KafkaConsumerUtils ;
2425import org .hypertrace .core .kafka .event .listener .KafkaLiveEventListener ;
2526import org .hypertrace .entity .change .event .v1 .EntityChangeEventKey ;
2627import org .hypertrace .entity .change .event .v1 .EntityChangeEventValue ;
@@ -99,7 +100,7 @@ public DefaultClientRegistry(
99100 EntityServiceClientConfig .from (config ).getCacheConfig (),
100101 cacheLoaderExecutor );
101102 this .entityChangeEventListener =
102- getEntityChangeEventConsumer (config , edsCacheClient ::updateBasedOnChangeEvent );
103+ getEntityChangeEventListener (config , edsCacheClient ::updateBasedOnChangeEvent );
103104 this .entityDataClient = EntityDataClient .builder (this .entityServiceChannel ).build ();
104105 this .entityCache = new EntityCache (this .edsCacheClient , cacheLoaderExecutor );
105106 this .entityAccessor =
@@ -197,7 +198,7 @@ protected Channel buildChannel(String host, int port, GrpcChannelConfig grpcChan
197198 }
198199
199200 private static Optional <KafkaLiveEventListener <EntityChangeEventKey , EntityChangeEventValue >>
200- getEntityChangeEventConsumer (
201+ getEntityChangeEventListener (
201202 Config clientsConfig , BiConsumer <EntityChangeEventKey , EntityChangeEventValue > callback ) {
202203 if (clientsConfig .hasPath (ENTITY_CHANGE_EVENTS_CONSUMER_ENABLED_KEY )
203204 && clientsConfig .getBoolean (ENTITY_CHANGE_EVENTS_CONSUMER_ENABLED_KEY )) {
@@ -206,14 +207,17 @@ protected Channel buildChannel(String host, int port, GrpcChannelConfig grpcChan
206207 Collections .singletonMap (
207208 "schema.registry.url" ,
208209 clientsConfig .getString (ENTITY_CHANGE_EVENTS_SCHEMA_REGISTRY_URL_KEY ));
210+ Config kafkaConfig = clientsConfig .getConfig (ENTITY_CHANGE_EVENTS_CONFIG_KEY );
209211 return Optional .of (
210212 new KafkaLiveEventListener .Builder <EntityChangeEventKey , EntityChangeEventValue >()
211213 .registerCallback (callback )
212214 .build (
213215 consumerName ,
214- clientsConfig .getConfig (ENTITY_CHANGE_EVENTS_CONFIG_KEY ),
215- getEntityChangeEventKeyDeser (deserConfig ),
216- getEntityChangeEventValueDeser (deserConfig )));
216+ kafkaConfig ,
217+ KafkaConsumerUtils .getKafkaConsumer (
218+ kafkaConfig ,
219+ getEntityChangeEventKeyDeser (deserConfig ),
220+ getEntityChangeEventValueDeser (deserConfig ))));
217221 }
218222 return Optional .empty ();
219223 }
0 commit comments