diff --git a/.editorconfig b/.editorconfig new file mode 100644 index 0000000..402c963 --- /dev/null +++ b/.editorconfig @@ -0,0 +1,19 @@ +[*] +indent_size=2 +indent_style=space +charset=utf-8 + +[*.java] +continuation_indent_size=2 +trim_trailing_whitespace=true +insert_final_newline=true +max_line_length=120 +wildcard_import_limit=999 +ij_java_blank_lines_after_imports=1 +ij_java_blank_lines_before_imports=1 +ij_java_class_count_to_use_import_on_demand=999 +ij_java_names_count_to_use_import_on_demand=999 +ij_java_imports_layout=$java.**,|,$javax.**,|,$org.**,|,$*,$com.**,|,$com.expedia.**,|,$com.hotels.**,java.**,|,javax.**,|,org.**,|,*,|,com.**,|,com.expedia.**,|,com.hotels.** +ij_java_keep_simple_methods_in_one_line=true +ij_java_keep_simple_lambdas_in_one_line=true +ij_java_keep_simple_classes_in_one_line = true \ No newline at end of file diff --git a/.sdkmanrc b/.sdkmanrc new file mode 100644 index 0000000..f24009b --- /dev/null +++ b/.sdkmanrc @@ -0,0 +1 @@ +java=21.0.8-tem diff --git a/drone-fly-app/pom.xml b/drone-fly-app/pom.xml index 12354f9..75b0e0d 100644 --- a/drone-fly-app/pom.xml +++ b/drone-fly-app/pom.xml @@ -27,7 +27,7 @@ com.expediagroup.apiary kafka-metastore-receiver - 8.1.15 + ${apiary.extensions.version} jdk.tools diff --git a/drone-fly-app/src/main/java/com/expediagroup/dataplatform/dronefly/app/DroneFly.java b/drone-fly-app/src/main/java/com/expediagroup/dataplatform/dronefly/app/DroneFly.java index d217828..dbe53f1 100644 --- a/drone-fly-app/src/main/java/com/expediagroup/dataplatform/dronefly/app/DroneFly.java +++ b/drone-fly-app/src/main/java/com/expediagroup/dataplatform/dronefly/app/DroneFly.java @@ -1,5 +1,5 @@ /** - * Copyright (C) 2020 Expedia, Inc. + * Copyright (C) 2020-2026 Expedia, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -17,22 +17,14 @@ import java.util.TimeZone; -import org.springframework.beans.BeansException; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration; import org.springframework.boot.builder.SpringApplicationBuilder; import org.springframework.boot.context.properties.EnableConfigurationProperties; -import org.springframework.context.ApplicationContext; -import org.springframework.context.ApplicationContextAware; -import org.springframework.context.ConfigurableApplicationContext; - -import com.google.common.annotations.VisibleForTesting; @SpringBootApplication(exclude = {DataSourceAutoConfiguration.class}) @EnableConfigurationProperties -public class DroneFly implements ApplicationContextAware { - - private static ConfigurableApplicationContext context; +public class DroneFly { public static void main(String[] args) { TimeZone.setDefault(TimeZone.getTimeZone("UTC")); @@ -42,22 +34,4 @@ public static void main(String[] args) { .build() .run(args); } - - @VisibleForTesting - public static boolean isRunning() { - return context != null && context.isRunning(); - } - - @VisibleForTesting - public static void stop() { - if (context == null) { - throw new RuntimeException("Application context has not been started."); - } - context.close(); - } - - @Override - public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { - context = (ConfigurableApplicationContext) applicationContext; - } } diff --git a/drone-fly-app/src/main/java/com/expediagroup/dataplatform/dronefly/app/DroneFlyRunner.java b/drone-fly-app/src/main/java/com/expediagroup/dataplatform/dronefly/app/DroneFlyRunner.java index e29d48b..c010f4f 100644 --- a/drone-fly-app/src/main/java/com/expediagroup/dataplatform/dronefly/app/DroneFlyRunner.java +++ b/drone-fly-app/src/main/java/com/expediagroup/dataplatform/dronefly/app/DroneFlyRunner.java @@ -1,5 +1,5 @@ /** - * Copyright (C) 2020-2025 Expedia, Inc. + * Copyright (C) 2020-2026 Expedia, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/drone-fly-app/src/main/java/com/expediagroup/dataplatform/dronefly/app/service/factory/HMSHandlerFactory.java b/drone-fly-app/src/main/java/com/expediagroup/dataplatform/dronefly/app/service/factory/HMSHandlerFactory.java index f8c9cb9..b0facfe 100644 --- a/drone-fly-app/src/main/java/com/expediagroup/dataplatform/dronefly/app/service/factory/HMSHandlerFactory.java +++ b/drone-fly-app/src/main/java/com/expediagroup/dataplatform/dronefly/app/service/factory/HMSHandlerFactory.java @@ -1,5 +1,5 @@ /** - * Copyright (C) 2020 Expedia, Inc. + * Copyright (C) 2020-2026 Expedia, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/drone-fly-app/src/main/java/com/expediagroup/dataplatform/dronefly/app/service/factory/ListenerCatalogFactory.java b/drone-fly-app/src/main/java/com/expediagroup/dataplatform/dronefly/app/service/factory/ListenerCatalogFactory.java index 4e5da28..b78b709 100644 --- a/drone-fly-app/src/main/java/com/expediagroup/dataplatform/dronefly/app/service/factory/ListenerCatalogFactory.java +++ b/drone-fly-app/src/main/java/com/expediagroup/dataplatform/dronefly/app/service/factory/ListenerCatalogFactory.java @@ -1,5 +1,5 @@ /** - * Copyright (C) 2020 Expedia, Inc. + * Copyright (C) 2020-2026 Expedia, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/drone-fly-app/src/test/java/com/expediagroup/dataplatform/dronefly/app/DroneFlyRunnerTest.java b/drone-fly-app/src/test/java/com/expediagroup/dataplatform/dronefly/app/DroneFlyRunnerTest.java index 9e99187..8cced2a 100644 --- a/drone-fly-app/src/test/java/com/expediagroup/dataplatform/dronefly/app/DroneFlyRunnerTest.java +++ b/drone-fly-app/src/test/java/com/expediagroup/dataplatform/dronefly/app/DroneFlyRunnerTest.java @@ -1,5 +1,5 @@ /** - * Copyright (C) 2020-2025 Expedia, Inc. + * Copyright (C) 2020-2026 Expedia, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/drone-fly-app/src/test/java/com/expediagroup/dataplatform/dronefly/app/service/DroneFlyNotificationServiceTest.java b/drone-fly-app/src/test/java/com/expediagroup/dataplatform/dronefly/app/service/DroneFlyNotificationServiceTest.java index 1221e51..94da523 100644 --- a/drone-fly-app/src/test/java/com/expediagroup/dataplatform/dronefly/app/service/DroneFlyNotificationServiceTest.java +++ b/drone-fly-app/src/test/java/com/expediagroup/dataplatform/dronefly/app/service/DroneFlyNotificationServiceTest.java @@ -1,5 +1,5 @@ /** - * Copyright (C) 2020 Expedia, Inc. + * Copyright (C) 2020-2026 Expedia, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/drone-fly-app/src/test/java/com/expediagroup/dataplatform/dronefly/app/service/factory/HMSHandlerFactoryTest.java b/drone-fly-app/src/test/java/com/expediagroup/dataplatform/dronefly/app/service/factory/HMSHandlerFactoryTest.java index 01d1540..a378eb9 100644 --- a/drone-fly-app/src/test/java/com/expediagroup/dataplatform/dronefly/app/service/factory/HMSHandlerFactoryTest.java +++ b/drone-fly-app/src/test/java/com/expediagroup/dataplatform/dronefly/app/service/factory/HMSHandlerFactoryTest.java @@ -1,5 +1,5 @@ /** - * Copyright (C) 2020 Expedia, Inc. + * Copyright (C) 2020-2026 Expedia, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/drone-fly-integration-tests/pom.xml b/drone-fly-integration-tests/pom.xml index 52785ed..3413d11 100644 --- a/drone-fly-integration-tests/pom.xml +++ b/drone-fly-integration-tests/pom.xml @@ -48,7 +48,7 @@ com.expediagroup.apiary kafka-metastore-listener - 8.1.18 + ${apiary.extensions.version} test @@ -111,7 +111,7 @@ com.expediagroup.apiary apiary-gluesync-listener - 8.1.18 + ${apiary.extensions.version} all test diff --git a/drone-fly-integration-tests/src/test/java/com/expediagroup/dataplatform/dronefly/core/integration/ApiaryGlueSyncMetricsIntegrationTest.java b/drone-fly-integration-tests/src/test/java/com/expediagroup/dataplatform/dronefly/core/integration/ApiaryGlueSyncMetricsIntegrationTest.java index 56741ae..b9f38ea 100644 --- a/drone-fly-integration-tests/src/test/java/com/expediagroup/dataplatform/dronefly/core/integration/ApiaryGlueSyncMetricsIntegrationTest.java +++ b/drone-fly-integration-tests/src/test/java/com/expediagroup/dataplatform/dronefly/core/integration/ApiaryGlueSyncMetricsIntegrationTest.java @@ -15,18 +15,39 @@ */ package com.expediagroup.dataplatform.dronefly.core.integration; +import static java.util.Map.entry; import static org.assertj.core.api.Assertions.assertThat; +import static com.expediagroup.dataplatform.dronefly.core.integration.support.DroneFlyIntegrationTestUtils.buildPartition; +import static com.expediagroup.dataplatform.dronefly.core.integration.support.DroneFlyIntegrationTestUtils.buildTable; +import static com.expediagroup.dataplatform.dronefly.core.integration.support.SpringMetricsUtils.metric; +import static com.expediagroup.dataplatform.dronefly.core.integration.support.SpringMetricsUtils.springMetricsIncrease; + +import org.apache.hadoop.hive.metastore.HMSHandler; +import org.apache.hadoop.hive.metastore.api.MetaException; +import org.apache.hadoop.hive.metastore.events.AddPartitionEvent; +import org.apache.hadoop.hive.metastore.events.CreateTableEvent; +import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestInstance; +import org.mockito.Mockito; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; -import org.springframework.test.context.bean.override.mockito.MockitoBean; +import org.springframework.boot.test.web.client.TestRestTemplate; +import org.springframework.boot.test.web.server.LocalManagementPort; +import org.springframework.context.annotation.Import; +import org.springframework.kafka.test.context.EmbeddedKafka; import io.micrometer.core.instrument.Metrics; +import com.expediagroup.apiary.extensions.gluesync.listener.ApiaryGlueSync; +import com.expediagroup.apiary.extensions.gluesync.listener.metrics.MetricConstants; import com.expediagroup.dataplatform.dronefly.app.DroneFly; -import com.expediagroup.dataplatform.dronefly.app.DroneFlyRunner; import com.expediagroup.dataplatform.dronefly.app.service.ListenerCatalog; +import com.expediagroup.dataplatform.dronefly.core.integration.support.AsyncRunnerConfig; +import com.expediagroup.dataplatform.dronefly.core.integration.support.SpringMetricsUtils; /** * Deployment smoke test for apiary-gluesync-listener inside a Dronefly Spring Boot context. @@ -36,29 +57,54 @@ */ @SpringBootTest( classes = DroneFly.class, - // RANDOM_PORT (not NONE) mirrors production: Dronefly always runs a web server, and the - // web server's dependency chain causes PrometheusMeterRegistry to initialise before - // ListenerCatalog — preventing the JmxMeterRegistry fallback in MetricService. webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT, properties = { - "apiary.bootstrap.servers=localhost:9999", + "spring.main.allow-bean-definition-overriding=true", + "apiary.bootstrap.servers=${spring.embedded.kafka.brokers}", "apiary.kafka.topic.name=test-topic", "instance.name=test", "apiary.listener.list=com.expediagroup.apiary.extensions.gluesync.listener.ApiaryGlueSync", // Spring Boot test defaults disable metric export; re-enable so PrometheusMeterRegistry // is added to Metrics.globalRegistry before ApiaryGlueSync is constructed. "management.defaults.metrics.export.enabled=true", - "management.prometheus.metrics.export.enabled=true" + "management.prometheus.metrics.export.enabled=true", + "management.endpoints.web.exposure.include=metrics,prometheus" } ) +@EmbeddedKafka(controlledShutdown = true, topics = {"test-topic"}, partitions = 1) +@Import(AsyncRunnerConfig.class) +@TestInstance(TestInstance.Lifecycle.PER_CLASS) class ApiaryGlueSyncMetricsIntegrationTest { - @MockitoBean - DroneFlyRunner droneFlyRunner; + private static final Logger log = LoggerFactory.getLogger(DroneFlyIntegrationTest.class); + + private static final SpringMetricsUtils.Metric CREATE_TABLE_IGNORED = metric("glue_listener_event", "COUNT", + "operation", MetricConstants.CREATE_TABLE, + "result", MetricConstants.RESULT_IGNORED, + "outcome", "ignored"); + private static final SpringMetricsUtils.Metric ADD_PARTITION_IGNORED = metric("glue_listener_event", "COUNT", + "operation", MetricConstants.ADD_PARTITION, + "result", MetricConstants.RESULT_IGNORED, + "outcome", "ignored"); + private static final SpringMetricsUtils.Metric TABLE_FAILURE = metric("glue_listener_table_failure", "COUNT"); + private static final SpringMetricsUtils.Metric CREATE_TABLE_FAILURE = metric("glue_listener_event", "COUNT", + "operation", MetricConstants.CREATE_TABLE, + "result", MetricConstants.RESULT_FAILURE); + + @LocalManagementPort + private int managementPort; @Autowired ListenerCatalog listenerCatalog; + @Autowired + TestRestTemplate restTemplate; + + @BeforeAll + void setUp() { + log.info("Management URI: http://localhost:{}/actuator", managementPort); + } + /** Verifies the fat jar loaded cleanly and that Prometheus was registered before ApiaryGlueSync * was constructed — wrong bean ordering would silently add a JmxMeterRegistry instead. */ @Test @@ -70,4 +116,111 @@ void listenerLoadedWithCorrectMetricRegistry() { assertThat(Metrics.globalRegistry.getRegistries()) .noneMatch(r -> r.getClass().getName().contains("JmxMeterRegistry")); } + + /** + * Verifies all GlueSync counters are exported via the actuator metrics endpoint, + * confirming they landed in the Prometheus registry rather than a hidden JMX one. + * {@code glue_listener_event} is on-demand, so a status=false event is fired first + * to ensure it is registered before the name list is checked. + */ + @Test + void gluesyncMetricsExported() { + ApiaryGlueSync apiaryGlueSync = listenerCatalog.getListeners().stream() + .filter(ApiaryGlueSync.class::isInstance) + .map(ApiaryGlueSync.class::cast) + .findFirst() + .orElseThrow(); + try { + apiaryGlueSync.onCreateTable(new CreateTableEvent(buildTable(), false, Mockito.mock(HMSHandler.class), false)); + } catch (MetaException e) { + throw new RuntimeException(e); + } + + assertThat(SpringMetricsUtils.metricNames(restTemplate)) + .containsAll(MetricConstants.LISTENER_METRICS) + .contains(MetricConstants.LISTENER_EVENT); + } + + /** + * Verifies GlueSync counters appear in the Prometheus scrape output. + * Micrometer appends {@code _total} to counter names in Prometheus format. + * {@code glue_listener_event} is registered on-demand, so a status=false event + * is fired first to ensure it is present in the registry before scraping. + */ + @Test + void gluesyncMetricsInPrometheusFormat() { + ApiaryGlueSync apiaryGlueSync = listenerCatalog.getListeners().stream() + .filter(ApiaryGlueSync.class::isInstance) + .map(ApiaryGlueSync.class::cast) + .findFirst() + .orElseThrow(); + try { + apiaryGlueSync.onCreateTable(new CreateTableEvent(buildTable(), false, Mockito.mock(HMSHandler.class), false)); + } catch (MetaException e) { + throw new RuntimeException(e); + } + + String body = SpringMetricsUtils.prometheusBody(restTemplate); + MetricConstants.LISTENER_METRICS.forEach(name -> assertThat(body).contains(name + "_total")); + assertThat(body).contains(MetricConstants.LISTENER_EVENT + "_total"); + } + + /** + * Verifies that the new tagged {@code glue_listener_event} counter is recorded and exported when + * events are processed. Uses {@code status=false} events so the HMS-failure path is taken + * inside ApiaryGlueSync — which records the "ignored" metric without making any Glue API calls. + */ + @Test + void gluesyncEventMetricRecorded() { + ApiaryGlueSync apiaryGlueSync = listenerCatalog.getListeners().stream() + .filter(ApiaryGlueSync.class::isInstance) + .map(ApiaryGlueSync.class::cast) + .findFirst() + .orElseThrow(); + + HMSHandler hmsHandler = Mockito.mock(HMSHandler.class); + + springMetricsIncrease( + restTemplate, + () -> { + try { + apiaryGlueSync.onCreateTable(new CreateTableEvent(buildTable(), false, hmsHandler, false)); + apiaryGlueSync.onAddPartition(new AddPartitionEvent(buildTable(), buildPartition(), false, hmsHandler)); + } catch (MetaException e) { + throw new RuntimeException(e); + } + }, + entry(CREATE_TABLE_IGNORED, 1.0), + entry(ADD_PARTITION_IGNORED, 1.0) + ); + } + + /** + * Verifies {@code glue_listener_table_failure} and the tagged {@code glue_listener_event} + * failure counter are recorded when the Glue API is unreachable. The surefire configuration + * sets {@code AWS_REGION=us-fake-1} with fake credentials so every real Glue call fails fast + * with an {@code UnknownHostException} — no mocking needed. + */ + @Test + void gluesyncFailureMetricRecordedOnGlueError() { + ApiaryGlueSync apiaryGlueSync = listenerCatalog.getListeners().stream() + .filter(ApiaryGlueSync.class::isInstance) + .map(ApiaryGlueSync.class::cast) + .findFirst() + .orElseThrow(); + + HMSHandler hmsHandler = Mockito.mock(HMSHandler.class); + + springMetricsIncrease( + restTemplate, + () -> { + try { + apiaryGlueSync.onCreateTable(new CreateTableEvent(buildTable(), true, hmsHandler, false)); + } catch (MetaException e) { + throw new RuntimeException(e); + } + }, + entry(TABLE_FAILURE, 1.0), + entry(CREATE_TABLE_FAILURE, 1.0)); + } } diff --git a/drone-fly-integration-tests/src/test/java/com/expediagroup/dataplatform/dronefly/core/integration/DroneFlyIntegrationTest.java b/drone-fly-integration-tests/src/test/java/com/expediagroup/dataplatform/dronefly/core/integration/DroneFlyIntegrationTest.java index 507bc3c..0796246 100644 --- a/drone-fly-integration-tests/src/test/java/com/expediagroup/dataplatform/dronefly/core/integration/DroneFlyIntegrationTest.java +++ b/drone-fly-integration-tests/src/test/java/com/expediagroup/dataplatform/dronefly/core/integration/DroneFlyIntegrationTest.java @@ -15,33 +15,31 @@ */ package com.expediagroup.dataplatform.dronefly.core.integration; +import static java.util.Map.entry; + import static org.apache.hadoop.hive.metastore.messaging.EventMessage.EventType.ADD_PARTITION; import static org.apache.hadoop.hive.metastore.messaging.EventMessage.EventType.CREATE_TABLE; import static org.assertj.core.api.Assertions.assertThat; -import static org.awaitility.Awaitility.await; import static org.junit.jupiter.api.Assertions.fail; import static com.expediagroup.apiary.extensions.events.metastore.kafka.messaging.KafkaProducerProperty.BOOTSTRAP_SERVERS; import static com.expediagroup.apiary.extensions.events.metastore.kafka.messaging.KafkaProducerProperty.CLIENT_ID; import static com.expediagroup.apiary.extensions.events.metastore.kafka.messaging.KafkaProducerProperty.TOPIC_NAME; -import static com.expediagroup.dataplatform.dronefly.core.integration.DroneFlyIntegrationTestUtils.DATABASE; -import static com.expediagroup.dataplatform.dronefly.core.integration.DroneFlyIntegrationTestUtils.TABLE; -import static com.expediagroup.dataplatform.dronefly.core.integration.DroneFlyIntegrationTestUtils.TOPIC; -import static com.expediagroup.dataplatform.dronefly.core.integration.DroneFlyIntegrationTestUtils.buildPartition; -import static com.expediagroup.dataplatform.dronefly.core.integration.DroneFlyIntegrationTestUtils.buildTable; -import static com.expediagroup.dataplatform.dronefly.core.integration.DroneFlyIntegrationTestUtils.buildTableParameters; -import static com.expediagroup.dataplatform.dronefly.core.integration.DummyListener.EVENT_COUNT_METRIC; +import static com.expediagroup.dataplatform.dronefly.core.integration.support.DroneFlyIntegrationTestUtils.DATABASE; +import static com.expediagroup.dataplatform.dronefly.core.integration.support.DroneFlyIntegrationTestUtils.TABLE; +import static com.expediagroup.dataplatform.dronefly.core.integration.support.DroneFlyIntegrationTestUtils.TOPIC; +import static com.expediagroup.dataplatform.dronefly.core.integration.support.DroneFlyIntegrationTestUtils.buildPartition; +import static com.expediagroup.dataplatform.dronefly.core.integration.support.DroneFlyIntegrationTestUtils.awaitOffsetCommitted; +import static com.expediagroup.dataplatform.dronefly.core.integration.support.DroneFlyIntegrationTestUtils.buildTable; +import static com.expediagroup.dataplatform.dronefly.core.integration.support.DroneFlyIntegrationTestUtils.buildTableParameters; +import static com.expediagroup.dataplatform.dronefly.core.integration.support.DummyListener.EVENT_COUNT_METRIC; +import static com.expediagroup.dataplatform.dronefly.core.integration.support.SpringMetricsUtils.metric; +import static com.expediagroup.dataplatform.dronefly.core.integration.support.SpringMetricsUtils.springMetricsIncrease; import java.util.ArrayList; -import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.TimeUnit; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.metastore.HMSHandler; @@ -50,67 +48,69 @@ import org.apache.hadoop.hive.metastore.events.CreateTableEvent; import org.apache.hadoop.hive.metastore.events.ListenerEvent; import org.apache.hadoop.hive.metastore.messaging.EventMessage.EventType; -import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.apache.kafka.common.serialization.StringDeserializer; -import java.time.Duration; -import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.TestInstance; -import org.junit.jupiter.api.extension.ExtendWith; -import org.mockito.Mock; +import org.mockito.Mockito; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.kafka.core.DefaultKafkaConsumerFactory; -import org.springframework.kafka.listener.ContainerProperties; -import org.springframework.kafka.listener.KafkaMessageListenerContainer; -import org.springframework.kafka.listener.MessageListener; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.boot.test.web.client.TestRestTemplate; +import org.springframework.boot.test.web.server.LocalManagementPort; +import org.springframework.context.annotation.Import; import org.springframework.kafka.test.EmbeddedKafkaBroker; import org.springframework.kafka.test.context.EmbeddedKafka; -import org.springframework.kafka.test.utils.ContainerTestUtils; -import org.springframework.kafka.test.utils.KafkaTestUtils; -import org.springframework.test.context.junit.jupiter.SpringExtension; - -import com.google.common.collect.Lists; import com.expediagroup.apiary.extensions.events.metastore.kafka.listener.KafkaMetaStoreEventListener; import com.expediagroup.dataplatform.dronefly.app.DroneFly; +import com.expediagroup.dataplatform.dronefly.core.integration.support.AsyncRunnerConfig; +import com.expediagroup.dataplatform.dronefly.core.integration.support.DummyListener; +import com.google.common.collect.Lists; -@EmbeddedKafka(count = 1, controlledShutdown = true, topics = { TOPIC }, partitions = 1) -@ExtendWith(SpringExtension.class) +@Import(AsyncRunnerConfig.class) +@SpringBootTest( + classes = DroneFly.class, + webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT, + properties = { + "spring.main.allow-bean-definition-overriding=true", + "apiary.bootstrap.servers=${spring.embedded.kafka.brokers}", + "apiary.kafka.topic.name=" + TOPIC, + "instance.name=test", + "apiary.listener.list=com.expediagroup.dataplatform.dronefly.core.integration.support.DummyListener", + "management.defaults.metrics.export.enabled=true", + "management.prometheus.metrics.export.enabled=true", + "management.endpoints.web.exposure.include=health,info,prometheus,metrics" + } +) +@EmbeddedKafka(count = 1, controlledShutdown = true, topics = {TOPIC}, partitions = 1) @TestInstance(TestInstance.Lifecycle.PER_CLASS) public class DroneFlyIntegrationTest { - private @Mock HMSHandler hmsHandler; + private static final Logger log = LoggerFactory.getLogger(DroneFlyIntegrationTest.class); + // KafkaMessageReaderBuilder prefixes "apiary-kafka-metastore-receiver-" + instanceName + private static final String CONSUMER_GROUP = "apiary-kafka-metastore-receiver-test"; + + private HMSHandler hmsHandler = Mockito.mock(HMSHandler.class); - private final ExecutorService executorService = Executors.newFixedThreadPool(1); private static Configuration CONF = new Configuration(); private KafkaMetaStoreEventListener kafkaMetaStoreEventListener; @Autowired - private EmbeddedKafkaBroker embeddedKafkaBroker; + private TestRestTemplate restTemplate; - private BlockingQueue> records; + @LocalManagementPort + private int managementPort; - private KafkaMessageListenerContainer container; + @Autowired + private EmbeddedKafkaBroker embeddedKafkaBroker; @BeforeAll - void setUp() throws InterruptedException { - /** - * The function initEmbeddedKafka() is required so that EmbeddedKafka waits for the consumer group assignment to - * complete. - * https://stackoverflow.com/questions/47312373/embeddedkafka-sending-messages-to-consumer-after-delay-in-subsequent-test - */ - initEmbeddedKafka(); - System.setProperty("instance.name", "test"); - System.setProperty("apiary.bootstrap.servers", embeddedKafkaBroker.getBrokersAsString()); - System.setProperty("apiary.kafka.topic.name", TOPIC); - System.setProperty("apiary.listener.list", "com.expediagroup.dataplatform.dronefly.core.integration.DummyListener"); + void setUp() { + log.info("Management URI: http://localhost:{}/actuator", managementPort); initKafkaListener(); - - executorService.execute(() -> DroneFly.main(new String[] {})); - await().atMost(Duration.ofMinutes(5)).until(DroneFly::isRunning); } @AfterEach @@ -118,75 +118,61 @@ public void reset() { DummyListener.reset(); } - @AfterAll - public void stop() throws InterruptedException { - DroneFly.stop(); - executorService.awaitTermination(5, TimeUnit.SECONDS); - } - @Test public void typical() { - AddPartitionEvent addPartitionEvent = new AddPartitionEvent(buildTable(), buildPartition(), true, hmsHandler); - kafkaMetaStoreEventListener.onAddPartition(addPartitionEvent); - - CreateTableEvent createTableEvent = new CreateTableEvent(buildTable(), true, hmsHandler, false); - kafkaMetaStoreEventListener.onCreateTable(createTableEvent); - - await().atMost(5, TimeUnit.SECONDS).until(() -> DummyListener.getNumEvents() > 1); - + // verify that the consumer has read the events + springMetricsIncrease( + restTemplate, + () -> { + kafkaMetaStoreEventListener.onAddPartition(new AddPartitionEvent(buildTable(), buildPartition(), true, hmsHandler)); + kafkaMetaStoreEventListener.onCreateTable(new CreateTableEvent(buildTable(), true, hmsHandler, false)); + }, + entry(metric("kafka.consumer.fetch.manager.records.consumed.total", "COUNT"), 2.0) + ); + + awaitOffsetCommitted(embeddedKafkaBroker, CONSUMER_GROUP, TOPIC, 0, 2L); assertThat(DummyListener.getNumEvents()).isEqualTo(2); + double countBefore = EVENT_COUNT_METRIC.count(); ListenerEvent receivedEventOne = DummyListener.get(0); ListenerEvent receivedEventTwo = DummyListener.get(1); assertEvent(receivedEventOne, ADD_PARTITION); assertEvent(receivedEventTwo, CREATE_TABLE); - assertThat(EVENT_COUNT_METRIC.count()).isEqualTo(2.0); + assertThat(EVENT_COUNT_METRIC.count()).isEqualTo(countBefore + 2.0); } private void assertEvent(ListenerEvent event, EventType eventType) { assertThat(event.getStatus()).isTrue(); switch (eventType) { - case ADD_PARTITION: - assertThat(event).isInstanceOf(AddPartitionEvent.class); - AddPartitionEvent addPartitionEvent = (AddPartitionEvent) event; - assertThat(addPartitionEvent.getTable().getDbName()).isEqualTo(DATABASE); - assertThat(addPartitionEvent.getTable().getTableName()).isEqualTo(TABLE); - Iterator iterator = addPartitionEvent.getPartitionIterator(); - List partitions = new ArrayList<>(); - while (iterator.hasNext()) { - partitions.add(iterator.next()); - } - assertThat(partitions).isEqualTo(Lists.newArrayList(buildPartition())); - assertThat(addPartitionEvent.getTable().getParameters()).isEqualTo(buildTableParameters()); - break; - case CREATE_TABLE: - assertThat(event).isInstanceOf(CreateTableEvent.class); - CreateTableEvent createTableEvent = (CreateTableEvent) event; - assertThat(createTableEvent.getTable().getDbName()).isEqualTo(DATABASE); - assertThat(createTableEvent.getTable().getTableName()).isEqualTo(TABLE); - break; - default: - fail(String + case ADD_PARTITION: + assertThat(event).isInstanceOf(AddPartitionEvent.class); + AddPartitionEvent addPartitionEvent = (AddPartitionEvent) event; + assertThat(addPartitionEvent.getTable().getDbName()).isEqualTo(DATABASE); + assertThat(addPartitionEvent.getTable().getTableName()).isEqualTo(TABLE); + Iterator iterator = addPartitionEvent.getPartitionIterator(); + List partitions = new ArrayList<>(); + while (iterator.hasNext()) { + partitions.add(iterator.next()); + } + assertThat(partitions).isEqualTo(Lists.newArrayList(buildPartition())); + assertThat(addPartitionEvent.getTable().getParameters()).isEqualTo(buildTableParameters()); + break; + case CREATE_TABLE: + assertThat(event).isInstanceOf(CreateTableEvent.class); + CreateTableEvent createTableEvent = (CreateTableEvent) event; + assertThat(createTableEvent.getTable().getDbName()).isEqualTo(DATABASE); + assertThat(createTableEvent.getTable().getTableName()).isEqualTo(TABLE); + break; + default: + fail(String .format("Received an event with type: {%s} that is different than ADD_PARTITION or CREATE_TABLE.", - eventType)); - break; + eventType)); + break; } } - private void initEmbeddedKafka() { - Map configs = new HashMap<>(KafkaTestUtils.consumerProps("consumer", "false", embeddedKafkaBroker)); - DefaultKafkaConsumerFactory consumerFactory = new DefaultKafkaConsumerFactory<>(configs, - new StringDeserializer(), new StringDeserializer()); - ContainerProperties containerProperties = new ContainerProperties(TOPIC); - container = new KafkaMessageListenerContainer<>(consumerFactory, containerProperties); - records = new LinkedBlockingQueue<>(); - container.setupMessageListener((MessageListener) records::add); - container.start(); - ContainerTestUtils.waitForAssignment(container, embeddedKafkaBroker.getPartitionsPerTopic()); - } - private void initKafkaListener() { CONF.set(BOOTSTRAP_SERVERS.key(), embeddedKafkaBroker.getBrokersAsString()); CONF.set(CLIENT_ID.key(), "apiary-kafka-listener"); diff --git a/drone-fly-integration-tests/src/test/java/com/expediagroup/dataplatform/dronefly/core/integration/support/AsyncApplicationRunner.java b/drone-fly-integration-tests/src/test/java/com/expediagroup/dataplatform/dronefly/core/integration/support/AsyncApplicationRunner.java new file mode 100644 index 0000000..0b3bcad --- /dev/null +++ b/drone-fly-integration-tests/src/test/java/com/expediagroup/dataplatform/dronefly/core/integration/support/AsyncApplicationRunner.java @@ -0,0 +1,57 @@ +/** + * Copyright (C) 2020-2026 Expedia, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.expediagroup.dataplatform.dronefly.core.integration.support; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.boot.ApplicationArguments; +import org.springframework.boot.ApplicationRunner; + +/** + * Wraps a blocking {@link ApplicationRunner} in a daemon thread so that Spring Boot startup + * completes without mocking. Intended for use in {@code @TestConfiguration} beans only. + * + *

{@code DroneFlyRunner.run()} blocks indefinitely (it is the Kafka polling loop). Spring Boot + * calls {@code ApplicationRunner} beans synchronously during startup, so {@code @SpringBootTest} + * would hang. Registering this wrapper via {@code @TestConfiguration} with + * {@code spring.main.allow-bean-definition-overriding=true} replaces the component-scanned bean + * so Spring Boot startup completes normally. + */ +class AsyncApplicationRunner implements ApplicationRunner { + + private static final Logger log = LoggerFactory.getLogger(AsyncApplicationRunner.class); + + private final ApplicationRunner delegate; + private final String threadName; + + AsyncApplicationRunner(ApplicationRunner delegate, String threadName) { + this.delegate = delegate; + this.threadName = threadName; + } + + @Override + public void run(ApplicationArguments args) { + Thread thread = new Thread(() -> { + try { + delegate.run(args); + } catch (Exception e) { + log.warn("{} delegate threw unexpectedly", threadName, e); + } + }, threadName); + thread.setDaemon(true); + thread.start(); + } +} diff --git a/drone-fly-integration-tests/src/test/java/com/expediagroup/dataplatform/dronefly/core/integration/support/AsyncRunnerConfig.java b/drone-fly-integration-tests/src/test/java/com/expediagroup/dataplatform/dronefly/core/integration/support/AsyncRunnerConfig.java new file mode 100644 index 0000000..f2ae207 --- /dev/null +++ b/drone-fly-integration-tests/src/test/java/com/expediagroup/dataplatform/dronefly/core/integration/support/AsyncRunnerConfig.java @@ -0,0 +1,31 @@ +/** + * Copyright (C) 2020-2026 Expedia, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.expediagroup.dataplatform.dronefly.core.integration.support; + +import org.springframework.boot.test.context.TestConfiguration; +import org.springframework.context.annotation.Bean; + +import com.expediagroup.dataplatform.dronefly.app.DroneFlyRunner; +import com.expediagroup.dataplatform.dronefly.app.service.DroneFlyNotificationService; + +@TestConfiguration +public class AsyncRunnerConfig { + + @Bean + AsyncApplicationRunner droneFlyRunner(DroneFlyNotificationService notificationService) { + return new AsyncApplicationRunner(new DroneFlyRunner(notificationService), "drone-fly-runner"); + } +} diff --git a/drone-fly-integration-tests/src/test/java/com/expediagroup/dataplatform/dronefly/core/integration/DroneFlyIntegrationTestUtils.java b/drone-fly-integration-tests/src/test/java/com/expediagroup/dataplatform/dronefly/core/integration/support/DroneFlyIntegrationTestUtils.java similarity index 64% rename from drone-fly-integration-tests/src/test/java/com/expediagroup/dataplatform/dronefly/core/integration/DroneFlyIntegrationTestUtils.java rename to drone-fly-integration-tests/src/test/java/com/expediagroup/dataplatform/dronefly/core/integration/support/DroneFlyIntegrationTestUtils.java index 96fef50..6812a42 100644 --- a/drone-fly-integration-tests/src/test/java/com/expediagroup/dataplatform/dronefly/core/integration/DroneFlyIntegrationTestUtils.java +++ b/drone-fly-integration-tests/src/test/java/com/expediagroup/dataplatform/dronefly/core/integration/support/DroneFlyIntegrationTestUtils.java @@ -13,25 +13,34 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package com.expediagroup.dataplatform.dronefly.core.integration; +package com.expediagroup.dataplatform.dronefly.core.integration.support; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.awaitility.Awaitility.await; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.TimeUnit; import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.metastore.api.Partition; import org.apache.hadoop.hive.metastore.api.SerDeInfo; import org.apache.hadoop.hive.metastore.api.StorageDescriptor; import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.kafka.clients.admin.AdminClient; +import org.apache.kafka.clients.admin.AdminClientConfig; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.common.TopicPartition; +import org.springframework.kafka.test.EmbeddedKafkaBroker; import com.google.common.collect.Lists; public class DroneFlyIntegrationTestUtils { - static final String TOPIC = "apiary-events"; - static final String DATABASE = "database"; - static final String TABLE = "table"; + public static final String TOPIC = "apiary-events"; + public static final String DATABASE = "database"; + public static final String TABLE = "table"; public static Table buildTable() { return buildTable(TABLE); @@ -75,4 +84,20 @@ public static String buildQualifiedTableName() { return DATABASE + "." + TABLE; } + public static void awaitOffsetCommitted( + EmbeddedKafkaBroker broker, String consumerGroup, String topic, int partition, long expectedOffset) { + try (AdminClient admin = AdminClient.create( + Map.of(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, broker.getBrokersAsString()))) { + await().atMost(30, TimeUnit.SECONDS).untilAsserted(() -> { + Map committed = admin + .listConsumerGroupOffsets(consumerGroup) + .partitionsToOffsetAndMetadata() + .get(5, TimeUnit.SECONDS); + OffsetAndMetadata offset = committed.get(new TopicPartition(topic, partition)); + assertThat(offset).isNotNull(); + assertThat(offset.offset()).isEqualTo(expectedOffset); + }); + } + } + } diff --git a/drone-fly-integration-tests/src/test/java/com/expediagroup/dataplatform/dronefly/core/integration/DummyListener.java b/drone-fly-integration-tests/src/test/java/com/expediagroup/dataplatform/dronefly/core/integration/support/DummyListener.java similarity index 95% rename from drone-fly-integration-tests/src/test/java/com/expediagroup/dataplatform/dronefly/core/integration/DummyListener.java rename to drone-fly-integration-tests/src/test/java/com/expediagroup/dataplatform/dronefly/core/integration/support/DummyListener.java index 9dfaa06..3046215 100644 --- a/drone-fly-integration-tests/src/test/java/com/expediagroup/dataplatform/dronefly/core/integration/DummyListener.java +++ b/drone-fly-integration-tests/src/test/java/com/expediagroup/dataplatform/dronefly/core/integration/support/DummyListener.java @@ -13,10 +13,10 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package com.expediagroup.dataplatform.dronefly.core.integration; +package com.expediagroup.dataplatform.dronefly.core.integration.support; -import java.util.ArrayList; import java.util.List; +import java.util.concurrent.CopyOnWriteArrayList; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.metastore.MetaStoreEventListener; @@ -40,7 +40,7 @@ public class DummyListener extends MetaStoreEventListener { - public static final List notifyList = new ArrayList<>(); + public static final List notifyList = new CopyOnWriteArrayList<>(); public static final Counter EVENT_COUNT_METRIC = Counter.builder("EVENT_COUNT_CUSTOM_METRIC") .register(Metrics.globalRegistry); diff --git a/drone-fly-integration-tests/src/test/java/com/expediagroup/dataplatform/dronefly/core/integration/support/SpringMetricsUtils.java b/drone-fly-integration-tests/src/test/java/com/expediagroup/dataplatform/dronefly/core/integration/support/SpringMetricsUtils.java new file mode 100644 index 0000000..94d97fa --- /dev/null +++ b/drone-fly-integration-tests/src/test/java/com/expediagroup/dataplatform/dronefly/core/integration/support/SpringMetricsUtils.java @@ -0,0 +1,102 @@ +/** + * Copyright (C) 2020-2026 Expedia, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.expediagroup.dataplatform.dronefly.core.integration.support; + +import java.net.URI; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +import org.assertj.core.api.SoftAssertions; +import org.springframework.boot.test.web.client.TestRestTemplate; +import org.springframework.web.util.UriComponentsBuilder; + +import static org.awaitility.Awaitility.await; + +import com.jayway.jsonpath.JsonPath; + +public class SpringMetricsUtils { + + public record Metric(String name, String statisticName, List tags) { + public String[] tagsArray() { + return tags.toArray(new String[0]); + } + } + + public static Metric metric(String name, String statisticName, String... tags) { + return new Metric(name, statisticName, List.of(tags)); + } + + /** + * Returns the value for the given statistic of a named actuator metric. + * Returns 0.0 if the metric or statistic is not found. + */ + public static double metricValue(TestRestTemplate restTemplate, String name, String statistic, String... tags) { + try { + UriComponentsBuilder builder = UriComponentsBuilder.fromPath("/actuator/metrics/{name}"); + for (int i = 0; i + 1 < tags.length; i += 2) { + builder.queryParam("tag", tags[i] + ":" + tags[i + 1]); + } + URI uri = builder.buildAndExpand(name).toUri(); + String body = restTemplate.getForObject(uri, String.class); + List values = JsonPath.read(body, "$.measurements[?(@.statistic=='" + statistic + "')].value"); + return values.isEmpty() ? 0.0 : values.get(0); + } catch (Exception e) { + return 0.0; + } + } + + /** Returns all metric names exposed by the actuator. */ + public static List metricNames(TestRestTemplate restTemplate) { + String body = restTemplate.getForObject("/actuator/metrics", String.class); + return JsonPath.read(body, "$.names"); + } + + /** Returns the raw Prometheus scrape text from {@code /actuator/prometheus}. */ + public static String prometheusBody(TestRestTemplate restTemplate) { + return restTemplate.getForObject("/actuator/prometheus", String.class); + } + + /** + * Executes {@code fn} then verifies each metric increased by the expected amount. + * Polls via Awaitility to handle async propagation. + */ + @SafeVarargs + public static void springMetricsIncrease( + TestRestTemplate restTemplate, + Runnable fn, + Map.Entry... metricIncreases) { + Map increaseMap = Arrays.stream(metricIncreases) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + + Map before = increaseMap.keySet().stream() + .collect(Collectors.toMap( + m -> m, + m -> metricValue(restTemplate, m.name(), m.statisticName(), m.tagsArray()))); + + fn.run(); + + await().atMost(1, TimeUnit.MINUTES).untilAsserted(() -> + SoftAssertions.assertSoftly(softly -> + before.forEach((metric, originalValue) -> + softly.assertThat(metricValue(restTemplate, metric.name(), metric.statisticName(), metric.tagsArray())) + .as("Expecting metric %s to have increased by %s. Original value was %s", + metric.name(), increaseMap.get(metric), originalValue) + .isEqualTo(originalValue + increaseMap.get(metric))))); + } +} diff --git a/pom.xml b/pom.xml index 75f7813..83ae028 100644 --- a/pom.xml +++ b/pom.xml @@ -36,6 +36,7 @@ 3.13.0 4.9.8.2 + 8.2.0 3.4.13 3.2.4 3.4.3