From e84d06775b8a49545271f6dabee634097e74d4e9 Mon Sep 17 00:00:00 2001 From: James Faulkner Date: Mon, 8 Jun 2026 10:23:52 +0100 Subject: [PATCH 01/29] feat: add Micrometer KafkaClientMetrics to KafkaMessageReader Binds the Kafka consumer's native metrics (~30 out of the box: fetch latency, consumer lag, heartbeat rate, etc.) to a MeterRegistry via KafkaClientMetrics. The registry is injected explicitly via the builder withMeterRegistry(...) and defaults to null (opt-in). Bumps Micrometer from 1.9.9 to 1.14.14 to pick up KafkaClientMetrics support. Also adds .sdkmanrc pinning Java 11 for local builds (required by Spotless/google-java-format compatibility). Co-Authored-By: Claude Sonnet 4.6 --- .sdkmanrc | 1 + .../kafka-metastore-receiver/pom.xml | 4 +++ .../kafka/messaging/KafkaMessageReader.java | 27 +++++++++++++++---- .../messaging/KafkaMessageReaderTest.java | 8 ++++-- pom.xml | 2 +- 5 files changed, 34 insertions(+), 8 deletions(-) create mode 100644 .sdkmanrc diff --git a/.sdkmanrc b/.sdkmanrc new file mode 100644 index 00000000..598592aa --- /dev/null +++ b/.sdkmanrc @@ -0,0 +1 @@ +java=11.0.28-tem diff --git a/apiary-metastore-events/kafka-metastore-events/kafka-metastore-receiver/pom.xml b/apiary-metastore-events/kafka-metastore-events/kafka-metastore-receiver/pom.xml index e50a44c4..d7b821be 100644 --- a/apiary-metastore-events/kafka-metastore-events/kafka-metastore-receiver/pom.xml +++ b/apiary-metastore-events/kafka-metastore-events/kafka-metastore-receiver/pom.xml @@ -22,6 +22,10 @@ kafka-clients ${kafka.version} + + io.micrometer + micrometer-core + diff --git a/apiary-metastore-events/kafka-metastore-events/kafka-metastore-receiver/src/main/java/com/expediagroup/apiary/extensions/events/metastore/kafka/messaging/KafkaMessageReader.java b/apiary-metastore-events/kafka-metastore-events/kafka-metastore-receiver/src/main/java/com/expediagroup/apiary/extensions/events/metastore/kafka/messaging/KafkaMessageReader.java index 81ec7bd9..97712e85 100644 --- a/apiary-metastore-events/kafka-metastore-events/kafka-metastore-receiver/src/main/java/com/expediagroup/apiary/extensions/events/metastore/kafka/messaging/KafkaMessageReader.java +++ b/apiary-metastore-events/kafka-metastore-events/kafka-metastore-receiver/src/main/java/com/expediagroup/apiary/extensions/events/metastore/kafka/messaging/KafkaMessageReader.java @@ -1,5 +1,5 @@ /** - * Copyright (C) 2018-2020 Expedia, Inc. + * Copyright (C) 2018-2026 Expedia, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -29,6 +29,9 @@ import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.KafkaConsumer; +import io.micrometer.core.instrument.MeterRegistry; +import io.micrometer.core.instrument.binder.kafka.KafkaClientMetrics; + import com.google.common.annotations.VisibleForTesting; import com.expediagroup.apiary.extensions.events.metastore.event.ApiaryListenerEvent; @@ -42,16 +45,21 @@ public class KafkaMessageReader implements Iterator, Closea private KafkaConsumer consumer; private MetaStoreEventSerDe eventSerDe; private Iterator> records; + private KafkaClientMetrics kafkaClientMetrics; - private KafkaMessageReader(String topicName, MetaStoreEventSerDe eventSerDe, Properties consumerProperties) { - this(topicName, eventSerDe, new KafkaConsumer(consumerProperties)); + private KafkaMessageReader(String topicName, MetaStoreEventSerDe eventSerDe, Properties consumerProperties, MeterRegistry meterRegistry) { + this(topicName, eventSerDe, new KafkaConsumer(consumerProperties), meterRegistry); } @VisibleForTesting - KafkaMessageReader(String topicName, MetaStoreEventSerDe eventSerDe, KafkaConsumer consumer) { + KafkaMessageReader(String topicName, MetaStoreEventSerDe eventSerDe, KafkaConsumer consumer, MeterRegistry meterRegistry) { this.eventSerDe = eventSerDe; this.consumer = consumer; this.consumer.subscribe(Collections.singletonList(topicName)); + if (meterRegistry != null) { + this.kafkaClientMetrics = new KafkaClientMetrics(consumer); + this.kafkaClientMetrics.bindTo(meterRegistry); + } } @Override @@ -73,6 +81,9 @@ public void remove() { @Override public void close() { + if (kafkaClientMetrics != null) { + kafkaClientMetrics.close(); + } consumer.close(); } @@ -89,6 +100,7 @@ public static final class KafkaMessageReaderBuilder { private String groupId = "apiary-kafka-metastore-receiver-"; private MetaStoreEventSerDe metaStoreEventSerDe = new JsonMetaStoreEventSerDe(); private Properties consumerProperties = new Properties(); + private MeterRegistry meterRegistry; private KafkaMessageReaderBuilder(String bootstrapServers, String topicName, String applicationName) { this.bootstrapServers = bootstrapServers; @@ -114,6 +126,11 @@ public KafkaMessageReaderBuilder withConsumerProperties(Properties consumerPrope return this; } + public KafkaMessageReaderBuilder withMeterRegistry(MeterRegistry meterRegistry) { + this.meterRegistry = meterRegistry; + return this; + } + public KafkaMessageReader build() { Properties props = new Properties(); props.put(BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); @@ -121,7 +138,7 @@ public KafkaMessageReader build() { props.put("key.deserializer", "org.apache.kafka.common.serialization.LongDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer"); consumerProperties.forEach((key, value) -> props.merge(key, value, (v1, v2) -> v1)); - return new KafkaMessageReader(topicName, metaStoreEventSerDe, props); + return new KafkaMessageReader(topicName, metaStoreEventSerDe, props, meterRegistry); } } } diff --git a/apiary-metastore-events/kafka-metastore-events/kafka-metastore-receiver/src/test/java/com/expediagroup/apiary/extensions/events/metastore/kafka/messaging/KafkaMessageReaderTest.java b/apiary-metastore-events/kafka-metastore-events/kafka-metastore-receiver/src/test/java/com/expediagroup/apiary/extensions/events/metastore/kafka/messaging/KafkaMessageReaderTest.java index 87373ab8..beb493bf 100644 --- a/apiary-metastore-events/kafka-metastore-events/kafka-metastore-receiver/src/test/java/com/expediagroup/apiary/extensions/events/metastore/kafka/messaging/KafkaMessageReaderTest.java +++ b/apiary-metastore-events/kafka-metastore-events/kafka-metastore-receiver/src/test/java/com/expediagroup/apiary/extensions/events/metastore/kafka/messaging/KafkaMessageReaderTest.java @@ -1,5 +1,5 @@ /** - * Copyright (C) 2018-2020 Expedia, Inc. + * Copyright (C) 2018-2026 Expedia, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -37,6 +37,8 @@ import org.mockito.Mock; import org.mockito.junit.MockitoJUnitRunner; +import io.micrometer.core.instrument.simple.SimpleMeterRegistry; + import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; @@ -60,9 +62,11 @@ public class KafkaMessageReaderTest { private ConsumerRecords messages; private KafkaMessageReader reader; + private SimpleMeterRegistry meterRegistry; @Before public void init() { + meterRegistry = new SimpleMeterRegistry(); List> messageList = ImmutableList.of(message); Map>> messageMap = ImmutableMap .of(new TopicPartition(TOPIC_NAME, PARTITION), messageList); @@ -70,7 +74,7 @@ public void init() { when(consumer.poll(any(Duration.class))).thenReturn(messages); when(message.value()).thenReturn(MESSAGE_CONTENT); when(serDe.unmarshal(MESSAGE_CONTENT)).thenReturn(event); - reader = new KafkaMessageReader(TOPIC_NAME, serDe, consumer); + reader = new KafkaMessageReader(TOPIC_NAME, serDe, consumer, meterRegistry); } @Test diff --git a/pom.xml b/pom.xml index 8fb9cca5..114ddbfe 100644 --- a/pom.xml +++ b/pom.xml @@ -28,7 +28,7 @@ 2.7.1 2.3.7 1.0.0 - 1.9.9 + 1.14.14