From 9720415e8ad7a9e9c4223e7e5576e72152d98cf2 Mon Sep 17 00:00:00 2001 From: AfrinKhan02 Date: Thu, 21 May 2026 18:11:43 +0530 Subject: [PATCH 1/9] Add response model &connector reset offsets controller Endpoint --- .../connect/ConnectorController.java | 25 +++++++++++++++++++ .../entities/ConnectorOffsetsResponse.java | 25 +++++++++++++++++++ 2 files changed, 50 insertions(+) create mode 100644 src/main/java/com/michelin/ns4kafka/service/client/connect/entities/ConnectorOffsetsResponse.java diff --git a/src/main/java/com/michelin/ns4kafka/controller/connect/ConnectorController.java b/src/main/java/com/michelin/ns4kafka/controller/connect/ConnectorController.java index 3e7b4c651..e50018dc7 100644 --- a/src/main/java/com/michelin/ns4kafka/controller/connect/ConnectorController.java +++ b/src/main/java/com/michelin/ns4kafka/controller/connect/ConnectorController.java @@ -28,6 +28,7 @@ import com.michelin.ns4kafka.model.connect.ChangeConnectorState; import com.michelin.ns4kafka.model.connect.Connector; import com.michelin.ns4kafka.service.ConnectorService; +import com.michelin.ns4kafka.service.client.connect.entities.ConnectorOffsetsResponse; import com.michelin.ns4kafka.service.NamespaceService; import com.michelin.ns4kafka.service.ResourceQuotaService; import com.michelin.ns4kafka.util.enumation.ApplyStatus; @@ -331,6 +332,30 @@ public Mono> changeState( .onErrorReturn(HttpResponse.ok(state)); } + /** + * Reset the offsets of a connector. + * + * @param namespace The namespace + * @param connector The connector to reset offsets for + * @return An HTTP response + */ + @Delete("/{connector}/offsets") + public Mono> resetOffsets(String namespace, String connector) { + Namespace ns = getNamespace(namespace); + + if (!connectorService.isNamespaceOwnerOfConnect(ns, connector)) { + return Mono.error(new ResourceValidationException(CONNECTOR, connector, invalidOwner(connector))); + } + + Optional optionalConnector = connectorService.findByName(ns, connector); + + if (optionalConnector.isEmpty()) { + return Mono.just(HttpResponse.notFound()); + } + + return connectorService.resetOffsets(ns, optionalConnector.get()); + } + /** * Import unsynchronized connectors. * diff --git a/src/main/java/com/michelin/ns4kafka/service/client/connect/entities/ConnectorOffsetsResponse.java b/src/main/java/com/michelin/ns4kafka/service/client/connect/entities/ConnectorOffsetsResponse.java new file mode 100644 index 000000000..e0c231ed9 --- /dev/null +++ b/src/main/java/com/michelin/ns4kafka/service/client/connect/entities/ConnectorOffsetsResponse.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.michelin.ns4kafka.service.client.connect.entities; + +import io.micronaut.serde.annotation.Serdeable; + +/** Kafka Connect connector offsets operation response. */ +@Serdeable +public record ConnectorOffsetsResponse(String message) {} \ No newline at end of file From 24844abfc44137340293c507de78fa5a585f4887 Mon Sep 17 00:00:00 2001 From: AfrinKhan02 Date: Fri, 22 May 2026 15:11:54 +0530 Subject: [PATCH 2/9] Handle connector service & Return Kafka Connect reset offsets response --- .../ns4kafka/service/ConnectorService.java | 25 ++++++++++++++++++ .../client/connect/KafkaConnectClient.java | 26 +++++++++++++++++++ 2 files changed, 51 insertions(+) diff --git a/src/main/java/com/michelin/ns4kafka/service/ConnectorService.java b/src/main/java/com/michelin/ns4kafka/service/ConnectorService.java index caab61dc9..86f9bbd7f 100644 --- a/src/main/java/com/michelin/ns4kafka/service/ConnectorService.java +++ b/src/main/java/com/michelin/ns4kafka/service/ConnectorService.java @@ -28,6 +28,7 @@ import com.michelin.ns4kafka.model.connect.Connector; import com.michelin.ns4kafka.repository.ConnectorRepository; import com.michelin.ns4kafka.service.client.connect.KafkaConnectClient; +import com.michelin.ns4kafka.service.client.connect.entities.ConnectorOffsetsResponse; import com.michelin.ns4kafka.service.client.connect.entities.ConnectorSpecs; import com.michelin.ns4kafka.service.executor.ConnectorAsyncExecutor; import com.michelin.ns4kafka.util.FormatErrorUtils; @@ -412,4 +413,28 @@ public Mono> stop(Namespace namespace, Connector connector) { return HttpResponse.accepted(); }); } + + /** + * Reset offsets for a given connector. + * + * @param namespace The namespace + * @param connector The connector + * @return An HTTP response + */ + public Mono> resetOffsets(Namespace namespace, Connector connector) { + return kafkaConnectClient + .resetOffsets( + namespace.getMetadata().getCluster(), + connector.getSpec().getConnectCluster(), + connector.getMetadata().getName()) + .map(response -> { + log.info( + "Success resetting offsets for Connector [{}] on Namespace [{}] Connect [{}]", + connector.getMetadata().getName(), + namespace.getMetadata().getName(), + connector.getSpec().getConnectCluster()); + + return HttpResponse.status(response.getStatus()).body(response.body()); + }); + } } diff --git a/src/main/java/com/michelin/ns4kafka/service/client/connect/KafkaConnectClient.java b/src/main/java/com/michelin/ns4kafka/service/client/connect/KafkaConnectClient.java index 3392503c5..15b0a0906 100644 --- a/src/main/java/com/michelin/ns4kafka/service/client/connect/KafkaConnectClient.java +++ b/src/main/java/com/michelin/ns4kafka/service/client/connect/KafkaConnectClient.java @@ -28,6 +28,7 @@ import com.michelin.ns4kafka.service.client.connect.entities.ConnectorSpecs; import com.michelin.ns4kafka.service.client.connect.entities.ConnectorStateInfo; import com.michelin.ns4kafka.service.client.connect.entities.ConnectorStatus; +import com.michelin.ns4kafka.service.client.connect.entities.ConnectorOffsetsResponse; import com.michelin.ns4kafka.service.client.connect.entities.ServerInfo; import com.michelin.ns4kafka.util.EncryptionUtils; import com.michelin.ns4kafka.util.exception.ResourceValidationException; @@ -358,6 +359,31 @@ public Mono> stop(String kafkaCluster, String connectCluster, return Mono.from(httpClient.exchange(request, Void.class)); } + /** + * Reset a connector offsets. + * + * @param kafkaCluster The Kafka cluster + * @param connectCluster The Kafka Connect + * @param connector The connector + * @return The reset response + */ + @Retryable( + delay = "${ns4kafka.retry.delay}", + attempts = "${ns4kafka.retry.attempt}", + multiplier = "${ns4kafka.retry.multiplier}", + includes = ReadTimeoutException.class) + public Mono> resetOffsets( + String kafkaCluster, String connectCluster, String connector) { + KafkaConnectHttpConfig config = getKafkaConnectConfig(kafkaCluster, connectCluster); + String encodedConnector = URLEncoder.encode(connector, StandardCharsets.UTF_8); + + HttpRequest request = HttpRequest.DELETE( + URI.create(StringUtils.prependUri(config.getUrl(), CONNECTORS + encodedConnector + "/offsets"))) + .basicAuth(config.getUsername(), config.getPassword()); + + return Mono.from(httpClient.exchange(request, ConnectorOffsetsResponse.class)); + } + /** * Get the Kafka Connect configuration. * From 173296aa84f3a52f7b76bc82e68350d1a47527a3 Mon Sep 17 00:00:00 2001 From: AfrinKhan02 Date: Sat, 23 May 2026 21:51:09 +0530 Subject: [PATCH 3/9] Add test coverage for Connector reset offsets endpoint --- .../connect/ConnectorControllerTest.java | 78 +++++++++++++++++++ .../service/ConnectorServiceTest.java | 37 +++++++++ 2 files changed, 115 insertions(+) diff --git a/src/test/java/com/michelin/ns4kafka/controller/connect/ConnectorControllerTest.java b/src/test/java/com/michelin/ns4kafka/controller/connect/ConnectorControllerTest.java index 29b1c8493..21361e8f1 100644 --- a/src/test/java/com/michelin/ns4kafka/controller/connect/ConnectorControllerTest.java +++ b/src/test/java/com/michelin/ns4kafka/controller/connect/ConnectorControllerTest.java @@ -36,6 +36,7 @@ import com.michelin.ns4kafka.model.connect.Connector; import com.michelin.ns4kafka.security.ResourceBasedSecurityRule; import com.michelin.ns4kafka.service.ConnectorService; +import com.michelin.ns4kafka.service.client.connect.entities.ConnectorOffsetsResponse; import com.michelin.ns4kafka.service.NamespaceService; import com.michelin.ns4kafka.service.ResourceQuotaService; import com.michelin.ns4kafka.util.exception.ResourceValidationException; @@ -1048,4 +1049,81 @@ void shouldResumeConnector() { }) .verifyComplete(); } + + @Test + void shouldNotResetConnectorOffsetsWhenNotOwned() { + Namespace ns = Namespace.builder() + .metadata(Resource.Metadata.builder() + .name("test") + .cluster("local") + .build()) + .build(); + + when(namespaceService.findByName("test")).thenReturn(Optional.of(ns)); + when(connectorService.isNamespaceOwnerOfConnect(ns, "connect1")).thenReturn(false); + + StepVerifier.create(connectorController.resetOffsets("test", "connect1")) + .consumeErrorWith(error -> { + assertEquals(ResourceValidationException.class, error.getClass()); + assertEquals( + 1, + ((ResourceValidationException) error) + .getValidationErrors() + .size()); + assertEquals( + "Invalid value \"connect1\" for field \"name\": namespace is not owner of the resource.", + ((ResourceValidationException) error) + .getValidationErrors() + .getFirst()); + }) + .verify(); + } + + @Test + void shouldNotResetConnectorOffsetsWhenNotFound() { + Namespace ns = Namespace.builder() + .metadata(Resource.Metadata.builder() + .name("test") + .cluster("local") + .build()) + .build(); + + when(namespaceService.findByName("test")).thenReturn(Optional.of(ns)); + when(connectorService.isNamespaceOwnerOfConnect(ns, "connect1")).thenReturn(true); + when(connectorService.findByName(ns, "connect1")).thenReturn(Optional.empty()); + + StepVerifier.create(connectorController.resetOffsets("test", "connect1")) + .consumeNextWith(response -> assertEquals(HttpStatus.NOT_FOUND, response.getStatus())) + .verifyComplete(); + + verify(connectorService, never()).resetOffsets(any(), any()); + } + + @Test + void shouldResetConnectorOffsetsThroughDedicatedEndpoint() { + Namespace ns = Namespace.builder() + .metadata(Resource.Metadata.builder() + .name("test") + .cluster("local") + .build()) + .build(); + + Connector connector = Connector.builder() + .metadata(Resource.Metadata.builder().name("connect1").build()) + .build(); + + when(namespaceService.findByName("test")).thenReturn(Optional.of(ns)); + when(connectorService.isNamespaceOwnerOfConnect(ns, "connect1")).thenReturn(true); + when(connectorService.findByName(ns, "connect1")).thenReturn(Optional.of(connector)); + when(connectorService.resetOffsets(ns, connector)) + .thenReturn(Mono.just(HttpResponse.ok(new ConnectorOffsetsResponse("reset ok")))); + + StepVerifier.create(connectorController.resetOffsets("test", "connect1")) + .consumeNextWith(response -> { + assertEquals(HttpStatus.OK, response.getStatus()); + assertTrue(response.getBody().isPresent()); + assertEquals("reset ok", response.body().message()); + }) + .verifyComplete(); + } } diff --git a/src/test/java/com/michelin/ns4kafka/service/ConnectorServiceTest.java b/src/test/java/com/michelin/ns4kafka/service/ConnectorServiceTest.java index 6abe57261..1649e09e4 100644 --- a/src/test/java/com/michelin/ns4kafka/service/ConnectorServiceTest.java +++ b/src/test/java/com/michelin/ns4kafka/service/ConnectorServiceTest.java @@ -43,6 +43,7 @@ import com.michelin.ns4kafka.service.client.connect.entities.ConfigKeyInfo; import com.michelin.ns4kafka.service.client.connect.entities.ConfigValueInfo; import com.michelin.ns4kafka.service.client.connect.entities.ConnectorPluginInfo; +import com.michelin.ns4kafka.service.client.connect.entities.ConnectorOffsetsResponse; import com.michelin.ns4kafka.service.client.connect.entities.ConnectorStateInfo; import com.michelin.ns4kafka.service.client.connect.entities.ConnectorType; import com.michelin.ns4kafka.service.executor.ConnectorAsyncExecutor; @@ -1438,4 +1439,40 @@ void shouldStopConnector() { connector.getSpec().getConnectCluster(), connector.getMetadata().getName()); } + + @Test + void shouldResetConnectorOffsets() { + Namespace namespace = Namespace.builder() + .metadata(Resource.Metadata.builder() + .name("namespace") + .cluster("local") + .build()) + .build(); + + Connector connector = Connector.builder() + .metadata(Resource.Metadata.builder().name("ns-connect1").build()) + .spec(Connector.ConnectorSpec.builder() + .connectCluster("local-name") + .build()) + .build(); + + when(kafkaConnectClient.resetOffsets( + namespace.getMetadata().getCluster(), + connector.getSpec().getConnectCluster(), + connector.getMetadata().getName())) + .thenReturn(Mono.just(HttpResponse.ok(new ConnectorOffsetsResponse("reset ok")))); + + StepVerifier.create(connectorService.resetOffsets(namespace, connector)) + .consumeNextWith(response -> { + assertEquals(HttpStatus.OK, response.getStatus()); + assertEquals("reset ok", response.body().message()); + }) + .verifyComplete(); + + verify(kafkaConnectClient) + .resetOffsets( + namespace.getMetadata().getCluster(), + connector.getSpec().getConnectCluster(), + connector.getMetadata().getName()); + } } From 3231cbc7c0d8a600e3f91b30f437e49e4ffdb276 Mon Sep 17 00:00:00 2001 From: AfrinKhan02 Date: Tue, 2 Jun 2026 23:54:01 +0530 Subject: [PATCH 4/9] spotless Apply --- .../connect/ConnectorController.java | 2 +- .../ns4kafka/service/ConnectorService.java | 44 +++++++++---------- .../client/connect/KafkaConnectClient.java | 8 ++-- .../entities/ConnectorOffsetsResponse.java | 2 +- .../connect/ConnectorControllerTest.java | 2 +- .../service/ConnectorServiceTest.java | 2 +- 6 files changed, 30 insertions(+), 30 deletions(-) diff --git a/src/main/java/com/michelin/ns4kafka/controller/connect/ConnectorController.java b/src/main/java/com/michelin/ns4kafka/controller/connect/ConnectorController.java index e50018dc7..1d085da40 100644 --- a/src/main/java/com/michelin/ns4kafka/controller/connect/ConnectorController.java +++ b/src/main/java/com/michelin/ns4kafka/controller/connect/ConnectorController.java @@ -28,9 +28,9 @@ import com.michelin.ns4kafka.model.connect.ChangeConnectorState; import com.michelin.ns4kafka.model.connect.Connector; import com.michelin.ns4kafka.service.ConnectorService; -import com.michelin.ns4kafka.service.client.connect.entities.ConnectorOffsetsResponse; import com.michelin.ns4kafka.service.NamespaceService; import com.michelin.ns4kafka.service.ResourceQuotaService; +import com.michelin.ns4kafka.service.client.connect.entities.ConnectorOffsetsResponse; import com.michelin.ns4kafka.util.enumation.ApplyStatus; import com.michelin.ns4kafka.util.exception.ResourceValidationException; import io.micronaut.context.event.ApplicationEventPublisher; diff --git a/src/main/java/com/michelin/ns4kafka/service/ConnectorService.java b/src/main/java/com/michelin/ns4kafka/service/ConnectorService.java index 86f9bbd7f..30d59d4d9 100644 --- a/src/main/java/com/michelin/ns4kafka/service/ConnectorService.java +++ b/src/main/java/com/michelin/ns4kafka/service/ConnectorService.java @@ -414,27 +414,27 @@ public Mono> stop(Namespace namespace, Connector connector) { }); } - /** - * Reset offsets for a given connector. - * - * @param namespace The namespace - * @param connector The connector - * @return An HTTP response - */ - public Mono> resetOffsets(Namespace namespace, Connector connector) { - return kafkaConnectClient - .resetOffsets( - namespace.getMetadata().getCluster(), - connector.getSpec().getConnectCluster(), - connector.getMetadata().getName()) - .map(response -> { - log.info( - "Success resetting offsets for Connector [{}] on Namespace [{}] Connect [{}]", - connector.getMetadata().getName(), - namespace.getMetadata().getName(), - connector.getSpec().getConnectCluster()); - - return HttpResponse.status(response.getStatus()).body(response.body()); - }); + /** + * Reset offsets for a given connector. + * + * @param namespace The namespace + * @param connector The connector + * @return An HTTP response + */ + public Mono> resetOffsets(Namespace namespace, Connector connector) { + return kafkaConnectClient + .resetOffsets( + namespace.getMetadata().getCluster(), + connector.getSpec().getConnectCluster(), + connector.getMetadata().getName()) + .map(response -> { + log.info( + "Success resetting offsets for Connector [{}] on Namespace [{}] Connect [{}]", + connector.getMetadata().getName(), + namespace.getMetadata().getName(), + connector.getSpec().getConnectCluster()); + + return HttpResponse.status(response.getStatus()).body(response.body()); + }); } } diff --git a/src/main/java/com/michelin/ns4kafka/service/client/connect/KafkaConnectClient.java b/src/main/java/com/michelin/ns4kafka/service/client/connect/KafkaConnectClient.java index 15b0a0906..a0be711d3 100644 --- a/src/main/java/com/michelin/ns4kafka/service/client/connect/KafkaConnectClient.java +++ b/src/main/java/com/michelin/ns4kafka/service/client/connect/KafkaConnectClient.java @@ -24,11 +24,11 @@ import com.michelin.ns4kafka.repository.ConnectClusterRepository; import com.michelin.ns4kafka.service.client.connect.entities.ConfigInfos; import com.michelin.ns4kafka.service.client.connect.entities.ConnectorInfo; +import com.michelin.ns4kafka.service.client.connect.entities.ConnectorOffsetsResponse; import com.michelin.ns4kafka.service.client.connect.entities.ConnectorPluginInfo; import com.michelin.ns4kafka.service.client.connect.entities.ConnectorSpecs; import com.michelin.ns4kafka.service.client.connect.entities.ConnectorStateInfo; import com.michelin.ns4kafka.service.client.connect.entities.ConnectorStatus; -import com.michelin.ns4kafka.service.client.connect.entities.ConnectorOffsetsResponse; import com.michelin.ns4kafka.service.client.connect.entities.ServerInfo; import com.michelin.ns4kafka.util.EncryptionUtils; import com.michelin.ns4kafka.util.exception.ResourceValidationException; @@ -372,8 +372,8 @@ public Mono> stop(String kafkaCluster, String connectCluster, attempts = "${ns4kafka.retry.attempt}", multiplier = "${ns4kafka.retry.multiplier}", includes = ReadTimeoutException.class) - public Mono> resetOffsets( - String kafkaCluster, String connectCluster, String connector) { + public Mono> resetOffsets( + String kafkaCluster, String connectCluster, String connector) { KafkaConnectHttpConfig config = getKafkaConnectConfig(kafkaCluster, connectCluster); String encodedConnector = URLEncoder.encode(connector, StandardCharsets.UTF_8); @@ -381,7 +381,7 @@ public Mono> resetOffsets( URI.create(StringUtils.prependUri(config.getUrl(), CONNECTORS + encodedConnector + "/offsets"))) .basicAuth(config.getUsername(), config.getPassword()); - return Mono.from(httpClient.exchange(request, ConnectorOffsetsResponse.class)); + return Mono.from(httpClient.exchange(request, ConnectorOffsetsResponse.class)); } /** diff --git a/src/main/java/com/michelin/ns4kafka/service/client/connect/entities/ConnectorOffsetsResponse.java b/src/main/java/com/michelin/ns4kafka/service/client/connect/entities/ConnectorOffsetsResponse.java index e0c231ed9..fc1793b4a 100644 --- a/src/main/java/com/michelin/ns4kafka/service/client/connect/entities/ConnectorOffsetsResponse.java +++ b/src/main/java/com/michelin/ns4kafka/service/client/connect/entities/ConnectorOffsetsResponse.java @@ -22,4 +22,4 @@ /** Kafka Connect connector offsets operation response. */ @Serdeable -public record ConnectorOffsetsResponse(String message) {} \ No newline at end of file +public record ConnectorOffsetsResponse(String message) {} diff --git a/src/test/java/com/michelin/ns4kafka/controller/connect/ConnectorControllerTest.java b/src/test/java/com/michelin/ns4kafka/controller/connect/ConnectorControllerTest.java index 21361e8f1..70f541c7b 100644 --- a/src/test/java/com/michelin/ns4kafka/controller/connect/ConnectorControllerTest.java +++ b/src/test/java/com/michelin/ns4kafka/controller/connect/ConnectorControllerTest.java @@ -36,9 +36,9 @@ import com.michelin.ns4kafka.model.connect.Connector; import com.michelin.ns4kafka.security.ResourceBasedSecurityRule; import com.michelin.ns4kafka.service.ConnectorService; -import com.michelin.ns4kafka.service.client.connect.entities.ConnectorOffsetsResponse; import com.michelin.ns4kafka.service.NamespaceService; import com.michelin.ns4kafka.service.ResourceQuotaService; +import com.michelin.ns4kafka.service.client.connect.entities.ConnectorOffsetsResponse; import com.michelin.ns4kafka.util.exception.ResourceValidationException; import com.michelin.ns4kafka.validation.ValidationResult; import io.micronaut.context.event.ApplicationEventPublisher; diff --git a/src/test/java/com/michelin/ns4kafka/service/ConnectorServiceTest.java b/src/test/java/com/michelin/ns4kafka/service/ConnectorServiceTest.java index 1649e09e4..2a7efe146 100644 --- a/src/test/java/com/michelin/ns4kafka/service/ConnectorServiceTest.java +++ b/src/test/java/com/michelin/ns4kafka/service/ConnectorServiceTest.java @@ -42,8 +42,8 @@ import com.michelin.ns4kafka.service.client.connect.entities.ConfigInfos; import com.michelin.ns4kafka.service.client.connect.entities.ConfigKeyInfo; import com.michelin.ns4kafka.service.client.connect.entities.ConfigValueInfo; -import com.michelin.ns4kafka.service.client.connect.entities.ConnectorPluginInfo; import com.michelin.ns4kafka.service.client.connect.entities.ConnectorOffsetsResponse; +import com.michelin.ns4kafka.service.client.connect.entities.ConnectorPluginInfo; import com.michelin.ns4kafka.service.client.connect.entities.ConnectorStateInfo; import com.michelin.ns4kafka.service.client.connect.entities.ConnectorType; import com.michelin.ns4kafka.service.executor.ConnectorAsyncExecutor; From f6d4920522cfbd794e8da474c24f86b66e09b374 Mon Sep 17 00:00:00 2001 From: AfrinKhan02 Date: Wed, 3 Jun 2026 00:21:56 +0530 Subject: [PATCH 5/9] SonarQube test coverage enhanced --- .../connect/ConnectorControllerTest.java | 36 +++++++++++++++++++ 1 file changed, 36 insertions(+) diff --git a/src/test/java/com/michelin/ns4kafka/controller/connect/ConnectorControllerTest.java b/src/test/java/com/michelin/ns4kafka/controller/connect/ConnectorControllerTest.java index 70f541c7b..6c414fc20 100644 --- a/src/test/java/com/michelin/ns4kafka/controller/connect/ConnectorControllerTest.java +++ b/src/test/java/com/michelin/ns4kafka/controller/connect/ConnectorControllerTest.java @@ -1050,6 +1050,42 @@ void shouldResumeConnector() { .verifyComplete(); } + @Test + void shouldStopConnector() { + Namespace ns = Namespace.builder() + .metadata(Resource.Metadata.builder() + .name("test") + .cluster("local") + .build()) + .build(); + + Connector connector = Connector.builder() + .metadata(Resource.Metadata.builder().name("connect1").build()) + .build(); + + when(namespaceService.findByName("test")).thenReturn(Optional.of(ns)); + when(connectorService.isNamespaceOwnerOfConnect(ns, "connect1")).thenReturn(true); + when(connectorService.findByName(ns, "connect1")).thenReturn(Optional.of(connector)); + when(connectorService.stop(ArgumentMatchers.any(), ArgumentMatchers.any())) + .thenReturn(Mono.just(HttpResponse.accepted())); + + ChangeConnectorState changeConnectorState = ChangeConnectorState.builder() + .metadata(Resource.Metadata.builder().name("connect1").build()) + .spec(ChangeConnectorState.ChangeConnectorStateSpec.builder() + .action(ChangeConnectorState.ConnectorAction.STOP) + .build()) + .build(); + + StepVerifier.create(connectorController.changeState("test", "connect1", changeConnectorState)) + .consumeNextWith(response -> { + assertTrue(response.getBody().isPresent()); + assertTrue(response.getBody().get().getStatus().isSuccess()); + assertEquals(HttpStatus.ACCEPTED, response.body().getStatus().getCode()); + assertEquals("connect1", response.body().getMetadata().getName()); + }) + .verifyComplete(); + } + @Test void shouldNotResetConnectorOffsetsWhenNotOwned() { Namespace ns = Namespace.builder() From 6399769b6f932d912d9656fd9f55082d660c0e01 Mon Sep 17 00:00:00 2001 From: AfrinKhan02 Date: Wed, 3 Jun 2026 00:53:52 +0530 Subject: [PATCH 6/9] Spotless apply --- .../ns4kafka/controller/connect/ConnectorControllerTest.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/test/java/com/michelin/ns4kafka/controller/connect/ConnectorControllerTest.java b/src/test/java/com/michelin/ns4kafka/controller/connect/ConnectorControllerTest.java index 6c414fc20..d2ea36017 100644 --- a/src/test/java/com/michelin/ns4kafka/controller/connect/ConnectorControllerTest.java +++ b/src/test/java/com/michelin/ns4kafka/controller/connect/ConnectorControllerTest.java @@ -1080,7 +1080,8 @@ void shouldStopConnector() { .consumeNextWith(response -> { assertTrue(response.getBody().isPresent()); assertTrue(response.getBody().get().getStatus().isSuccess()); - assertEquals(HttpStatus.ACCEPTED, response.body().getStatus().getCode()); + assertEquals( + HttpStatus.ACCEPTED, response.body().getStatus().getCode()); assertEquals("connect1", response.body().getMetadata().getName()); }) .verifyComplete(); From f426d91911abaa0294dac70ebc117870dc819f45 Mon Sep 17 00:00:00 2001 From: AfrinKhan02 Date: Wed, 3 Jun 2026 01:45:58 +0530 Subject: [PATCH 7/9] Coverage for kafkaclient --- .../integration/ConnectorIntegrationTest.java | 79 +++++++++++++++++++ 1 file changed, 79 insertions(+) diff --git a/src/test/java/com/michelin/ns4kafka/integration/ConnectorIntegrationTest.java b/src/test/java/com/michelin/ns4kafka/integration/ConnectorIntegrationTest.java index 8b7df1314..1ab721ad5 100644 --- a/src/test/java/com/michelin/ns4kafka/integration/ConnectorIntegrationTest.java +++ b/src/test/java/com/michelin/ns4kafka/integration/ConnectorIntegrationTest.java @@ -45,6 +45,7 @@ import com.michelin.ns4kafka.model.connect.ChangeConnectorState; import com.michelin.ns4kafka.model.connect.Connector; import com.michelin.ns4kafka.service.client.connect.entities.ConnectorInfo; +import com.michelin.ns4kafka.service.client.connect.entities.ConnectorOffsetsResponse; import com.michelin.ns4kafka.service.client.connect.entities.ConnectorSpecs; import com.michelin.ns4kafka.service.client.connect.entities.ConnectorStateInfo; import com.michelin.ns4kafka.service.client.connect.entities.ServerInfo; @@ -697,6 +698,84 @@ void shouldStopConnector() throws InterruptedException { assertEquals("STOPPED", actual.connector().getState()); } + @Test + void shouldResetConnectorOffsets() throws InterruptedException { + Topic topic = Topic.builder() + .metadata(Resource.Metadata.builder() + .name("ns1-topic-reset-offsets") + .namespace("ns1") + .build()) + .spec(Topic.TopicSpec.builder() + .partitions(3) + .replicationFactor(1) + .configs( + Map.of("cleanup.policy", "delete", "min.insync.replicas", "1", "retention.ms", "60000")) + .build()) + .build(); + + Connector connector = Connector.builder() + .metadata(Resource.Metadata.builder() + .name("ns1-connector-reset-offsets") + .namespace("ns1") + .build()) + .spec(Connector.ConnectorSpec.builder() + .connectCluster("test-connect") + .config(Map.of( + "connector.class", "org.apache.kafka.connect.file.FileStreamSinkConnector", + "tasks.max", "1", + "topics", "ns1-topic-reset-offsets")) + .build()) + .build(); + + ns4KafkaClient + .toBlocking() + .exchange(HttpRequest.create(HttpMethod.POST, "/api/namespaces/ns1/topics") + .bearerAuth(token) + .body(topic)); + + topicAsyncExecutorList.forEach(TopicAsyncExecutor::run); + ns4KafkaClient + .toBlocking() + .exchange(HttpRequest.create(HttpMethod.POST, "/api/namespaces/ns1/connectors") + .bearerAuth(token) + .body(connector)); + + forceConnectorSynchronization(); + waitForConnectorToBeInState("ns1-connector-reset-offsets", "RUNNING"); + + ChangeConnectorState stopState = ChangeConnectorState.builder() + .metadata(Resource.Metadata.builder().name("ns1-connector-reset-offsets").build()) + .spec(ChangeConnectorState.ChangeConnectorStateSpec.builder() + .action(ChangeConnectorState.ConnectorAction.STOP) + .build()) + .build(); + + HttpResponse stopResponse = ns4KafkaClient + .toBlocking() + .exchange(HttpRequest.create( + HttpMethod.POST, + "/api/namespaces/ns1/connectors/ns1-connector-reset-offsets/change-state") + .bearerAuth(token) + .body(stopState)); + + assertEquals(HttpStatus.OK, stopResponse.status()); + + waitForConnectorToBeInState("ns1-connector-reset-offsets", "STOPPED"); + + HttpResponse resetResponse = ns4KafkaClient + .toBlocking() + .exchange( + HttpRequest.create( + HttpMethod.DELETE, + "/api/namespaces/ns1/connectors/ns1-connector-reset-offsets/offsets") + .bearerAuth(token), + ConnectorOffsetsResponse.class); + + assertEquals(HttpStatus.OK, resetResponse.status()); + assertTrue(resetResponse.getBody().isPresent()); + assertTrue(resetResponse.body().message().contains("reset successfully")); + } + @Test void shouldNotCreateConnectorWhenStrictRegexPatternDoesNotMatchName() { Namespace namespace = Namespace.builder() From 54bcc2d5b1e0ae466cae6cc2c9518d78a4aaf5aa Mon Sep 17 00:00:00 2001 From: AfrinKhan02 Date: Wed, 3 Jun 2026 01:48:30 +0530 Subject: [PATCH 8/9] List resolved --- .../ns4kafka/integration/ConnectorIntegrationTest.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/test/java/com/michelin/ns4kafka/integration/ConnectorIntegrationTest.java b/src/test/java/com/michelin/ns4kafka/integration/ConnectorIntegrationTest.java index 1ab721ad5..a1c088124 100644 --- a/src/test/java/com/michelin/ns4kafka/integration/ConnectorIntegrationTest.java +++ b/src/test/java/com/michelin/ns4kafka/integration/ConnectorIntegrationTest.java @@ -744,7 +744,9 @@ void shouldResetConnectorOffsets() throws InterruptedException { waitForConnectorToBeInState("ns1-connector-reset-offsets", "RUNNING"); ChangeConnectorState stopState = ChangeConnectorState.builder() - .metadata(Resource.Metadata.builder().name("ns1-connector-reset-offsets").build()) + .metadata(Resource.Metadata.builder() + .name("ns1-connector-reset-offsets") + .build()) .spec(ChangeConnectorState.ChangeConnectorStateSpec.builder() .action(ChangeConnectorState.ConnectorAction.STOP) .build()) From ee968e1079d6f03c16887c7c874d8cc767d1cad7 Mon Sep 17 00:00:00 2001 From: AfrinKhan02 Date: Wed, 3 Jun 2026 17:52:11 +0530 Subject: [PATCH 9/9] Refactoring test methodnames --- .../ns4kafka/controller/connect/ConnectorControllerTest.java | 2 +- .../michelin/ns4kafka/integration/ConnectorIntegrationTest.java | 2 +- .../com/michelin/ns4kafka/service/ConnectorServiceTest.java | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/src/test/java/com/michelin/ns4kafka/controller/connect/ConnectorControllerTest.java b/src/test/java/com/michelin/ns4kafka/controller/connect/ConnectorControllerTest.java index d2ea36017..881f3f828 100644 --- a/src/test/java/com/michelin/ns4kafka/controller/connect/ConnectorControllerTest.java +++ b/src/test/java/com/michelin/ns4kafka/controller/connect/ConnectorControllerTest.java @@ -1137,7 +1137,7 @@ void shouldNotResetConnectorOffsetsWhenNotFound() { } @Test - void shouldResetConnectorOffsetsThroughDedicatedEndpoint() { + void shouldFullyResetConnectorOffsets() { Namespace ns = Namespace.builder() .metadata(Resource.Metadata.builder() .name("test") diff --git a/src/test/java/com/michelin/ns4kafka/integration/ConnectorIntegrationTest.java b/src/test/java/com/michelin/ns4kafka/integration/ConnectorIntegrationTest.java index a1c088124..aa2fca63a 100644 --- a/src/test/java/com/michelin/ns4kafka/integration/ConnectorIntegrationTest.java +++ b/src/test/java/com/michelin/ns4kafka/integration/ConnectorIntegrationTest.java @@ -699,7 +699,7 @@ void shouldStopConnector() throws InterruptedException { } @Test - void shouldResetConnectorOffsets() throws InterruptedException { + void shouldFullyResetConnectorOffsets() throws InterruptedException { Topic topic = Topic.builder() .metadata(Resource.Metadata.builder() .name("ns1-topic-reset-offsets") diff --git a/src/test/java/com/michelin/ns4kafka/service/ConnectorServiceTest.java b/src/test/java/com/michelin/ns4kafka/service/ConnectorServiceTest.java index 2a7efe146..1dad3f7e4 100644 --- a/src/test/java/com/michelin/ns4kafka/service/ConnectorServiceTest.java +++ b/src/test/java/com/michelin/ns4kafka/service/ConnectorServiceTest.java @@ -1441,7 +1441,7 @@ void shouldStopConnector() { } @Test - void shouldResetConnectorOffsets() { + void shouldFullyResetConnectorOffsets() { Namespace namespace = Namespace.builder() .metadata(Resource.Metadata.builder() .name("namespace")