Skip to content

Commit 736f37d

Browse files
Address review: explicit fallback gates, single-deadline allOf
- Replace throw-and-catch in resolver with explicit if-gate: a non-positive replica count (zero, negative, or absent) logs and returns FALLBACK_NUM_STREAM_THREADS. optionalReplicaCount() now yields 0 instead of throwing. - Calculator: replace per-topic get(timeout) loop with KafkaFuture.allOf(...).get(timeout) so worst-case startup wait is one deadline instead of N. Per-future UnknownTopicOrPartitionException handling is preserved. - KafkaStreamsApp: when num.stream.threads = DYNAMIC and no resolver is provided (or the override throws), substitute FALLBACK_NUM_STREAM_THREADS so the literal "DYNAMIC" never reaches Kafka Streams config (it can't parse the string). - Promote FALLBACK_NUM_STREAM_THREADS to public so callers can reference the same constant. - Document regex/pattern subscription limitation in the calculator javadoc. - Document fallback rationale on FALLBACK_NUM_STREAM_THREADS. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
1 parent c305f24 commit 736f37d

3 files changed

Lines changed: 76 additions & 52 deletions

File tree

kafka-streams-framework/src/main/java/org/hypertrace/core/kafkastreams/framework/KafkaStreamsApp.java

Lines changed: 23 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,6 @@
3636
import java.util.List;
3737
import java.util.Map;
3838
import java.util.Optional;
39-
import java.util.OptionalInt;
4039
import java.util.Properties;
4140
import java.util.concurrent.TimeUnit;
4241
import java.util.stream.Collectors;
@@ -110,8 +109,9 @@ protected void doInit() {
110109
streamsBuilder = buildTopology(streamsConfig, streamsBuilder, sourceStreams);
111110
this.topology = streamsBuilder.build();
112111

113-
resolveDynamicStreamThreads(streamsConfig)
114-
.ifPresent(threads -> streamsConfig.put(NUM_STREAM_THREADS_CONFIG, threads));
112+
if (StreamThreadsCountResolver.isDynamic(streamsConfig)) {
113+
streamsConfig.put(NUM_STREAM_THREADS_CONFIG, resolveDynamicStreamThreads(streamsConfig));
114+
}
115115

116116
getLogger().info("Finalized kafka streams configuration: {}", streamsConfig);
117117

@@ -292,31 +292,31 @@ protected Optional<StreamThreadsCountResolver> getStreamThreadsCountResolver() {
292292
return Optional.empty();
293293
}
294294

295-
// Defensive: any unexpected runtime failure in dynamic resolution must NOT prevent the app from
296-
// starting. If something goes wrong, log and return empty so the caller leaves the streams
297-
// config untouched. JVM Errors (OOM, LinkageError, etc.) propagate — those are not recoverable.
298-
private OptionalInt resolveDynamicStreamThreads(Map<String, Object> streamsProperties) {
295+
// Caller must check StreamThreadsCountResolver.isDynamic(...) before invoking. Always returns
296+
// a concrete value: either the resolver's computed count or FALLBACK_NUM_STREAM_THREADS.
297+
// The string sentinel "DYNAMIC" must never leak back to Kafka Streams config — it can't be
298+
// parsed as an integer.
299+
private int resolveDynamicStreamThreads(Map<String, Object> streamsProperties) {
300+
Optional<StreamThreadsCountResolver> resolver;
299301
try {
300-
if (!StreamThreadsCountResolver.isDynamic(streamsProperties)) {
301-
return OptionalInt.empty();
302-
}
303-
Optional<StreamThreadsCountResolver> resolver = getStreamThreadsCountResolver();
304-
if (resolver.isEmpty()) {
305-
getLogger()
306-
.warn(
307-
"{} is set to DYNAMIC but no StreamThreadsCountResolver is provided; leaving as-is",
308-
NUM_STREAM_THREADS_CONFIG);
309-
return OptionalInt.empty();
310-
}
311-
return OptionalInt.of(resolver.get().resolve(this.topology, streamsProperties));
302+
resolver = getStreamThreadsCountResolver();
312303
} catch (Exception exception) {
313304
getLogger()
314-
.error(
315-
"Unexpected error resolving dynamic {}; leaving config untouched",
316-
NUM_STREAM_THREADS_CONFIG,
305+
.warn(
306+
"getStreamThreadsCountResolver() threw; falling back to {} stream threads",
307+
StreamThreadsCountResolver.FALLBACK_NUM_STREAM_THREADS,
317308
exception);
318-
return OptionalInt.empty();
309+
return StreamThreadsCountResolver.FALLBACK_NUM_STREAM_THREADS;
310+
}
311+
if (resolver.isEmpty()) {
312+
getLogger()
313+
.warn(
314+
"{} is set to DYNAMIC but no StreamThreadsCountResolver is provided; falling back to {}",
315+
NUM_STREAM_THREADS_CONFIG,
316+
StreamThreadsCountResolver.FALLBACK_NUM_STREAM_THREADS);
317+
return StreamThreadsCountResolver.FALLBACK_NUM_STREAM_THREADS;
319318
}
319+
return resolver.get().resolve(this.topology, streamsProperties);
320320
}
321321

322322
public Map<String, Object> getStreamsConfig(Config jobConfig) {

kafka-streams-framework/src/main/java/org/hypertrace/core/kafkastreams/framework/threading/DynamicStreamThreadsCountCalculator.java

Lines changed: 31 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,9 @@
2929
* <p>For each sub-topology the maximum partition count across its source topics is the number of
3030
* stream tasks. Summing across sub-topologies and dividing by the replica count yields the threads
3131
* each instance should run to keep all tasks active without idle threads.
32+
*
33+
* <p>Regex/pattern subscriptions ({@link Source#topicPattern()}) are not supported — those
34+
* sub-topologies contribute zero tasks. No current app subscribes via regex.
3235
*/
3336
public class DynamicStreamThreadsCountCalculator {
3437

@@ -93,13 +96,19 @@ private Map<String, Integer> describePartitions(
9396
return Map.of();
9497
}
9598
final DescribeTopicsResult result = adminClient.describeTopics(topics);
99+
final Map<String, KafkaFuture<TopicDescription>> futures = result.topicNameValues();
100+
101+
// Single shared deadline across all topics — worst-case wait is one timeout, not N.
102+
// AdminClient retries the underlying RPC internally per its own retry config.
103+
awaitAll(futures.values(), DESCRIBE_TOPICS_TIMEOUT_MILLIS);
104+
96105
final Map<String, Integer> partitions = new HashMap<>();
97-
for (final Entry<String, KafkaFuture<TopicDescription>> entry :
98-
result.topicNameValues().entrySet()) {
106+
for (final Entry<String, KafkaFuture<TopicDescription>> entry : futures.entrySet()) {
99107
try {
100-
final TopicDescription description =
101-
entry.getValue().get(DESCRIBE_TOPICS_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
102-
partitions.put(entry.getKey(), description.partitions().size());
108+
// Non-blocking after awaitAll: each future is already complete (or completed
109+
// exceptionally). Per-topic handling stays so UnknownTopicOrPartitionException
110+
// doesn't fail the whole batch.
111+
partitions.put(entry.getKey(), entry.getValue().get().partitions().size());
103112
} catch (final ExecutionException executionException) {
104113
if (executionException.getCause() instanceof UnknownTopicOrPartitionException) {
105114
logger.warn(
@@ -114,11 +123,25 @@ private Map<String, Integer> describePartitions(
114123
Thread.currentThread().interrupt();
115124
throw new RuntimeException(
116125
"Interrupted while describing topic " + entry.getKey(), interruptedException);
117-
} catch (final TimeoutException timeoutException) {
118-
throw new RuntimeException(
119-
"Timed out describing topic " + entry.getKey(), timeoutException);
120126
}
121127
}
122128
return Map.copyOf(partitions);
123129
}
130+
131+
private static void awaitAll(
132+
final java.util.Collection<KafkaFuture<TopicDescription>> futures, final long timeoutMillis) {
133+
try {
134+
KafkaFuture.allOf(futures.toArray(KafkaFuture[]::new))
135+
.get(timeoutMillis, TimeUnit.MILLISECONDS);
136+
} catch (final TimeoutException timeoutException) {
137+
throw new RuntimeException(
138+
"Timed out describing topics after " + timeoutMillis + "ms", timeoutException);
139+
} catch (final InterruptedException interruptedException) {
140+
Thread.currentThread().interrupt();
141+
throw new RuntimeException("Interrupted describing topics", interruptedException);
142+
} catch (final ExecutionException executionException) {
143+
// At least one topic future completed exceptionally; per-future handling below classifies
144+
// (UnknownTopicOrPartitionException → 0 partitions; everything else → fail).
145+
}
146+
}
124147
}

kafka-streams-framework/src/main/java/org/hypertrace/core/kafkastreams/framework/threading/StreamThreadsCountResolver.java

Lines changed: 22 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,16 @@
1717
* resolution fails.
1818
*
1919
* <p>The replica count is supplied externally — applications typically wire it from a {@code
20-
* REPLICA_COUNT} environment variable injected by the deployment template.
20+
* REPLICA_COUNT} environment variable injected by the deployment template. A non-positive value
21+
* (zero, negative, or unset) falls back to {@link #FALLBACK_NUM_STREAM_THREADS}.
2122
*/
2223
public class StreamThreadsCountResolver {
2324

2425
public static final String DYNAMIC_SENTINEL = "DYNAMIC";
25-
static final int FALLBACK_NUM_STREAM_THREADS = 8;
26+
27+
// Reasonable default for current Hypertrace Kafka Streams apps. Apps that need a different
28+
// value should configure a numeric num.stream.threads explicitly instead of using DYNAMIC.
29+
public static final int FALLBACK_NUM_STREAM_THREADS = 8;
2630

2731
private static final Logger logger = LoggerFactory.getLogger(StreamThreadsCountResolver.class);
2832

@@ -55,15 +59,21 @@ public static boolean isDynamic(final Map<String, Object> streamsProperties) {
5559

5660
/**
5761
* Resolve a concrete thread count for the given topology. Returns {@link
58-
* #FALLBACK_NUM_STREAM_THREADS} on any failure so the application can still start.
62+
* #FALLBACK_NUM_STREAM_THREADS} when prerequisites are missing (non-positive replica count) or
63+
* the AdminClient/calculator call fails, so the application can still start.
5964
*/
6065
public int resolve(final Topology topology, final Map<String, Object> streamsProperties) {
61-
try {
62-
final int replicas = requirePositiveReplicaCount();
63-
try (final AdminClient adminClient =
64-
adminClientFactory.apply(toProperties(streamsProperties))) {
65-
return calculator.compute(topology, adminClient, replicas);
66-
}
66+
final int replicas = replicaCountSupplier.getAsInt();
67+
if (replicas <= 0) {
68+
logger.warn(
69+
"replica.count is non-positive ({}); falling back to {} stream threads",
70+
replicas,
71+
FALLBACK_NUM_STREAM_THREADS);
72+
return FALLBACK_NUM_STREAM_THREADS;
73+
}
74+
try (final AdminClient adminClient =
75+
adminClientFactory.apply(toProperties(streamsProperties))) {
76+
return calculator.compute(topology, adminClient, replicas);
6777
} catch (final RuntimeException runtimeException) {
6878
logger.error(
6979
"Failed to compute dynamic num.stream.threads; falling back to {}",
@@ -75,12 +85,11 @@ public int resolve(final Topology topology, final Map<String, Object> streamsPro
7585

7686
/**
7787
* Convenience adapter — given an {@code Optional<Integer>} replica-count source (the common
78-
* config-loaded shape), produce a supplier that this resolver accepts.
88+
* config-loaded shape), produce a supplier that yields the value when present and {@code 0} when
89+
* absent (treated as a fallback signal by {@link #resolve}).
7990
*/
8091
public static IntSupplier optionalReplicaCount(final Optional<Integer> replicaCount) {
81-
return () ->
82-
replicaCount.orElseThrow(
83-
() -> new IllegalStateException("replica.count is not configured"));
92+
return () -> replicaCount.orElse(0);
8493
}
8594

8695
private static Properties toProperties(final Map<String, Object> streamsProperties) {
@@ -93,12 +102,4 @@ private static Properties toProperties(final Map<String, Object> streamsProperti
93102
});
94103
return properties;
95104
}
96-
97-
private int requirePositiveReplicaCount() {
98-
final int replicas = replicaCountSupplier.getAsInt();
99-
if (replicas <= 0) {
100-
throw new IllegalStateException("replica.count must be positive, got " + replicas);
101-
}
102-
return replicas;
103-
}
104105
}

0 commit comments

Comments
 (0)