|
3 | 3 | import static org.junit.jupiter.api.Assertions.assertEquals; |
4 | 4 | import static org.junit.jupiter.api.Assertions.assertFalse; |
5 | 5 | import static org.junit.jupiter.api.Assertions.assertThrows; |
6 | | -import static org.mockito.Mockito.mock; |
7 | 6 |
|
8 | 7 | import com.github.benmanes.caffeine.cache.AsyncLoadingCache; |
9 | 8 | import com.github.benmanes.caffeine.cache.Caffeine; |
10 | 9 | import com.typesafe.config.Config; |
11 | 10 | import com.typesafe.config.ConfigException; |
12 | 11 | import com.typesafe.config.ConfigFactory; |
13 | 12 | import java.time.Duration; |
14 | | -import java.util.HashMap; |
15 | | -import java.util.List; |
16 | 13 | import java.util.Map; |
17 | 14 | import java.util.concurrent.CompletableFuture; |
18 | 15 | import java.util.concurrent.TimeUnit; |
19 | 16 | import org.apache.kafka.clients.consumer.Consumer; |
20 | | -import org.apache.kafka.clients.consumer.ConsumerRecord; |
21 | 17 | import org.apache.kafka.clients.consumer.MockConsumer; |
22 | 18 | import org.apache.kafka.clients.consumer.OffsetResetStrategy; |
23 | | -import org.apache.kafka.common.Node; |
24 | | -import org.apache.kafka.common.PartitionInfo; |
25 | | -import org.apache.kafka.common.TopicPartition; |
26 | 19 | import org.junit.jupiter.api.Test; |
27 | 20 |
|
28 | 21 | class KafkaLiveEventListenerTest { |
@@ -50,45 +43,29 @@ void testThrowOnInvalidInputs() { |
50 | 43 | @Test |
51 | 44 | void testEventModificationCache() throws Exception { |
52 | 45 | // kafka consumer mock setup |
53 | | - MockConsumer<String, Long> kafkaConsumer = new MockConsumer<>(OffsetResetStrategy.LATEST); |
54 | | - String topic = "event-update-topic"; |
55 | | - kafkaConsumer.updatePartitions( |
56 | | - topic, |
57 | | - List.of( |
58 | | - getPartitionInfo(topic, 0), |
59 | | - getPartitionInfo(topic, 1), |
60 | | - getPartitionInfo(topic, 2), |
61 | | - getPartitionInfo(topic, 3))); |
62 | | - HashMap<TopicPartition, Long> endOffsets = new HashMap<>(); |
63 | | - endOffsets.put(new TopicPartition(topic, 0), 50L); |
64 | | - endOffsets.put(new TopicPartition(topic, 1), 50L); |
65 | | - endOffsets.put(new TopicPartition(topic, 2), 50L); |
66 | | - endOffsets.put(new TopicPartition(topic, 3), 50L); |
67 | | - kafkaConsumer.updateEndOffsets(endOffsets); |
| 46 | + String topicName = "event-update-topic"; |
| 47 | + KafkaMockConsumerTestUtil<String, Long> mockConsumerTestUtil = |
| 48 | + new KafkaMockConsumerTestUtil<>(topicName, 4); |
68 | 49 | // create instance of event modification cache consuming from this consumer |
69 | 50 | EventModificationCache eventModificationCache = |
70 | 51 | new EventModificationCache( |
71 | 52 | "modification-event-consumer", |
72 | | - ConfigFactory.parseMap(Map.of("topic.name", topic, "poll.timeout", "5ms")), |
73 | | - kafkaConsumer); |
| 53 | + ConfigFactory.parseMap(Map.of("topic.name", topicName, "poll.timeout", "5ms")), |
| 54 | + mockConsumerTestUtil.getMockConsumer()); |
74 | 55 | Thread.sleep(10); |
75 | 56 | assertEquals(10L, eventModificationCache.get(10)); |
76 | 57 | assertEquals(100L, eventModificationCache.get(100)); |
77 | 58 | // not present key won't trigger any population but callback function should be called |
78 | | - kafkaConsumer.addRecord(new ConsumerRecord<>(topic, 0, 100, "32", 89L)); |
| 59 | + mockConsumerTestUtil.addRecordToPartition(0, "32", 89L); |
79 | 60 | Thread.sleep(100); |
80 | 61 | assertFalse(eventModificationCache.hasKey(32)); |
81 | 62 | // existing key will be modified based on entry |
82 | | - kafkaConsumer.addRecord(new ConsumerRecord<>(topic, 3, 200, "10", -3L)); |
| 63 | + mockConsumerTestUtil.addRecordToPartition(3, "10", -3L); |
83 | 64 | Thread.sleep(100); |
84 | 65 | assertEquals(-3L, eventModificationCache.get(10)); |
85 | 66 | eventModificationCache.close(); |
86 | 67 | } |
87 | 68 |
|
88 | | - private PartitionInfo getPartitionInfo(String topic, int partition) { |
89 | | - return new PartitionInfo(topic, partition, mock(Node.class), new Node[0], new Node[0]); |
90 | | - } |
91 | | - |
92 | 69 | static class EventModificationCache { |
93 | 70 | private final AsyncLoadingCache<Integer, Long> cache; |
94 | 71 | private final KafkaLiveEventListener<String, Long> eventListener; |
|
0 commit comments