Skip to content

Commit 0d2addc

Browse files
committed
update interceptors
1 parent 46cf86a commit 0d2addc

8 files changed

Lines changed: 28 additions & 203 deletions

File tree

kafka-bom/build.gradle.kts

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -29,9 +29,6 @@ dependencies {
2929
because("[https://nvd.nist.gov/vuln/detail/CVE-2025-12183] in org.lz4:lz4-java:1.8.0")
3030
because("CVE-2025-12183 is fixed in 1.8.1")
3131
}
32-
api("org.eclipse.jetty:jetty-bom:11.0.26") {
33-
because("CVE-2025-5115 is fixed in 11.0.26")
34-
}
3532

3633
api("io.confluent:kafka-streams-avro-serde:$confluentVersion")
3734
api("io.confluent:kafka-protobuf-serializer:$confluentVersion")
@@ -40,6 +37,6 @@ dependencies {
4037
api("org.apache.kafka:kafka-clients:$confluentCcsVersion")
4138
api("org.apache.kafka:kafka-streams:$confluentCcsVersion")
4239
api("org.apache.kafka:kafka-streams-test-utils:$confluentCcsVersion")
43-
api("org.apache.avro:avro:1.12.0")
40+
api("org.apache.avro:avro:1.12.1")
4441
}
4542
}

kafka-streams-framework/src/main/java/org/hypertrace/core/kafkastreams/framework/KafkaStreamsApp.java

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -46,8 +46,6 @@
4646
import org.apache.kafka.streams.kstream.KStream;
4747
import org.hypertrace.core.grpcutils.client.GrpcChannelRegistry;
4848
import org.hypertrace.core.grpcutils.client.GrpcRegistryConfig;
49-
import org.hypertrace.core.kafkastreams.framework.interceptors.StreamsBuilderWithInterceptor;
50-
import org.hypertrace.core.kafkastreams.framework.interceptors.metrics.MetricsInterceptorFactory;
5149
import org.hypertrace.core.kafkastreams.framework.listeners.LoggingStateListener;
5250
import org.hypertrace.core.kafkastreams.framework.listeners.LoggingStateRestoreListener;
5351
import org.hypertrace.core.kafkastreams.framework.rocksdb.BoundedMemoryConfigSetter;
@@ -104,11 +102,8 @@ protected void doInit() {
104102
streamsConfig = mergeProperties(getBaseStreamsConfig(), getJobStreamsConfig(getAppConfig()));
105103
// build topologies
106104
Map<String, KStream<?, ?>> sourceStreams = new HashMap<>();
107-
// initialize MetricsInterceptorFactory
108-
MetricsInterceptorFactory metricsInterceptorFactory = new MetricsInterceptorFactory();
109-
StreamsBuilder streamsBuilder =
110-
new StreamsBuilderWithInterceptor(List.of(metricsInterceptorFactory::create));
111105

106+
StreamsBuilder streamsBuilder = new StreamsBuilder();
112107
streamsBuilder = buildTopology(streamsConfig, streamsBuilder, sourceStreams);
113108
this.topology = streamsBuilder.build();
114109

kafka-streams-framework/src/main/java/org/hypertrace/core/kafkastreams/framework/interceptors/StreamsBuilderWithInterceptor.java

Lines changed: 0 additions & 34 deletions
This file was deleted.
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,12 @@
11
package org.hypertrace.core.kafkastreams.framework.interceptors.metrics;
22

33
import io.micrometer.core.instrument.Counter;
4-
import org.apache.kafka.streams.processor.api.Processor;
5-
import org.apache.kafka.streams.processor.api.Record;
4+
import java.util.Map;
5+
import org.apache.kafka.clients.consumer.ConsumerInterceptor;
6+
import org.apache.kafka.clients.consumer.ConsumerRecord;
7+
import org.apache.kafka.clients.consumer.ConsumerRecords;
68

7-
public class MetricsInterceptor implements Processor {
9+
public class MetricsInterceptor implements ConsumerInterceptor<Object, Object> {
810

911
private final Counter timeLagCounter;
1012
private final Counter numRecordsCounter;
@@ -15,8 +17,26 @@ public class MetricsInterceptor implements Processor {
1517
}
1618

1719
@Override
18-
public void process(Record record) {
19-
timeLagCounter.increment(System.currentTimeMillis() - record.timestamp());
20-
numRecordsCounter.increment();
20+
public ConsumerRecords<Object, Object> onConsume(ConsumerRecords<Object, Object> records) {
21+
for (ConsumerRecord<Object, Object> record : records) {
22+
timeLagCounter.increment(System.currentTimeMillis() - record.timestamp());
23+
numRecordsCounter.increment();
24+
}
25+
return records;
26+
}
27+
28+
@Override
29+
public void close() {
30+
// no-op
31+
}
32+
33+
@Override
34+
public void onCommit(Map offsets) {
35+
// no-op
36+
}
37+
38+
@Override
39+
public void configure(Map<String, ?> configs) {
40+
// no-op
2141
}
2242
}

kafka-streams-framework/src/main/java/org/hypertrace/core/kafkastreams/framework/interceptors/metrics/MetricsInterceptorFactory.java

Lines changed: 0 additions & 24 deletions
This file was deleted.

kafka-streams-framework/src/test/java/org/hypertrace/core/kafkastreams/framework/interceptors/CachingInterceptor.java

Lines changed: 0 additions & 20 deletions
This file was deleted.

kafka-streams-framework/src/test/java/org/hypertrace/core/kafkastreams/framework/interceptors/CachingInterceptorFactory.java

Lines changed: 0 additions & 21 deletions
This file was deleted.

kafka-streams-framework/src/test/java/org/hypertrace/core/kafkastreams/framework/interceptors/StreamsBuilderWithInterceptorTest.java

Lines changed: 0 additions & 88 deletions
This file was deleted.

0 commit comments

Comments
 (0)