Skip to content

Commit 1c5c6ce

Browse files
Detect pattern sources, use OptionalInt result, inline awaitAll
- Calculator returns OptionalInt; pre-pass on Source.topicPattern() returns empty when any sub-topology subscribes via regex (was silently undercount). - Resolver also returns OptionalInt; non-positive replicas and AdminClient failures map to empty rather than each falling back independently. - Framework collapses every "cannot resolve" path through one .orElse so FALLBACK_NUM_STREAM_THREADS is the only place the literal substitution happens. - describePartitions inlines the awaitAll helper into a single deadline- driven loop. Behaviour identical (allOf + per-future get and now+timeout + per-future get(remaining) both share one 5s budget; AdminClient fires RPCs concurrently before returning futures), readability improved. - New test covers regex-source topology -> empty all the way through.
1 parent f8ae1b0 commit 1c5c6ce

6 files changed

Lines changed: 143 additions & 74 deletions

File tree

kafka-streams-framework/src/main/java/org/hypertrace/core/kafkastreams/framework/KafkaStreamsApp.java

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
import java.util.List;
3737
import java.util.Map;
3838
import java.util.Optional;
39+
import java.util.OptionalInt;
3940
import java.util.Properties;
4041
import java.util.concurrent.TimeUnit;
4142
import java.util.stream.Collectors;
@@ -292,10 +293,11 @@ protected Optional<StreamThreadsCountResolver> getStreamThreadsCountResolver() {
292293
return Optional.empty();
293294
}
294295

295-
// Caller must check StreamThreadsCountResolver.isDynamic(...) before invoking. Always returns
296-
// a concrete value: either the resolver's computed count or FALLBACK_NUM_STREAM_THREADS.
297-
// The string sentinel "DYNAMIC" must never leak back to Kafka Streams config — it can't be
298-
// parsed as an integer.
296+
// Caller must check StreamThreadsCountResolver.isDynamic(...) before invoking. Always returns a
297+
// concrete int — every "cannot resolve" path (resolver-supplier throws, no resolver wired,
298+
// resolver returns empty for unsupported topologies / non-positive replicas / AdminClient
299+
// failure) collapses to FALLBACK_NUM_STREAM_THREADS so the literal "DYNAMIC" sentinel never
300+
// reaches Kafka Streams.
299301
private int resolveDynamicStreamThreads(Map<String, Object> streamsProperties) {
300302
Optional<StreamThreadsCountResolver> resolver;
301303
try {
@@ -316,7 +318,14 @@ private int resolveDynamicStreamThreads(Map<String, Object> streamsProperties) {
316318
StreamThreadsCountResolver.FALLBACK_NUM_STREAM_THREADS);
317319
return StreamThreadsCountResolver.FALLBACK_NUM_STREAM_THREADS;
318320
}
319-
return resolver.get().resolve(this.topology, streamsProperties);
321+
final OptionalInt resolved = resolver.get().resolve(this.topology, streamsProperties);
322+
if (resolved.isEmpty()) {
323+
getLogger()
324+
.warn(
325+
"Resolver could not compute dynamic num.stream.threads; falling back to {}",
326+
StreamThreadsCountResolver.FALLBACK_NUM_STREAM_THREADS);
327+
}
328+
return resolved.orElse(StreamThreadsCountResolver.FALLBACK_NUM_STREAM_THREADS);
320329
}
321330

322331
public Map<String, Object> getStreamsConfig(Config jobConfig) {

kafka-streams-framework/src/main/java/org/hypertrace/core/kafkastreams/framework/threading/DynamicStreamThreadsCountCalculator.java

Lines changed: 48 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
import java.util.HashMap;
77
import java.util.Map;
88
import java.util.Map.Entry;
9+
import java.util.OptionalInt;
910
import java.util.Set;
1011
import java.util.concurrent.ExecutionException;
1112
import java.util.concurrent.TimeUnit;
@@ -30,8 +31,10 @@
3031
* stream tasks. Summing across sub-topologies and dividing by the replica count yields the threads
3132
* each instance should run to keep all tasks active without idle threads.
3233
*
33-
* <p>Regex/pattern subscriptions ({@link Source#topicPattern()}) are not supported — those
34-
* sub-topologies contribute zero tasks. No current app subscribes via regex.
34+
* <p>Returns {@link OptionalInt#empty()} when the topology contains a regex/pattern subscription
35+
* ({@link Source#topicPattern()}) — those sub-topologies cannot be enumerated against the broker
36+
* up-front, so dynamic sizing would silently under-count tasks. The caller falls back to its
37+
* configured default in that case.
3538
*/
3639
public class DynamicStreamThreadsCountCalculator {
3740

@@ -47,12 +50,33 @@ private static Set<String> sourceTopicsOf(final Subtopology subtopology) {
4750
.collect(toUnmodifiableSet());
4851
}
4952

50-
public int compute(final Topology topology, final AdminClient adminClient, final int replicas) {
53+
private static boolean hasPatternSource(final Subtopology subtopology) {
54+
return subtopology.nodes().stream()
55+
.filter(node -> node instanceof Source)
56+
.map(node -> (Source) node)
57+
.anyMatch(source -> source.topicPattern() != null);
58+
}
59+
60+
public OptionalInt compute(
61+
final Topology topology, final AdminClient adminClient, final int replicas) {
5162
if (replicas <= 0) {
5263
throw new IllegalArgumentException("replicas must be positive, got " + replicas);
5364
}
5465

5566
final TopologyDescription description = topology.describe();
67+
68+
// Bail out if any sub-topology subscribes via regex — topicSet() is empty for those, so
69+
// dynamic sizing would silently under-count tasks. The caller substitutes its fallback.
70+
final boolean anyPatternSource =
71+
description.subtopologies().stream()
72+
.anyMatch(DynamicStreamThreadsCountCalculator::hasPatternSource);
73+
if (anyPatternSource) {
74+
logger.warn(
75+
"Topology contains a regex/pattern source; dynamic num.stream.threads is not supported. "
76+
+ "Caller will fall back to its configured default.");
77+
return OptionalInt.empty();
78+
}
79+
5680
final Set<String> sourceTopics =
5781
description.subtopologies().stream()
5882
.flatMap(subtopology -> sourceTopicsOf(subtopology).stream())
@@ -87,28 +111,39 @@ public int compute(final Topology topology, final AdminClient adminClient, final
87111
subtopologyCount,
88112
replicas,
89113
threads);
90-
return threads;
114+
return OptionalInt.of(threads);
91115
}
92116

117+
// Single-loop implementation: AdminClient.describeTopics() already fires all RPCs concurrently
118+
// before returning futures, so iteration here only consumes a shared deadline (now+timeout) —
119+
// total wall-clock is capped at DESCRIBE_TOPICS_TIMEOUT_MILLIS regardless of topic count.
93120
private Map<String, Integer> describePartitions(
94121
final AdminClient adminClient, final Set<String> topics) {
95122
if (topics.isEmpty()) {
96123
return Map.of();
97124
}
98125
final DescribeTopicsResult result = adminClient.describeTopics(topics);
99126
final Map<String, KafkaFuture<TopicDescription>> futures = result.topicNameValues();
100-
101-
// Single shared deadline across all topics — worst-case wait is one timeout, not N.
102-
// AdminClient retries the underlying RPC internally per its own retry config.
103-
awaitAll(futures.values(), DESCRIBE_TOPICS_TIMEOUT_MILLIS);
104-
127+
final long deadlineMillis = System.currentTimeMillis() + DESCRIBE_TOPICS_TIMEOUT_MILLIS;
105128
final Map<String, Integer> partitions = new HashMap<>();
129+
106130
for (final Entry<String, KafkaFuture<TopicDescription>> entry : futures.entrySet()) {
131+
final long remainingMillis = deadlineMillis - System.currentTimeMillis();
132+
if (remainingMillis <= 0) {
133+
throw new RuntimeException(
134+
"Timed out describing topics after " + DESCRIBE_TOPICS_TIMEOUT_MILLIS + "ms");
135+
}
107136
try {
108-
// Non-blocking after awaitAll: each future is already complete (or completed
109-
// exceptionally). Per-topic handling stays so UnknownTopicOrPartitionException
110-
// doesn't fail the whole batch.
111-
partitions.put(entry.getKey(), entry.getValue().get().partitions().size());
137+
partitions.put(
138+
entry.getKey(),
139+
entry.getValue().get(remainingMillis, TimeUnit.MILLISECONDS).partitions().size());
140+
} catch (final TimeoutException timeoutException) {
141+
throw new RuntimeException(
142+
"Timed out describing topic " + entry.getKey(), timeoutException);
143+
} catch (final InterruptedException interruptedException) {
144+
Thread.currentThread().interrupt();
145+
throw new RuntimeException(
146+
"Interrupted while describing topic " + entry.getKey(), interruptedException);
112147
} catch (final ExecutionException executionException) {
113148
if (executionException.getCause() instanceof UnknownTopicOrPartitionException) {
114149
logger.warn(
@@ -119,29 +154,8 @@ private Map<String, Integer> describePartitions(
119154
throw new RuntimeException(
120155
"Failed to describe topic " + entry.getKey(), executionException);
121156
}
122-
} catch (final InterruptedException interruptedException) {
123-
Thread.currentThread().interrupt();
124-
throw new RuntimeException(
125-
"Interrupted while describing topic " + entry.getKey(), interruptedException);
126157
}
127158
}
128159
return Map.copyOf(partitions);
129160
}
130-
131-
private static void awaitAll(
132-
final java.util.Collection<KafkaFuture<TopicDescription>> futures, final long timeoutMillis) {
133-
try {
134-
KafkaFuture.allOf(futures.toArray(KafkaFuture[]::new))
135-
.get(timeoutMillis, TimeUnit.MILLISECONDS);
136-
} catch (final TimeoutException timeoutException) {
137-
throw new RuntimeException(
138-
"Timed out describing topics after " + timeoutMillis + "ms", timeoutException);
139-
} catch (final InterruptedException interruptedException) {
140-
Thread.currentThread().interrupt();
141-
throw new RuntimeException("Interrupted describing topics", interruptedException);
142-
} catch (final ExecutionException executionException) {
143-
// At least one topic future completed exceptionally; per-future handling below classifies
144-
// (UnknownTopicOrPartitionException → 0 partitions; everything else → fail).
145-
}
146-
}
147161
}

kafka-streams-framework/src/main/java/org/hypertrace/core/kafkastreams/framework/threading/StreamThreadsCountResolver.java

Lines changed: 10 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import java.util.Map;
44
import java.util.Optional;
5+
import java.util.OptionalInt;
56
import java.util.Properties;
67
import java.util.function.Function;
78
import java.util.function.IntSupplier;
@@ -58,28 +59,25 @@ public static boolean isDynamic(final Map<String, Object> streamsProperties) {
5859
}
5960

6061
/**
61-
* Resolve a concrete thread count for the given topology. Returns {@link
62-
* #FALLBACK_NUM_STREAM_THREADS} when prerequisites are missing (non-positive replica count) or
63-
* the AdminClient/calculator call fails, so the application can still start.
62+
* Resolve a thread count for the given topology. Returns {@link OptionalInt#empty()} when
63+
* prerequisites are missing (non-positive replica count), the topology is unsupported (regex
64+
* sources), or the AdminClient/calculator call fails — the caller substitutes its fallback so the
65+
* application can still start.
6466
*/
65-
public int resolve(final Topology topology, final Map<String, Object> streamsProperties) {
67+
public OptionalInt resolve(final Topology topology, final Map<String, Object> streamsProperties) {
6668
final int replicas = replicaCountSupplier.getAsInt();
6769
if (replicas <= 0) {
68-
logger.warn(
69-
"replica.count is non-positive ({}); falling back to {} stream threads",
70-
replicas,
71-
FALLBACK_NUM_STREAM_THREADS);
72-
return FALLBACK_NUM_STREAM_THREADS;
70+
logger.warn("replica.count is non-positive ({}); skipping dynamic resolution", replicas);
71+
return OptionalInt.empty();
7372
}
7473
try (final AdminClient adminClient =
7574
adminClientFactory.apply(toProperties(streamsProperties))) {
7675
return calculator.compute(topology, adminClient, replicas);
7776
} catch (final RuntimeException runtimeException) {
7877
logger.error(
79-
"Failed to compute dynamic num.stream.threads; falling back to {}",
80-
FALLBACK_NUM_STREAM_THREADS,
78+
"Failed to compute dynamic num.stream.threads; skipping dynamic resolution",
8179
runtimeException);
82-
return FALLBACK_NUM_STREAM_THREADS;
80+
return OptionalInt.empty();
8381
}
8482
}
8583

kafka-streams-framework/src/test/java/org/hypertrace/core/kafkastreams/framework/SampleAppTest.java

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,11 +16,15 @@
1616
import java.util.Map;
1717
import java.util.Optional;
1818
import java.util.Properties;
19+
import java.util.regex.Pattern;
1920
import org.apache.kafka.common.serialization.Serdes;
21+
import org.apache.kafka.streams.StreamsBuilder;
2022
import org.apache.kafka.streams.TestInputTopic;
2123
import org.apache.kafka.streams.TestOutputTopic;
2224
import org.apache.kafka.streams.TopologyTestDriver;
2325
import org.apache.kafka.streams.errors.LogAndContinueExceptionHandler;
26+
import org.apache.kafka.streams.kstream.Consumed;
27+
import org.apache.kafka.streams.kstream.KStream;
2428
import org.hypertrace.core.kafkastreams.framework.rocksdb.BoundedMemoryConfigSetter;
2529
import org.hypertrace.core.kafkastreams.framework.threading.StreamThreadsCountResolver;
2630
import org.hypertrace.core.serviceframework.config.ConfigClientFactory;
@@ -111,6 +115,44 @@ public Map<String, Object> getStreamsConfig(Config jobConfig) {
111115
is(StreamThreadsCountResolver.FALLBACK_NUM_STREAM_THREADS));
112116
}
113117

118+
// Pattern-source topology: calculator returns OptionalInt.empty() because regex subscriptions
119+
// can't be enumerated up-front against the broker. The framework must substitute the integer
120+
// fallback so Kafka Streams gets a parseable value (not the literal "DYNAMIC").
121+
@Test
122+
public void dynamicWithPatternSourceFallsBackToEight() {
123+
SampleApp dynamicApp =
124+
new SampleApp(ConfigClientFactory.getClient()) {
125+
@Override
126+
public Map<String, Object> getStreamsConfig(Config jobConfig) {
127+
Map<String, Object> properties = super.getStreamsConfig(jobConfig);
128+
properties.put(NUM_STREAM_THREADS_CONFIG, StreamThreadsCountResolver.DYNAMIC_SENTINEL);
129+
return properties;
130+
}
131+
132+
@Override
133+
public StreamsBuilder buildTopology(
134+
Map<String, Object> streamsConfig,
135+
StreamsBuilder streamsBuilder,
136+
Map<String, KStream<?, ?>> sourceStreams) {
137+
streamsBuilder.stream(
138+
Pattern.compile("input-.*"), Consumed.with(Serdes.String(), Serdes.String()))
139+
.foreach((key, value) -> {});
140+
return streamsBuilder;
141+
}
142+
143+
@Override
144+
protected Optional<StreamThreadsCountResolver> getStreamThreadsCountResolver() {
145+
return Optional.of(new StreamThreadsCountResolver(() -> 8));
146+
}
147+
};
148+
149+
dynamicApp.doInit();
150+
151+
assertThat(
152+
dynamicApp.streamsConfig.get(NUM_STREAM_THREADS_CONFIG),
153+
is(StreamThreadsCountResolver.FALLBACK_NUM_STREAM_THREADS));
154+
}
155+
114156
// Verifies the fix for "exception while obtaining the resolver should not leak DYNAMIC" — when
115157
// getStreamThreadsCountResolver() throws, the framework must catch and fall back to the integer
116158
// default rather than letting the literal sentinel reach Kafka Streams.

kafka-streams-framework/src/test/java/org/hypertrace/core/kafkastreams/framework/threading/DynamicStreamThreadsCountCalculatorTest.java

Lines changed: 19 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,9 @@
1010
import java.util.HashMap;
1111
import java.util.List;
1212
import java.util.Map;
13+
import java.util.OptionalInt;
1314
import java.util.Set;
15+
import java.util.regex.Pattern;
1416
import java.util.stream.IntStream;
1517
import org.apache.kafka.clients.admin.AdminClient;
1618
import org.apache.kafka.clients.admin.DescribeTopicsResult;
@@ -44,23 +46,23 @@ void singleSubtopologyWithThirtyPartitionsAndEightReplicasGivesFourThreads() {
4446
final Topology topology = topologyForSubtopologies(Set.of("topic-a"));
4547
stubPartitions(Map.of("topic-a", 30));
4648

47-
assertEquals(4, calculator.compute(topology, adminClient, 8));
49+
assertEquals(OptionalInt.of(4), calculator.compute(topology, adminClient, 8));
4850
}
4951

5052
@Test
5153
void twoSubtopologiesAreSummedThenDividedByReplicas() {
5254
final Topology topology = topologyForSubtopologies(Set.of("topic-a"), Set.of("topic-b"));
5355
stubPartitions(Map.of("topic-a", 10, "topic-b", 20));
5456

55-
assertEquals(4, calculator.compute(topology, adminClient, 8));
57+
assertEquals(OptionalInt.of(4), calculator.compute(topology, adminClient, 8));
5658
}
5759

5860
@Test
5961
void absentTopicCountsAsZeroPartitions() {
6062
final Topology topology = topologyForSubtopologies(Set.of("topic-a"), Set.of("topic-missing"));
6163
stubPartitions(Map.of("topic-a", 16, "topic-missing", ABSENT));
6264

63-
assertEquals(2, calculator.compute(topology, adminClient, 8));
65+
assertEquals(OptionalInt.of(2), calculator.compute(topology, adminClient, 8));
6466
}
6567

6668
@Test
@@ -78,7 +80,20 @@ void totalTasksZeroReturnsOne() {
7880
final Topology topology = topologyForSubtopologies(Set.of("topic-a"));
7981
stubPartitions(Map.of("topic-a", ABSENT));
8082

81-
assertEquals(1, calculator.compute(topology, adminClient, 8));
83+
assertEquals(OptionalInt.of(1), calculator.compute(topology, adminClient, 8));
84+
}
85+
86+
// Regex/pattern subscriptions cannot be enumerated up-front against the broker, so dynamic
87+
// sizing would silently under-count tasks. Calculator must signal "not applicable" via empty
88+
// so the caller falls back to its configured default.
89+
@Test
90+
void patternSubscriptionReturnsEmpty() {
91+
final StreamsBuilder builder = new StreamsBuilder();
92+
builder.stream(Pattern.compile("topic-.*"), Consumed.with(Serdes.String(), Serdes.String()))
93+
.foreach((key, value) -> {});
94+
final Topology topology = builder.build();
95+
96+
assertEquals(OptionalInt.empty(), calculator.compute(topology, adminClient, 8));
8297
}
8398

8499
@SafeVarargs

0 commit comments

Comments
 (0)