From 1a83c9604c79050aecd5dfe20de98541a55a4b2f Mon Sep 17 00:00:00 2001 From: Lari Hotari Date: Tue, 18 Nov 2025 10:13:01 +0200 Subject: [PATCH 01/12] [improve][fn] Upgrade Kubernetes Java client to 23.0.0 --- pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index d9486cc281892..bc3757de74869 100644 --- a/pom.xml +++ b/pom.xml @@ -283,7 +283,7 @@ flexible messaging model and an intuitive client API. 2.3.3 2.0.2 5.12.1 - 18.0.0 + 23.0.0 0.9.4 5.3.1 From 5ef412d4dd645ec9a79632f1140bfff291d060cd Mon Sep 17 00:00:00 2001 From: Lari Hotari Date: Tue, 18 Nov 2025 10:42:58 +0200 Subject: [PATCH 02/12] Adapt usage of Kubernetes Java client to newer API --- .../broker/authentication/oidc/JwksCache.java | 2 +- .../oidc/OpenIDProviderMetadataCache.java | 2 +- ...ticationProviderOpenIDIntegrationTest.java | 6 +- .../KubernetesSecretsTokenAuthProvider.java | 19 ++-- .../runtime/kubernetes/KubernetesRuntime.java | 100 +++++------------- .../kubernetes/KubernetesRuntimeFactory.java | 6 +- ...ubernetesSecretsTokenAuthProviderTest.java | 11 +- .../KubernetesRuntimeFactoryTest.java | 10 +- .../kubernetes/KubernetesRuntimeTest.java | 47 ++++---- 9 files changed, 78 insertions(+), 125 deletions(-) diff --git a/pulsar-broker-auth-oidc/src/main/java/org/apache/pulsar/broker/authentication/oidc/JwksCache.java b/pulsar-broker-auth-oidc/src/main/java/org/apache/pulsar/broker/authentication/oidc/JwksCache.java index 71070471d146e..44a17131d772a 100644 --- a/pulsar-broker-auth-oidc/src/main/java/org/apache/pulsar/broker/authentication/oidc/JwksCache.java +++ b/pulsar-broker-auth-oidc/src/main/java/org/apache/pulsar/broker/authentication/oidc/JwksCache.java @@ -169,7 +169,7 @@ CompletableFuture getJwkFromKubernetesApiServer(String keyId) { private CompletableFuture> getJwksFromKubernetesApiServer() { CompletableFuture> future = new CompletableFuture<>(); try { - openidApi.getServiceAccountIssuerOpenIDKeysetAsync(new ApiCallback() { + openidApi.getServiceAccountIssuerOpenIDKeyset().executeAsync(new ApiCallback() { @Override public void onFailure(ApiException e, int statusCode, Map> responseHeaders) { authenticationProvider.incrementFailureMetric(ERROR_RETRIEVING_PUBLIC_KEY); diff --git a/pulsar-broker-auth-oidc/src/main/java/org/apache/pulsar/broker/authentication/oidc/OpenIDProviderMetadataCache.java b/pulsar-broker-auth-oidc/src/main/java/org/apache/pulsar/broker/authentication/oidc/OpenIDProviderMetadataCache.java index 66e71c5306f8d..2f715dbaf7f54 100644 --- a/pulsar-broker-auth-oidc/src/main/java/org/apache/pulsar/broker/authentication/oidc/OpenIDProviderMetadataCache.java +++ b/pulsar-broker-auth-oidc/src/main/java/org/apache/pulsar/broker/authentication/oidc/OpenIDProviderMetadataCache.java @@ -168,7 +168,7 @@ CompletableFuture getOpenIDProviderMetadataForKubernetes private CompletableFuture loadOpenIDProviderMetadataForKubernetesApiServer() { CompletableFuture future = new CompletableFuture<>(); try { - wellKnownApi.getServiceAccountIssuerOpenIDConfigurationAsync(new ApiCallback<>() { + wellKnownApi.getServiceAccountIssuerOpenIDConfiguration().executeAsync(new ApiCallback<>() { @Override public void onFailure(ApiException e, int statusCode, Map> responseHeaders) { authenticationProvider.incrementFailureMetric(ERROR_RETRIEVING_PROVIDER_METADATA); diff --git a/pulsar-broker-auth-oidc/src/test/java/org/apache/pulsar/broker/authentication/oidc/AuthenticationProviderOpenIDIntegrationTest.java b/pulsar-broker-auth-oidc/src/test/java/org/apache/pulsar/broker/authentication/oidc/AuthenticationProviderOpenIDIntegrationTest.java index dd313cdb73a89..1eede7059a441 100644 --- a/pulsar-broker-auth-oidc/src/test/java/org/apache/pulsar/broker/authentication/oidc/AuthenticationProviderOpenIDIntegrationTest.java +++ b/pulsar-broker-auth-oidc/src/test/java/org/apache/pulsar/broker/authentication/oidc/AuthenticationProviderOpenIDIntegrationTest.java @@ -117,12 +117,10 @@ void beforeClass() throws IOException { """.replace("%s", server.baseUrl())))); // Set up a correct openid-configuration that the k8s integration test can use - // NOTE: integration tests revealed that the k8s client adds a trailing slash to the openid-configuration - // endpoint. // NOTE: the jwks_uri is ignored, so we supply one that would fail here to ensure that we are not implicitly // relying on the jwks_uri. server.stubFor( - get(urlEqualTo("/k8s/.well-known/openid-configuration/")) + get(urlEqualTo("/k8s/.well-known/openid-configuration")) .willReturn(aResponse() .withHeader("Content-Type", "application/json") .withBody(""" @@ -170,7 +168,7 @@ void beforeClass() throws IOException { // Set up JWKS endpoint with a valid and an invalid public key // The url matches are for both the normal and the k8s endpoints server.stubFor( - get(urlMatching("/keys|/k8s/openid/v1/jwks/")) + get(urlMatching("/keys|/k8s/openid/v1/jwks")) .willReturn(aResponse() .withHeader("Content-Type", "application/json") .withBody( diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/auth/KubernetesSecretsTokenAuthProvider.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/auth/KubernetesSecretsTokenAuthProvider.java index 913171aca9bc8..4e300aad3b56a 100644 --- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/auth/KubernetesSecretsTokenAuthProvider.java +++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/auth/KubernetesSecretsTokenAuthProvider.java @@ -179,9 +179,10 @@ public void cleanUpAuthData(Function.FunctionDetails funcDetails, Optional { try { - coreClient.readNamespacedSecret(secretName, kubeNamespace, null); - + coreClient.readNamespacedSecret(secretName, kubeNamespace).execute(); } catch (ApiException e) { // statefulset is gone if (e.getCode() == HTTP_NOT_FOUND) { @@ -304,15 +304,12 @@ private void upsertSecret(String token, Function.FunctionDetails funcDetails, St .data(buildSecretMap(token)); try { - coreClient.createNamespacedSecret(kubeNamespace, v1Secret, null, null, null, null); + coreClient.createNamespacedSecret(kubeNamespace, v1Secret).execute(); } catch (ApiException e) { if (e.getCode() == HTTP_CONFLICT) { try { - coreClient - .replaceNamespacedSecret(secretName, kubeNamespace, v1Secret, - null, null, null, null); + coreClient.replaceNamespacedSecret(secretName, kubeNamespace, v1Secret).execute(); return Actions.ActionResult.builder().success(true).build(); - } catch (ApiException e1) { String errorMsg = e.getResponseBody() != null ? e.getResponseBody() : e.getMessage(); return Actions.ActionResult.builder() @@ -366,7 +363,7 @@ private String createSecret(String token, Function.FunctionDetails funcDetails) .metadata(new V1ObjectMeta().name(getSecretName(id))) .data(buildSecretMap(token)); try { - coreClient.createNamespacedSecret(kubeNamespace, v1Secret, null, null, null, null); + coreClient.createNamespacedSecret(kubeNamespace, v1Secret).execute(); } catch (ApiException e) { // already exists if (e.getCode() == HTTP_CONFLICT) { diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntime.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntime.java index 7a69b822cbd89..80d1a7af89cd4 100644 --- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntime.java +++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntime.java @@ -40,7 +40,6 @@ import io.kubernetes.client.openapi.apis.CoreV1Api; import io.kubernetes.client.openapi.models.V1Container; import io.kubernetes.client.openapi.models.V1ContainerPort; -import io.kubernetes.client.openapi.models.V1DeleteOptions; import io.kubernetes.client.openapi.models.V1EnvVar; import io.kubernetes.client.openapi.models.V1EnvVarSource; import io.kubernetes.client.openapi.models.V1LabelSelector; @@ -72,7 +71,6 @@ import java.util.regex.Pattern; import lombok.Getter; import lombok.extern.slf4j.Slf4j; -import okhttp3.Response; import org.apache.commons.codec.digest.DigestUtils; import org.apache.pulsar.functions.auth.KubernetesFunctionAuthProvider; import org.apache.pulsar.functions.instance.AuthenticationConfig; @@ -470,9 +468,8 @@ private void submitService() throws Exception { .numRetries(KubernetesRuntimeFactory.numRetries) .sleepBetweenInvocationsMs(KubernetesRuntimeFactory.sleepBetweenRetriesMs) .supplier(() -> { - final V1Service response; try { - response = coreClient.createNamespacedService(jobNamespace, service, null, null, null, null); + coreClient.createNamespacedService(jobNamespace, service).execute(); } catch (ApiException e) { // already exists if (e.getCode() == HTTP_CONFLICT) { @@ -495,7 +492,7 @@ private void submitService() throws Exception { AtomicBoolean success = new AtomicBoolean(false); Actions.newBuilder() .addAction(createService.toBuilder() - .onSuccess((ignored) -> success.set(true)) + .onSuccess(ignored -> success.set(true)) .build()) .run(); @@ -559,10 +556,8 @@ private void submitStatefulSet() throws Exception { .numRetries(KubernetesRuntimeFactory.numRetries) .sleepBetweenInvocationsMs(KubernetesRuntimeFactory.sleepBetweenRetriesMs) .supplier(() -> { - final V1StatefulSet response; try { - response = appsClient.createNamespacedStatefulSet(jobNamespace, statefulSet, - null, null, null, null); + appsClient.createNamespacedStatefulSet(jobNamespace, statefulSet).execute(); } catch (ApiException e) { // already exists if (e.getCode() == HTTP_CONFLICT) { @@ -585,7 +580,7 @@ private void submitStatefulSet() throws Exception { AtomicBoolean success = new AtomicBoolean(false); Actions.newBuilder() .addAction(createStatefulSet.toBuilder() - .onSuccess((ignored) -> success.set(true)) + .onSuccess(ignored -> success.set(true)) .build()) .run(); @@ -597,9 +592,6 @@ private void submitStatefulSet() throws Exception { public void deleteStatefulSet() throws InterruptedException { String statefulSetName = createJobName(instanceConfig.getFunctionDetails(), this.jobName); - final V1DeleteOptions options = new V1DeleteOptions(); - options.setGracePeriodSeconds((long) gracePeriodSeconds); - options.setPropagationPolicy("Foreground"); String fqfn = FunctionCommon.getFullyQualifiedName(instanceConfig.getFunctionDetails()); Actions.Action deleteStatefulSet = Actions.Action.builder() @@ -607,15 +599,12 @@ public void deleteStatefulSet() throws InterruptedException { .numRetries(KubernetesRuntimeFactory.numRetries) .sleepBetweenInvocationsMs(KubernetesRuntimeFactory.sleepBetweenRetriesMs) .supplier(() -> { - Response response; try { - // cannot use deleteNamespacedStatefulSet because of bug in kuberenetes - // https://github.com/kubernetes-client/java/issues/86 - response = appsClient.deleteNamespacedStatefulSetCall( + appsClient.deleteNamespacedStatefulSet( statefulSetName, - jobNamespace, null, null, - gracePeriodSeconds, null, "Foreground", - options, null) + jobNamespace) + .gracePeriodSeconds(gracePeriodSeconds) + .propagationPolicy("Foreground") .execute(); } catch (ApiException e) { // if already deleted @@ -623,29 +612,15 @@ public void deleteStatefulSet() throws InterruptedException { log.warn("Statefulset for function {} does not exist", fqfn); return Actions.ActionResult.builder().success(true).build(); } - String errorMsg = e.getResponseBody() != null ? e.getResponseBody() : e.getMessage(); return Actions.ActionResult.builder() .success(false) .errorMsg(errorMsg) .build(); - } catch (IOException e) { - return Actions.ActionResult.builder() - .success(false) - .errorMsg(e.getMessage()) - .build(); - } - - // if already deleted - if (response.code() == HTTP_NOT_FOUND) { - log.warn("Statefulset for function {} does not exist", fqfn); - return Actions.ActionResult.builder().success(true).build(); - } else { - return Actions.ActionResult.builder() - .success(response.isSuccessful()) - .errorMsg(response.message()) - .build(); } + return Actions.ActionResult.builder() + .success(true) + .build(); }) .build(); @@ -658,7 +633,7 @@ public void deleteStatefulSet() throws InterruptedException { .supplier(() -> { V1StatefulSet response; try { - response = appsClient.readNamespacedStatefulSet(statefulSetName, jobNamespace, null); + response = appsClient.readNamespacedStatefulSet(statefulSetName, jobNamespace).execute(); } catch (ApiException e) { // statefulset is gone if (e.getCode() == HTTP_NOT_FOUND) { @@ -692,11 +667,8 @@ public void deleteStatefulSet() throws InterruptedException { V1PodList response; try { - response = coreClient.listNamespacedPod(jobNamespace, null, null, - null, null, labels, - null, null, null, null, null); + response = coreClient.listNamespacedPod(jobNamespace).labelSelector(labels).execute(); } catch (ApiException e) { - String errorMsg = e.getResponseBody() != null ? e.getResponseBody() : e.getMessage(); return Actions.ActionResult.builder() .success(false) @@ -725,13 +697,13 @@ public void deleteStatefulSet() throws InterruptedException { .build()) .addAction(waitForStatefulSetDeletion.toBuilder() .continueOn(false) - .onSuccess((ignored) -> success.set(true)) + .onSuccess(ignored -> success.set(true)) .build()) .addAction(deleteStatefulSet.toBuilder() .continueOn(true) .build()) .addAction(waitForStatefulSetDeletion.toBuilder() - .onSuccess((ignored) -> success.set(true)) + .onSuccess(ignored -> success.set(true)) .build()) .run(); @@ -746,10 +718,6 @@ public void deleteStatefulSet() throws InterruptedException { } public void deleteService() throws InterruptedException { - - final V1DeleteOptions options = new V1DeleteOptions(); - options.setGracePeriodSeconds(0L); - options.setPropagationPolicy("Foreground"); String fqfn = FunctionCommon.getFullyQualifiedName(instanceConfig.getFunctionDetails()); String serviceName = createJobName(instanceConfig.getFunctionDetails(), this.jobName); @@ -758,15 +726,11 @@ public void deleteService() throws InterruptedException { .numRetries(KubernetesRuntimeFactory.numRetries) .sleepBetweenInvocationsMs(KubernetesRuntimeFactory.sleepBetweenRetriesMs) .supplier(() -> { - final Response response; try { - // cannot use deleteNamespacedService because of bug in kuberenetes - // https://github.com/kubernetes-client/java/issues/86 - response = coreClient.deleteNamespacedServiceCall( - serviceName, - jobNamespace, null, null, - 0, null, - "Foreground", options, null).execute(); + coreClient.deleteNamespacedService(serviceName, jobNamespace) + .gracePeriodSeconds(0) + .propagationPolicy("Foreground") + .execute(); } catch (ApiException e) { // if already deleted if (e.getCode() == HTTP_NOT_FOUND) { @@ -779,23 +743,9 @@ public void deleteService() throws InterruptedException { .success(false) .errorMsg(errorMsg) .build(); - } catch (IOException e) { - return Actions.ActionResult.builder() - .success(false) - .errorMsg(e.getMessage()) - .build(); } - // if already deleted - if (response.code() == HTTP_NOT_FOUND) { - log.warn("Service for function {} does not exist", fqfn); - return Actions.ActionResult.builder().success(true).build(); - } else { - return Actions.ActionResult.builder() - .success(response.isSuccessful()) - .errorMsg(response.message()) - .build(); - } + return Actions.ActionResult.builder().success(true).build(); }) .build(); @@ -806,7 +756,7 @@ public void deleteService() throws InterruptedException { .supplier(() -> { V1Service response; try { - response = coreClient.readNamespacedService(serviceName, jobNamespace, null); + response = coreClient.readNamespacedService(serviceName, jobNamespace).execute(); } catch (ApiException e) { // service is gone @@ -833,13 +783,13 @@ public void deleteService() throws InterruptedException { .build()) .addAction(waitForServiceDeletion.toBuilder() .continueOn(false) - .onSuccess((ignored) -> success.set(true)) + .onSuccess(ignored -> success.set(true)) .build()) .addAction(deleteService.toBuilder() .continueOn(true) .build()) .addAction(waitForServiceDeletion.toBuilder() - .onSuccess((ignored) -> success.set(true)) + .onSuccess(ignored -> success.set(true)) .build()) .run(); @@ -971,7 +921,7 @@ V1StatefulSet createStatefulSet() { // let the customizer run but ensure it doesn't change the name so we can find it again final V1StatefulSet overridden = manifestCustomizer - .map((customizer) -> customizer.customizeStatefulSet(instanceConfig.getFunctionDetails(), statefulSet)) + .map(customizer -> customizer.customizeStatefulSet(instanceConfig.getFunctionDetails(), statefulSet)) .orElse(statefulSet); overridden.getMetadata().name(jobName); @@ -1039,7 +989,7 @@ private V1PodSpec getPodSpec(List instanceCommand, Function.Resources re return podSpec; } - private List getTolerations() { + private static List getTolerations() { final List tolerations = new ArrayList<>(); TOLERATIONS.forEach(t -> { final V1Toleration toleration = diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeFactory.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeFactory.java index bbb6e3992a018..3f0a2ae9cc351 100644 --- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeFactory.java +++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeFactory.java @@ -410,7 +410,7 @@ static void fetchConfigMap(CoreV1Api coreClient, String changeConfigMap, KubernetesRuntimeFactory kubernetesRuntimeFactory) { try { V1ConfigMap v1ConfigMap = - coreClient.readNamespacedConfigMap(changeConfigMap, changeConfigMapNamespace, null); + coreClient.readNamespacedConfigMap(changeConfigMap, changeConfigMapNamespace).execute(); Map data = v1ConfigMap.getData(); if (data != null) { overRideKubernetesConfig(data, kubernetesRuntimeFactory); @@ -545,12 +545,12 @@ public Optional getRuntimeCustomizer() { private String getOverriddenNamespace(Function.FunctionDetails funcDetails) { Optional manifestCustomizer = getRuntimeCustomizer(); - return manifestCustomizer.map((customizer) -> customizer.customizeNamespace(funcDetails, jobNamespace)) + return manifestCustomizer.map(customizer -> customizer.customizeNamespace(funcDetails, jobNamespace)) .orElse(jobNamespace); } private String getOverriddenName(Function.FunctionDetails funcDetails) { Optional manifestCustomizer = getRuntimeCustomizer(); - return manifestCustomizer.map((customizer) -> customizer.customizeName(funcDetails, jobName)).orElse(jobName); + return manifestCustomizer.map(customizer -> customizer.customizeName(funcDetails, jobName)).orElse(jobName); } } diff --git a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/auth/KubernetesSecretsTokenAuthProviderTest.java b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/auth/KubernetesSecretsTokenAuthProviderTest.java index be5e2cdc481c5..f6c6fc4c7bd8c 100644 --- a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/auth/KubernetesSecretsTokenAuthProviderTest.java +++ b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/auth/KubernetesSecretsTokenAuthProviderTest.java @@ -22,6 +22,7 @@ import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; import io.kubernetes.client.openapi.ApiException; import io.kubernetes.client.openapi.apis.CoreV1Api; import io.kubernetes.client.openapi.models.V1Container; @@ -112,8 +113,9 @@ public void testConfigureAuthDataStatefulSetNoCa() { @Test public void testCacheAuthData() throws ApiException { CoreV1Api coreV1Api = mock(CoreV1Api.class); - doReturn(new V1Secret()).when(coreV1Api).createNamespacedSecret(anyString(), - any(), anyString(), anyString(), anyString(), anyString()); + CoreV1Api.APIcreateNamespacedSecretRequest request = mock(CoreV1Api.APIcreateNamespacedSecretRequest.class); + doReturn(request).when(coreV1Api).createNamespacedSecret(anyString(), any()); + doReturn(new V1Secret()).when(request).execute(); KubernetesSecretsTokenAuthProvider kubernetesSecretsTokenAuthProvider = new KubernetesSecretsTokenAuthProvider(); kubernetesSecretsTokenAuthProvider.initialize(coreV1Api, null, (fd) -> "default"); @@ -180,6 +182,11 @@ public void testUpdateAuthData() throws Exception { Optional existingFunctionAuthData = Optional.empty(); Function.FunctionDetails funcDetails = Function.FunctionDetails.newBuilder().setTenant("test-tenant") .setNamespace("test-ns").setName("test-func").build(); + + CoreV1Api.APIcreateNamespacedSecretRequest namespacedSecretRequest = mock(); + when(coreV1Api.createNamespacedSecret(anyString(), any())).thenReturn(namespacedSecretRequest); + when(namespacedSecretRequest.execute()).thenReturn(new V1Secret()); + Optional functionAuthData = kubernetesSecretsTokenAuthProvider.updateAuthData(funcDetails, existingFunctionAuthData, new AuthenticationDataSource() { @Override diff --git a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeFactoryTest.java b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeFactoryTest.java index 08afca93cc46a..659398ed48250 100644 --- a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeFactoryTest.java +++ b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeFactoryTest.java @@ -21,6 +21,8 @@ import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; import static org.testng.Assert.assertEquals; import static org.testng.Assert.fail; @@ -512,11 +514,13 @@ public void testDynamicConfigMapLoading() throws Exception { KubernetesRuntimeFactory kubernetesRuntimeFactory = getKuberentesRuntimeFactory(); CoreV1Api coreV1Api = Mockito.mock(CoreV1Api.class); V1ConfigMap v1ConfigMap = new V1ConfigMap(); - Mockito.doReturn(v1ConfigMap).when(coreV1Api).readNamespacedConfigMap(any(), any(), any()); + CoreV1Api.APIreadNamespacedConfigMapRequest request = mock(CoreV1Api.APIreadNamespacedConfigMapRequest.class); + Mockito.doReturn(request).when(coreV1Api).readNamespacedConfigMap(any(), any()); + doReturn(v1ConfigMap).when(request).execute(); KubernetesRuntimeFactory.fetchConfigMap(coreV1Api, changeConfigMap, changeConfigNamespace, kubernetesRuntimeFactory); Mockito.verify(coreV1Api, Mockito.times(1)).readNamespacedConfigMap( - eq(changeConfigMap), eq(changeConfigNamespace), eq(null)); + eq(changeConfigMap), eq(changeConfigNamespace)); KubernetesRuntimeFactory expected = getKuberentesRuntimeFactory(); assertEquals(kubernetesRuntimeFactory, expected); @@ -527,7 +531,7 @@ public void testDynamicConfigMapLoading() throws Exception { KubernetesRuntimeFactory.fetchConfigMap(coreV1Api, changeConfigMap, changeConfigNamespace, kubernetesRuntimeFactory); Mockito.verify(coreV1Api, Mockito.times(2)).readNamespacedConfigMap( - eq(changeConfigMap), eq(changeConfigNamespace), eq(null)); + eq(changeConfigMap), eq(changeConfigNamespace)); assertEquals(kubernetesRuntimeFactory.getPulsarDockerImageName(), "test_dockerImage2"); assertEquals(kubernetesRuntimeFactory.getImagePullPolicy(), "test_imagePullPolicy2"); diff --git a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeTest.java b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeTest.java index f8069efe299cb..e3271e98e279c 100644 --- a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeTest.java +++ b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeTest.java @@ -21,12 +21,9 @@ import static org.apache.pulsar.functions.runtime.RuntimeUtils.FUNCTIONS_INSTANCE_CLASSPATH; import static org.apache.pulsar.functions.utils.FunctionCommon.roundDecimal; import static org.assertj.core.api.Assertions.assertThat; -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.ArgumentMatchers.argThat; import static org.mockito.ArgumentMatchers.eq; -import static org.mockito.ArgumentMatchers.isNull; import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; @@ -53,6 +50,7 @@ import io.kubernetes.client.openapi.models.V1ResourceRequirements; import io.kubernetes.client.openapi.models.V1Service; import io.kubernetes.client.openapi.models.V1StatefulSet; +import io.kubernetes.client.openapi.models.V1Status; import io.kubernetes.client.openapi.models.V1Toleration; import java.lang.reflect.Type; import java.math.BigDecimal; @@ -64,8 +62,6 @@ import java.util.Optional; import java.util.function.Consumer; import java.util.stream.Collectors; -import okhttp3.Call; -import okhttp3.Response; import org.apache.commons.lang3.JavaVersion; import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.SystemUtils; @@ -1385,13 +1381,6 @@ public void testDeleteStatefulSetWithTranslatedKubernetesLabelChars() throws Exc CoreV1Api coreApi = mock(CoreV1Api.class); AppsV1Api appsApi = mock(AppsV1Api.class); - Call successfulCall = mock(Call.class); - Response okResponse = mock(Response.class); - when(okResponse.code()).thenReturn(HttpURLConnection.HTTP_OK); - when(okResponse.isSuccessful()).thenReturn(true); - when(okResponse.message()).thenReturn(""); - when(successfulCall.execute()).thenReturn(okResponse); - final String expectedFunctionNamePrefix = String.format("pf-%s-%s-%s", "c-tenant", "c-ns", "c-fn"); factory = createKubernetesRuntimeFactory(null, 10, 1.0, 1.0); @@ -1400,33 +1389,41 @@ public void testDeleteStatefulSetWithTranslatedKubernetesLabelChars() throws Exc ArgumentMatcher hasTranslatedFunctionName = (String t) -> t.startsWith(expectedFunctionNamePrefix); - when(appsApi.deleteNamespacedStatefulSetCall( + AppsV1Api.APIdeleteNamespacedStatefulSetRequest request = + mock(AppsV1Api.APIdeleteNamespacedStatefulSetRequest.class); + when(appsApi.deleteNamespacedStatefulSet( + argThat(hasTranslatedFunctionName), + anyString())).thenReturn(request); + when(request.execute()).thenReturn(new V1Status()); + + AppsV1Api.APIreadNamespacedStatefulSetRequest request2 = + mock(AppsV1Api.APIreadNamespacedStatefulSetRequest.class); + when(appsApi.readNamespacedStatefulSet( argThat(hasTranslatedFunctionName), - anyString(), isNull(), isNull(), anyInt(), isNull(), - anyString(), any(), isNull())).thenReturn(successfulCall); + anyString())).thenReturn(request2); ApiException notFoundException = mock(ApiException.class); when(notFoundException.getCode()).thenReturn(HttpURLConnection.HTTP_NOT_FOUND); - when(appsApi.readNamespacedStatefulSet( - argThat(hasTranslatedFunctionName), anyString(), isNull())).thenThrow(notFoundException); + + when(request2.execute()).thenThrow(notFoundException); V1PodList podList = mock(V1PodList.class); when(podList.getItems()).thenReturn(Collections.emptyList()); String expectedLabels = String.format("tenant=%s,namespace=%s,name=%s", "c-tenant", "c-ns", "c-fn"); - when(coreApi.listNamespacedPod(anyString(), isNull(), isNull(), - isNull(), isNull(), - eq(expectedLabels), isNull(), isNull(), isNull(), - isNull(), isNull())).thenReturn(podList); + + CoreV1Api.APIlistNamespacedPodRequest listNamespacedPodRequest = mock(); + when(coreApi.listNamespacedPod(anyString())).thenReturn(listNamespacedPodRequest); + when(listNamespacedPodRequest.labelSelector(anyString())).thenReturn(listNamespacedPodRequest); + when(listNamespacedPodRequest.execute()).thenReturn(podList); + KubernetesRuntime kr = factory.createContainer(config, "/test/code", "code.yml", "/test/transforms", "transform.yml", Long.MIN_VALUE); kr.deleteStatefulSet(); - verify(coreApi).listNamespacedPod(anyString(), isNull(), isNull(), - isNull(), isNull(), - eq(expectedLabels), isNull(), isNull(), isNull(), - isNull(), isNull()); + verify(coreApi).listNamespacedPod(anyString()); + verify(listNamespacedPodRequest).labelSelector(eq(expectedLabels)); } @Test From d9c5738964d7e46f49b20fbe93f05f5de0c6e433 Mon Sep 17 00:00:00 2001 From: Lari Hotari Date: Tue, 18 Nov 2025 14:51:42 +0200 Subject: [PATCH 03/12] Add integration test for Pulsar Functions in Kubernetes --- .github/workflows/pulsar-ci.yaml | 3 + build/build_java_test_image.sh | 2 +- build/run_integration_group.sh | 4 + tests/integration/pom.xml | 63 ++++ .../functions/k8s/PulsarFunctionsK8STest.java | 149 ++++++++ .../k8s/AbstractPulsarStandaloneK8STest.java | 337 ++++++++++++++++++ .../src/test/resources/pulsar-k8s.xml | 28 ++ 7 files changed, 585 insertions(+), 1 deletion(-) create mode 100644 tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/k8s/PulsarFunctionsK8STest.java create mode 100644 tests/integration/src/test/java/org/apache/pulsar/tests/integration/k8s/AbstractPulsarStandaloneK8STest.java create mode 100644 tests/integration/src/test/resources/pulsar-k8s.xml diff --git a/.github/workflows/pulsar-ci.yaml b/.github/workflows/pulsar-ci.yaml index 5443f7e4bf570..3e3f706e2833f 100644 --- a/.github/workflows/pulsar-ci.yaml +++ b/.github/workflows/pulsar-ci.yaml @@ -683,6 +683,9 @@ jobs: - name: Upgrade group: UPGRADE + - name: Kubernetes + group: PULSAR_K8S + steps: - name: checkout uses: actions/checkout@v4 diff --git a/build/build_java_test_image.sh b/build/build_java_test_image.sh index 51257c992dc87..bab7afd7d878b 100755 --- a/build/build_java_test_image.sh +++ b/build/build_java_test_image.sh @@ -21,5 +21,5 @@ SCRIPT_DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" >/dev/null 2>&1 && pwd )" cd "$SCRIPT_DIR/.." mvn -am -pl tests/docker-images/java-test-image -Pcore-modules,-main,integrationTests,docker \ - -Dmaven.test.skip=true -DskipSourceReleaseAssembly=true -Dspotbugs.skip=true -Dcheckstyle.skip=true -Dlicense.skip=true \ + -DskipTests -DskipSourceReleaseAssembly=true -Dspotbugs.skip=true -Dcheckstyle.skip=true -Dlicense.skip=true \ "$@" install \ No newline at end of file diff --git a/build/run_integration_group.sh b/build/run_integration_group.sh index 40483b84f2b21..419fe5c940112 100755 --- a/build/run_integration_group.sh +++ b/build/run_integration_group.sh @@ -207,6 +207,10 @@ test_group_sql() { mvn_run_integration_test "$@" -DintegrationTestSuiteFile=pulsar-sql.xml -DintegrationTests -DtestForkCount=1 -DtestReuseFork=false } +test_group_pulsar_k8s() { + mvn_run_integration_test "$@" -DintegrationTestSuiteFile=pulsar-k8s.xml -DintegrationTests +} + test_group_pulsar_io() { mvn_run_integration_test "$@" -DintegrationTestSuiteFile=pulsar-io-sources.xml -DintegrationTests -Dgroups=source mvn_run_integration_test --skip-build-deps "$@" -DintegrationTestSuiteFile=pulsar-io-sinks.xml -DintegrationTests -Dgroups=sink diff --git a/tests/integration/pom.xml b/tests/integration/pom.xml index fdad69444bb90..3f3c07d70c0d7 100644 --- a/tests/integration/pom.xml +++ b/tests/integration/pom.xml @@ -247,6 +247,69 @@ test + + org.testcontainers + k3s + test + + + + io.kubernetes + client-java + ${kubernetesclient.version} + test + + + io.prometheus + simpleclient_httpserver + + + bcpkix-jdk18on + org.bouncycastle + + + bcutil-jdk18on + org.bouncycastle + + + bcprov-jdk18on + org.bouncycastle + + + javax.annotation + javax.annotation-api + + + + + io.kubernetes + client-java-api-fluent + ${kubernetesclient.version} + test + + + io.prometheus + simpleclient_httpserver + + + bcpkix-jdk18on + org.bouncycastle + + + bcutil-jdk18on + org.bouncycastle + + + bcprov-jdk18on + org.bouncycastle + + + javax.annotation + javax.annotation-api + + + + diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/k8s/PulsarFunctionsK8STest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/k8s/PulsarFunctionsK8STest.java new file mode 100644 index 0000000000000..82edaf7739bf8 --- /dev/null +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/k8s/PulsarFunctionsK8STest.java @@ -0,0 +1,149 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.tests.integration.functions.k8s; + +import static org.assertj.core.api.Assertions.assertThat; +import io.kubernetes.client.Exec; +import io.kubernetes.client.openapi.ApiException; +import io.kubernetes.client.openapi.apis.CoreV1Api; +import io.kubernetes.client.openapi.models.V1Secret; +import io.kubernetes.client.openapi.models.V1SecretBuilder; +import java.io.IOException; +import java.time.Duration; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.TimeUnit; +import lombok.Cleanup; +import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.client.admin.PulsarAdmin; +import org.apache.pulsar.client.admin.PulsarAdminException; +import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.common.functions.FunctionConfig; +import org.apache.pulsar.common.policies.data.FunctionStatus; +import org.apache.pulsar.tests.integration.functions.java.PulsarFunctionsJavaTest; +import org.apache.pulsar.tests.integration.functions.utils.CommandGenerator; +import org.apache.pulsar.tests.integration.k8s.AbstractPulsarStandaloneK8STest; +import org.awaitility.Awaitility; +import org.testng.annotations.Test; + +/** + * This class is an integration test for Pulsar Functions running in Kubernetes. + * This tests {@link org.apache.pulsar.functions.runtime.kubernetes.KubernetesRuntimeFactory}, + * {@link org.apache.pulsar.functions.runtime.kubernetes.KubernetesRuntime}, + * and {@link org.apache.pulsar.functions.secretsproviderconfigurator.KubernetesSecretsProviderConfigurator} classes + * in a lightweight Kubernetes cluster which is provided by a k3s container running in Docker with Testcontainers. + */ +@Slf4j +public class PulsarFunctionsK8STest extends AbstractPulsarStandaloneK8STest { + @Test + public void testCreateFunctionInK8sWithSecrets() + throws PulsarAdminException, IOException, InterruptedException, ApiException { + @Cleanup + PulsarAdmin admin = PulsarAdmin.builder().serviceHttpUrl(getPulsarWebServiceUrl()) + .build(); + + String randomPart = UUID.randomUUID().toString(); + String inputTopicName = + "persistent://public/default/test-function-input-" + randomPart; + String outputTopicName = "persistent://public/default/test-function-output-" + randomPart; + + // create a sample secret + try { + CoreV1Api api = new CoreV1Api(getApiClient()); + V1Secret secret = new V1SecretBuilder() + .withNewMetadata() + .withName("mysecret") + .endMetadata() + .addToStringData("secret1", "value1") + .addToStringData("secret2", "value2").build(); + api.createNamespacedSecret(getNamespace(), secret).execute(); + } catch (ApiException e) { + // ignore if the secret already exists since this could happen due to retries + if (e.getCode() != 409) { + throw e; + } + } + + FunctionConfig functionConfig = new FunctionConfig(); + functionConfig.setTenant("public"); + functionConfig.setNamespace("default"); + functionConfig.setName("test-function"); + functionConfig.setRuntime(FunctionConfig.Runtime.JAVA); + functionConfig.setClassName(PulsarFunctionsJavaTest.EXCLAMATION_JAVA_CLASS); + functionConfig.setJar(CommandGenerator.JAVAJAR); + functionConfig.setInputs(List.of(inputTopicName)); + functionConfig.setOutput(outputTopicName); + // test referencing k8s secrets + functionConfig.setSecrets( + Map.of("secret1", Map.of( + "path", "mysecret", + "key", "secret1"), + "secret2", Map.of( + "path", "mysecret", + "key", "secret2"))); + + log.info("Creating function"); + admin.functions().createFunctionWithUrl(functionConfig, "file://" + CommandGenerator.JAVAJAR); + + log.info("Waiting for function to be created"); + Awaitility.await().ignoreExceptions().pollDelay(Duration.ofSeconds(2)).atMost(Duration.ofSeconds(30)) + .untilAsserted(() -> { + FunctionStatus functionStatus = admin.functions().getFunctionStatus("public", "default", "test-function"); + assertThat(functionStatus.getNumInstances()).isEqualTo(1); + assertThat(functionStatus.getNumRunning()).isEqualTo(1); + }); + log.info("Function created successfully"); + + // Validate that k8s secrets were provided as environment variables to the function pod + String podName = "pf-public-default-test-function-0"; + Exec exec = new Exec(getApiClient()); + Process process = exec.newExecutionBuilder(getNamespace(), podName, new String[]{"sh", "-c", + "echo \"secret1=$secret1,secret2=$secret2\""}) + .setStdin(false).setStderr(false).setStdout(true).setTty(false).execute(); + String response = process.inputReader().readLine(); + assertThat(process.waitFor()).isEqualTo(0); + assertThat(response).isEqualTo("secret1=value1,secret2=value2"); + + log.info("Testing function"); + @Cleanup + PulsarClient client = PulsarClient.builder().serviceUrl(getPulsarBrokerUrl()) + .build(); + + @Cleanup + Consumer consumer = + client.newConsumer(Schema.STRING).topic(outputTopicName).subscriptionName("sub").subscribe(); + + @Cleanup + Producer producer = client.newProducer(Schema.STRING).topic(inputTopicName).create(); + producer.send("Hello"); + + Message message = consumer.receive(5, TimeUnit.SECONDS); + assertThat(message).isNotNull(); + assertThat(message.getValue()).isEqualTo("Hello!"); + + log.info("Waiting for function to be deleted"); + admin.functions().deleteFunction("public", "default", "test-function"); + log.info("Function deleted successfully"); + } +} diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/k8s/AbstractPulsarStandaloneK8STest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/k8s/AbstractPulsarStandaloneK8STest.java new file mode 100644 index 0000000000000..82c667a99f5e8 --- /dev/null +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/k8s/AbstractPulsarStandaloneK8STest.java @@ -0,0 +1,337 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.tests.integration.k8s; + +import com.github.dockerjava.api.DockerClient; +import com.github.dockerjava.api.command.ExecCreateCmdResponse; +import com.github.dockerjava.core.command.ExecStartResultCallback; +import io.kubernetes.client.custom.Quantity; +import io.kubernetes.client.openapi.ApiClient; +import io.kubernetes.client.openapi.ApiException; +import io.kubernetes.client.openapi.apis.CoreV1Api; +import io.kubernetes.client.openapi.apis.RbacAuthorizationV1Api; +import io.kubernetes.client.openapi.models.RbacV1SubjectBuilder; +import io.kubernetes.client.openapi.models.V1Pod; +import io.kubernetes.client.openapi.models.V1PodBuilder; +import io.kubernetes.client.openapi.models.V1PolicyRuleBuilder; +import io.kubernetes.client.openapi.models.V1Role; +import io.kubernetes.client.openapi.models.V1RoleBinding; +import io.kubernetes.client.openapi.models.V1RoleBindingBuilder; +import io.kubernetes.client.openapi.models.V1RoleBuilder; +import io.kubernetes.client.openapi.models.V1Service; +import io.kubernetes.client.openapi.models.V1ServiceAccount; +import io.kubernetes.client.openapi.models.V1ServiceAccountBuilder; +import io.kubernetes.client.openapi.models.V1ServiceBuilder; +import io.kubernetes.client.util.Config; +import io.kubernetes.client.util.KubeConfig; +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.io.StringReader; +import java.io.UncheckedIOException; +import java.nio.file.Files; +import java.time.Duration; +import java.util.LinkedHashMap; +import java.util.Map; +import lombok.Getter; +import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.functions.runtime.kubernetes.KubernetesRuntimeFactory; +import org.apache.pulsar.functions.secretsproviderconfigurator.KubernetesSecretsProviderConfigurator; +import org.testcontainers.containers.wait.strategy.Wait; +import org.testcontainers.k3s.K3sContainer; +import org.testcontainers.utility.DockerImageName; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; + +/** + * Abstract base class for integration tests that use Kubernetes (K8s) and Pulsar standalone. + * This class sets up a Kubernetes environment using the k3s lightweight Kubernetes container + * and deploys a standalone Pulsar instance within this Kubernetes cluster. + * It provides configuration and utility methods to allow test classes to interact + * with the deployed Pulsar instance and Kubernetes cluster. + * The main reason to use this base class is to test features in Pulsar which are integrated into Kubernetes + * APIs. + */ +@Slf4j +public abstract class AbstractPulsarStandaloneK8STest { + private static final String DEFAULT_IMAGE_NAME = System.getenv().getOrDefault("PULSAR_TEST_IMAGE_NAME", + "apachepulsar/java-test-image:latest"); + private static final int PULSAR_NODE_PORT = 30101; + private static final int PULSAR_HTTP_NODE_PORT = 30102; + private static final String K3S_IMAGE_NAME = "rancher/k3s:v1.33.5-k3s1"; + K3sContainer k3sContainer; + KubeConfig kubeConfig; + @Getter + ApiClient apiClient; + DockerClient dockerClient; + String dockerHostName; + @Getter + String pulsarBrokerUrl; + @Getter + String pulsarWebServiceUrl; + @Getter + File kubeConfigFile; + + @BeforeClass + public final void setupCluster() throws IOException, ApiException, InterruptedException { + k3sContainer = new K3sContainer(DockerImageName.parse(K3S_IMAGE_NAME)); + k3sContainer.addExposedPort(PULSAR_NODE_PORT); + k3sContainer.addExposedPort(PULSAR_HTTP_NODE_PORT); + k3sContainer.start(); + dockerHostName = k3sContainer.getHost(); + pulsarBrokerUrl = "pulsar://" + dockerHostName + ":" + k3sContainer.getMappedPort(PULSAR_NODE_PORT); + pulsarWebServiceUrl = "http://" + dockerHostName + ":" + k3sContainer.getMappedPort(PULSAR_HTTP_NODE_PORT); + dockerClient = k3sContainer.getDockerClient(); + String kubeConfigYaml = k3sContainer.getKubeConfigYaml(); + kubeConfig = KubeConfig.loadKubeConfig(new StringReader(kubeConfigYaml)); + apiClient = Config.fromConfig(kubeConfig); + kubeConfigFile = File.createTempFile("kubeconfig", ".yaml"); + Files.writeString(kubeConfigFile.toPath(), kubeConfigYaml); + log.info("Pulsar broker URL: {} http URL: {}", pulsarBrokerUrl, pulsarWebServiceUrl); + log.info("For debugging k8s, use KUBECONFIG={}", kubeConfigFile.getAbsolutePath()); + importPulsarImage(); + deployPulsarStandalonePod(); + log.info("Waiting for Pulsar cluster to be ready"); + Wait.forHttp("/admin/v2/tenants") + .forPort(PULSAR_HTTP_NODE_PORT) + // when Pulsar Functions are ready, there will also be a pulsar tenant created + .forResponsePredicate("[\"public\",\"pulsar\"]"::equals) + .withStartupTimeout(Duration.ofMinutes(5)) + .waitUntilReady(k3sContainer); + log.info("Pulsar cluster ready. Waiting 3 seconds before continuing for Functions Leader to be ready."); + Thread.sleep(3000); + } + + @AfterClass(alwaysRun = true) + public final void cleanupCluster() throws InterruptedException { + if (k3sContainer != null) { + k3sContainer.stop(); + } + } + + protected String getPulsarImageName() { + return DEFAULT_IMAGE_NAME; + } + + private void deployPulsarStandalonePod() throws ApiException { + createServiceAccountAndRoleAndRoleBinding(); + CoreV1Api coreApi = new CoreV1Api(apiClient); + String namespace = getNamespace(); + V1Pod pod = createPulsarPod(); + coreApi.createNamespacedPod(namespace, pod).execute(); + V1Service service = createPulsarService(); + coreApi.createNamespacedService(namespace, service).execute(); + } + + protected String getNamespace() { + return "default"; + } + + // This method imports the local image into the k3s container's containerd registry so that it can be used + // to create pods. image pull policy should be set to Never when using the image. + private void importPulsarImage() { + // Save local image from Docker + InputStream imageStream = dockerClient.saveImageCmd(getPulsarImageName()).exec(); + + // Create exec instance with stdin + String containerId = k3sContainer.getContainerId(); + + ExecCreateCmdResponse execCreateCmdResponse = dockerClient.execCreateCmd(containerId) + .withAttachStdin(true) + .withAttachStdout(true) + .withAttachStderr(true) + .withCmd("ctr", "images", "import", "/dev/stdin") + .exec(); + + // Start exec and stream data + try (InputStream is = imageStream) { + dockerClient.execStartCmd(execCreateCmdResponse.getId()) + .withStdIn(is) + .exec(new ExecStartResultCallback(System.out, System.err)) + .awaitCompletion(); + } catch (IOException e) { + throw new UncheckedIOException(e); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + + private void createServiceAccountAndRoleAndRoleBinding() throws ApiException { + CoreV1Api coreApi = new CoreV1Api(apiClient); + V1ServiceAccount serviceAccount = new V1ServiceAccountBuilder() + .withNewMetadata() + .withName("pulsar-standalone-sa") + .endMetadata() + .build(); + coreApi.createNamespacedServiceAccount(getNamespace(), serviceAccount).execute(); + V1Role role = new V1RoleBuilder() + .withNewMetadata() + .withName("pulsar-functions-role") + .endMetadata() + .withRules( + new V1PolicyRuleBuilder() + .addToApiGroups("") + .addToResources("services", "configmaps", "pods") + .addToVerbs("*").build(), + new V1PolicyRuleBuilder() + .addToApiGroups("apps") + .addToResources("statefulsets") + .addToVerbs("*").build() + ) + .build(); + RbacAuthorizationV1Api rbacApi = new RbacAuthorizationV1Api(apiClient); + rbacApi.createNamespacedRole(getNamespace(), role).execute(); + V1RoleBinding roleBinding = new V1RoleBindingBuilder() + .withNewMetadata() + .withName("pulsar-functions-role-binding") + .endMetadata() + .withNewRoleRef() + .withKind("Role") + .withName("pulsar-functions-role") + .endRoleRef() + .withSubjects(new RbacV1SubjectBuilder() + .withKind("ServiceAccount") + .withName("pulsar-standalone-sa") + .withNamespace(getNamespace()) + .build()) + .build(); + rbacApi.createNamespacedRoleBinding(getNamespace(), roleBinding).execute(); + } + + private V1Pod createPulsarPod() { + V1PodBuilder podBuilder = new V1PodBuilder(); + var containerBuilder = podBuilder + .withNewMetadata() + .withName("pulsar-standalone-pod") + .addToLabels("app", "pulsar") + .addToLabels("component", "standalone") + .endMetadata() + .withNewSpec() + .withServiceAccountName("pulsar-standalone-sa") + .addNewContainer(); + + // Container + containerBuilder + .withName("pulsar") + .withImage(getPulsarImageName()) + .withImagePullPolicy("Never") + .withCommand("sh", "-c", + "bin/apply-config-from-env.py conf/standalone.conf && " + + "bin/gen-yml-from-env.py conf/functions_worker.yml && " + + "bin/update-rocksdb-conf-from-env.py conf/entry_location_rocksdb.conf && " + + "bin/pulsar standalone") + // Ports + .addNewPort() + .withName("pulsar") + .withContainerPort(6650) + .withProtocol("TCP") + .endPort() + .addNewPort() + .withName("http") + .withContainerPort(8080) + .withProtocol("TCP") + .endPort() + .addNewPort() + .withName("pulsar-nodeport") + .withContainerPort(16650) + .withProtocol("TCP") + .endPort(); + + // Environment variables + for (Map.Entry env : getBrokerEnv().entrySet()) { + containerBuilder.addNewEnv() + .withName(env.getKey()) + .withValue(env.getValue()) + .endEnv(); + } + + // Resource limits + containerBuilder.withNewResources() + .withLimits(null) + .addToRequests("memory", new Quantity("128Mi")) + .addToRequests("cpu", new Quantity("100m")) + .endResources(); + + return containerBuilder.endContainer() + .withOverhead(null) + .endSpec().build(); + } + + protected Map getBrokerEnv() { + Map envVars = new LinkedHashMap<>() { + { + put("PULSAR_MEM", "-Xmx256m"); + put("PULSAR_GC", "-XX:+UseG1GC"); + put("bindAddresses", "nodeport:pulsar://0.0.0.0:16650"); + put("PULSAR_PREFIX_advertisedListeners", + "internal:pulsar://pulsar.default.svc.cluster.local:6650,nodeport:" + pulsarBrokerUrl); + // log to file since docker logs will get truncated + put("PULSAR_ROUTING_APPENDER_DEFAULT", "RollingFile"); + put("dbStorage_writeCacheMaxSizeMb", "32"); + put("dbStorage_readAheadCacheMaxSizeMb", "32"); + put("PF_functionsDirectory", "/pulsar/examples"); + put("PF_functionRuntimeFactoryClassName", KubernetesRuntimeFactory.class.getName()); + put("PF_secretsProviderConfiguratorClassName", KubernetesSecretsProviderConfigurator.class.getName()); + put("PF_functionRuntimeFactoryConfigs_pulsarDockerImageName", getPulsarImageName()); + put("PF_kubernetesContainerFactory_imagePullPolicy", "Never"); + put("PF_functionRuntimeFactoryConfigs_submittingInsidePod", "true"); + put("PF_functionRuntimeFactoryConfigs_installUserCodeDependencies", "true"); + put("PF_functionRuntimeFactoryConfigs_jobNamespace", getNamespace()); + put("PF_functionRuntimeFactoryConfigs_pulsarAdminUrl", "http://pulsar.default.svc.cluster.local"); + put("PF_functionRuntimeFactoryConfigs_pulsarServiceUrl", + "pulsar://pulsar.default.svc.cluster.local:6650"); + } + }; + return envVars; + } + + public V1Service createPulsarService() { + V1Service service = new V1ServiceBuilder() + .withNewMetadata() + .withName("pulsar") + .endMetadata() + .withNewSpec() + .withType("NodePort") + .withSelector(Map.of("app", "pulsar", "component", "standalone")) + // Ports + .addNewPort() + .withName("pulsar-nodeport") + .withNodePort(PULSAR_NODE_PORT) + .withPort(16650) + .withNewTargetPort(16650) + .withProtocol("TCP") + .endPort() + .addNewPort() + .withName("http") + .withPort(80) + .withNodePort(PULSAR_HTTP_NODE_PORT) + .withNewTargetPort(8080) + .withProtocol("TCP") + .endPort() + .addNewPort() + .withName("pulsar") + .withPort(6650) + .withNewTargetPort(6650) + .withProtocol("TCP") + .endPort() + .endSpec() + .build(); + return service; + } +} diff --git a/tests/integration/src/test/resources/pulsar-k8s.xml b/tests/integration/src/test/resources/pulsar-k8s.xml new file mode 100644 index 0000000000000..03f7f0c950324 --- /dev/null +++ b/tests/integration/src/test/resources/pulsar-k8s.xml @@ -0,0 +1,28 @@ + + + + + + + + + \ No newline at end of file From 1cd38fddfed1346f071ebe0cd4ca0568ee17eccd Mon Sep 17 00:00:00 2001 From: Lari Hotari Date: Wed, 19 Nov 2025 20:23:57 +0200 Subject: [PATCH 04/12] Update license file --- distribution/server/src/assemble/LICENSE.bin.txt | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/distribution/server/src/assemble/LICENSE.bin.txt b/distribution/server/src/assemble/LICENSE.bin.txt index 8759c5be22106..1f4b2cd2f9ad9 100644 --- a/distribution/server/src/assemble/LICENSE.bin.txt +++ b/distribution/server/src/assemble/LICENSE.bin.txt @@ -267,7 +267,7 @@ The Apache Software License, Version 2.0 * Bitbucket -- org.bitbucket.b_c-jose4j-0.9.4.jar * Gson - com.google.code.gson-gson-2.8.9.jar - - io.gsonfire-gson-fire-1.8.5.jar + - io.gsonfire-gson-fire-1.9.0.jar * Guava - com.google.guava-guava-33.4.8-jre.jar - com.google.guava-failureaccess-1.0.3.jar @@ -472,9 +472,9 @@ The Apache Software License, Version 2.0 * Apache Yetus - org.apache.yetus-audience-annotations-0.12.0.jar * Kubernetes Client - - io.kubernetes-client-java-18.0.0.jar - - io.kubernetes-client-java-api-18.0.0.jar - - io.kubernetes-client-java-proto-18.0.0.jar + - io.kubernetes-client-java-23.0.0.jar + - io.kubernetes-client-java-api-23.0.0.jar + - io.kubernetes-client-java-proto-23.0.0.jar * Dropwizard - io.dropwizard.metrics-metrics-core-4.1.12.1.jar - io.dropwizard.metrics-metrics-graphite-4.1.12.1.jar From 863f7015e5434ecff146e7d4aadcfe7270dd8f91 Mon Sep 17 00:00:00 2001 From: Lari Hotari Date: Thu, 20 Nov 2025 08:15:48 +0200 Subject: [PATCH 05/12] Fix test --- .../functions/runtime/kubernetes/KubernetesRuntimeTest.java | 3 +++ 1 file changed, 3 insertions(+) diff --git a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeTest.java b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeTest.java index e3271e98e279c..2be701c832a8a 100644 --- a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeTest.java +++ b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeTest.java @@ -21,6 +21,7 @@ import static org.apache.pulsar.functions.runtime.RuntimeUtils.FUNCTIONS_INSTANCE_CLASSPATH; import static org.apache.pulsar.functions.utils.FunctionCommon.roundDecimal; import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.ArgumentMatchers.argThat; import static org.mockito.ArgumentMatchers.eq; @@ -1394,6 +1395,8 @@ public void testDeleteStatefulSetWithTranslatedKubernetesLabelChars() throws Exc when(appsApi.deleteNamespacedStatefulSet( argThat(hasTranslatedFunctionName), anyString())).thenReturn(request); + when(request.gracePeriodSeconds(anyInt())).thenReturn(request); + when(request.propagationPolicy(anyString())).thenReturn(request); when(request.execute()).thenReturn(new V1Status()); AppsV1Api.APIreadNamespacedStatefulSetRequest request2 = From 8283b463a9fd6cc811a76557085fbd1b7abb207c Mon Sep 17 00:00:00 2001 From: Lari Hotari Date: Thu, 20 Nov 2025 08:50:50 +0200 Subject: [PATCH 06/12] Test stop, start and validate delete --- .../functions/k8s/PulsarFunctionsK8STest.java | 48 ++++++++++++++++--- 1 file changed, 42 insertions(+), 6 deletions(-) diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/k8s/PulsarFunctionsK8STest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/k8s/PulsarFunctionsK8STest.java index 82edaf7739bf8..6738b24aac252 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/k8s/PulsarFunctionsK8STest.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/k8s/PulsarFunctionsK8STest.java @@ -19,9 +19,12 @@ package org.apache.pulsar.tests.integration.functions.k8s; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatExceptionOfType; +import static org.assertj.core.api.Assertions.assertThatThrownBy; import io.kubernetes.client.Exec; import io.kubernetes.client.openapi.ApiException; import io.kubernetes.client.openapi.apis.CoreV1Api; +import io.kubernetes.client.openapi.models.V1Pod; import io.kubernetes.client.openapi.models.V1Secret; import io.kubernetes.client.openapi.models.V1SecretBuilder; import java.io.IOException; @@ -86,9 +89,12 @@ public void testCreateFunctionInK8sWithSecrets() } FunctionConfig functionConfig = new FunctionConfig(); - functionConfig.setTenant("public"); - functionConfig.setNamespace("default"); - functionConfig.setName("test-function"); + String fnTenant = "public"; + String fnNamespace = "default"; + String fnName = "test-function"; + functionConfig.setTenant(fnTenant); + functionConfig.setNamespace(fnNamespace); + functionConfig.setName(fnName); functionConfig.setRuntime(FunctionConfig.Runtime.JAVA); functionConfig.setClassName(PulsarFunctionsJavaTest.EXCLAMATION_JAVA_CLASS); functionConfig.setJar(CommandGenerator.JAVAJAR); @@ -109,14 +115,14 @@ public void testCreateFunctionInK8sWithSecrets() log.info("Waiting for function to be created"); Awaitility.await().ignoreExceptions().pollDelay(Duration.ofSeconds(2)).atMost(Duration.ofSeconds(30)) .untilAsserted(() -> { - FunctionStatus functionStatus = admin.functions().getFunctionStatus("public", "default", "test-function"); + FunctionStatus functionStatus = admin.functions().getFunctionStatus(fnTenant, fnNamespace, fnName); assertThat(functionStatus.getNumInstances()).isEqualTo(1); assertThat(functionStatus.getNumRunning()).isEqualTo(1); }); log.info("Function created successfully"); // Validate that k8s secrets were provided as environment variables to the function pod - String podName = "pf-public-default-test-function-0"; + String podName = "pf-%s-%s-%s-0".formatted(fnTenant, fnNamespace, fnName); Exec exec = new Exec(getApiClient()); Process process = exec.newExecutionBuilder(getNamespace(), podName, new String[]{"sh", "-c", "echo \"secret1=$secret1,secret2=$secret2\""}) @@ -142,8 +148,38 @@ public void testCreateFunctionInK8sWithSecrets() assertThat(message).isNotNull(); assertThat(message.getValue()).isEqualTo("Hello!"); + + log.info("Stopping function"); + admin.functions().stopFunction(fnTenant, fnNamespace, fnName); + Awaitility.await().ignoreExceptions().pollDelay(Duration.ofSeconds(2)).atMost(Duration.ofSeconds(30)) + .untilAsserted(() -> { + FunctionStatus functionStatus = admin.functions().getFunctionStatus(fnTenant, fnNamespace, fnName); + assertThat(functionStatus.getNumInstances()).isEqualTo(1); + assertThat(functionStatus.getNumRunning()).isEqualTo(0); + }); + + log.info("Starting function"); + admin.functions().startFunction(fnTenant, fnNamespace, fnName); + Awaitility.await().ignoreExceptions().pollDelay(Duration.ofSeconds(2)).atMost(Duration.ofSeconds(30)) + .untilAsserted(() -> { + FunctionStatus functionStatus = admin.functions().getFunctionStatus(fnTenant, fnNamespace, fnName); + assertThat(functionStatus.getNumInstances()).isEqualTo(1); + assertThat(functionStatus.getNumRunning()).isEqualTo(1); + }); + log.info("Waiting for function to be deleted"); - admin.functions().deleteFunction("public", "default", "test-function"); + admin.functions().deleteFunction(fnTenant, fnNamespace, fnName); log.info("Function deleted successfully"); + + log.info("Waiting for function pod to be deleted"); + CoreV1Api coreApi = new CoreV1Api(getApiClient()); + Awaitility.await().pollDelay(Duration.ofSeconds(2)).atMost(Duration.ofSeconds(30)).untilAsserted(() -> { + assertThatExceptionOfType(ApiException.class).isThrownBy( + () -> coreApi.readNamespacedPod(getNamespace(), podName).execute()) + .satisfies(apiException -> { + assertThat(apiException.getCode()).isEqualTo(404); + }); + }); + log.info("Function pod deleted successfully"); } } From a05369a8a07ad261766bfbd18748a5c22011bcaa Mon Sep 17 00:00:00 2001 From: Lari Hotari Date: Thu, 20 Nov 2025 08:51:11 +0200 Subject: [PATCH 07/12] Collect logs from k8s pulsar-standalone-pod --- .../k8s/AbstractPulsarStandaloneK8STest.java | 45 ++++++++++++++++++- 1 file changed, 44 insertions(+), 1 deletion(-) diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/k8s/AbstractPulsarStandaloneK8STest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/k8s/AbstractPulsarStandaloneK8STest.java index 82c667a99f5e8..93dd9f5cc6432 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/k8s/AbstractPulsarStandaloneK8STest.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/k8s/AbstractPulsarStandaloneK8STest.java @@ -21,6 +21,7 @@ import com.github.dockerjava.api.DockerClient; import com.github.dockerjava.api.command.ExecCreateCmdResponse; import com.github.dockerjava.core.command.ExecStartResultCallback; +import io.kubernetes.client.Exec; import io.kubernetes.client.custom.Quantity; import io.kubernetes.client.openapi.ApiClient; import io.kubernetes.client.openapi.ApiException; @@ -53,6 +54,8 @@ import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.functions.runtime.kubernetes.KubernetesRuntimeFactory; import org.apache.pulsar.functions.secretsproviderconfigurator.KubernetesSecretsProviderConfigurator; +import org.apache.tools.tar.TarEntry; +import org.apache.tools.tar.TarInputStream; import org.testcontainers.containers.wait.strategy.Wait; import org.testcontainers.k3s.K3sContainer; import org.testcontainers.utility.DockerImageName; @@ -75,6 +78,7 @@ public abstract class AbstractPulsarStandaloneK8STest { private static final int PULSAR_NODE_PORT = 30101; private static final int PULSAR_HTTP_NODE_PORT = 30102; private static final String K3S_IMAGE_NAME = "rancher/k3s:v1.33.5-k3s1"; + private static final String PULSAR_STANDALONE_POD = "pulsar-standalone-pod"; K3sContainer k3sContainer; KubeConfig kubeConfig; @Getter @@ -121,10 +125,49 @@ public final void setupCluster() throws IOException, ApiException, InterruptedEx @AfterClass(alwaysRun = true) public final void cleanupCluster() throws InterruptedException { if (k3sContainer != null) { + copyLogsToTargetDirectory(); k3sContainer.stop(); } } + private void copyLogsToTargetDirectory() { + if (apiClient != null) { + Exec exec = new Exec(apiClient); + try { + File targetDirectoryForLogs = getTargetDirectoryForLogs(); + log.info("Copying logs from Pulsar standalone pod to target directory: {}", targetDirectoryForLogs); + Process process = exec.newExecutionBuilder(getNamespace(), PULSAR_STANDALONE_POD, + new String[]{"sh", "-c", "cd /pulsar/logs && tar cf - *"} + ).setTty(false).setStdin(false).setStderr(false).setStdout(true).execute(); + try (TarInputStream tarInputStream = new TarInputStream(process.getInputStream())) { + TarEntry tarEntry; + while ((tarEntry = tarInputStream.getNextEntry()) != null) { + if (tarEntry.isFile()) { + File targetFile = new File(targetDirectoryForLogs, new File(tarEntry.getName()).getName()); + Files.copy(tarInputStream, targetFile.toPath()); + } + } + } + } catch (Exception e) { + log.error("Error copying logs from Pulsar standalone pod to target directory.", e); + } + } + } + + private File getTargetDirectoryForLogs() { + String base = System.getProperty("maven.buildDirectory"); + if (base == null) { + base = "target"; + } + // use the container-logs directory since it's used in CI for integration tests as the file location + File directory = new File(new File(base, "container-logs"), + "k8s_" + getClass().getSimpleName() + "_" + System.currentTimeMillis()); + if (!directory.exists() && !directory.mkdirs()) { + log.error("Error creating directory for logs."); + } + return directory; + } + protected String getPulsarImageName() { return DEFAULT_IMAGE_NAME; } @@ -218,7 +261,7 @@ private V1Pod createPulsarPod() { V1PodBuilder podBuilder = new V1PodBuilder(); var containerBuilder = podBuilder .withNewMetadata() - .withName("pulsar-standalone-pod") + .withName(PULSAR_STANDALONE_POD) .addToLabels("app", "pulsar") .addToLabels("component", "standalone") .endMetadata() From 092f9179d9b5f92419b5058d3ccdef0d712f01e1 Mon Sep 17 00:00:00 2001 From: Lari Hotari Date: Thu, 20 Nov 2025 09:06:00 +0200 Subject: [PATCH 08/12] Collect k8s events --- .../k8s/AbstractPulsarStandaloneK8STest.java | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/k8s/AbstractPulsarStandaloneK8STest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/k8s/AbstractPulsarStandaloneK8STest.java index 93dd9f5cc6432..398447720f206 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/k8s/AbstractPulsarStandaloneK8STest.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/k8s/AbstractPulsarStandaloneK8STest.java @@ -27,6 +27,7 @@ import io.kubernetes.client.openapi.ApiException; import io.kubernetes.client.openapi.apis.CoreV1Api; import io.kubernetes.client.openapi.apis.RbacAuthorizationV1Api; +import io.kubernetes.client.openapi.models.CoreV1EventList; import io.kubernetes.client.openapi.models.RbacV1SubjectBuilder; import io.kubernetes.client.openapi.models.V1Pod; import io.kubernetes.client.openapi.models.V1PodBuilder; @@ -132,9 +133,9 @@ public final void cleanupCluster() throws InterruptedException { private void copyLogsToTargetDirectory() { if (apiClient != null) { + File targetDirectoryForLogs = getTargetDirectoryForLogs(); Exec exec = new Exec(apiClient); try { - File targetDirectoryForLogs = getTargetDirectoryForLogs(); log.info("Copying logs from Pulsar standalone pod to target directory: {}", targetDirectoryForLogs); Process process = exec.newExecutionBuilder(getNamespace(), PULSAR_STANDALONE_POD, new String[]{"sh", "-c", "cd /pulsar/logs && tar cf - *"} @@ -151,6 +152,16 @@ private void copyLogsToTargetDirectory() { } catch (Exception e) { log.error("Error copying logs from Pulsar standalone pod to target directory.", e); } + + CoreV1Api coreApi = new CoreV1Api(apiClient); + File eventsFile = new File(targetDirectoryForLogs, "k8s_events.json"); + try { + log.info("Copying events from Kubernetes cluster namespace {} to {}.", getNamespace(), eventsFile); + CoreV1EventList eventList = coreApi.listNamespacedEvent(getNamespace()).execute(); + Files.writeString(eventsFile.toPath(), eventList.toJson()); + } catch (Exception e) { + log.error("Error copying events from Kubernetes cluster to {}.", eventsFile, e); + } } } From 3971b7ee033d63c0a4f6c2f1c5567e382c9b2b79 Mon Sep 17 00:00:00 2001 From: Lari Hotari Date: Thu, 20 Nov 2025 09:15:33 +0200 Subject: [PATCH 09/12] Improve image import method --- .../k8s/AbstractPulsarStandaloneK8STest.java | 61 ++++++++++++------- 1 file changed, 40 insertions(+), 21 deletions(-) diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/k8s/AbstractPulsarStandaloneK8STest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/k8s/AbstractPulsarStandaloneK8STest.java index 398447720f206..af697e7538a5d 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/k8s/AbstractPulsarStandaloneK8STest.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/k8s/AbstractPulsarStandaloneK8STest.java @@ -19,8 +19,10 @@ package org.apache.pulsar.tests.integration.k8s; import com.github.dockerjava.api.DockerClient; +import com.github.dockerjava.api.async.ResultCallbackTemplate; import com.github.dockerjava.api.command.ExecCreateCmdResponse; -import com.github.dockerjava.core.command.ExecStartResultCallback; +import com.github.dockerjava.api.model.Frame; +import com.github.dockerjava.api.model.StreamType; import io.kubernetes.client.Exec; import io.kubernetes.client.custom.Quantity; import io.kubernetes.client.openapi.ApiClient; @@ -197,32 +199,49 @@ protected String getNamespace() { return "default"; } - // This method imports the local image into the k3s container's containerd registry so that it can be used - // to create pods. image pull policy should be set to Never when using the image. - private void importPulsarImage() { - // Save local image from Docker - InputStream imageStream = dockerClient.saveImageCmd(getPulsarImageName()).exec(); - - // Create exec instance with stdin - String containerId = k3sContainer.getContainerId(); - - ExecCreateCmdResponse execCreateCmdResponse = dockerClient.execCreateCmd(containerId) - .withAttachStdin(true) - .withAttachStdout(true) - .withAttachStderr(true) - .withCmd("ctr", "images", "import", "/dev/stdin") - .exec(); + private void importPulsarImage() throws InterruptedException { + importImageToK3S(getPulsarImageName()); + } - // Start exec and stream data - try (InputStream is = imageStream) { + /** + * Imports a local Docker image into the k3s container's containerd registry. + * This method allows the image to be used by Kubernetes pods within the k3s cluster. + * When using this imported image in pod specs, the imagePullPolicy should be set to "Never" + * since the image is already available locally in the cluster. + * @param imageName The name of the Docker image to import + * @throws UncheckedIOException if there is an error during image import + * @throws InterruptedException if the import is interrupted + */ + protected void importImageToK3S(String imageName) throws InterruptedException { + // Stream the saved image directly to "ctr images import -" inside the k3s container + try (InputStream is = dockerClient.saveImageCmd(imageName).exec()) { + String[] importCommand = {"ctr", "images", "import", "-"}; + ExecCreateCmdResponse execCreateCmdResponse = dockerClient.execCreateCmd(k3sContainer.getContainerId()) + .withAttachStdin(true) + .withAttachStdout(true) + .withAttachStderr(true) + .withTty(false) + .withCmd(importCommand) + .exec(); dockerClient.execStartCmd(execCreateCmdResponse.getId()) .withStdIn(is) - .exec(new ExecStartResultCallback(System.out, System.err)) + .withTty(false) + .withDetach(false) + .exec(new ResultCallbackTemplate<>() { + @Override + public void onNext(Frame frame) { + String logMessage = + String.join(" ", importCommand) + ": " + new String(frame.getPayload()).trim(); + if (frame.getStreamType() == StreamType.STDERR) { + log.error(logMessage); + } else { + log.info(logMessage); + } + } + }) .awaitCompletion(); } catch (IOException e) { throw new UncheckedIOException(e); - } catch (InterruptedException e) { - throw new RuntimeException(e); } } From c942a99d42db1972a2ba09e4e150ef8a75ab49a2 Mon Sep 17 00:00:00 2001 From: Lari Hotari Date: Thu, 20 Nov 2025 09:46:00 +0200 Subject: [PATCH 10/12] Delete kube config file after k3s container is stopped --- .../tests/integration/k8s/AbstractPulsarStandaloneK8STest.java | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/k8s/AbstractPulsarStandaloneK8STest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/k8s/AbstractPulsarStandaloneK8STest.java index af697e7538a5d..1c4f2503abcb3 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/k8s/AbstractPulsarStandaloneK8STest.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/k8s/AbstractPulsarStandaloneK8STest.java @@ -130,6 +130,7 @@ public final void cleanupCluster() throws InterruptedException { if (k3sContainer != null) { copyLogsToTargetDirectory(); k3sContainer.stop(); + kubeConfigFile.delete(); } } From 2b3bdfea9d9bfd7b327fe02214dd0609efe5d84b Mon Sep 17 00:00:00 2001 From: Lari Hotari Date: Thu, 20 Nov 2025 09:46:54 +0200 Subject: [PATCH 11/12] Fix checkstyle --- .../tests/integration/functions/k8s/PulsarFunctionsK8STest.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/k8s/PulsarFunctionsK8STest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/k8s/PulsarFunctionsK8STest.java index 6738b24aac252..d6122fb7528e6 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/k8s/PulsarFunctionsK8STest.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/k8s/PulsarFunctionsK8STest.java @@ -20,11 +20,9 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatExceptionOfType; -import static org.assertj.core.api.Assertions.assertThatThrownBy; import io.kubernetes.client.Exec; import io.kubernetes.client.openapi.ApiException; import io.kubernetes.client.openapi.apis.CoreV1Api; -import io.kubernetes.client.openapi.models.V1Pod; import io.kubernetes.client.openapi.models.V1Secret; import io.kubernetes.client.openapi.models.V1SecretBuilder; import java.io.IOException; From 8b11b1b77575723308043d9c43043b016a494f18 Mon Sep 17 00:00:00 2001 From: Lari Hotari Date: Thu, 20 Nov 2025 09:58:55 +0200 Subject: [PATCH 12/12] Reduce pollDelay and add comment about flakiness --- .../functions/k8s/PulsarFunctionsK8STest.java | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/k8s/PulsarFunctionsK8STest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/k8s/PulsarFunctionsK8STest.java index d6122fb7528e6..4671c82e4eae1 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/k8s/PulsarFunctionsK8STest.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/k8s/PulsarFunctionsK8STest.java @@ -111,7 +111,7 @@ public void testCreateFunctionInK8sWithSecrets() admin.functions().createFunctionWithUrl(functionConfig, "file://" + CommandGenerator.JAVAJAR); log.info("Waiting for function to be created"); - Awaitility.await().ignoreExceptions().pollDelay(Duration.ofSeconds(2)).atMost(Duration.ofSeconds(30)) + Awaitility.await().ignoreExceptions().pollDelay(Duration.ofSeconds(1)).atMost(Duration.ofSeconds(30)) .untilAsserted(() -> { FunctionStatus functionStatus = admin.functions().getFunctionStatus(fnTenant, fnNamespace, fnName); assertThat(functionStatus.getNumInstances()).isEqualTo(1); @@ -149,7 +149,7 @@ public void testCreateFunctionInK8sWithSecrets() log.info("Stopping function"); admin.functions().stopFunction(fnTenant, fnNamespace, fnName); - Awaitility.await().ignoreExceptions().pollDelay(Duration.ofSeconds(2)).atMost(Duration.ofSeconds(30)) + Awaitility.await().ignoreExceptions().pollDelay(Duration.ofSeconds(1)).atMost(Duration.ofSeconds(30)) .untilAsserted(() -> { FunctionStatus functionStatus = admin.functions().getFunctionStatus(fnTenant, fnNamespace, fnName); assertThat(functionStatus.getNumInstances()).isEqualTo(1); @@ -157,8 +157,12 @@ public void testCreateFunctionInK8sWithSecrets() }); log.info("Starting function"); + // this seems to be flaky if the stopping of the function hasn't fully completed when it's started again. + // one way to reproduce is to remove the delay before starting the function and also removing the pollDelay + // from the await after stopFunction + Thread.sleep(2000); admin.functions().startFunction(fnTenant, fnNamespace, fnName); - Awaitility.await().ignoreExceptions().pollDelay(Duration.ofSeconds(2)).atMost(Duration.ofSeconds(30)) + Awaitility.await().ignoreExceptions().pollDelay(Duration.ofSeconds(1)).atMost(Duration.ofSeconds(30)) .untilAsserted(() -> { FunctionStatus functionStatus = admin.functions().getFunctionStatus(fnTenant, fnNamespace, fnName); assertThat(functionStatus.getNumInstances()).isEqualTo(1); @@ -171,7 +175,7 @@ public void testCreateFunctionInK8sWithSecrets() log.info("Waiting for function pod to be deleted"); CoreV1Api coreApi = new CoreV1Api(getApiClient()); - Awaitility.await().pollDelay(Duration.ofSeconds(2)).atMost(Duration.ofSeconds(30)).untilAsserted(() -> { + Awaitility.await().pollDelay(Duration.ofSeconds(1)).atMost(Duration.ofSeconds(30)).untilAsserted(() -> { assertThatExceptionOfType(ApiException.class).isThrownBy( () -> coreApi.readNamespacedPod(getNamespace(), podName).execute()) .satisfies(apiException -> {