Skip to content

Commit 6bead09

Browse files
Auto-size num.stream.threads from topology and replica count (#119)
1 parent 94e63d8 commit 6bead09

8 files changed

Lines changed: 696 additions & 0 deletions

File tree

kafka-streams-framework/build.gradle.kts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ dependencies {
3333
testImplementation(commonLibs.junit.jupiter)
3434
testImplementation(localLibs.junit.pioneer)
3535
testImplementation(commonLibs.mockito.core)
36+
testImplementation(commonLibs.mockito.junit)
3637
testImplementation(localLibs.hamcrest.core)
3738
testRuntimeOnly(commonLibs.log4j.slf4j2.impl)
3839
}

kafka-streams-framework/gradle.lockfile

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,7 @@ org.junit:junit-bom:5.10.0=testCompileClasspath
119119
org.junit:junit-bom:5.11.2=testRuntimeClasspath
120120
org.latencyutils:LatencyUtils:2.0.3=runtimeClasspath,testRuntimeClasspath
121121
org.mockito:mockito-core:5.8.0=testCompileClasspath,testRuntimeClasspath
122+
org.mockito:mockito-junit-jupiter:5.8.0=testCompileClasspath,testRuntimeClasspath
122123
org.objenesis:objenesis:3.3=testRuntimeClasspath
123124
org.opentest4j:opentest4j:1.3.0=testCompileClasspath,testRuntimeClasspath
124125
org.projectlombok:lombok:1.18.30=annotationProcessor,compileClasspath,testCompileClasspath

kafka-streams-framework/src/main/java/org/hypertrace/core/kafkastreams/framework/KafkaStreamsApp.java

Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
import static org.apache.kafka.streams.StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG;
1717
import static org.apache.kafka.streams.StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG;
1818
import static org.apache.kafka.streams.StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG;
19+
import static org.apache.kafka.streams.StreamsConfig.NUM_STREAM_THREADS_CONFIG;
1920
import static org.apache.kafka.streams.StreamsConfig.ROCKSDB_CONFIG_SETTER_CLASS_CONFIG;
2021
import static org.apache.kafka.streams.StreamsConfig.TOPOLOGY_OPTIMIZATION;
2122
import static org.apache.kafka.streams.StreamsConfig.consumerPrefix;
@@ -34,6 +35,8 @@
3435
import java.util.HashMap;
3536
import java.util.List;
3637
import java.util.Map;
38+
import java.util.Optional;
39+
import java.util.OptionalInt;
3740
import java.util.Properties;
3841
import java.util.concurrent.TimeUnit;
3942
import java.util.stream.Collectors;
@@ -49,6 +52,7 @@
4952
import org.hypertrace.core.kafkastreams.framework.listeners.LoggingStateListener;
5053
import org.hypertrace.core.kafkastreams.framework.listeners.LoggingStateRestoreListener;
5154
import org.hypertrace.core.kafkastreams.framework.rocksdb.BoundedMemoryConfigSetter;
55+
import org.hypertrace.core.kafkastreams.framework.threading.StreamThreadsCountResolver;
5256
import org.hypertrace.core.kafkastreams.framework.timestampextractors.UseWallclockTimeOnInvalidTimestamp;
5357
import org.hypertrace.core.kafkastreams.framework.topics.creator.KafkaTopicCreator;
5458
import org.hypertrace.core.kafkastreams.framework.util.ExceptionUtils;
@@ -65,6 +69,16 @@ public abstract class KafkaStreamsApp extends PlatformService {
6569
public static final String CLEANUP_LOCAL_STATE = "cleanup.local.state";
6670
public static final String PRE_CREATE_TOPICS = "precreate.topics";
6771
public static final String KAFKA_STREAMS_CONFIG_KEY = "kafka.streams.config";
72+
73+
/**
74+
* Framework-level boolean opt-in (set in the streams config map) that gates dynamic {@code
75+
* num.stream.threads} resolution. Apps must also override {@link
76+
* #getStreamThreadsCountResolver()}; this flag exists separately so deployments can roll out and
77+
* roll back per-cluster without code changes. The flag is consumed by the framework before the
78+
* config reaches Kafka Streams and is not a Kafka config key.
79+
*/
80+
public static final String DYNAMIC_NUM_STREAM_THREADS_CONFIG = "dynamic.num.stream.threads";
81+
6882
private static final String SHUTDOWN_DURATION = "shutdown.duration";
6983
private static final Logger logger = LoggerFactory.getLogger(KafkaStreamsApp.class);
7084

@@ -106,6 +120,17 @@ protected void doInit() {
106120
streamsBuilder = buildTopology(streamsConfig, streamsBuilder, sourceStreams);
107121
this.topology = streamsBuilder.build();
108122

123+
// Strip the dynamic-resolution flag before it reaches Kafka Streams (it's a framework-level
124+
// opt-in, not a Kafka config). When enabled and a resolver is wired up, replace the
125+
// configured num.stream.threads with the dynamically-computed value; otherwise the
126+
// configured value flows through unchanged.
127+
final boolean dynamicEnabled = isDynamicNumStreamThreadsEnabled(streamsConfig);
128+
streamsConfig.remove(DYNAMIC_NUM_STREAM_THREADS_CONFIG);
129+
if (dynamicEnabled) {
130+
resolveDynamicStreamThreads(streamsConfig)
131+
.ifPresent(threads -> streamsConfig.put(NUM_STREAM_THREADS_CONFIG, threads));
132+
}
133+
109134
getLogger().info("Finalized kafka streams configuration: {}", streamsConfig);
110135

111136
// pre-create input/output topics required for kstream application
@@ -257,6 +282,80 @@ public abstract StreamsBuilder buildTopology(
257282
StreamsBuilder streamsBuilder,
258283
Map<String, KStream<?, ?>> sourceStreams);
259284

285+
/**
286+
* Override in subclasses that want auto-sized {@code num.stream.threads}. Return a resolver
287+
* configured with this app's replica-count source. The framework only invokes this when {@link
288+
* #DYNAMIC_NUM_STREAM_THREADS_CONFIG} is {@code true} in the streams config; returning an empty
289+
* optional (the default) disables dynamic resolution.
290+
*
291+
* <p>Apps that don't override this are unaffected — the default returns {@code Optional.empty()}
292+
* and the framework leaves {@code num.stream.threads} exactly as configured. Apps that do
293+
* override should supply replica count from wherever the deployment exposes it (e.g. the {@code
294+
* REPLICA_COUNT} environment variable injected by the k8s template, or a HOCON config key).
295+
*
296+
* <p>Example:
297+
*
298+
* <pre>{@code
299+
* @Override
300+
* protected Optional<StreamThreadsCountResolver> getStreamThreadsCountResolver() {
301+
* final Optional<Integer> replicaCount =
302+
* ConfigUtils.optionalInteger(getAppConfig(), "replica.count");
303+
* return Optional.of(
304+
* new StreamThreadsCountResolver(
305+
* StreamThreadsCountResolver.optionalReplicaCount(replicaCount)));
306+
* }
307+
* }</pre>
308+
*
309+
* <p>The configured numeric {@code num.stream.threads} flows through unchanged when:
310+
*
311+
* <ul>
312+
* <li>{@link #DYNAMIC_NUM_STREAM_THREADS_CONFIG} is {@code false} or unset
313+
* <li>this method returns empty (no resolver wired up)
314+
* <li>this method throws
315+
* <li>the resolver returns empty: replica count is non-positive, the AdminClient/calculator
316+
* call throws, the topology contains a regex/pattern source, or no source partitions are
317+
* resolvable on the broker
318+
* </ul>
319+
*/
320+
protected Optional<StreamThreadsCountResolver> getStreamThreadsCountResolver() {
321+
return Optional.empty();
322+
}
323+
324+
// Caller must check the dynamic-enabled flag before invoking. Returns OptionalInt.empty() when
325+
// no resolver is wired or resolution cannot produce a value — caller keeps the configured
326+
// num.stream.threads as the fallback.
327+
private OptionalInt resolveDynamicStreamThreads(Map<String, Object> streamsProperties) {
328+
final Optional<StreamThreadsCountResolver> resolver;
329+
try {
330+
resolver = getStreamThreadsCountResolver();
331+
} catch (Exception exception) {
332+
getLogger()
333+
.warn(
334+
"getStreamThreadsCountResolver() threw; keeping configured num.stream.threads",
335+
exception);
336+
return OptionalInt.empty();
337+
}
338+
if (resolver.isEmpty()) {
339+
getLogger()
340+
.warn(
341+
"{} is true but no StreamThreadsCountResolver is provided; keeping configured num.stream.threads",
342+
DYNAMIC_NUM_STREAM_THREADS_CONFIG);
343+
return OptionalInt.empty();
344+
}
345+
final OptionalInt resolved = resolver.get().resolve(this.topology, streamsProperties);
346+
if (resolved.isEmpty()) {
347+
getLogger()
348+
.warn("Resolver could not compute dynamic num.stream.threads; keeping configured value");
349+
}
350+
return resolved;
351+
}
352+
353+
private static boolean isDynamicNumStreamThreadsEnabled(
354+
final Map<String, Object> streamsProperties) {
355+
final Object value = streamsProperties.get(DYNAMIC_NUM_STREAM_THREADS_CONFIG);
356+
return value != null && Boolean.parseBoolean(String.valueOf(value));
357+
}
358+
260359
public Map<String, Object> getStreamsConfig(Config jobConfig) {
261360
return new HashMap<>(ConfigUtils.getFlatMapConfig(jobConfig, getStreamsConfigKey()));
262361
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,168 @@
1+
package org.hypertrace.core.kafkastreams.framework.threading;
2+
3+
import static java.util.stream.Collectors.toUnmodifiableSet;
4+
5+
import java.time.Duration;
6+
import java.util.HashMap;
7+
import java.util.Map;
8+
import java.util.Map.Entry;
9+
import java.util.OptionalInt;
10+
import java.util.Set;
11+
import java.util.concurrent.ExecutionException;
12+
import java.util.concurrent.TimeUnit;
13+
import java.util.concurrent.TimeoutException;
14+
import org.apache.kafka.clients.admin.AdminClient;
15+
import org.apache.kafka.clients.admin.DescribeTopicsResult;
16+
import org.apache.kafka.clients.admin.TopicDescription;
17+
import org.apache.kafka.common.KafkaFuture;
18+
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
19+
import org.apache.kafka.streams.Topology;
20+
import org.apache.kafka.streams.TopologyDescription;
21+
import org.apache.kafka.streams.TopologyDescription.Source;
22+
import org.apache.kafka.streams.TopologyDescription.Subtopology;
23+
import org.slf4j.Logger;
24+
import org.slf4j.LoggerFactory;
25+
26+
/**
27+
* Computes a per-instance {@code num.stream.threads} value from a topology and the partition count
28+
* of every source topic.
29+
*
30+
* <p>For each sub-topology the maximum partition count across its source topics is the number of
31+
* stream tasks. Summing across sub-topologies and dividing by the replica count yields the threads
32+
* each instance should run to keep all tasks active without idle threads.
33+
*
34+
* <p>Returns {@link OptionalInt#empty()} when the topology contains a regex/pattern subscription
35+
* ({@link Source#topicPattern()}) — those sub-topologies cannot be enumerated against the broker
36+
* up-front, so dynamic sizing would silently under-count tasks. The caller falls back to its
37+
* configured default in that case.
38+
*/
39+
public class DynamicStreamThreadsCountCalculator {
40+
41+
private static final long DESCRIBE_TOPICS_TIMEOUT_MILLIS = Duration.ofSeconds(5).toMillis();
42+
private static final Logger logger =
43+
LoggerFactory.getLogger(DynamicStreamThreadsCountCalculator.class);
44+
45+
private static Set<String> sourceTopicsOf(final Subtopology subtopology) {
46+
return subtopology.nodes().stream()
47+
.filter(node -> node instanceof Source)
48+
.map(node -> (Source) node)
49+
.flatMap(source -> source.topicSet().stream())
50+
.collect(toUnmodifiableSet());
51+
}
52+
53+
private static boolean hasPatternSource(final Subtopology subtopology) {
54+
return subtopology.nodes().stream()
55+
.filter(node -> node instanceof Source)
56+
.map(node -> (Source) node)
57+
.anyMatch(source -> source.topicPattern() != null);
58+
}
59+
60+
public OptionalInt compute(
61+
final Topology topology, final AdminClient adminClient, final int replicas) {
62+
if (replicas <= 0) {
63+
throw new IllegalArgumentException("replicas must be positive, got " + replicas);
64+
}
65+
66+
final TopologyDescription description = topology.describe();
67+
68+
// Bail out if any sub-topology subscribes via regex — topicSet() is empty for those, so
69+
// dynamic sizing would silently under-count tasks. The caller substitutes its fallback.
70+
final boolean anyPatternSource =
71+
description.subtopologies().stream()
72+
.anyMatch(DynamicStreamThreadsCountCalculator::hasPatternSource);
73+
if (anyPatternSource) {
74+
logger.warn(
75+
"Topology contains a regex/pattern source; dynamic num.stream.threads is not supported. "
76+
+ "Caller will fall back to its configured default.");
77+
return OptionalInt.empty();
78+
}
79+
80+
final Set<String> sourceTopics =
81+
description.subtopologies().stream()
82+
.flatMap(subtopology -> sourceTopicsOf(subtopology).stream())
83+
.collect(toUnmodifiableSet());
84+
85+
final Map<String, Integer> partitionsByTopic = describePartitions(adminClient, sourceTopics);
86+
87+
int totalTasks = 0;
88+
int subtopologyCount = 0;
89+
for (final Subtopology subtopology : description.subtopologies()) {
90+
subtopologyCount++;
91+
final Set<String> subtopologyTopics = sourceTopicsOf(subtopology);
92+
93+
final int tasksForSubtopology =
94+
subtopologyTopics.stream()
95+
.mapToInt(topic -> partitionsByTopic.getOrDefault(topic, 0))
96+
.max()
97+
.orElse(0);
98+
99+
if (tasksForSubtopology == 0) {
100+
logger.warn(
101+
"Sub-topology has no resolvable partitions; topics={}. Pod restart will be needed once topics exist.",
102+
subtopologyTopics);
103+
}
104+
totalTasks += tasksForSubtopology;
105+
}
106+
107+
if (totalTasks == 0) {
108+
logger.warn(
109+
"No resolvable partitions across {} sub-topologies; skipping dynamic num.stream.threads.",
110+
subtopologyCount);
111+
return OptionalInt.empty();
112+
}
113+
114+
final int threads = (int) Math.ceil((double) totalTasks / replicas);
115+
logger.info(
116+
"Dynamic num.stream.threads: totalTasks={} across {} sub-topologies, replicas={}, computed={}",
117+
totalTasks,
118+
subtopologyCount,
119+
replicas,
120+
threads);
121+
return OptionalInt.of(threads);
122+
}
123+
124+
// Single-loop implementation: AdminClient.describeTopics() already fires all RPCs concurrently
125+
// before returning futures, so iteration here only consumes a shared deadline (now+timeout) —
126+
// total wall-clock is capped at DESCRIBE_TOPICS_TIMEOUT_MILLIS regardless of topic count.
127+
private Map<String, Integer> describePartitions(
128+
final AdminClient adminClient, final Set<String> topics) {
129+
if (topics.isEmpty()) {
130+
return Map.of();
131+
}
132+
final DescribeTopicsResult result = adminClient.describeTopics(topics);
133+
final Map<String, KafkaFuture<TopicDescription>> futures = result.topicNameValues();
134+
final long deadlineMillis = System.currentTimeMillis() + DESCRIBE_TOPICS_TIMEOUT_MILLIS;
135+
final Map<String, Integer> partitions = new HashMap<>();
136+
137+
for (final Entry<String, KafkaFuture<TopicDescription>> entry : futures.entrySet()) {
138+
final long remainingMillis = deadlineMillis - System.currentTimeMillis();
139+
if (remainingMillis <= 0) {
140+
throw new RuntimeException(
141+
"Timed out describing topics after " + DESCRIBE_TOPICS_TIMEOUT_MILLIS + "ms");
142+
}
143+
try {
144+
partitions.put(
145+
entry.getKey(),
146+
entry.getValue().get(remainingMillis, TimeUnit.MILLISECONDS).partitions().size());
147+
} catch (final TimeoutException timeoutException) {
148+
throw new RuntimeException(
149+
"Timed out describing topic " + entry.getKey(), timeoutException);
150+
} catch (final InterruptedException interruptedException) {
151+
Thread.currentThread().interrupt();
152+
throw new RuntimeException(
153+
"Interrupted while describing topic " + entry.getKey(), interruptedException);
154+
} catch (final ExecutionException executionException) {
155+
if (executionException.getCause() instanceof UnknownTopicOrPartitionException) {
156+
logger.warn(
157+
"Topic absent on broker: {}. Treating as 0 partitions; restart needed once created.",
158+
entry.getKey());
159+
partitions.put(entry.getKey(), 0);
160+
} else {
161+
throw new RuntimeException(
162+
"Failed to describe topic " + entry.getKey(), executionException);
163+
}
164+
}
165+
}
166+
return Map.copyOf(partitions);
167+
}
168+
}

0 commit comments

Comments
 (0)