diff --git a/src/main/java/com/michelin/ns4kafka/controller/connect/ConnectorController.java b/src/main/java/com/michelin/ns4kafka/controller/connect/ConnectorController.java index 3e7b4c651..1d085da40 100644 --- a/src/main/java/com/michelin/ns4kafka/controller/connect/ConnectorController.java +++ b/src/main/java/com/michelin/ns4kafka/controller/connect/ConnectorController.java @@ -30,6 +30,7 @@ import com.michelin.ns4kafka.service.ConnectorService; import com.michelin.ns4kafka.service.NamespaceService; import com.michelin.ns4kafka.service.ResourceQuotaService; +import com.michelin.ns4kafka.service.client.connect.entities.ConnectorOffsetsResponse; import com.michelin.ns4kafka.util.enumation.ApplyStatus; import com.michelin.ns4kafka.util.exception.ResourceValidationException; import io.micronaut.context.event.ApplicationEventPublisher; @@ -331,6 +332,30 @@ public Mono> changeState( .onErrorReturn(HttpResponse.ok(state)); } + /** + * Reset the offsets of a connector. + * + * @param namespace The namespace + * @param connector The connector to reset offsets for + * @return An HTTP response + */ + @Delete("/{connector}/offsets") + public Mono> resetOffsets(String namespace, String connector) { + Namespace ns = getNamespace(namespace); + + if (!connectorService.isNamespaceOwnerOfConnect(ns, connector)) { + return Mono.error(new ResourceValidationException(CONNECTOR, connector, invalidOwner(connector))); + } + + Optional optionalConnector = connectorService.findByName(ns, connector); + + if (optionalConnector.isEmpty()) { + return Mono.just(HttpResponse.notFound()); + } + + return connectorService.resetOffsets(ns, optionalConnector.get()); + } + /** * Import unsynchronized connectors. * diff --git a/src/main/java/com/michelin/ns4kafka/service/ConnectorService.java b/src/main/java/com/michelin/ns4kafka/service/ConnectorService.java index caab61dc9..30d59d4d9 100644 --- a/src/main/java/com/michelin/ns4kafka/service/ConnectorService.java +++ b/src/main/java/com/michelin/ns4kafka/service/ConnectorService.java @@ -28,6 +28,7 @@ import com.michelin.ns4kafka.model.connect.Connector; import com.michelin.ns4kafka.repository.ConnectorRepository; import com.michelin.ns4kafka.service.client.connect.KafkaConnectClient; +import com.michelin.ns4kafka.service.client.connect.entities.ConnectorOffsetsResponse; import com.michelin.ns4kafka.service.client.connect.entities.ConnectorSpecs; import com.michelin.ns4kafka.service.executor.ConnectorAsyncExecutor; import com.michelin.ns4kafka.util.FormatErrorUtils; @@ -412,4 +413,28 @@ public Mono> stop(Namespace namespace, Connector connector) { return HttpResponse.accepted(); }); } + + /** + * Reset offsets for a given connector. + * + * @param namespace The namespace + * @param connector The connector + * @return An HTTP response + */ + public Mono> resetOffsets(Namespace namespace, Connector connector) { + return kafkaConnectClient + .resetOffsets( + namespace.getMetadata().getCluster(), + connector.getSpec().getConnectCluster(), + connector.getMetadata().getName()) + .map(response -> { + log.info( + "Success resetting offsets for Connector [{}] on Namespace [{}] Connect [{}]", + connector.getMetadata().getName(), + namespace.getMetadata().getName(), + connector.getSpec().getConnectCluster()); + + return HttpResponse.status(response.getStatus()).body(response.body()); + }); + } } diff --git a/src/main/java/com/michelin/ns4kafka/service/client/connect/KafkaConnectClient.java b/src/main/java/com/michelin/ns4kafka/service/client/connect/KafkaConnectClient.java index 3392503c5..a0be711d3 100644 --- a/src/main/java/com/michelin/ns4kafka/service/client/connect/KafkaConnectClient.java +++ b/src/main/java/com/michelin/ns4kafka/service/client/connect/KafkaConnectClient.java @@ -24,6 +24,7 @@ import com.michelin.ns4kafka.repository.ConnectClusterRepository; import com.michelin.ns4kafka.service.client.connect.entities.ConfigInfos; import com.michelin.ns4kafka.service.client.connect.entities.ConnectorInfo; +import com.michelin.ns4kafka.service.client.connect.entities.ConnectorOffsetsResponse; import com.michelin.ns4kafka.service.client.connect.entities.ConnectorPluginInfo; import com.michelin.ns4kafka.service.client.connect.entities.ConnectorSpecs; import com.michelin.ns4kafka.service.client.connect.entities.ConnectorStateInfo; @@ -358,6 +359,31 @@ public Mono> stop(String kafkaCluster, String connectCluster, return Mono.from(httpClient.exchange(request, Void.class)); } + /** + * Reset a connector offsets. + * + * @param kafkaCluster The Kafka cluster + * @param connectCluster The Kafka Connect + * @param connector The connector + * @return The reset response + */ + @Retryable( + delay = "${ns4kafka.retry.delay}", + attempts = "${ns4kafka.retry.attempt}", + multiplier = "${ns4kafka.retry.multiplier}", + includes = ReadTimeoutException.class) + public Mono> resetOffsets( + String kafkaCluster, String connectCluster, String connector) { + KafkaConnectHttpConfig config = getKafkaConnectConfig(kafkaCluster, connectCluster); + String encodedConnector = URLEncoder.encode(connector, StandardCharsets.UTF_8); + + HttpRequest request = HttpRequest.DELETE( + URI.create(StringUtils.prependUri(config.getUrl(), CONNECTORS + encodedConnector + "/offsets"))) + .basicAuth(config.getUsername(), config.getPassword()); + + return Mono.from(httpClient.exchange(request, ConnectorOffsetsResponse.class)); + } + /** * Get the Kafka Connect configuration. * diff --git a/src/main/java/com/michelin/ns4kafka/service/client/connect/entities/ConnectorOffsetsResponse.java b/src/main/java/com/michelin/ns4kafka/service/client/connect/entities/ConnectorOffsetsResponse.java new file mode 100644 index 000000000..fc1793b4a --- /dev/null +++ b/src/main/java/com/michelin/ns4kafka/service/client/connect/entities/ConnectorOffsetsResponse.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.michelin.ns4kafka.service.client.connect.entities; + +import io.micronaut.serde.annotation.Serdeable; + +/** Kafka Connect connector offsets operation response. */ +@Serdeable +public record ConnectorOffsetsResponse(String message) {} diff --git a/src/test/java/com/michelin/ns4kafka/controller/connect/ConnectorControllerTest.java b/src/test/java/com/michelin/ns4kafka/controller/connect/ConnectorControllerTest.java index 29b1c8493..881f3f828 100644 --- a/src/test/java/com/michelin/ns4kafka/controller/connect/ConnectorControllerTest.java +++ b/src/test/java/com/michelin/ns4kafka/controller/connect/ConnectorControllerTest.java @@ -38,6 +38,7 @@ import com.michelin.ns4kafka.service.ConnectorService; import com.michelin.ns4kafka.service.NamespaceService; import com.michelin.ns4kafka.service.ResourceQuotaService; +import com.michelin.ns4kafka.service.client.connect.entities.ConnectorOffsetsResponse; import com.michelin.ns4kafka.util.exception.ResourceValidationException; import com.michelin.ns4kafka.validation.ValidationResult; import io.micronaut.context.event.ApplicationEventPublisher; @@ -1048,4 +1049,118 @@ void shouldResumeConnector() { }) .verifyComplete(); } + + @Test + void shouldStopConnector() { + Namespace ns = Namespace.builder() + .metadata(Resource.Metadata.builder() + .name("test") + .cluster("local") + .build()) + .build(); + + Connector connector = Connector.builder() + .metadata(Resource.Metadata.builder().name("connect1").build()) + .build(); + + when(namespaceService.findByName("test")).thenReturn(Optional.of(ns)); + when(connectorService.isNamespaceOwnerOfConnect(ns, "connect1")).thenReturn(true); + when(connectorService.findByName(ns, "connect1")).thenReturn(Optional.of(connector)); + when(connectorService.stop(ArgumentMatchers.any(), ArgumentMatchers.any())) + .thenReturn(Mono.just(HttpResponse.accepted())); + + ChangeConnectorState changeConnectorState = ChangeConnectorState.builder() + .metadata(Resource.Metadata.builder().name("connect1").build()) + .spec(ChangeConnectorState.ChangeConnectorStateSpec.builder() + .action(ChangeConnectorState.ConnectorAction.STOP) + .build()) + .build(); + + StepVerifier.create(connectorController.changeState("test", "connect1", changeConnectorState)) + .consumeNextWith(response -> { + assertTrue(response.getBody().isPresent()); + assertTrue(response.getBody().get().getStatus().isSuccess()); + assertEquals( + HttpStatus.ACCEPTED, response.body().getStatus().getCode()); + assertEquals("connect1", response.body().getMetadata().getName()); + }) + .verifyComplete(); + } + + @Test + void shouldNotResetConnectorOffsetsWhenNotOwned() { + Namespace ns = Namespace.builder() + .metadata(Resource.Metadata.builder() + .name("test") + .cluster("local") + .build()) + .build(); + + when(namespaceService.findByName("test")).thenReturn(Optional.of(ns)); + when(connectorService.isNamespaceOwnerOfConnect(ns, "connect1")).thenReturn(false); + + StepVerifier.create(connectorController.resetOffsets("test", "connect1")) + .consumeErrorWith(error -> { + assertEquals(ResourceValidationException.class, error.getClass()); + assertEquals( + 1, + ((ResourceValidationException) error) + .getValidationErrors() + .size()); + assertEquals( + "Invalid value \"connect1\" for field \"name\": namespace is not owner of the resource.", + ((ResourceValidationException) error) + .getValidationErrors() + .getFirst()); + }) + .verify(); + } + + @Test + void shouldNotResetConnectorOffsetsWhenNotFound() { + Namespace ns = Namespace.builder() + .metadata(Resource.Metadata.builder() + .name("test") + .cluster("local") + .build()) + .build(); + + when(namespaceService.findByName("test")).thenReturn(Optional.of(ns)); + when(connectorService.isNamespaceOwnerOfConnect(ns, "connect1")).thenReturn(true); + when(connectorService.findByName(ns, "connect1")).thenReturn(Optional.empty()); + + StepVerifier.create(connectorController.resetOffsets("test", "connect1")) + .consumeNextWith(response -> assertEquals(HttpStatus.NOT_FOUND, response.getStatus())) + .verifyComplete(); + + verify(connectorService, never()).resetOffsets(any(), any()); + } + + @Test + void shouldFullyResetConnectorOffsets() { + Namespace ns = Namespace.builder() + .metadata(Resource.Metadata.builder() + .name("test") + .cluster("local") + .build()) + .build(); + + Connector connector = Connector.builder() + .metadata(Resource.Metadata.builder().name("connect1").build()) + .build(); + + when(namespaceService.findByName("test")).thenReturn(Optional.of(ns)); + when(connectorService.isNamespaceOwnerOfConnect(ns, "connect1")).thenReturn(true); + when(connectorService.findByName(ns, "connect1")).thenReturn(Optional.of(connector)); + when(connectorService.resetOffsets(ns, connector)) + .thenReturn(Mono.just(HttpResponse.ok(new ConnectorOffsetsResponse("reset ok")))); + + StepVerifier.create(connectorController.resetOffsets("test", "connect1")) + .consumeNextWith(response -> { + assertEquals(HttpStatus.OK, response.getStatus()); + assertTrue(response.getBody().isPresent()); + assertEquals("reset ok", response.body().message()); + }) + .verifyComplete(); + } } diff --git a/src/test/java/com/michelin/ns4kafka/integration/ConnectorIntegrationTest.java b/src/test/java/com/michelin/ns4kafka/integration/ConnectorIntegrationTest.java index 8b7df1314..aa2fca63a 100644 --- a/src/test/java/com/michelin/ns4kafka/integration/ConnectorIntegrationTest.java +++ b/src/test/java/com/michelin/ns4kafka/integration/ConnectorIntegrationTest.java @@ -45,6 +45,7 @@ import com.michelin.ns4kafka.model.connect.ChangeConnectorState; import com.michelin.ns4kafka.model.connect.Connector; import com.michelin.ns4kafka.service.client.connect.entities.ConnectorInfo; +import com.michelin.ns4kafka.service.client.connect.entities.ConnectorOffsetsResponse; import com.michelin.ns4kafka.service.client.connect.entities.ConnectorSpecs; import com.michelin.ns4kafka.service.client.connect.entities.ConnectorStateInfo; import com.michelin.ns4kafka.service.client.connect.entities.ServerInfo; @@ -697,6 +698,86 @@ void shouldStopConnector() throws InterruptedException { assertEquals("STOPPED", actual.connector().getState()); } + @Test + void shouldFullyResetConnectorOffsets() throws InterruptedException { + Topic topic = Topic.builder() + .metadata(Resource.Metadata.builder() + .name("ns1-topic-reset-offsets") + .namespace("ns1") + .build()) + .spec(Topic.TopicSpec.builder() + .partitions(3) + .replicationFactor(1) + .configs( + Map.of("cleanup.policy", "delete", "min.insync.replicas", "1", "retention.ms", "60000")) + .build()) + .build(); + + Connector connector = Connector.builder() + .metadata(Resource.Metadata.builder() + .name("ns1-connector-reset-offsets") + .namespace("ns1") + .build()) + .spec(Connector.ConnectorSpec.builder() + .connectCluster("test-connect") + .config(Map.of( + "connector.class", "org.apache.kafka.connect.file.FileStreamSinkConnector", + "tasks.max", "1", + "topics", "ns1-topic-reset-offsets")) + .build()) + .build(); + + ns4KafkaClient + .toBlocking() + .exchange(HttpRequest.create(HttpMethod.POST, "/api/namespaces/ns1/topics") + .bearerAuth(token) + .body(topic)); + + topicAsyncExecutorList.forEach(TopicAsyncExecutor::run); + ns4KafkaClient + .toBlocking() + .exchange(HttpRequest.create(HttpMethod.POST, "/api/namespaces/ns1/connectors") + .bearerAuth(token) + .body(connector)); + + forceConnectorSynchronization(); + waitForConnectorToBeInState("ns1-connector-reset-offsets", "RUNNING"); + + ChangeConnectorState stopState = ChangeConnectorState.builder() + .metadata(Resource.Metadata.builder() + .name("ns1-connector-reset-offsets") + .build()) + .spec(ChangeConnectorState.ChangeConnectorStateSpec.builder() + .action(ChangeConnectorState.ConnectorAction.STOP) + .build()) + .build(); + + HttpResponse stopResponse = ns4KafkaClient + .toBlocking() + .exchange(HttpRequest.create( + HttpMethod.POST, + "/api/namespaces/ns1/connectors/ns1-connector-reset-offsets/change-state") + .bearerAuth(token) + .body(stopState)); + + assertEquals(HttpStatus.OK, stopResponse.status()); + + waitForConnectorToBeInState("ns1-connector-reset-offsets", "STOPPED"); + + HttpResponse resetResponse = ns4KafkaClient + .toBlocking() + .exchange( + HttpRequest.create( + HttpMethod.DELETE, + "/api/namespaces/ns1/connectors/ns1-connector-reset-offsets/offsets") + .bearerAuth(token), + ConnectorOffsetsResponse.class); + + assertEquals(HttpStatus.OK, resetResponse.status()); + assertTrue(resetResponse.getBody().isPresent()); + assertTrue(resetResponse.body().message().contains("reset successfully")); + } + @Test void shouldNotCreateConnectorWhenStrictRegexPatternDoesNotMatchName() { Namespace namespace = Namespace.builder() diff --git a/src/test/java/com/michelin/ns4kafka/service/ConnectorServiceTest.java b/src/test/java/com/michelin/ns4kafka/service/ConnectorServiceTest.java index 6abe57261..1dad3f7e4 100644 --- a/src/test/java/com/michelin/ns4kafka/service/ConnectorServiceTest.java +++ b/src/test/java/com/michelin/ns4kafka/service/ConnectorServiceTest.java @@ -42,6 +42,7 @@ import com.michelin.ns4kafka.service.client.connect.entities.ConfigInfos; import com.michelin.ns4kafka.service.client.connect.entities.ConfigKeyInfo; import com.michelin.ns4kafka.service.client.connect.entities.ConfigValueInfo; +import com.michelin.ns4kafka.service.client.connect.entities.ConnectorOffsetsResponse; import com.michelin.ns4kafka.service.client.connect.entities.ConnectorPluginInfo; import com.michelin.ns4kafka.service.client.connect.entities.ConnectorStateInfo; import com.michelin.ns4kafka.service.client.connect.entities.ConnectorType; @@ -1438,4 +1439,40 @@ void shouldStopConnector() { connector.getSpec().getConnectCluster(), connector.getMetadata().getName()); } + + @Test + void shouldFullyResetConnectorOffsets() { + Namespace namespace = Namespace.builder() + .metadata(Resource.Metadata.builder() + .name("namespace") + .cluster("local") + .build()) + .build(); + + Connector connector = Connector.builder() + .metadata(Resource.Metadata.builder().name("ns-connect1").build()) + .spec(Connector.ConnectorSpec.builder() + .connectCluster("local-name") + .build()) + .build(); + + when(kafkaConnectClient.resetOffsets( + namespace.getMetadata().getCluster(), + connector.getSpec().getConnectCluster(), + connector.getMetadata().getName())) + .thenReturn(Mono.just(HttpResponse.ok(new ConnectorOffsetsResponse("reset ok")))); + + StepVerifier.create(connectorService.resetOffsets(namespace, connector)) + .consumeNextWith(response -> { + assertEquals(HttpStatus.OK, response.getStatus()); + assertEquals("reset ok", response.body().message()); + }) + .verifyComplete(); + + verify(kafkaConnectClient) + .resetOffsets( + namespace.getMetadata().getCluster(), + connector.getSpec().getConnectCluster(), + connector.getMetadata().getName()); + } }