Skip to content

Commit c305f24

Browse files
Address review: inject AdminClient factory, narrow exception scope
- Invert resolver dep: take AdminClient factory in constructor instead of building inside resolve(). Removes the package-private test-only constructor and lets the test mock the actual external (AdminClient) while exercising the real DynamicStreamThreadsCountCalculator. - Drop @mock on the calculator in the resolver test; route stubs through AdminClient.describeTopics(...) so the test asserts on the real partition→threads math instead of stubbed return values. - Narrow catch (Throwable) to catch (Exception) in resolveDynamicStreamThreads so JVM Errors (OOM, LinkageError) keep surfacing instead of being silently swallowed at startup. - Return OptionalInt from resolveDynamicStreamThreads and apply the override at the doInit() call site, so the helper no longer mutates the caller's map under a query-shaped name. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
1 parent 6299517 commit c305f24

3 files changed

Lines changed: 75 additions & 36 deletions

File tree

kafka-streams-framework/src/main/java/org/hypertrace/core/kafkastreams/framework/KafkaStreamsApp.java

Lines changed: 13 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
import java.util.List;
3737
import java.util.Map;
3838
import java.util.Optional;
39+
import java.util.OptionalInt;
3940
import java.util.Properties;
4041
import java.util.concurrent.TimeUnit;
4142
import java.util.stream.Collectors;
@@ -109,7 +110,8 @@ protected void doInit() {
109110
streamsBuilder = buildTopology(streamsConfig, streamsBuilder, sourceStreams);
110111
this.topology = streamsBuilder.build();
111112

112-
resolveDynamicStreamThreads(streamsConfig);
113+
resolveDynamicStreamThreads(streamsConfig)
114+
.ifPresent(threads -> streamsConfig.put(NUM_STREAM_THREADS_CONFIG, threads));
113115

114116
getLogger().info("Finalized kafka streams configuration: {}", streamsConfig);
115117

@@ -290,30 +292,30 @@ protected Optional<StreamThreadsCountResolver> getStreamThreadsCountResolver() {
290292
return Optional.empty();
291293
}
292294

293-
// Defensive: any unexpected failure in dynamic resolution must NOT prevent the app from starting.
294-
// If something goes wrong, log and leave streamsProperties untouched so Kafka Streams uses
295-
// whatever value was originally configured (or its own default).
296-
private void resolveDynamicStreamThreads(Map<String, Object> streamsProperties) {
295+
// Defensive: any unexpected runtime failure in dynamic resolution must NOT prevent the app from
296+
// starting. If something goes wrong, log and return empty so the caller leaves the streams
297+
// config untouched. JVM Errors (OOM, LinkageError, etc.) propagate — those are not recoverable.
298+
private OptionalInt resolveDynamicStreamThreads(Map<String, Object> streamsProperties) {
297299
try {
298300
if (!StreamThreadsCountResolver.isDynamic(streamsProperties)) {
299-
return;
301+
return OptionalInt.empty();
300302
}
301303
Optional<StreamThreadsCountResolver> resolver = getStreamThreadsCountResolver();
302304
if (resolver.isEmpty()) {
303305
getLogger()
304306
.warn(
305307
"{} is set to DYNAMIC but no StreamThreadsCountResolver is provided; leaving as-is",
306308
NUM_STREAM_THREADS_CONFIG);
307-
return;
309+
return OptionalInt.empty();
308310
}
309-
int resolved = resolver.get().resolve(this.topology, streamsProperties);
310-
streamsProperties.put(NUM_STREAM_THREADS_CONFIG, resolved);
311-
} catch (Throwable throwable) {
311+
return OptionalInt.of(resolver.get().resolve(this.topology, streamsProperties));
312+
} catch (Exception exception) {
312313
getLogger()
313314
.error(
314315
"Unexpected error resolving dynamic {}; leaving config untouched",
315316
NUM_STREAM_THREADS_CONFIG,
316-
throwable);
317+
exception);
318+
return OptionalInt.empty();
317319
}
318320
}
319321

kafka-streams-framework/src/main/java/org/hypertrace/core/kafkastreams/framework/threading/StreamThreadsCountResolver.java

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import java.util.Map;
44
import java.util.Optional;
55
import java.util.Properties;
6+
import java.util.function.Function;
67
import java.util.function.IntSupplier;
78
import org.apache.kafka.clients.admin.AdminClient;
89
import org.apache.kafka.streams.Topology;
@@ -27,16 +28,19 @@ public class StreamThreadsCountResolver {
2728

2829
private final DynamicStreamThreadsCountCalculator calculator;
2930
private final IntSupplier replicaCountSupplier;
31+
private final Function<Properties, AdminClient> adminClientFactory;
3032

3133
public StreamThreadsCountResolver(final IntSupplier replicaCountSupplier) {
32-
this(new DynamicStreamThreadsCountCalculator(), replicaCountSupplier);
34+
this(new DynamicStreamThreadsCountCalculator(), replicaCountSupplier, AdminClient::create);
3335
}
3436

35-
StreamThreadsCountResolver(
37+
public StreamThreadsCountResolver(
3638
final DynamicStreamThreadsCountCalculator calculator,
37-
final IntSupplier replicaCountSupplier) {
39+
final IntSupplier replicaCountSupplier,
40+
final Function<Properties, AdminClient> adminClientFactory) {
3841
this.calculator = calculator;
3942
this.replicaCountSupplier = replicaCountSupplier;
43+
this.adminClientFactory = adminClientFactory;
4044
}
4145

4246
/**
@@ -56,7 +60,8 @@ public static boolean isDynamic(final Map<String, Object> streamsProperties) {
5660
public int resolve(final Topology topology, final Map<String, Object> streamsProperties) {
5761
try {
5862
final int replicas = requirePositiveReplicaCount();
59-
try (final AdminClient adminClient = AdminClient.create(toProperties(streamsProperties))) {
63+
try (final AdminClient adminClient =
64+
adminClientFactory.apply(toProperties(streamsProperties))) {
6065
return calculator.compute(topology, adminClient, replicas);
6166
}
6267
} catch (final RuntimeException runtimeException) {
Lines changed: 53 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,45 +1,51 @@
11
package org.hypertrace.core.kafkastreams.framework.threading;
22

3+
import static java.util.stream.Collectors.toUnmodifiableList;
34
import static org.junit.jupiter.api.Assertions.assertEquals;
4-
import static org.mockito.ArgumentMatchers.any;
5-
import static org.mockito.ArgumentMatchers.eq;
5+
import static org.mockito.ArgumentMatchers.anySet;
6+
import static org.mockito.Mockito.mock;
67
import static org.mockito.Mockito.when;
78

9+
import java.util.HashMap;
10+
import java.util.List;
811
import java.util.Map;
912
import java.util.Optional;
13+
import java.util.Properties;
14+
import java.util.function.Function;
1015
import java.util.function.IntSupplier;
16+
import java.util.stream.IntStream;
1117
import org.apache.kafka.clients.CommonClientConfigs;
18+
import org.apache.kafka.clients.admin.AdminClient;
19+
import org.apache.kafka.clients.admin.DescribeTopicsResult;
20+
import org.apache.kafka.clients.admin.TopicDescription;
21+
import org.apache.kafka.common.KafkaFuture;
22+
import org.apache.kafka.common.Node;
23+
import org.apache.kafka.common.TopicPartitionInfo;
1224
import org.apache.kafka.common.serialization.Serdes;
1325
import org.apache.kafka.streams.StreamsBuilder;
1426
import org.apache.kafka.streams.StreamsConfig;
1527
import org.apache.kafka.streams.Topology;
1628
import org.apache.kafka.streams.kstream.Consumed;
1729
import org.junit.jupiter.api.Test;
18-
import org.junit.jupiter.api.extension.ExtendWith;
19-
import org.mockito.Mock;
20-
import org.mockito.junit.jupiter.MockitoExtension;
2130

22-
@ExtendWith(MockitoExtension.class)
2331
class StreamThreadsCountResolverTest {
2432

25-
@Mock private DynamicStreamThreadsCountCalculator calculator;
26-
2733
@Test
2834
void calculatorThrowFallsBackToEight() {
29-
final Topology topology = simpleTopology();
30-
when(calculator.compute(eq(topology), any(), eq(8)))
35+
final AdminClient adminClient = mock(AdminClient.class);
36+
when(adminClient.describeTopics(anySet()))
3137
.thenThrow(new RuntimeException("simulated broker outage"));
32-
final StreamThreadsCountResolver resolver = new StreamThreadsCountResolver(calculator, () -> 8);
38+
final StreamThreadsCountResolver resolver = resolverWith(() -> 8, adminClient);
3339

34-
final int resolved = resolver.resolve(topology, bootstrapProperties());
40+
final int resolved = resolver.resolve(simpleTopology(), bootstrapProperties());
3541

3642
assertEquals(StreamThreadsCountResolver.FALLBACK_NUM_STREAM_THREADS, resolved);
3743
}
3844

3945
@Test
4046
void missingReplicaCountFallsBack() {
4147
final IntSupplier missing = StreamThreadsCountResolver.optionalReplicaCount(Optional.empty());
42-
final StreamThreadsCountResolver resolver = new StreamThreadsCountResolver(calculator, missing);
48+
final StreamThreadsCountResolver resolver = resolverWith(missing, mock(AdminClient.class));
4349

4450
final int resolved = resolver.resolve(simpleTopology(), bootstrapProperties());
4551

@@ -48,7 +54,7 @@ void missingReplicaCountFallsBack() {
4854

4955
@Test
5056
void zeroReplicaCountFallsBack() {
51-
final StreamThreadsCountResolver resolver = new StreamThreadsCountResolver(calculator, () -> 0);
57+
final StreamThreadsCountResolver resolver = resolverWith(() -> 0, mock(AdminClient.class));
5258

5359
final int resolved = resolver.resolve(simpleTopology(), bootstrapProperties());
5460

@@ -57,8 +63,7 @@ void zeroReplicaCountFallsBack() {
5763

5864
@Test
5965
void negativeReplicaCountFallsBack() {
60-
final StreamThreadsCountResolver resolver =
61-
new StreamThreadsCountResolver(calculator, () -> -1);
66+
final StreamThreadsCountResolver resolver = resolverWith(() -> -1, mock(AdminClient.class));
6267

6368
final int resolved = resolver.resolve(simpleTopology(), bootstrapProperties());
6469

@@ -67,13 +72,15 @@ void negativeReplicaCountFallsBack() {
6772

6873
@Test
6974
void delegatesToCalculatorWithConfiguredReplicas() {
70-
final Topology topology = simpleTopology();
71-
when(calculator.compute(eq(topology), any(), eq(8))).thenReturn(7);
72-
final StreamThreadsCountResolver resolver = new StreamThreadsCountResolver(calculator, () -> 8);
75+
// 30-partition source / 8 replicas -> ceil(30/8) = 4 threads. Real calculator computes this;
76+
// only AdminClient (the external) is stubbed.
77+
final AdminClient adminClient = mock(AdminClient.class);
78+
stubPartitions(adminClient, Map.of("topic-a", 30));
79+
final StreamThreadsCountResolver resolver = resolverWith(() -> 8, adminClient);
7380

74-
final int resolved = resolver.resolve(topology, bootstrapProperties());
81+
final int resolved = resolver.resolve(simpleTopology(), bootstrapProperties());
7582

76-
assertEquals(7, resolved);
83+
assertEquals(4, resolved);
7784
}
7885

7986
@Test
@@ -94,6 +101,13 @@ void isDynamicFalseForNumericValueOrAbsent() {
94101
assertEquals(false, StreamThreadsCountResolver.isDynamic(Map.of()));
95102
}
96103

104+
private static StreamThreadsCountResolver resolverWith(
105+
final IntSupplier replicaCountSupplier, final AdminClient adminClient) {
106+
final Function<Properties, AdminClient> factory = properties -> adminClient;
107+
return new StreamThreadsCountResolver(
108+
new DynamicStreamThreadsCountCalculator(), replicaCountSupplier, factory);
109+
}
110+
97111
private static Topology simpleTopology() {
98112
final StreamsBuilder builder = new StreamsBuilder();
99113
builder.stream("topic-a", Consumed.with(Serdes.String(), Serdes.String()))
@@ -104,4 +118,22 @@ private static Topology simpleTopology() {
104118
private static Map<String, Object> bootstrapProperties() {
105119
return Map.of(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
106120
}
121+
122+
private static void stubPartitions(
123+
final AdminClient adminClient, final Map<String, Integer> partitionsByTopic) {
124+
final DescribeTopicsResult result = mock(DescribeTopicsResult.class);
125+
final Map<String, KafkaFuture<TopicDescription>> futures = new HashMap<>();
126+
partitionsByTopic.forEach(
127+
(topic, partitionCount) -> futures.put(topic, futureFor(topic, partitionCount)));
128+
when(result.topicNameValues()).thenReturn(futures);
129+
when(adminClient.describeTopics(anySet())).thenReturn(result);
130+
}
131+
132+
private static KafkaFuture<TopicDescription> futureFor(final String topic, final int partitions) {
133+
final List<TopicPartitionInfo> partitionInfos =
134+
IntStream.range(0, partitions)
135+
.mapToObj(index -> new TopicPartitionInfo(index, Node.noNode(), List.of(), List.of()))
136+
.collect(toUnmodifiableList());
137+
return KafkaFuture.completedFuture(new TopicDescription(topic, false, partitionInfos));
138+
}
107139
}

0 commit comments

Comments
 (0)