diff --git a/.github/workflows/pulsar-ci.yaml b/.github/workflows/pulsar-ci.yaml
index 5443f7e4bf570..3e3f706e2833f 100644
--- a/.github/workflows/pulsar-ci.yaml
+++ b/.github/workflows/pulsar-ci.yaml
@@ -683,6 +683,9 @@ jobs:
- name: Upgrade
group: UPGRADE
+ - name: Kubernetes
+ group: PULSAR_K8S
+
steps:
- name: checkout
uses: actions/checkout@v4
diff --git a/build/build_java_test_image.sh b/build/build_java_test_image.sh
index 51257c992dc87..bab7afd7d878b 100755
--- a/build/build_java_test_image.sh
+++ b/build/build_java_test_image.sh
@@ -21,5 +21,5 @@
SCRIPT_DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" >/dev/null 2>&1 && pwd )"
cd "$SCRIPT_DIR/.."
mvn -am -pl tests/docker-images/java-test-image -Pcore-modules,-main,integrationTests,docker \
- -Dmaven.test.skip=true -DskipSourceReleaseAssembly=true -Dspotbugs.skip=true -Dcheckstyle.skip=true -Dlicense.skip=true \
+ -DskipTests -DskipSourceReleaseAssembly=true -Dspotbugs.skip=true -Dcheckstyle.skip=true -Dlicense.skip=true \
"$@" install
\ No newline at end of file
diff --git a/build/run_integration_group.sh b/build/run_integration_group.sh
index 40483b84f2b21..419fe5c940112 100755
--- a/build/run_integration_group.sh
+++ b/build/run_integration_group.sh
@@ -207,6 +207,10 @@ test_group_sql() {
mvn_run_integration_test "$@" -DintegrationTestSuiteFile=pulsar-sql.xml -DintegrationTests -DtestForkCount=1 -DtestReuseFork=false
}
+test_group_pulsar_k8s() {
+ mvn_run_integration_test "$@" -DintegrationTestSuiteFile=pulsar-k8s.xml -DintegrationTests
+}
+
test_group_pulsar_io() {
mvn_run_integration_test "$@" -DintegrationTestSuiteFile=pulsar-io-sources.xml -DintegrationTests -Dgroups=source
mvn_run_integration_test --skip-build-deps "$@" -DintegrationTestSuiteFile=pulsar-io-sinks.xml -DintegrationTests -Dgroups=sink
diff --git a/distribution/server/src/assemble/LICENSE.bin.txt b/distribution/server/src/assemble/LICENSE.bin.txt
index 8759c5be22106..1f4b2cd2f9ad9 100644
--- a/distribution/server/src/assemble/LICENSE.bin.txt
+++ b/distribution/server/src/assemble/LICENSE.bin.txt
@@ -267,7 +267,7 @@ The Apache Software License, Version 2.0
* Bitbucket -- org.bitbucket.b_c-jose4j-0.9.4.jar
* Gson
- com.google.code.gson-gson-2.8.9.jar
- - io.gsonfire-gson-fire-1.8.5.jar
+ - io.gsonfire-gson-fire-1.9.0.jar
* Guava
- com.google.guava-guava-33.4.8-jre.jar
- com.google.guava-failureaccess-1.0.3.jar
@@ -472,9 +472,9 @@ The Apache Software License, Version 2.0
* Apache Yetus
- org.apache.yetus-audience-annotations-0.12.0.jar
* Kubernetes Client
- - io.kubernetes-client-java-18.0.0.jar
- - io.kubernetes-client-java-api-18.0.0.jar
- - io.kubernetes-client-java-proto-18.0.0.jar
+ - io.kubernetes-client-java-23.0.0.jar
+ - io.kubernetes-client-java-api-23.0.0.jar
+ - io.kubernetes-client-java-proto-23.0.0.jar
* Dropwizard
- io.dropwizard.metrics-metrics-core-4.1.12.1.jar
- io.dropwizard.metrics-metrics-graphite-4.1.12.1.jar
diff --git a/pom.xml b/pom.xml
index d9486cc281892..bc3757de74869 100644
--- a/pom.xml
+++ b/pom.xml
@@ -283,7 +283,7 @@ flexible messaging model and an intuitive client API.
2.3.3
2.0.2
5.12.1
- 18.0.0
+ 23.0.0
0.9.4
5.3.1
diff --git a/pulsar-broker-auth-oidc/src/main/java/org/apache/pulsar/broker/authentication/oidc/JwksCache.java b/pulsar-broker-auth-oidc/src/main/java/org/apache/pulsar/broker/authentication/oidc/JwksCache.java
index 71070471d146e..44a17131d772a 100644
--- a/pulsar-broker-auth-oidc/src/main/java/org/apache/pulsar/broker/authentication/oidc/JwksCache.java
+++ b/pulsar-broker-auth-oidc/src/main/java/org/apache/pulsar/broker/authentication/oidc/JwksCache.java
@@ -169,7 +169,7 @@ CompletableFuture getJwkFromKubernetesApiServer(String keyId) {
private CompletableFuture> getJwksFromKubernetesApiServer() {
CompletableFuture> future = new CompletableFuture<>();
try {
- openidApi.getServiceAccountIssuerOpenIDKeysetAsync(new ApiCallback() {
+ openidApi.getServiceAccountIssuerOpenIDKeyset().executeAsync(new ApiCallback() {
@Override
public void onFailure(ApiException e, int statusCode, Map> responseHeaders) {
authenticationProvider.incrementFailureMetric(ERROR_RETRIEVING_PUBLIC_KEY);
diff --git a/pulsar-broker-auth-oidc/src/main/java/org/apache/pulsar/broker/authentication/oidc/OpenIDProviderMetadataCache.java b/pulsar-broker-auth-oidc/src/main/java/org/apache/pulsar/broker/authentication/oidc/OpenIDProviderMetadataCache.java
index 66e71c5306f8d..2f715dbaf7f54 100644
--- a/pulsar-broker-auth-oidc/src/main/java/org/apache/pulsar/broker/authentication/oidc/OpenIDProviderMetadataCache.java
+++ b/pulsar-broker-auth-oidc/src/main/java/org/apache/pulsar/broker/authentication/oidc/OpenIDProviderMetadataCache.java
@@ -168,7 +168,7 @@ CompletableFuture getOpenIDProviderMetadataForKubernetes
private CompletableFuture loadOpenIDProviderMetadataForKubernetesApiServer() {
CompletableFuture future = new CompletableFuture<>();
try {
- wellKnownApi.getServiceAccountIssuerOpenIDConfigurationAsync(new ApiCallback<>() {
+ wellKnownApi.getServiceAccountIssuerOpenIDConfiguration().executeAsync(new ApiCallback<>() {
@Override
public void onFailure(ApiException e, int statusCode, Map> responseHeaders) {
authenticationProvider.incrementFailureMetric(ERROR_RETRIEVING_PROVIDER_METADATA);
diff --git a/pulsar-broker-auth-oidc/src/test/java/org/apache/pulsar/broker/authentication/oidc/AuthenticationProviderOpenIDIntegrationTest.java b/pulsar-broker-auth-oidc/src/test/java/org/apache/pulsar/broker/authentication/oidc/AuthenticationProviderOpenIDIntegrationTest.java
index dd313cdb73a89..1eede7059a441 100644
--- a/pulsar-broker-auth-oidc/src/test/java/org/apache/pulsar/broker/authentication/oidc/AuthenticationProviderOpenIDIntegrationTest.java
+++ b/pulsar-broker-auth-oidc/src/test/java/org/apache/pulsar/broker/authentication/oidc/AuthenticationProviderOpenIDIntegrationTest.java
@@ -117,12 +117,10 @@ void beforeClass() throws IOException {
""".replace("%s", server.baseUrl()))));
// Set up a correct openid-configuration that the k8s integration test can use
- // NOTE: integration tests revealed that the k8s client adds a trailing slash to the openid-configuration
- // endpoint.
// NOTE: the jwks_uri is ignored, so we supply one that would fail here to ensure that we are not implicitly
// relying on the jwks_uri.
server.stubFor(
- get(urlEqualTo("/k8s/.well-known/openid-configuration/"))
+ get(urlEqualTo("/k8s/.well-known/openid-configuration"))
.willReturn(aResponse()
.withHeader("Content-Type", "application/json")
.withBody("""
@@ -170,7 +168,7 @@ void beforeClass() throws IOException {
// Set up JWKS endpoint with a valid and an invalid public key
// The url matches are for both the normal and the k8s endpoints
server.stubFor(
- get(urlMatching("/keys|/k8s/openid/v1/jwks/"))
+ get(urlMatching("/keys|/k8s/openid/v1/jwks"))
.willReturn(aResponse()
.withHeader("Content-Type", "application/json")
.withBody(
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/auth/KubernetesSecretsTokenAuthProvider.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/auth/KubernetesSecretsTokenAuthProvider.java
index 913171aca9bc8..4e300aad3b56a 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/auth/KubernetesSecretsTokenAuthProvider.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/auth/KubernetesSecretsTokenAuthProvider.java
@@ -179,9 +179,10 @@ public void cleanUpAuthData(Function.FunctionDetails funcDetails, Optional {
try {
- coreClient.readNamespacedSecret(secretName, kubeNamespace, null);
-
+ coreClient.readNamespacedSecret(secretName, kubeNamespace).execute();
} catch (ApiException e) {
// statefulset is gone
if (e.getCode() == HTTP_NOT_FOUND) {
@@ -304,15 +304,12 @@ private void upsertSecret(String token, Function.FunctionDetails funcDetails, St
.data(buildSecretMap(token));
try {
- coreClient.createNamespacedSecret(kubeNamespace, v1Secret, null, null, null, null);
+ coreClient.createNamespacedSecret(kubeNamespace, v1Secret).execute();
} catch (ApiException e) {
if (e.getCode() == HTTP_CONFLICT) {
try {
- coreClient
- .replaceNamespacedSecret(secretName, kubeNamespace, v1Secret,
- null, null, null, null);
+ coreClient.replaceNamespacedSecret(secretName, kubeNamespace, v1Secret).execute();
return Actions.ActionResult.builder().success(true).build();
-
} catch (ApiException e1) {
String errorMsg = e.getResponseBody() != null ? e.getResponseBody() : e.getMessage();
return Actions.ActionResult.builder()
@@ -366,7 +363,7 @@ private String createSecret(String token, Function.FunctionDetails funcDetails)
.metadata(new V1ObjectMeta().name(getSecretName(id)))
.data(buildSecretMap(token));
try {
- coreClient.createNamespacedSecret(kubeNamespace, v1Secret, null, null, null, null);
+ coreClient.createNamespacedSecret(kubeNamespace, v1Secret).execute();
} catch (ApiException e) {
// already exists
if (e.getCode() == HTTP_CONFLICT) {
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntime.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntime.java
index 7a69b822cbd89..80d1a7af89cd4 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntime.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntime.java
@@ -40,7 +40,6 @@
import io.kubernetes.client.openapi.apis.CoreV1Api;
import io.kubernetes.client.openapi.models.V1Container;
import io.kubernetes.client.openapi.models.V1ContainerPort;
-import io.kubernetes.client.openapi.models.V1DeleteOptions;
import io.kubernetes.client.openapi.models.V1EnvVar;
import io.kubernetes.client.openapi.models.V1EnvVarSource;
import io.kubernetes.client.openapi.models.V1LabelSelector;
@@ -72,7 +71,6 @@
import java.util.regex.Pattern;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
-import okhttp3.Response;
import org.apache.commons.codec.digest.DigestUtils;
import org.apache.pulsar.functions.auth.KubernetesFunctionAuthProvider;
import org.apache.pulsar.functions.instance.AuthenticationConfig;
@@ -470,9 +468,8 @@ private void submitService() throws Exception {
.numRetries(KubernetesRuntimeFactory.numRetries)
.sleepBetweenInvocationsMs(KubernetesRuntimeFactory.sleepBetweenRetriesMs)
.supplier(() -> {
- final V1Service response;
try {
- response = coreClient.createNamespacedService(jobNamespace, service, null, null, null, null);
+ coreClient.createNamespacedService(jobNamespace, service).execute();
} catch (ApiException e) {
// already exists
if (e.getCode() == HTTP_CONFLICT) {
@@ -495,7 +492,7 @@ private void submitService() throws Exception {
AtomicBoolean success = new AtomicBoolean(false);
Actions.newBuilder()
.addAction(createService.toBuilder()
- .onSuccess((ignored) -> success.set(true))
+ .onSuccess(ignored -> success.set(true))
.build())
.run();
@@ -559,10 +556,8 @@ private void submitStatefulSet() throws Exception {
.numRetries(KubernetesRuntimeFactory.numRetries)
.sleepBetweenInvocationsMs(KubernetesRuntimeFactory.sleepBetweenRetriesMs)
.supplier(() -> {
- final V1StatefulSet response;
try {
- response = appsClient.createNamespacedStatefulSet(jobNamespace, statefulSet,
- null, null, null, null);
+ appsClient.createNamespacedStatefulSet(jobNamespace, statefulSet).execute();
} catch (ApiException e) {
// already exists
if (e.getCode() == HTTP_CONFLICT) {
@@ -585,7 +580,7 @@ private void submitStatefulSet() throws Exception {
AtomicBoolean success = new AtomicBoolean(false);
Actions.newBuilder()
.addAction(createStatefulSet.toBuilder()
- .onSuccess((ignored) -> success.set(true))
+ .onSuccess(ignored -> success.set(true))
.build())
.run();
@@ -597,9 +592,6 @@ private void submitStatefulSet() throws Exception {
public void deleteStatefulSet() throws InterruptedException {
String statefulSetName = createJobName(instanceConfig.getFunctionDetails(), this.jobName);
- final V1DeleteOptions options = new V1DeleteOptions();
- options.setGracePeriodSeconds((long) gracePeriodSeconds);
- options.setPropagationPolicy("Foreground");
String fqfn = FunctionCommon.getFullyQualifiedName(instanceConfig.getFunctionDetails());
Actions.Action deleteStatefulSet = Actions.Action.builder()
@@ -607,15 +599,12 @@ public void deleteStatefulSet() throws InterruptedException {
.numRetries(KubernetesRuntimeFactory.numRetries)
.sleepBetweenInvocationsMs(KubernetesRuntimeFactory.sleepBetweenRetriesMs)
.supplier(() -> {
- Response response;
try {
- // cannot use deleteNamespacedStatefulSet because of bug in kuberenetes
- // https://github.com/kubernetes-client/java/issues/86
- response = appsClient.deleteNamespacedStatefulSetCall(
+ appsClient.deleteNamespacedStatefulSet(
statefulSetName,
- jobNamespace, null, null,
- gracePeriodSeconds, null, "Foreground",
- options, null)
+ jobNamespace)
+ .gracePeriodSeconds(gracePeriodSeconds)
+ .propagationPolicy("Foreground")
.execute();
} catch (ApiException e) {
// if already deleted
@@ -623,29 +612,15 @@ public void deleteStatefulSet() throws InterruptedException {
log.warn("Statefulset for function {} does not exist", fqfn);
return Actions.ActionResult.builder().success(true).build();
}
-
String errorMsg = e.getResponseBody() != null ? e.getResponseBody() : e.getMessage();
return Actions.ActionResult.builder()
.success(false)
.errorMsg(errorMsg)
.build();
- } catch (IOException e) {
- return Actions.ActionResult.builder()
- .success(false)
- .errorMsg(e.getMessage())
- .build();
- }
-
- // if already deleted
- if (response.code() == HTTP_NOT_FOUND) {
- log.warn("Statefulset for function {} does not exist", fqfn);
- return Actions.ActionResult.builder().success(true).build();
- } else {
- return Actions.ActionResult.builder()
- .success(response.isSuccessful())
- .errorMsg(response.message())
- .build();
}
+ return Actions.ActionResult.builder()
+ .success(true)
+ .build();
})
.build();
@@ -658,7 +633,7 @@ public void deleteStatefulSet() throws InterruptedException {
.supplier(() -> {
V1StatefulSet response;
try {
- response = appsClient.readNamespacedStatefulSet(statefulSetName, jobNamespace, null);
+ response = appsClient.readNamespacedStatefulSet(statefulSetName, jobNamespace).execute();
} catch (ApiException e) {
// statefulset is gone
if (e.getCode() == HTTP_NOT_FOUND) {
@@ -692,11 +667,8 @@ public void deleteStatefulSet() throws InterruptedException {
V1PodList response;
try {
- response = coreClient.listNamespacedPod(jobNamespace, null, null,
- null, null, labels,
- null, null, null, null, null);
+ response = coreClient.listNamespacedPod(jobNamespace).labelSelector(labels).execute();
} catch (ApiException e) {
-
String errorMsg = e.getResponseBody() != null ? e.getResponseBody() : e.getMessage();
return Actions.ActionResult.builder()
.success(false)
@@ -725,13 +697,13 @@ public void deleteStatefulSet() throws InterruptedException {
.build())
.addAction(waitForStatefulSetDeletion.toBuilder()
.continueOn(false)
- .onSuccess((ignored) -> success.set(true))
+ .onSuccess(ignored -> success.set(true))
.build())
.addAction(deleteStatefulSet.toBuilder()
.continueOn(true)
.build())
.addAction(waitForStatefulSetDeletion.toBuilder()
- .onSuccess((ignored) -> success.set(true))
+ .onSuccess(ignored -> success.set(true))
.build())
.run();
@@ -746,10 +718,6 @@ public void deleteStatefulSet() throws InterruptedException {
}
public void deleteService() throws InterruptedException {
-
- final V1DeleteOptions options = new V1DeleteOptions();
- options.setGracePeriodSeconds(0L);
- options.setPropagationPolicy("Foreground");
String fqfn = FunctionCommon.getFullyQualifiedName(instanceConfig.getFunctionDetails());
String serviceName = createJobName(instanceConfig.getFunctionDetails(), this.jobName);
@@ -758,15 +726,11 @@ public void deleteService() throws InterruptedException {
.numRetries(KubernetesRuntimeFactory.numRetries)
.sleepBetweenInvocationsMs(KubernetesRuntimeFactory.sleepBetweenRetriesMs)
.supplier(() -> {
- final Response response;
try {
- // cannot use deleteNamespacedService because of bug in kuberenetes
- // https://github.com/kubernetes-client/java/issues/86
- response = coreClient.deleteNamespacedServiceCall(
- serviceName,
- jobNamespace, null, null,
- 0, null,
- "Foreground", options, null).execute();
+ coreClient.deleteNamespacedService(serviceName, jobNamespace)
+ .gracePeriodSeconds(0)
+ .propagationPolicy("Foreground")
+ .execute();
} catch (ApiException e) {
// if already deleted
if (e.getCode() == HTTP_NOT_FOUND) {
@@ -779,23 +743,9 @@ public void deleteService() throws InterruptedException {
.success(false)
.errorMsg(errorMsg)
.build();
- } catch (IOException e) {
- return Actions.ActionResult.builder()
- .success(false)
- .errorMsg(e.getMessage())
- .build();
}
- // if already deleted
- if (response.code() == HTTP_NOT_FOUND) {
- log.warn("Service for function {} does not exist", fqfn);
- return Actions.ActionResult.builder().success(true).build();
- } else {
- return Actions.ActionResult.builder()
- .success(response.isSuccessful())
- .errorMsg(response.message())
- .build();
- }
+ return Actions.ActionResult.builder().success(true).build();
})
.build();
@@ -806,7 +756,7 @@ public void deleteService() throws InterruptedException {
.supplier(() -> {
V1Service response;
try {
- response = coreClient.readNamespacedService(serviceName, jobNamespace, null);
+ response = coreClient.readNamespacedService(serviceName, jobNamespace).execute();
} catch (ApiException e) {
// service is gone
@@ -833,13 +783,13 @@ public void deleteService() throws InterruptedException {
.build())
.addAction(waitForServiceDeletion.toBuilder()
.continueOn(false)
- .onSuccess((ignored) -> success.set(true))
+ .onSuccess(ignored -> success.set(true))
.build())
.addAction(deleteService.toBuilder()
.continueOn(true)
.build())
.addAction(waitForServiceDeletion.toBuilder()
- .onSuccess((ignored) -> success.set(true))
+ .onSuccess(ignored -> success.set(true))
.build())
.run();
@@ -971,7 +921,7 @@ V1StatefulSet createStatefulSet() {
// let the customizer run but ensure it doesn't change the name so we can find it again
final V1StatefulSet overridden = manifestCustomizer
- .map((customizer) -> customizer.customizeStatefulSet(instanceConfig.getFunctionDetails(), statefulSet))
+ .map(customizer -> customizer.customizeStatefulSet(instanceConfig.getFunctionDetails(), statefulSet))
.orElse(statefulSet);
overridden.getMetadata().name(jobName);
@@ -1039,7 +989,7 @@ private V1PodSpec getPodSpec(List instanceCommand, Function.Resources re
return podSpec;
}
- private List getTolerations() {
+ private static List getTolerations() {
final List tolerations = new ArrayList<>();
TOLERATIONS.forEach(t -> {
final V1Toleration toleration =
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeFactory.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeFactory.java
index bbb6e3992a018..3f0a2ae9cc351 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeFactory.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeFactory.java
@@ -410,7 +410,7 @@ static void fetchConfigMap(CoreV1Api coreClient, String changeConfigMap,
KubernetesRuntimeFactory kubernetesRuntimeFactory) {
try {
V1ConfigMap v1ConfigMap =
- coreClient.readNamespacedConfigMap(changeConfigMap, changeConfigMapNamespace, null);
+ coreClient.readNamespacedConfigMap(changeConfigMap, changeConfigMapNamespace).execute();
Map data = v1ConfigMap.getData();
if (data != null) {
overRideKubernetesConfig(data, kubernetesRuntimeFactory);
@@ -545,12 +545,12 @@ public Optional getRuntimeCustomizer() {
private String getOverriddenNamespace(Function.FunctionDetails funcDetails) {
Optional manifestCustomizer = getRuntimeCustomizer();
- return manifestCustomizer.map((customizer) -> customizer.customizeNamespace(funcDetails, jobNamespace))
+ return manifestCustomizer.map(customizer -> customizer.customizeNamespace(funcDetails, jobNamespace))
.orElse(jobNamespace);
}
private String getOverriddenName(Function.FunctionDetails funcDetails) {
Optional manifestCustomizer = getRuntimeCustomizer();
- return manifestCustomizer.map((customizer) -> customizer.customizeName(funcDetails, jobName)).orElse(jobName);
+ return manifestCustomizer.map(customizer -> customizer.customizeName(funcDetails, jobName)).orElse(jobName);
}
}
diff --git a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/auth/KubernetesSecretsTokenAuthProviderTest.java b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/auth/KubernetesSecretsTokenAuthProviderTest.java
index be5e2cdc481c5..f6c6fc4c7bd8c 100644
--- a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/auth/KubernetesSecretsTokenAuthProviderTest.java
+++ b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/auth/KubernetesSecretsTokenAuthProviderTest.java
@@ -22,6 +22,7 @@
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
import io.kubernetes.client.openapi.ApiException;
import io.kubernetes.client.openapi.apis.CoreV1Api;
import io.kubernetes.client.openapi.models.V1Container;
@@ -112,8 +113,9 @@ public void testConfigureAuthDataStatefulSetNoCa() {
@Test
public void testCacheAuthData() throws ApiException {
CoreV1Api coreV1Api = mock(CoreV1Api.class);
- doReturn(new V1Secret()).when(coreV1Api).createNamespacedSecret(anyString(),
- any(), anyString(), anyString(), anyString(), anyString());
+ CoreV1Api.APIcreateNamespacedSecretRequest request = mock(CoreV1Api.APIcreateNamespacedSecretRequest.class);
+ doReturn(request).when(coreV1Api).createNamespacedSecret(anyString(), any());
+ doReturn(new V1Secret()).when(request).execute();
KubernetesSecretsTokenAuthProvider kubernetesSecretsTokenAuthProvider =
new KubernetesSecretsTokenAuthProvider();
kubernetesSecretsTokenAuthProvider.initialize(coreV1Api, null, (fd) -> "default");
@@ -180,6 +182,11 @@ public void testUpdateAuthData() throws Exception {
Optional existingFunctionAuthData = Optional.empty();
Function.FunctionDetails funcDetails = Function.FunctionDetails.newBuilder().setTenant("test-tenant")
.setNamespace("test-ns").setName("test-func").build();
+
+ CoreV1Api.APIcreateNamespacedSecretRequest namespacedSecretRequest = mock();
+ when(coreV1Api.createNamespacedSecret(anyString(), any())).thenReturn(namespacedSecretRequest);
+ when(namespacedSecretRequest.execute()).thenReturn(new V1Secret());
+
Optional functionAuthData = kubernetesSecretsTokenAuthProvider.updateAuthData(funcDetails,
existingFunctionAuthData, new AuthenticationDataSource() {
@Override
diff --git a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeFactoryTest.java b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeFactoryTest.java
index 08afca93cc46a..659398ed48250 100644
--- a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeFactoryTest.java
+++ b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeFactoryTest.java
@@ -21,6 +21,8 @@
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.fail;
@@ -512,11 +514,13 @@ public void testDynamicConfigMapLoading() throws Exception {
KubernetesRuntimeFactory kubernetesRuntimeFactory = getKuberentesRuntimeFactory();
CoreV1Api coreV1Api = Mockito.mock(CoreV1Api.class);
V1ConfigMap v1ConfigMap = new V1ConfigMap();
- Mockito.doReturn(v1ConfigMap).when(coreV1Api).readNamespacedConfigMap(any(), any(), any());
+ CoreV1Api.APIreadNamespacedConfigMapRequest request = mock(CoreV1Api.APIreadNamespacedConfigMapRequest.class);
+ Mockito.doReturn(request).when(coreV1Api).readNamespacedConfigMap(any(), any());
+ doReturn(v1ConfigMap).when(request).execute();
KubernetesRuntimeFactory.fetchConfigMap(coreV1Api, changeConfigMap,
changeConfigNamespace, kubernetesRuntimeFactory);
Mockito.verify(coreV1Api, Mockito.times(1)).readNamespacedConfigMap(
- eq(changeConfigMap), eq(changeConfigNamespace), eq(null));
+ eq(changeConfigMap), eq(changeConfigNamespace));
KubernetesRuntimeFactory expected = getKuberentesRuntimeFactory();
assertEquals(kubernetesRuntimeFactory, expected);
@@ -527,7 +531,7 @@ public void testDynamicConfigMapLoading() throws Exception {
KubernetesRuntimeFactory.fetchConfigMap(coreV1Api, changeConfigMap,
changeConfigNamespace, kubernetesRuntimeFactory);
Mockito.verify(coreV1Api, Mockito.times(2)).readNamespacedConfigMap(
- eq(changeConfigMap), eq(changeConfigNamespace), eq(null));
+ eq(changeConfigMap), eq(changeConfigNamespace));
assertEquals(kubernetesRuntimeFactory.getPulsarDockerImageName(), "test_dockerImage2");
assertEquals(kubernetesRuntimeFactory.getImagePullPolicy(), "test_imagePullPolicy2");
diff --git a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeTest.java b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeTest.java
index f8069efe299cb..2be701c832a8a 100644
--- a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeTest.java
+++ b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeTest.java
@@ -21,12 +21,10 @@
import static org.apache.pulsar.functions.runtime.RuntimeUtils.FUNCTIONS_INSTANCE_CLASSPATH;
import static org.apache.pulsar.functions.utils.FunctionCommon.roundDecimal;
import static org.assertj.core.api.Assertions.assertThat;
-import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.argThat;
import static org.mockito.ArgumentMatchers.eq;
-import static org.mockito.ArgumentMatchers.isNull;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
@@ -53,6 +51,7 @@
import io.kubernetes.client.openapi.models.V1ResourceRequirements;
import io.kubernetes.client.openapi.models.V1Service;
import io.kubernetes.client.openapi.models.V1StatefulSet;
+import io.kubernetes.client.openapi.models.V1Status;
import io.kubernetes.client.openapi.models.V1Toleration;
import java.lang.reflect.Type;
import java.math.BigDecimal;
@@ -64,8 +63,6 @@
import java.util.Optional;
import java.util.function.Consumer;
import java.util.stream.Collectors;
-import okhttp3.Call;
-import okhttp3.Response;
import org.apache.commons.lang3.JavaVersion;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.SystemUtils;
@@ -1385,13 +1382,6 @@ public void testDeleteStatefulSetWithTranslatedKubernetesLabelChars() throws Exc
CoreV1Api coreApi = mock(CoreV1Api.class);
AppsV1Api appsApi = mock(AppsV1Api.class);
- Call successfulCall = mock(Call.class);
- Response okResponse = mock(Response.class);
- when(okResponse.code()).thenReturn(HttpURLConnection.HTTP_OK);
- when(okResponse.isSuccessful()).thenReturn(true);
- when(okResponse.message()).thenReturn("");
- when(successfulCall.execute()).thenReturn(okResponse);
-
final String expectedFunctionNamePrefix = String.format("pf-%s-%s-%s", "c-tenant", "c-ns", "c-fn");
factory = createKubernetesRuntimeFactory(null, 10, 1.0, 1.0);
@@ -1400,33 +1390,43 @@ public void testDeleteStatefulSetWithTranslatedKubernetesLabelChars() throws Exc
ArgumentMatcher hasTranslatedFunctionName = (String t) -> t.startsWith(expectedFunctionNamePrefix);
- when(appsApi.deleteNamespacedStatefulSetCall(
+ AppsV1Api.APIdeleteNamespacedStatefulSetRequest request =
+ mock(AppsV1Api.APIdeleteNamespacedStatefulSetRequest.class);
+ when(appsApi.deleteNamespacedStatefulSet(
+ argThat(hasTranslatedFunctionName),
+ anyString())).thenReturn(request);
+ when(request.gracePeriodSeconds(anyInt())).thenReturn(request);
+ when(request.propagationPolicy(anyString())).thenReturn(request);
+ when(request.execute()).thenReturn(new V1Status());
+
+ AppsV1Api.APIreadNamespacedStatefulSetRequest request2 =
+ mock(AppsV1Api.APIreadNamespacedStatefulSetRequest.class);
+ when(appsApi.readNamespacedStatefulSet(
argThat(hasTranslatedFunctionName),
- anyString(), isNull(), isNull(), anyInt(), isNull(),
- anyString(), any(), isNull())).thenReturn(successfulCall);
+ anyString())).thenReturn(request2);
ApiException notFoundException = mock(ApiException.class);
when(notFoundException.getCode()).thenReturn(HttpURLConnection.HTTP_NOT_FOUND);
- when(appsApi.readNamespacedStatefulSet(
- argThat(hasTranslatedFunctionName), anyString(), isNull())).thenThrow(notFoundException);
+
+ when(request2.execute()).thenThrow(notFoundException);
V1PodList podList = mock(V1PodList.class);
when(podList.getItems()).thenReturn(Collections.emptyList());
String expectedLabels = String.format("tenant=%s,namespace=%s,name=%s", "c-tenant", "c-ns", "c-fn");
- when(coreApi.listNamespacedPod(anyString(), isNull(), isNull(),
- isNull(), isNull(),
- eq(expectedLabels), isNull(), isNull(), isNull(),
- isNull(), isNull())).thenReturn(podList);
+
+ CoreV1Api.APIlistNamespacedPodRequest listNamespacedPodRequest = mock();
+ when(coreApi.listNamespacedPod(anyString())).thenReturn(listNamespacedPodRequest);
+ when(listNamespacedPodRequest.labelSelector(anyString())).thenReturn(listNamespacedPodRequest);
+ when(listNamespacedPodRequest.execute()).thenReturn(podList);
+
KubernetesRuntime kr = factory.createContainer(config, "/test/code", "code.yml",
"/test/transforms", "transform.yml", Long.MIN_VALUE);
kr.deleteStatefulSet();
- verify(coreApi).listNamespacedPod(anyString(), isNull(), isNull(),
- isNull(), isNull(),
- eq(expectedLabels), isNull(), isNull(), isNull(),
- isNull(), isNull());
+ verify(coreApi).listNamespacedPod(anyString());
+ verify(listNamespacedPodRequest).labelSelector(eq(expectedLabels));
}
@Test
diff --git a/tests/integration/pom.xml b/tests/integration/pom.xml
index fdad69444bb90..3f3c07d70c0d7 100644
--- a/tests/integration/pom.xml
+++ b/tests/integration/pom.xml
@@ -247,6 +247,69 @@
test
+
+ org.testcontainers
+ k3s
+ test
+
+
+
+ io.kubernetes
+ client-java
+ ${kubernetesclient.version}
+ test
+
+
+ io.prometheus
+ simpleclient_httpserver
+
+
+ bcpkix-jdk18on
+ org.bouncycastle
+
+
+ bcutil-jdk18on
+ org.bouncycastle
+
+
+ bcprov-jdk18on
+ org.bouncycastle
+
+
+ javax.annotation
+ javax.annotation-api
+
+
+
+
+ io.kubernetes
+ client-java-api-fluent
+ ${kubernetesclient.version}
+ test
+
+
+ io.prometheus
+ simpleclient_httpserver
+
+
+ bcpkix-jdk18on
+ org.bouncycastle
+
+
+ bcutil-jdk18on
+ org.bouncycastle
+
+
+ bcprov-jdk18on
+ org.bouncycastle
+
+
+ javax.annotation
+ javax.annotation-api
+
+
+
+
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/k8s/PulsarFunctionsK8STest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/k8s/PulsarFunctionsK8STest.java
new file mode 100644
index 0000000000000..4671c82e4eae1
--- /dev/null
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/k8s/PulsarFunctionsK8STest.java
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.tests.integration.functions.k8s;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
+import io.kubernetes.client.Exec;
+import io.kubernetes.client.openapi.ApiException;
+import io.kubernetes.client.openapi.apis.CoreV1Api;
+import io.kubernetes.client.openapi.models.V1Secret;
+import io.kubernetes.client.openapi.models.V1SecretBuilder;
+import java.io.IOException;
+import java.time.Duration;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+import lombok.Cleanup;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.common.functions.FunctionConfig;
+import org.apache.pulsar.common.policies.data.FunctionStatus;
+import org.apache.pulsar.tests.integration.functions.java.PulsarFunctionsJavaTest;
+import org.apache.pulsar.tests.integration.functions.utils.CommandGenerator;
+import org.apache.pulsar.tests.integration.k8s.AbstractPulsarStandaloneK8STest;
+import org.awaitility.Awaitility;
+import org.testng.annotations.Test;
+
+/**
+ * This class is an integration test for Pulsar Functions running in Kubernetes.
+ * This tests {@link org.apache.pulsar.functions.runtime.kubernetes.KubernetesRuntimeFactory},
+ * {@link org.apache.pulsar.functions.runtime.kubernetes.KubernetesRuntime},
+ * and {@link org.apache.pulsar.functions.secretsproviderconfigurator.KubernetesSecretsProviderConfigurator} classes
+ * in a lightweight Kubernetes cluster which is provided by a k3s container running in Docker with Testcontainers.
+ */
+@Slf4j
+public class PulsarFunctionsK8STest extends AbstractPulsarStandaloneK8STest {
+ @Test
+ public void testCreateFunctionInK8sWithSecrets()
+ throws PulsarAdminException, IOException, InterruptedException, ApiException {
+ @Cleanup
+ PulsarAdmin admin = PulsarAdmin.builder().serviceHttpUrl(getPulsarWebServiceUrl())
+ .build();
+
+ String randomPart = UUID.randomUUID().toString();
+ String inputTopicName =
+ "persistent://public/default/test-function-input-" + randomPart;
+ String outputTopicName = "persistent://public/default/test-function-output-" + randomPart;
+
+ // create a sample secret
+ try {
+ CoreV1Api api = new CoreV1Api(getApiClient());
+ V1Secret secret = new V1SecretBuilder()
+ .withNewMetadata()
+ .withName("mysecret")
+ .endMetadata()
+ .addToStringData("secret1", "value1")
+ .addToStringData("secret2", "value2").build();
+ api.createNamespacedSecret(getNamespace(), secret).execute();
+ } catch (ApiException e) {
+ // ignore if the secret already exists since this could happen due to retries
+ if (e.getCode() != 409) {
+ throw e;
+ }
+ }
+
+ FunctionConfig functionConfig = new FunctionConfig();
+ String fnTenant = "public";
+ String fnNamespace = "default";
+ String fnName = "test-function";
+ functionConfig.setTenant(fnTenant);
+ functionConfig.setNamespace(fnNamespace);
+ functionConfig.setName(fnName);
+ functionConfig.setRuntime(FunctionConfig.Runtime.JAVA);
+ functionConfig.setClassName(PulsarFunctionsJavaTest.EXCLAMATION_JAVA_CLASS);
+ functionConfig.setJar(CommandGenerator.JAVAJAR);
+ functionConfig.setInputs(List.of(inputTopicName));
+ functionConfig.setOutput(outputTopicName);
+ // test referencing k8s secrets
+ functionConfig.setSecrets(
+ Map.of("secret1", Map.of(
+ "path", "mysecret",
+ "key", "secret1"),
+ "secret2", Map.of(
+ "path", "mysecret",
+ "key", "secret2")));
+
+ log.info("Creating function");
+ admin.functions().createFunctionWithUrl(functionConfig, "file://" + CommandGenerator.JAVAJAR);
+
+ log.info("Waiting for function to be created");
+ Awaitility.await().ignoreExceptions().pollDelay(Duration.ofSeconds(1)).atMost(Duration.ofSeconds(30))
+ .untilAsserted(() -> {
+ FunctionStatus functionStatus = admin.functions().getFunctionStatus(fnTenant, fnNamespace, fnName);
+ assertThat(functionStatus.getNumInstances()).isEqualTo(1);
+ assertThat(functionStatus.getNumRunning()).isEqualTo(1);
+ });
+ log.info("Function created successfully");
+
+ // Validate that k8s secrets were provided as environment variables to the function pod
+ String podName = "pf-%s-%s-%s-0".formatted(fnTenant, fnNamespace, fnName);
+ Exec exec = new Exec(getApiClient());
+ Process process = exec.newExecutionBuilder(getNamespace(), podName, new String[]{"sh", "-c",
+ "echo \"secret1=$secret1,secret2=$secret2\""})
+ .setStdin(false).setStderr(false).setStdout(true).setTty(false).execute();
+ String response = process.inputReader().readLine();
+ assertThat(process.waitFor()).isEqualTo(0);
+ assertThat(response).isEqualTo("secret1=value1,secret2=value2");
+
+ log.info("Testing function");
+ @Cleanup
+ PulsarClient client = PulsarClient.builder().serviceUrl(getPulsarBrokerUrl())
+ .build();
+
+ @Cleanup
+ Consumer consumer =
+ client.newConsumer(Schema.STRING).topic(outputTopicName).subscriptionName("sub").subscribe();
+
+ @Cleanup
+ Producer producer = client.newProducer(Schema.STRING).topic(inputTopicName).create();
+ producer.send("Hello");
+
+ Message message = consumer.receive(5, TimeUnit.SECONDS);
+ assertThat(message).isNotNull();
+ assertThat(message.getValue()).isEqualTo("Hello!");
+
+
+ log.info("Stopping function");
+ admin.functions().stopFunction(fnTenant, fnNamespace, fnName);
+ Awaitility.await().ignoreExceptions().pollDelay(Duration.ofSeconds(1)).atMost(Duration.ofSeconds(30))
+ .untilAsserted(() -> {
+ FunctionStatus functionStatus = admin.functions().getFunctionStatus(fnTenant, fnNamespace, fnName);
+ assertThat(functionStatus.getNumInstances()).isEqualTo(1);
+ assertThat(functionStatus.getNumRunning()).isEqualTo(0);
+ });
+
+ log.info("Starting function");
+ // this seems to be flaky if the stopping of the function hasn't fully completed when it's started again.
+ // one way to reproduce is to remove the delay before starting the function and also removing the pollDelay
+ // from the await after stopFunction
+ Thread.sleep(2000);
+ admin.functions().startFunction(fnTenant, fnNamespace, fnName);
+ Awaitility.await().ignoreExceptions().pollDelay(Duration.ofSeconds(1)).atMost(Duration.ofSeconds(30))
+ .untilAsserted(() -> {
+ FunctionStatus functionStatus = admin.functions().getFunctionStatus(fnTenant, fnNamespace, fnName);
+ assertThat(functionStatus.getNumInstances()).isEqualTo(1);
+ assertThat(functionStatus.getNumRunning()).isEqualTo(1);
+ });
+
+ log.info("Waiting for function to be deleted");
+ admin.functions().deleteFunction(fnTenant, fnNamespace, fnName);
+ log.info("Function deleted successfully");
+
+ log.info("Waiting for function pod to be deleted");
+ CoreV1Api coreApi = new CoreV1Api(getApiClient());
+ Awaitility.await().pollDelay(Duration.ofSeconds(1)).atMost(Duration.ofSeconds(30)).untilAsserted(() -> {
+ assertThatExceptionOfType(ApiException.class).isThrownBy(
+ () -> coreApi.readNamespacedPod(getNamespace(), podName).execute())
+ .satisfies(apiException -> {
+ assertThat(apiException.getCode()).isEqualTo(404);
+ });
+ });
+ log.info("Function pod deleted successfully");
+ }
+}
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/k8s/AbstractPulsarStandaloneK8STest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/k8s/AbstractPulsarStandaloneK8STest.java
new file mode 100644
index 0000000000000..1c4f2503abcb3
--- /dev/null
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/k8s/AbstractPulsarStandaloneK8STest.java
@@ -0,0 +1,411 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.tests.integration.k8s;
+
+import com.github.dockerjava.api.DockerClient;
+import com.github.dockerjava.api.async.ResultCallbackTemplate;
+import com.github.dockerjava.api.command.ExecCreateCmdResponse;
+import com.github.dockerjava.api.model.Frame;
+import com.github.dockerjava.api.model.StreamType;
+import io.kubernetes.client.Exec;
+import io.kubernetes.client.custom.Quantity;
+import io.kubernetes.client.openapi.ApiClient;
+import io.kubernetes.client.openapi.ApiException;
+import io.kubernetes.client.openapi.apis.CoreV1Api;
+import io.kubernetes.client.openapi.apis.RbacAuthorizationV1Api;
+import io.kubernetes.client.openapi.models.CoreV1EventList;
+import io.kubernetes.client.openapi.models.RbacV1SubjectBuilder;
+import io.kubernetes.client.openapi.models.V1Pod;
+import io.kubernetes.client.openapi.models.V1PodBuilder;
+import io.kubernetes.client.openapi.models.V1PolicyRuleBuilder;
+import io.kubernetes.client.openapi.models.V1Role;
+import io.kubernetes.client.openapi.models.V1RoleBinding;
+import io.kubernetes.client.openapi.models.V1RoleBindingBuilder;
+import io.kubernetes.client.openapi.models.V1RoleBuilder;
+import io.kubernetes.client.openapi.models.V1Service;
+import io.kubernetes.client.openapi.models.V1ServiceAccount;
+import io.kubernetes.client.openapi.models.V1ServiceAccountBuilder;
+import io.kubernetes.client.openapi.models.V1ServiceBuilder;
+import io.kubernetes.client.util.Config;
+import io.kubernetes.client.util.KubeConfig;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.StringReader;
+import java.io.UncheckedIOException;
+import java.nio.file.Files;
+import java.time.Duration;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.functions.runtime.kubernetes.KubernetesRuntimeFactory;
+import org.apache.pulsar.functions.secretsproviderconfigurator.KubernetesSecretsProviderConfigurator;
+import org.apache.tools.tar.TarEntry;
+import org.apache.tools.tar.TarInputStream;
+import org.testcontainers.containers.wait.strategy.Wait;
+import org.testcontainers.k3s.K3sContainer;
+import org.testcontainers.utility.DockerImageName;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+
+/**
+ * Abstract base class for integration tests that use Kubernetes (K8s) and Pulsar standalone.
+ * This class sets up a Kubernetes environment using the k3s lightweight Kubernetes container
+ * and deploys a standalone Pulsar instance within this Kubernetes cluster.
+ * It provides configuration and utility methods to allow test classes to interact
+ * with the deployed Pulsar instance and Kubernetes cluster.
+ * The main reason to use this base class is to test features in Pulsar which are integrated into Kubernetes
+ * APIs.
+ */
+@Slf4j
+public abstract class AbstractPulsarStandaloneK8STest {
+ private static final String DEFAULT_IMAGE_NAME = System.getenv().getOrDefault("PULSAR_TEST_IMAGE_NAME",
+ "apachepulsar/java-test-image:latest");
+ private static final int PULSAR_NODE_PORT = 30101;
+ private static final int PULSAR_HTTP_NODE_PORT = 30102;
+ private static final String K3S_IMAGE_NAME = "rancher/k3s:v1.33.5-k3s1";
+ private static final String PULSAR_STANDALONE_POD = "pulsar-standalone-pod";
+ K3sContainer k3sContainer;
+ KubeConfig kubeConfig;
+ @Getter
+ ApiClient apiClient;
+ DockerClient dockerClient;
+ String dockerHostName;
+ @Getter
+ String pulsarBrokerUrl;
+ @Getter
+ String pulsarWebServiceUrl;
+ @Getter
+ File kubeConfigFile;
+
+ @BeforeClass
+ public final void setupCluster() throws IOException, ApiException, InterruptedException {
+ k3sContainer = new K3sContainer(DockerImageName.parse(K3S_IMAGE_NAME));
+ k3sContainer.addExposedPort(PULSAR_NODE_PORT);
+ k3sContainer.addExposedPort(PULSAR_HTTP_NODE_PORT);
+ k3sContainer.start();
+ dockerHostName = k3sContainer.getHost();
+ pulsarBrokerUrl = "pulsar://" + dockerHostName + ":" + k3sContainer.getMappedPort(PULSAR_NODE_PORT);
+ pulsarWebServiceUrl = "http://" + dockerHostName + ":" + k3sContainer.getMappedPort(PULSAR_HTTP_NODE_PORT);
+ dockerClient = k3sContainer.getDockerClient();
+ String kubeConfigYaml = k3sContainer.getKubeConfigYaml();
+ kubeConfig = KubeConfig.loadKubeConfig(new StringReader(kubeConfigYaml));
+ apiClient = Config.fromConfig(kubeConfig);
+ kubeConfigFile = File.createTempFile("kubeconfig", ".yaml");
+ Files.writeString(kubeConfigFile.toPath(), kubeConfigYaml);
+ log.info("Pulsar broker URL: {} http URL: {}", pulsarBrokerUrl, pulsarWebServiceUrl);
+ log.info("For debugging k8s, use KUBECONFIG={}", kubeConfigFile.getAbsolutePath());
+ importPulsarImage();
+ deployPulsarStandalonePod();
+ log.info("Waiting for Pulsar cluster to be ready");
+ Wait.forHttp("/admin/v2/tenants")
+ .forPort(PULSAR_HTTP_NODE_PORT)
+ // when Pulsar Functions are ready, there will also be a pulsar tenant created
+ .forResponsePredicate("[\"public\",\"pulsar\"]"::equals)
+ .withStartupTimeout(Duration.ofMinutes(5))
+ .waitUntilReady(k3sContainer);
+ log.info("Pulsar cluster ready. Waiting 3 seconds before continuing for Functions Leader to be ready.");
+ Thread.sleep(3000);
+ }
+
+ @AfterClass(alwaysRun = true)
+ public final void cleanupCluster() throws InterruptedException {
+ if (k3sContainer != null) {
+ copyLogsToTargetDirectory();
+ k3sContainer.stop();
+ kubeConfigFile.delete();
+ }
+ }
+
+ private void copyLogsToTargetDirectory() {
+ if (apiClient != null) {
+ File targetDirectoryForLogs = getTargetDirectoryForLogs();
+ Exec exec = new Exec(apiClient);
+ try {
+ log.info("Copying logs from Pulsar standalone pod to target directory: {}", targetDirectoryForLogs);
+ Process process = exec.newExecutionBuilder(getNamespace(), PULSAR_STANDALONE_POD,
+ new String[]{"sh", "-c", "cd /pulsar/logs && tar cf - *"}
+ ).setTty(false).setStdin(false).setStderr(false).setStdout(true).execute();
+ try (TarInputStream tarInputStream = new TarInputStream(process.getInputStream())) {
+ TarEntry tarEntry;
+ while ((tarEntry = tarInputStream.getNextEntry()) != null) {
+ if (tarEntry.isFile()) {
+ File targetFile = new File(targetDirectoryForLogs, new File(tarEntry.getName()).getName());
+ Files.copy(tarInputStream, targetFile.toPath());
+ }
+ }
+ }
+ } catch (Exception e) {
+ log.error("Error copying logs from Pulsar standalone pod to target directory.", e);
+ }
+
+ CoreV1Api coreApi = new CoreV1Api(apiClient);
+ File eventsFile = new File(targetDirectoryForLogs, "k8s_events.json");
+ try {
+ log.info("Copying events from Kubernetes cluster namespace {} to {}.", getNamespace(), eventsFile);
+ CoreV1EventList eventList = coreApi.listNamespacedEvent(getNamespace()).execute();
+ Files.writeString(eventsFile.toPath(), eventList.toJson());
+ } catch (Exception e) {
+ log.error("Error copying events from Kubernetes cluster to {}.", eventsFile, e);
+ }
+ }
+ }
+
+ private File getTargetDirectoryForLogs() {
+ String base = System.getProperty("maven.buildDirectory");
+ if (base == null) {
+ base = "target";
+ }
+ // use the container-logs directory since it's used in CI for integration tests as the file location
+ File directory = new File(new File(base, "container-logs"),
+ "k8s_" + getClass().getSimpleName() + "_" + System.currentTimeMillis());
+ if (!directory.exists() && !directory.mkdirs()) {
+ log.error("Error creating directory for logs.");
+ }
+ return directory;
+ }
+
+ protected String getPulsarImageName() {
+ return DEFAULT_IMAGE_NAME;
+ }
+
+ private void deployPulsarStandalonePod() throws ApiException {
+ createServiceAccountAndRoleAndRoleBinding();
+ CoreV1Api coreApi = new CoreV1Api(apiClient);
+ String namespace = getNamespace();
+ V1Pod pod = createPulsarPod();
+ coreApi.createNamespacedPod(namespace, pod).execute();
+ V1Service service = createPulsarService();
+ coreApi.createNamespacedService(namespace, service).execute();
+ }
+
+ protected String getNamespace() {
+ return "default";
+ }
+
+ private void importPulsarImage() throws InterruptedException {
+ importImageToK3S(getPulsarImageName());
+ }
+
+ /**
+ * Imports a local Docker image into the k3s container's containerd registry.
+ * This method allows the image to be used by Kubernetes pods within the k3s cluster.
+ * When using this imported image in pod specs, the imagePullPolicy should be set to "Never"
+ * since the image is already available locally in the cluster.
+ * @param imageName The name of the Docker image to import
+ * @throws UncheckedIOException if there is an error during image import
+ * @throws InterruptedException if the import is interrupted
+ */
+ protected void importImageToK3S(String imageName) throws InterruptedException {
+ // Stream the saved image directly to "ctr images import -" inside the k3s container
+ try (InputStream is = dockerClient.saveImageCmd(imageName).exec()) {
+ String[] importCommand = {"ctr", "images", "import", "-"};
+ ExecCreateCmdResponse execCreateCmdResponse = dockerClient.execCreateCmd(k3sContainer.getContainerId())
+ .withAttachStdin(true)
+ .withAttachStdout(true)
+ .withAttachStderr(true)
+ .withTty(false)
+ .withCmd(importCommand)
+ .exec();
+ dockerClient.execStartCmd(execCreateCmdResponse.getId())
+ .withStdIn(is)
+ .withTty(false)
+ .withDetach(false)
+ .exec(new ResultCallbackTemplate<>() {
+ @Override
+ public void onNext(Frame frame) {
+ String logMessage =
+ String.join(" ", importCommand) + ": " + new String(frame.getPayload()).trim();
+ if (frame.getStreamType() == StreamType.STDERR) {
+ log.error(logMessage);
+ } else {
+ log.info(logMessage);
+ }
+ }
+ })
+ .awaitCompletion();
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+ }
+
+ private void createServiceAccountAndRoleAndRoleBinding() throws ApiException {
+ CoreV1Api coreApi = new CoreV1Api(apiClient);
+ V1ServiceAccount serviceAccount = new V1ServiceAccountBuilder()
+ .withNewMetadata()
+ .withName("pulsar-standalone-sa")
+ .endMetadata()
+ .build();
+ coreApi.createNamespacedServiceAccount(getNamespace(), serviceAccount).execute();
+ V1Role role = new V1RoleBuilder()
+ .withNewMetadata()
+ .withName("pulsar-functions-role")
+ .endMetadata()
+ .withRules(
+ new V1PolicyRuleBuilder()
+ .addToApiGroups("")
+ .addToResources("services", "configmaps", "pods")
+ .addToVerbs("*").build(),
+ new V1PolicyRuleBuilder()
+ .addToApiGroups("apps")
+ .addToResources("statefulsets")
+ .addToVerbs("*").build()
+ )
+ .build();
+ RbacAuthorizationV1Api rbacApi = new RbacAuthorizationV1Api(apiClient);
+ rbacApi.createNamespacedRole(getNamespace(), role).execute();
+ V1RoleBinding roleBinding = new V1RoleBindingBuilder()
+ .withNewMetadata()
+ .withName("pulsar-functions-role-binding")
+ .endMetadata()
+ .withNewRoleRef()
+ .withKind("Role")
+ .withName("pulsar-functions-role")
+ .endRoleRef()
+ .withSubjects(new RbacV1SubjectBuilder()
+ .withKind("ServiceAccount")
+ .withName("pulsar-standalone-sa")
+ .withNamespace(getNamespace())
+ .build())
+ .build();
+ rbacApi.createNamespacedRoleBinding(getNamespace(), roleBinding).execute();
+ }
+
+ private V1Pod createPulsarPod() {
+ V1PodBuilder podBuilder = new V1PodBuilder();
+ var containerBuilder = podBuilder
+ .withNewMetadata()
+ .withName(PULSAR_STANDALONE_POD)
+ .addToLabels("app", "pulsar")
+ .addToLabels("component", "standalone")
+ .endMetadata()
+ .withNewSpec()
+ .withServiceAccountName("pulsar-standalone-sa")
+ .addNewContainer();
+
+ // Container
+ containerBuilder
+ .withName("pulsar")
+ .withImage(getPulsarImageName())
+ .withImagePullPolicy("Never")
+ .withCommand("sh", "-c",
+ "bin/apply-config-from-env.py conf/standalone.conf && "
+ + "bin/gen-yml-from-env.py conf/functions_worker.yml && "
+ + "bin/update-rocksdb-conf-from-env.py conf/entry_location_rocksdb.conf && "
+ + "bin/pulsar standalone")
+ // Ports
+ .addNewPort()
+ .withName("pulsar")
+ .withContainerPort(6650)
+ .withProtocol("TCP")
+ .endPort()
+ .addNewPort()
+ .withName("http")
+ .withContainerPort(8080)
+ .withProtocol("TCP")
+ .endPort()
+ .addNewPort()
+ .withName("pulsar-nodeport")
+ .withContainerPort(16650)
+ .withProtocol("TCP")
+ .endPort();
+
+ // Environment variables
+ for (Map.Entry env : getBrokerEnv().entrySet()) {
+ containerBuilder.addNewEnv()
+ .withName(env.getKey())
+ .withValue(env.getValue())
+ .endEnv();
+ }
+
+ // Resource limits
+ containerBuilder.withNewResources()
+ .withLimits(null)
+ .addToRequests("memory", new Quantity("128Mi"))
+ .addToRequests("cpu", new Quantity("100m"))
+ .endResources();
+
+ return containerBuilder.endContainer()
+ .withOverhead(null)
+ .endSpec().build();
+ }
+
+ protected Map getBrokerEnv() {
+ Map envVars = new LinkedHashMap<>() {
+ {
+ put("PULSAR_MEM", "-Xmx256m");
+ put("PULSAR_GC", "-XX:+UseG1GC");
+ put("bindAddresses", "nodeport:pulsar://0.0.0.0:16650");
+ put("PULSAR_PREFIX_advertisedListeners",
+ "internal:pulsar://pulsar.default.svc.cluster.local:6650,nodeport:" + pulsarBrokerUrl);
+ // log to file since docker logs will get truncated
+ put("PULSAR_ROUTING_APPENDER_DEFAULT", "RollingFile");
+ put("dbStorage_writeCacheMaxSizeMb", "32");
+ put("dbStorage_readAheadCacheMaxSizeMb", "32");
+ put("PF_functionsDirectory", "/pulsar/examples");
+ put("PF_functionRuntimeFactoryClassName", KubernetesRuntimeFactory.class.getName());
+ put("PF_secretsProviderConfiguratorClassName", KubernetesSecretsProviderConfigurator.class.getName());
+ put("PF_functionRuntimeFactoryConfigs_pulsarDockerImageName", getPulsarImageName());
+ put("PF_kubernetesContainerFactory_imagePullPolicy", "Never");
+ put("PF_functionRuntimeFactoryConfigs_submittingInsidePod", "true");
+ put("PF_functionRuntimeFactoryConfigs_installUserCodeDependencies", "true");
+ put("PF_functionRuntimeFactoryConfigs_jobNamespace", getNamespace());
+ put("PF_functionRuntimeFactoryConfigs_pulsarAdminUrl", "http://pulsar.default.svc.cluster.local");
+ put("PF_functionRuntimeFactoryConfigs_pulsarServiceUrl",
+ "pulsar://pulsar.default.svc.cluster.local:6650");
+ }
+ };
+ return envVars;
+ }
+
+ public V1Service createPulsarService() {
+ V1Service service = new V1ServiceBuilder()
+ .withNewMetadata()
+ .withName("pulsar")
+ .endMetadata()
+ .withNewSpec()
+ .withType("NodePort")
+ .withSelector(Map.of("app", "pulsar", "component", "standalone"))
+ // Ports
+ .addNewPort()
+ .withName("pulsar-nodeport")
+ .withNodePort(PULSAR_NODE_PORT)
+ .withPort(16650)
+ .withNewTargetPort(16650)
+ .withProtocol("TCP")
+ .endPort()
+ .addNewPort()
+ .withName("http")
+ .withPort(80)
+ .withNodePort(PULSAR_HTTP_NODE_PORT)
+ .withNewTargetPort(8080)
+ .withProtocol("TCP")
+ .endPort()
+ .addNewPort()
+ .withName("pulsar")
+ .withPort(6650)
+ .withNewTargetPort(6650)
+ .withProtocol("TCP")
+ .endPort()
+ .endSpec()
+ .build();
+ return service;
+ }
+}
diff --git a/tests/integration/src/test/resources/pulsar-k8s.xml b/tests/integration/src/test/resources/pulsar-k8s.xml
new file mode 100644
index 0000000000000..03f7f0c950324
--- /dev/null
+++ b/tests/integration/src/test/resources/pulsar-k8s.xml
@@ -0,0 +1,28 @@
+
+
+
+
+
+
+
+
+
\ No newline at end of file