Skip to content

Commit 2f95e81

Browse files
committed
add unit tests
1 parent 4e691ee commit 2f95e81

2 files changed

Lines changed: 33 additions & 17 deletions

File tree

kafka-streams-framework/src/main/java/org/hypertrace/core/kafkastreams/framework/interceptors/metrics/MetricsInterceptor.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@ public class MetricsInterceptor implements ConsumerInterceptor<Object, Object> {
1515

1616
@Override
1717
public ConsumerRecords<Object, Object> onConsume(ConsumerRecords<Object, Object> records) {
18-
System.out.println("Hello world! 2.0");
1918
for (ConsumerRecord<Object, Object> record : records) {
2019
timeLagCounter.increment(System.currentTimeMillis() - record.timestamp());
2120
numRecordsCounter.increment();
@@ -35,7 +34,6 @@ public void onCommit(Map offsets) {
3534

3635
@Override
3736
public void configure(Map<String, ?> configs) {
38-
System.out.println("Hello world! 3.0");
3937
this.timeLagCounter = PlatformMetricsRegistry.getMeterRegistry().counter(TIME_LAG_COUNTER_NAME);
4038
this.numRecordsCounter =
4139
PlatformMetricsRegistry.getMeterRegistry().counter(NUM_RECORDS_COUNTER_NAME);
Original file line numberDiff line numberDiff line change
@@ -1,30 +1,48 @@
11
package org.hypertrace.core.kafkastreams.framework.interceptors.metrics;
22

3-
import static org.mockito.Mockito.mock;
3+
import static org.junit.jupiter.api.Assertions.assertEquals;
4+
import static org.junit.jupiter.api.Assertions.assertNotEquals;
45

5-
import io.micrometer.core.instrument.Counter;
6+
import com.google.common.collect.Maps;
7+
import com.typesafe.config.ConfigFactory;
8+
import java.util.HashMap;
9+
import java.util.List;
10+
import java.util.Map;
11+
import org.apache.kafka.clients.consumer.ConsumerRecord;
12+
import org.apache.kafka.clients.consumer.ConsumerRecords;
13+
import org.apache.kafka.common.TopicPartition;
14+
import org.hypertrace.core.serviceframework.metrics.PlatformMetricsRegistry;
615
import org.junit.jupiter.api.BeforeEach;
716
import org.junit.jupiter.api.Test;
817

918
public class MetricsInterceptorTest {
1019

11-
private Counter timeLagCounter;
12-
private Counter numRecordsCounter;
13-
private MetricsInterceptor interceptor;
14-
1520
@BeforeEach
1621
void setup() {
17-
timeLagCounter = mock(Counter.class);
18-
numRecordsCounter = mock(Counter.class);
22+
Map<String, Object> config = new HashMap<>();
23+
config.put("reporter.names", List.of("testing"));
24+
25+
PlatformMetricsRegistry.initMetricsRegistry("test", ConfigFactory.parseMap(config));
1926
}
2027

2128
@Test
22-
void shouldIncrementCounters() {
23-
// Record<Object, Object> record =
24-
// new Record<>("key", "value", System.currentTimeMillis() - 50000);
25-
// interceptor.process(record);
26-
//
27-
// verify(numRecordsCounter, times(1)).increment();
28-
// verify(timeLagCounter, times(1)).increment(anyDouble());
29+
public void testOnConsume() {
30+
TopicPartition tp = new TopicPartition("test-topic", 0);
31+
List<ConsumerRecord<Object, Object>> records =
32+
List.of(
33+
new ConsumerRecord<>("test-topic", 0, 0, "k1", "v1"),
34+
new ConsumerRecord<>("test-topic", 0, 1, "k2", null));
35+
36+
Map<TopicPartition, List<ConsumerRecord<Object, Object>>> map = Map.of(tp, records);
37+
ConsumerRecords<Object, Object> input = new ConsumerRecords<>(map);
38+
MetricsInterceptor interceptor = new MetricsInterceptor();
39+
interceptor.configure(Maps.newTreeMap());
40+
ConsumerRecords<Object, Object> output = interceptor.onConsume(input);
41+
assertEquals(2, output.count());
42+
43+
assertEquals(
44+
2.0, PlatformMetricsRegistry.getMeterRegistry().counter("kafka_records_count").count());
45+
assertNotEquals(
46+
0, PlatformMetricsRegistry.getMeterRegistry().counter("kafka_records_time_lag").count());
2947
}
3048
}

0 commit comments

Comments
 (0)