From 915c91715dab266eb69e6de65076fa8c0619c015 Mon Sep 17 00:00:00 2001 From: Suresh Prakash Date: Mon, 1 Jun 2026 10:25:30 +0530 Subject: [PATCH 1/8] Add dynamic num.stream.threads sizing for Kafka Streams apps Introduce StreamThreadsCountResolver and DynamicStreamThreadsCountCalculator under a new `threading` package, and wire them into KafkaStreamsApp via an overrideable getStreamThreadsCountResolver() hook. When num.stream.threads is set to the sentinel "DYNAMIC", the framework sums the maximum partition count across each sub-topology's source topics and divides by the configured replica count, producing a per-instance thread count that keeps every task active without idle threads. Apps that don't opt in are unaffected: isDynamic() short-circuits unless the value is literally "DYNAMIC", and the resolution path is wrapped in try/catch so any unexpected failure logs and leaves the streams config untouched. Bad broker calls or absent topics fall back to a safe default of 8 threads instead of preventing startup. Also fixes a typo'd `import Optional;` left from an earlier refactor. Tests cover the partition math, multi-subtopology summation, absent-topic handling, replica-count edge cases, sentinel detection, and the resolver's fallback behavior. Co-Authored-By: Claude Opus 4.7 --- kafka-streams-framework/build.gradle.kts | 1 + kafka-streams-framework/gradle.lockfile | 1 + .../framework/KafkaStreamsApp.java | 42 ++++++ .../DynamicStreamThreadsCountCalculator.java | 124 ++++++++++++++++++ .../threading/StreamThreadsCountResolver.java | 99 ++++++++++++++ ...namicStreamThreadsCountCalculatorTest.java | 117 +++++++++++++++++ .../StreamThreadsCountResolverTest.java | 107 +++++++++++++++ 7 files changed, 491 insertions(+) create mode 100644 kafka-streams-framework/src/main/java/org/hypertrace/core/kafkastreams/framework/threading/DynamicStreamThreadsCountCalculator.java create mode 100644 kafka-streams-framework/src/main/java/org/hypertrace/core/kafkastreams/framework/threading/StreamThreadsCountResolver.java create mode 100644 kafka-streams-framework/src/test/java/org/hypertrace/core/kafkastreams/framework/threading/DynamicStreamThreadsCountCalculatorTest.java create mode 100644 kafka-streams-framework/src/test/java/org/hypertrace/core/kafkastreams/framework/threading/StreamThreadsCountResolverTest.java diff --git a/kafka-streams-framework/build.gradle.kts b/kafka-streams-framework/build.gradle.kts index dcdbf36..c65e5b8 100644 --- a/kafka-streams-framework/build.gradle.kts +++ b/kafka-streams-framework/build.gradle.kts @@ -33,6 +33,7 @@ dependencies { testImplementation(commonLibs.junit.jupiter) testImplementation(localLibs.junit.pioneer) testImplementation(commonLibs.mockito.core) + testImplementation(commonLibs.mockito.junit) testImplementation(localLibs.hamcrest.core) testRuntimeOnly(commonLibs.log4j.slf4j2.impl) } diff --git a/kafka-streams-framework/gradle.lockfile b/kafka-streams-framework/gradle.lockfile index e1efd02..e5d2e7e 100644 --- a/kafka-streams-framework/gradle.lockfile +++ b/kafka-streams-framework/gradle.lockfile @@ -119,6 +119,7 @@ org.junit:junit-bom:5.10.0=testCompileClasspath org.junit:junit-bom:5.11.2=testRuntimeClasspath org.latencyutils:LatencyUtils:2.0.3=runtimeClasspath,testRuntimeClasspath org.mockito:mockito-core:5.8.0=testCompileClasspath,testRuntimeClasspath +org.mockito:mockito-junit-jupiter:5.8.0=testCompileClasspath,testRuntimeClasspath org.objenesis:objenesis:3.3=testRuntimeClasspath org.opentest4j:opentest4j:1.3.0=testCompileClasspath,testRuntimeClasspath org.projectlombok:lombok:1.18.30=annotationProcessor,compileClasspath,testCompileClasspath diff --git a/kafka-streams-framework/src/main/java/org/hypertrace/core/kafkastreams/framework/KafkaStreamsApp.java b/kafka-streams-framework/src/main/java/org/hypertrace/core/kafkastreams/framework/KafkaStreamsApp.java index 2e95b3e..44612b4 100644 --- a/kafka-streams-framework/src/main/java/org/hypertrace/core/kafkastreams/framework/KafkaStreamsApp.java +++ b/kafka-streams-framework/src/main/java/org/hypertrace/core/kafkastreams/framework/KafkaStreamsApp.java @@ -16,6 +16,7 @@ import static org.apache.kafka.streams.StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG; import static org.apache.kafka.streams.StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG; import static org.apache.kafka.streams.StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG; +import static org.apache.kafka.streams.StreamsConfig.NUM_STREAM_THREADS_CONFIG; import static org.apache.kafka.streams.StreamsConfig.ROCKSDB_CONFIG_SETTER_CLASS_CONFIG; import static org.apache.kafka.streams.StreamsConfig.TOPOLOGY_OPTIMIZATION; import static org.apache.kafka.streams.StreamsConfig.consumerPrefix; @@ -34,6 +35,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Properties; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; @@ -49,6 +51,7 @@ import org.hypertrace.core.kafkastreams.framework.listeners.LoggingStateListener; import org.hypertrace.core.kafkastreams.framework.listeners.LoggingStateRestoreListener; import org.hypertrace.core.kafkastreams.framework.rocksdb.BoundedMemoryConfigSetter; +import org.hypertrace.core.kafkastreams.framework.threading.StreamThreadsCountResolver; import org.hypertrace.core.kafkastreams.framework.timestampextractors.UseWallclockTimeOnInvalidTimestamp; import org.hypertrace.core.kafkastreams.framework.topics.creator.KafkaTopicCreator; import org.hypertrace.core.kafkastreams.framework.util.ExceptionUtils; @@ -106,6 +109,8 @@ protected void doInit() { streamsBuilder = buildTopology(streamsConfig, streamsBuilder, sourceStreams); this.topology = streamsBuilder.build(); + resolveDynamicStreamThreads(streamsConfig); + getLogger().info("Finalized kafka streams configuration: {}", streamsConfig); // pre-create input/output topics required for kstream application @@ -257,6 +262,43 @@ public abstract StreamsBuilder buildTopology( StreamsBuilder streamsBuilder, Map> sourceStreams); + /** + * Override in subclasses that want auto-sized {@code num.stream.threads}. Return a resolver + * configured with this app's replica count source. The framework only invokes this when {@code + * num.stream.threads} is set to {@link StreamThreadsCountResolver#DYNAMIC_SENTINEL}; returning an + * empty optional disables dynamic resolution and the app keeps whatever value was configured. + */ + protected Optional getStreamThreadsCountResolver() { + return Optional.empty(); + } + + // Defensive: any unexpected failure in dynamic resolution must NOT prevent the app from starting. + // If something goes wrong, log and leave streamsProperties untouched so Kafka Streams uses + // whatever value was originally configured (or its own default). + private void resolveDynamicStreamThreads(Map streamsProperties) { + try { + if (!StreamThreadsCountResolver.isDynamic(streamsProperties)) { + return; + } + Optional resolver = getStreamThreadsCountResolver(); + if (resolver.isEmpty()) { + getLogger() + .warn( + "{} is set to DYNAMIC but no StreamThreadsCountResolver is provided; leaving as-is", + NUM_STREAM_THREADS_CONFIG); + return; + } + int resolved = resolver.get().resolve(this.topology, streamsProperties); + streamsProperties.put(NUM_STREAM_THREADS_CONFIG, resolved); + } catch (Throwable throwable) { + getLogger() + .error( + "Unexpected error resolving dynamic {}; leaving config untouched", + NUM_STREAM_THREADS_CONFIG, + throwable); + } + } + public Map getStreamsConfig(Config jobConfig) { return new HashMap<>(ConfigUtils.getFlatMapConfig(jobConfig, getStreamsConfigKey())); } diff --git a/kafka-streams-framework/src/main/java/org/hypertrace/core/kafkastreams/framework/threading/DynamicStreamThreadsCountCalculator.java b/kafka-streams-framework/src/main/java/org/hypertrace/core/kafkastreams/framework/threading/DynamicStreamThreadsCountCalculator.java new file mode 100644 index 0000000..23aa101 --- /dev/null +++ b/kafka-streams-framework/src/main/java/org/hypertrace/core/kafkastreams/framework/threading/DynamicStreamThreadsCountCalculator.java @@ -0,0 +1,124 @@ +package org.hypertrace.core.kafkastreams.framework.threading; + +import static java.util.stream.Collectors.toUnmodifiableSet; + +import java.time.Duration; +import java.util.HashMap; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import org.apache.kafka.clients.admin.AdminClient; +import org.apache.kafka.clients.admin.DescribeTopicsResult; +import org.apache.kafka.clients.admin.TopicDescription; +import org.apache.kafka.common.KafkaFuture; +import org.apache.kafka.common.errors.UnknownTopicOrPartitionException; +import org.apache.kafka.streams.Topology; +import org.apache.kafka.streams.TopologyDescription; +import org.apache.kafka.streams.TopologyDescription.Source; +import org.apache.kafka.streams.TopologyDescription.Subtopology; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Computes a per-instance {@code num.stream.threads} value from a topology and the partition count + * of every source topic. + * + *

For each sub-topology the maximum partition count across its source topics is the number of + * stream tasks. Summing across sub-topologies and dividing by the replica count yields the threads + * each instance should run to keep all tasks active without idle threads. + */ +public class DynamicStreamThreadsCountCalculator { + + private static final long DESCRIBE_TOPICS_TIMEOUT_MILLIS = Duration.ofSeconds(5).toMillis(); + private static final Logger logger = + LoggerFactory.getLogger(DynamicStreamThreadsCountCalculator.class); + + private static Set sourceTopicsOf(final Subtopology subtopology) { + return subtopology.nodes().stream() + .filter(node -> node instanceof Source) + .map(node -> (Source) node) + .flatMap(source -> source.topicSet().stream()) + .collect(toUnmodifiableSet()); + } + + public int compute(final Topology topology, final AdminClient adminClient, final int replicas) { + if (replicas <= 0) { + throw new IllegalArgumentException("replicas must be positive, got " + replicas); + } + + final TopologyDescription description = topology.describe(); + final Set sourceTopics = + description.subtopologies().stream() + .flatMap(subtopology -> sourceTopicsOf(subtopology).stream()) + .collect(toUnmodifiableSet()); + + final Map partitionsByTopic = describePartitions(adminClient, sourceTopics); + + int totalTasks = 0; + int subtopologyCount = 0; + for (final Subtopology subtopology : description.subtopologies()) { + subtopologyCount++; + final Set subtopologyTopics = sourceTopicsOf(subtopology); + + final int tasksForSubtopology = + subtopologyTopics.stream() + .mapToInt(topic -> partitionsByTopic.getOrDefault(topic, 0)) + .max() + .orElse(0); + + if (tasksForSubtopology == 0) { + logger.warn( + "Sub-topology has no resolvable partitions; topics={}. Pod restart will be needed once topics exist.", + subtopologyTopics); + } + totalTasks += tasksForSubtopology; + } + + final int threads = totalTasks == 0 ? 1 : (int) Math.ceil((double) totalTasks / replicas); + logger.info( + "Dynamic num.stream.threads: totalTasks={} across {} sub-topologies, replicas={}, computed={}", + totalTasks, + subtopologyCount, + replicas, + threads); + return threads; + } + + private Map describePartitions( + final AdminClient adminClient, final Set topics) { + if (topics.isEmpty()) { + return Map.of(); + } + final DescribeTopicsResult result = adminClient.describeTopics(topics); + final Map partitions = new HashMap<>(); + for (final Entry> entry : + result.topicNameValues().entrySet()) { + try { + final TopicDescription description = + entry.getValue().get(DESCRIBE_TOPICS_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS); + partitions.put(entry.getKey(), description.partitions().size()); + } catch (final ExecutionException executionException) { + if (executionException.getCause() instanceof UnknownTopicOrPartitionException) { + logger.warn( + "Topic absent on broker: {}. Treating as 0 partitions; restart needed once created.", + entry.getKey()); + partitions.put(entry.getKey(), 0); + } else { + throw new RuntimeException( + "Failed to describe topic " + entry.getKey(), executionException); + } + } catch (final InterruptedException interruptedException) { + Thread.currentThread().interrupt(); + throw new RuntimeException( + "Interrupted while describing topic " + entry.getKey(), interruptedException); + } catch (final TimeoutException timeoutException) { + throw new RuntimeException( + "Timed out describing topic " + entry.getKey(), timeoutException); + } + } + return Map.copyOf(partitions); + } +} diff --git a/kafka-streams-framework/src/main/java/org/hypertrace/core/kafkastreams/framework/threading/StreamThreadsCountResolver.java b/kafka-streams-framework/src/main/java/org/hypertrace/core/kafkastreams/framework/threading/StreamThreadsCountResolver.java new file mode 100644 index 0000000..e3ae595 --- /dev/null +++ b/kafka-streams-framework/src/main/java/org/hypertrace/core/kafkastreams/framework/threading/StreamThreadsCountResolver.java @@ -0,0 +1,99 @@ +package org.hypertrace.core.kafkastreams.framework.threading; + +import java.util.Map; +import java.util.Optional; +import java.util.Properties; +import java.util.function.IntSupplier; +import org.apache.kafka.clients.admin.AdminClient; +import org.apache.kafka.streams.Topology; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Resolves a concrete {@code num.stream.threads} value when the application has configured the + * {@link #DYNAMIC_SENTINEL} sentinel. Bridges the topology + AdminClient to the {@link + * DynamicStreamThreadsCountCalculator} and falls back to {@link #FALLBACK_NUM_STREAM_THREADS} when + * resolution fails. + * + *

The replica count is supplied externally — applications typically wire it from a {@code + * REPLICA_COUNT} environment variable injected by the deployment template. + */ +public class StreamThreadsCountResolver { + + public static final String DYNAMIC_SENTINEL = "DYNAMIC"; + static final int FALLBACK_NUM_STREAM_THREADS = 8; + + private static final Logger logger = LoggerFactory.getLogger(StreamThreadsCountResolver.class); + + private final DynamicStreamThreadsCountCalculator calculator; + private final IntSupplier replicaCountSupplier; + + public StreamThreadsCountResolver(final IntSupplier replicaCountSupplier) { + this(new DynamicStreamThreadsCountCalculator(), replicaCountSupplier); + } + + StreamThreadsCountResolver( + final DynamicStreamThreadsCountCalculator calculator, + final IntSupplier replicaCountSupplier) { + this.calculator = calculator; + this.replicaCountSupplier = replicaCountSupplier; + } + + /** + * Returns true if the {@code num.stream.threads} value in the given streams properties is the + * dynamic sentinel and should be resolved by this class. + */ + public static boolean isDynamic(final Map streamsProperties) { + final Object value = + streamsProperties.get(org.apache.kafka.streams.StreamsConfig.NUM_STREAM_THREADS_CONFIG); + return value != null && DYNAMIC_SENTINEL.equals(String.valueOf(value)); + } + + /** + * Resolve a concrete thread count for the given topology. Returns {@link + * #FALLBACK_NUM_STREAM_THREADS} on any failure so the application can still start. + */ + public int resolve(final Topology topology, final Map streamsProperties) { + try { + final int replicas = requirePositiveReplicaCount(); + try (final AdminClient adminClient = AdminClient.create(toProperties(streamsProperties))) { + return calculator.compute(topology, adminClient, replicas); + } + } catch (final RuntimeException runtimeException) { + logger.error( + "Failed to compute dynamic num.stream.threads; falling back to {}", + FALLBACK_NUM_STREAM_THREADS, + runtimeException); + return FALLBACK_NUM_STREAM_THREADS; + } + } + + /** + * Convenience adapter — given an {@code Optional} replica-count source (the common + * config-loaded shape), produce a supplier that this resolver accepts. + */ + public static IntSupplier optionalReplicaCount(final Optional replicaCount) { + return () -> + replicaCount.orElseThrow( + () -> new IllegalStateException("replica.count is not configured")); + } + + private static Properties toProperties(final Map streamsProperties) { + final Properties properties = new Properties(); + streamsProperties.forEach( + (key, value) -> { + if (value != null) { + properties.put(key, value); + } + }); + return properties; + } + + private int requirePositiveReplicaCount() { + final int replicas = replicaCountSupplier.getAsInt(); + if (replicas <= 0) { + throw new IllegalStateException("replica.count must be positive, got " + replicas); + } + return replicas; + } +} diff --git a/kafka-streams-framework/src/test/java/org/hypertrace/core/kafkastreams/framework/threading/DynamicStreamThreadsCountCalculatorTest.java b/kafka-streams-framework/src/test/java/org/hypertrace/core/kafkastreams/framework/threading/DynamicStreamThreadsCountCalculatorTest.java new file mode 100644 index 0000000..f00aa52 --- /dev/null +++ b/kafka-streams-framework/src/test/java/org/hypertrace/core/kafkastreams/framework/threading/DynamicStreamThreadsCountCalculatorTest.java @@ -0,0 +1,117 @@ +package org.hypertrace.core.kafkastreams.framework.threading; + +import static java.util.stream.Collectors.toUnmodifiableList; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.mockito.ArgumentMatchers.anySet; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.IntStream; +import org.apache.kafka.clients.admin.AdminClient; +import org.apache.kafka.clients.admin.DescribeTopicsResult; +import org.apache.kafka.clients.admin.TopicDescription; +import org.apache.kafka.common.KafkaFuture; +import org.apache.kafka.common.Node; +import org.apache.kafka.common.TopicPartitionInfo; +import org.apache.kafka.common.errors.UnknownTopicOrPartitionException; +import org.apache.kafka.common.internals.KafkaFutureImpl; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.Topology; +import org.apache.kafka.streams.kstream.Consumed; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; + +@ExtendWith(MockitoExtension.class) +class DynamicStreamThreadsCountCalculatorTest { + + private static final int ABSENT = -1; + + private final DynamicStreamThreadsCountCalculator calculator = + new DynamicStreamThreadsCountCalculator(); + + @Mock private AdminClient adminClient; + + @Test + void singleSubtopologyWithThirtyPartitionsAndEightReplicasGivesFourThreads() { + final Topology topology = topologyForSubtopologies(Set.of("topic-a")); + stubPartitions(Map.of("topic-a", 30)); + + assertEquals(4, calculator.compute(topology, adminClient, 8)); + } + + @Test + void twoSubtopologiesAreSummedThenDividedByReplicas() { + final Topology topology = topologyForSubtopologies(Set.of("topic-a"), Set.of("topic-b")); + stubPartitions(Map.of("topic-a", 10, "topic-b", 20)); + + assertEquals(4, calculator.compute(topology, adminClient, 8)); + } + + @Test + void absentTopicCountsAsZeroPartitions() { + final Topology topology = topologyForSubtopologies(Set.of("topic-a"), Set.of("topic-missing")); + stubPartitions(Map.of("topic-a", 16, "topic-missing", ABSENT)); + + assertEquals(2, calculator.compute(topology, adminClient, 8)); + } + + @Test + void zeroOrNegativeReplicasThrows() { + final Topology topology = topologyForSubtopologies(Set.of("topic-a")); + + assertThrows( + IllegalArgumentException.class, () -> calculator.compute(topology, adminClient, 0)); + assertThrows( + IllegalArgumentException.class, () -> calculator.compute(topology, adminClient, -1)); + } + + @Test + void totalTasksZeroReturnsOne() { + final Topology topology = topologyForSubtopologies(Set.of("topic-a")); + stubPartitions(Map.of("topic-a", ABSENT)); + + assertEquals(1, calculator.compute(topology, adminClient, 8)); + } + + @SafeVarargs + private static Topology topologyForSubtopologies(final Set... topicsBySubtopology) { + final StreamsBuilder builder = new StreamsBuilder(); + for (final Set topics : topicsBySubtopology) { + for (final String topic : topics) { + builder.stream(topic, Consumed.with(Serdes.String(), Serdes.String())) + .foreach((key, value) -> {}); + } + } + return builder.build(); + } + + private void stubPartitions(final Map partitionsByTopic) { + final DescribeTopicsResult result = mock(DescribeTopicsResult.class); + final Map> futures = new HashMap<>(); + partitionsByTopic.forEach( + (topic, partitionCount) -> futures.put(topic, futureFor(topic, partitionCount))); + when(result.topicNameValues()).thenReturn(futures); + when(adminClient.describeTopics(anySet())).thenReturn(result); + } + + private static KafkaFuture futureFor(final String topic, final int partitions) { + if (partitions == ABSENT) { + final KafkaFutureImpl failed = new KafkaFutureImpl<>(); + failed.completeExceptionally(new UnknownTopicOrPartitionException(topic)); + return failed; + } + final List partitionInfos = + IntStream.range(0, partitions) + .mapToObj(index -> new TopicPartitionInfo(index, Node.noNode(), List.of(), List.of())) + .collect(toUnmodifiableList()); + return KafkaFuture.completedFuture(new TopicDescription(topic, false, partitionInfos)); + } +} diff --git a/kafka-streams-framework/src/test/java/org/hypertrace/core/kafkastreams/framework/threading/StreamThreadsCountResolverTest.java b/kafka-streams-framework/src/test/java/org/hypertrace/core/kafkastreams/framework/threading/StreamThreadsCountResolverTest.java new file mode 100644 index 0000000..e500288 --- /dev/null +++ b/kafka-streams-framework/src/test/java/org/hypertrace/core/kafkastreams/framework/threading/StreamThreadsCountResolverTest.java @@ -0,0 +1,107 @@ +package org.hypertrace.core.kafkastreams.framework.threading; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.when; + +import java.util.Map; +import java.util.Optional; +import java.util.function.IntSupplier; +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.Topology; +import org.apache.kafka.streams.kstream.Consumed; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; + +@ExtendWith(MockitoExtension.class) +class StreamThreadsCountResolverTest { + + @Mock private DynamicStreamThreadsCountCalculator calculator; + + @Test + void calculatorThrowFallsBackToEight() { + final Topology topology = simpleTopology(); + when(calculator.compute(eq(topology), any(), eq(8))) + .thenThrow(new RuntimeException("simulated broker outage")); + final StreamThreadsCountResolver resolver = new StreamThreadsCountResolver(calculator, () -> 8); + + final int resolved = resolver.resolve(topology, bootstrapProperties()); + + assertEquals(StreamThreadsCountResolver.FALLBACK_NUM_STREAM_THREADS, resolved); + } + + @Test + void missingReplicaCountFallsBack() { + final IntSupplier missing = StreamThreadsCountResolver.optionalReplicaCount(Optional.empty()); + final StreamThreadsCountResolver resolver = new StreamThreadsCountResolver(calculator, missing); + + final int resolved = resolver.resolve(simpleTopology(), bootstrapProperties()); + + assertEquals(StreamThreadsCountResolver.FALLBACK_NUM_STREAM_THREADS, resolved); + } + + @Test + void zeroReplicaCountFallsBack() { + final StreamThreadsCountResolver resolver = new StreamThreadsCountResolver(calculator, () -> 0); + + final int resolved = resolver.resolve(simpleTopology(), bootstrapProperties()); + + assertEquals(StreamThreadsCountResolver.FALLBACK_NUM_STREAM_THREADS, resolved); + } + + @Test + void negativeReplicaCountFallsBack() { + final StreamThreadsCountResolver resolver = + new StreamThreadsCountResolver(calculator, () -> -1); + + final int resolved = resolver.resolve(simpleTopology(), bootstrapProperties()); + + assertEquals(StreamThreadsCountResolver.FALLBACK_NUM_STREAM_THREADS, resolved); + } + + @Test + void delegatesToCalculatorWithConfiguredReplicas() { + final Topology topology = simpleTopology(); + when(calculator.compute(eq(topology), any(), eq(8))).thenReturn(7); + final StreamThreadsCountResolver resolver = new StreamThreadsCountResolver(calculator, () -> 8); + + final int resolved = resolver.resolve(topology, bootstrapProperties()); + + assertEquals(7, resolved); + } + + @Test + void isDynamicTrueWhenSentinelSet() { + assertEquals( + true, + StreamThreadsCountResolver.isDynamic( + Map.of( + StreamsConfig.NUM_STREAM_THREADS_CONFIG, + StreamThreadsCountResolver.DYNAMIC_SENTINEL))); + } + + @Test + void isDynamicFalseForNumericValueOrAbsent() { + assertEquals( + false, + StreamThreadsCountResolver.isDynamic(Map.of(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 4))); + assertEquals(false, StreamThreadsCountResolver.isDynamic(Map.of())); + } + + private static Topology simpleTopology() { + final StreamsBuilder builder = new StreamsBuilder(); + builder.stream("topic-a", Consumed.with(Serdes.String(), Serdes.String())) + .foreach((key, value) -> {}); + return builder.build(); + } + + private static Map bootstrapProperties() { + return Map.of(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); + } +} From 62995170efd4c8afdae4c8e4a7f6e1a834fc90c1 Mon Sep 17 00:00:00 2001 From: Suresh Prakash Date: Mon, 1 Jun 2026 12:09:39 +0530 Subject: [PATCH 2/8] Expand getStreamThreadsCountResolver() javadoc with override example Adds a concrete code snippet showing how a consumer app wires its own replica-count source through StreamThreadsCountResolver, so adopters don't have to read the calculator/resolver internals to integrate. Also clarifies that apps not overriding the hook are unaffected. Co-Authored-By: Claude Opus 4.7 --- .../framework/KafkaStreamsApp.java | 20 ++++++++++++++++++- 1 file changed, 19 insertions(+), 1 deletion(-) diff --git a/kafka-streams-framework/src/main/java/org/hypertrace/core/kafkastreams/framework/KafkaStreamsApp.java b/kafka-streams-framework/src/main/java/org/hypertrace/core/kafkastreams/framework/KafkaStreamsApp.java index 44612b4..5fc1c2b 100644 --- a/kafka-streams-framework/src/main/java/org/hypertrace/core/kafkastreams/framework/KafkaStreamsApp.java +++ b/kafka-streams-framework/src/main/java/org/hypertrace/core/kafkastreams/framework/KafkaStreamsApp.java @@ -264,9 +264,27 @@ public abstract StreamsBuilder buildTopology( /** * Override in subclasses that want auto-sized {@code num.stream.threads}. Return a resolver - * configured with this app's replica count source. The framework only invokes this when {@code + * configured with this app's replica-count source. The framework only invokes this when {@code * num.stream.threads} is set to {@link StreamThreadsCountResolver#DYNAMIC_SENTINEL}; returning an * empty optional disables dynamic resolution and the app keeps whatever value was configured. + * + *

Apps that don't override this are unaffected — the default returns {@code Optional.empty()} + * and the framework leaves {@code num.stream.threads} exactly as configured. Apps that do + * override should supply replica count from wherever the deployment exposes it (e.g. the {@code + * REPLICA_COUNT} environment variable injected by the k8s template, or a HOCON config key). + * + *

Example: + * + *

{@code
+   * @Override
+   * protected Optional getStreamThreadsCountResolver() {
+   *   final Optional replicaCount =
+   *       ConfigUtils.optionalInteger(getAppConfig(), "replica.count");
+   *   return Optional.of(
+   *       new StreamThreadsCountResolver(
+   *           StreamThreadsCountResolver.optionalReplicaCount(replicaCount)));
+   * }
+   * }
*/ protected Optional getStreamThreadsCountResolver() { return Optional.empty(); From c305f2473ca30abf9e389f4836d7eaf539d0c7ce Mon Sep 17 00:00:00 2001 From: Suresh Prakash Date: Tue, 2 Jun 2026 13:41:20 +0530 Subject: [PATCH 3/8] Address review: inject AdminClient factory, narrow exception scope MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Invert resolver dep: take AdminClient factory in constructor instead of building inside resolve(). Removes the package-private test-only constructor and lets the test mock the actual external (AdminClient) while exercising the real DynamicStreamThreadsCountCalculator. - Drop @Mock on the calculator in the resolver test; route stubs through AdminClient.describeTopics(...) so the test asserts on the real partition→threads math instead of stubbed return values. - Narrow catch (Throwable) to catch (Exception) in resolveDynamicStreamThreads so JVM Errors (OOM, LinkageError) keep surfacing instead of being silently swallowed at startup. - Return OptionalInt from resolveDynamicStreamThreads and apply the override at the doInit() call site, so the helper no longer mutates the caller's map under a query-shaped name. Co-Authored-By: Claude Opus 4.7 --- .../framework/KafkaStreamsApp.java | 24 +++--- .../threading/StreamThreadsCountResolver.java | 13 +++- .../StreamThreadsCountResolverTest.java | 74 +++++++++++++------ 3 files changed, 75 insertions(+), 36 deletions(-) diff --git a/kafka-streams-framework/src/main/java/org/hypertrace/core/kafkastreams/framework/KafkaStreamsApp.java b/kafka-streams-framework/src/main/java/org/hypertrace/core/kafkastreams/framework/KafkaStreamsApp.java index 5fc1c2b..4445bf2 100644 --- a/kafka-streams-framework/src/main/java/org/hypertrace/core/kafkastreams/framework/KafkaStreamsApp.java +++ b/kafka-streams-framework/src/main/java/org/hypertrace/core/kafkastreams/framework/KafkaStreamsApp.java @@ -36,6 +36,7 @@ import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.OptionalInt; import java.util.Properties; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; @@ -109,7 +110,8 @@ protected void doInit() { streamsBuilder = buildTopology(streamsConfig, streamsBuilder, sourceStreams); this.topology = streamsBuilder.build(); - resolveDynamicStreamThreads(streamsConfig); + resolveDynamicStreamThreads(streamsConfig) + .ifPresent(threads -> streamsConfig.put(NUM_STREAM_THREADS_CONFIG, threads)); getLogger().info("Finalized kafka streams configuration: {}", streamsConfig); @@ -290,13 +292,13 @@ protected Optional getStreamThreadsCountResolver() { return Optional.empty(); } - // Defensive: any unexpected failure in dynamic resolution must NOT prevent the app from starting. - // If something goes wrong, log and leave streamsProperties untouched so Kafka Streams uses - // whatever value was originally configured (or its own default). - private void resolveDynamicStreamThreads(Map streamsProperties) { + // Defensive: any unexpected runtime failure in dynamic resolution must NOT prevent the app from + // starting. If something goes wrong, log and return empty so the caller leaves the streams + // config untouched. JVM Errors (OOM, LinkageError, etc.) propagate — those are not recoverable. + private OptionalInt resolveDynamicStreamThreads(Map streamsProperties) { try { if (!StreamThreadsCountResolver.isDynamic(streamsProperties)) { - return; + return OptionalInt.empty(); } Optional resolver = getStreamThreadsCountResolver(); if (resolver.isEmpty()) { @@ -304,16 +306,16 @@ private void resolveDynamicStreamThreads(Map streamsProperties) .warn( "{} is set to DYNAMIC but no StreamThreadsCountResolver is provided; leaving as-is", NUM_STREAM_THREADS_CONFIG); - return; + return OptionalInt.empty(); } - int resolved = resolver.get().resolve(this.topology, streamsProperties); - streamsProperties.put(NUM_STREAM_THREADS_CONFIG, resolved); - } catch (Throwable throwable) { + return OptionalInt.of(resolver.get().resolve(this.topology, streamsProperties)); + } catch (Exception exception) { getLogger() .error( "Unexpected error resolving dynamic {}; leaving config untouched", NUM_STREAM_THREADS_CONFIG, - throwable); + exception); + return OptionalInt.empty(); } } diff --git a/kafka-streams-framework/src/main/java/org/hypertrace/core/kafkastreams/framework/threading/StreamThreadsCountResolver.java b/kafka-streams-framework/src/main/java/org/hypertrace/core/kafkastreams/framework/threading/StreamThreadsCountResolver.java index e3ae595..cca70d1 100644 --- a/kafka-streams-framework/src/main/java/org/hypertrace/core/kafkastreams/framework/threading/StreamThreadsCountResolver.java +++ b/kafka-streams-framework/src/main/java/org/hypertrace/core/kafkastreams/framework/threading/StreamThreadsCountResolver.java @@ -3,6 +3,7 @@ import java.util.Map; import java.util.Optional; import java.util.Properties; +import java.util.function.Function; import java.util.function.IntSupplier; import org.apache.kafka.clients.admin.AdminClient; import org.apache.kafka.streams.Topology; @@ -27,16 +28,19 @@ public class StreamThreadsCountResolver { private final DynamicStreamThreadsCountCalculator calculator; private final IntSupplier replicaCountSupplier; + private final Function adminClientFactory; public StreamThreadsCountResolver(final IntSupplier replicaCountSupplier) { - this(new DynamicStreamThreadsCountCalculator(), replicaCountSupplier); + this(new DynamicStreamThreadsCountCalculator(), replicaCountSupplier, AdminClient::create); } - StreamThreadsCountResolver( + public StreamThreadsCountResolver( final DynamicStreamThreadsCountCalculator calculator, - final IntSupplier replicaCountSupplier) { + final IntSupplier replicaCountSupplier, + final Function adminClientFactory) { this.calculator = calculator; this.replicaCountSupplier = replicaCountSupplier; + this.adminClientFactory = adminClientFactory; } /** @@ -56,7 +60,8 @@ public static boolean isDynamic(final Map streamsProperties) { public int resolve(final Topology topology, final Map streamsProperties) { try { final int replicas = requirePositiveReplicaCount(); - try (final AdminClient adminClient = AdminClient.create(toProperties(streamsProperties))) { + try (final AdminClient adminClient = + adminClientFactory.apply(toProperties(streamsProperties))) { return calculator.compute(topology, adminClient, replicas); } } catch (final RuntimeException runtimeException) { diff --git a/kafka-streams-framework/src/test/java/org/hypertrace/core/kafkastreams/framework/threading/StreamThreadsCountResolverTest.java b/kafka-streams-framework/src/test/java/org/hypertrace/core/kafkastreams/framework/threading/StreamThreadsCountResolverTest.java index e500288..71cd7d5 100644 --- a/kafka-streams-framework/src/test/java/org/hypertrace/core/kafkastreams/framework/threading/StreamThreadsCountResolverTest.java +++ b/kafka-streams-framework/src/test/java/org/hypertrace/core/kafkastreams/framework/threading/StreamThreadsCountResolverTest.java @@ -1,37 +1,43 @@ package org.hypertrace.core.kafkastreams.framework.threading; +import static java.util.stream.Collectors.toUnmodifiableList; import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.ArgumentMatchers.anySet; +import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; +import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.Properties; +import java.util.function.Function; import java.util.function.IntSupplier; +import java.util.stream.IntStream; import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.admin.AdminClient; +import org.apache.kafka.clients.admin.DescribeTopicsResult; +import org.apache.kafka.clients.admin.TopicDescription; +import org.apache.kafka.common.KafkaFuture; +import org.apache.kafka.common.Node; +import org.apache.kafka.common.TopicPartitionInfo; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.Topology; import org.apache.kafka.streams.kstream.Consumed; import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.extension.ExtendWith; -import org.mockito.Mock; -import org.mockito.junit.jupiter.MockitoExtension; -@ExtendWith(MockitoExtension.class) class StreamThreadsCountResolverTest { - @Mock private DynamicStreamThreadsCountCalculator calculator; - @Test void calculatorThrowFallsBackToEight() { - final Topology topology = simpleTopology(); - when(calculator.compute(eq(topology), any(), eq(8))) + final AdminClient adminClient = mock(AdminClient.class); + when(adminClient.describeTopics(anySet())) .thenThrow(new RuntimeException("simulated broker outage")); - final StreamThreadsCountResolver resolver = new StreamThreadsCountResolver(calculator, () -> 8); + final StreamThreadsCountResolver resolver = resolverWith(() -> 8, adminClient); - final int resolved = resolver.resolve(topology, bootstrapProperties()); + final int resolved = resolver.resolve(simpleTopology(), bootstrapProperties()); assertEquals(StreamThreadsCountResolver.FALLBACK_NUM_STREAM_THREADS, resolved); } @@ -39,7 +45,7 @@ void calculatorThrowFallsBackToEight() { @Test void missingReplicaCountFallsBack() { final IntSupplier missing = StreamThreadsCountResolver.optionalReplicaCount(Optional.empty()); - final StreamThreadsCountResolver resolver = new StreamThreadsCountResolver(calculator, missing); + final StreamThreadsCountResolver resolver = resolverWith(missing, mock(AdminClient.class)); final int resolved = resolver.resolve(simpleTopology(), bootstrapProperties()); @@ -48,7 +54,7 @@ void missingReplicaCountFallsBack() { @Test void zeroReplicaCountFallsBack() { - final StreamThreadsCountResolver resolver = new StreamThreadsCountResolver(calculator, () -> 0); + final StreamThreadsCountResolver resolver = resolverWith(() -> 0, mock(AdminClient.class)); final int resolved = resolver.resolve(simpleTopology(), bootstrapProperties()); @@ -57,8 +63,7 @@ void zeroReplicaCountFallsBack() { @Test void negativeReplicaCountFallsBack() { - final StreamThreadsCountResolver resolver = - new StreamThreadsCountResolver(calculator, () -> -1); + final StreamThreadsCountResolver resolver = resolverWith(() -> -1, mock(AdminClient.class)); final int resolved = resolver.resolve(simpleTopology(), bootstrapProperties()); @@ -67,13 +72,15 @@ void negativeReplicaCountFallsBack() { @Test void delegatesToCalculatorWithConfiguredReplicas() { - final Topology topology = simpleTopology(); - when(calculator.compute(eq(topology), any(), eq(8))).thenReturn(7); - final StreamThreadsCountResolver resolver = new StreamThreadsCountResolver(calculator, () -> 8); + // 30-partition source / 8 replicas -> ceil(30/8) = 4 threads. Real calculator computes this; + // only AdminClient (the external) is stubbed. + final AdminClient adminClient = mock(AdminClient.class); + stubPartitions(adminClient, Map.of("topic-a", 30)); + final StreamThreadsCountResolver resolver = resolverWith(() -> 8, adminClient); - final int resolved = resolver.resolve(topology, bootstrapProperties()); + final int resolved = resolver.resolve(simpleTopology(), bootstrapProperties()); - assertEquals(7, resolved); + assertEquals(4, resolved); } @Test @@ -94,6 +101,13 @@ void isDynamicFalseForNumericValueOrAbsent() { assertEquals(false, StreamThreadsCountResolver.isDynamic(Map.of())); } + private static StreamThreadsCountResolver resolverWith( + final IntSupplier replicaCountSupplier, final AdminClient adminClient) { + final Function factory = properties -> adminClient; + return new StreamThreadsCountResolver( + new DynamicStreamThreadsCountCalculator(), replicaCountSupplier, factory); + } + private static Topology simpleTopology() { final StreamsBuilder builder = new StreamsBuilder(); builder.stream("topic-a", Consumed.with(Serdes.String(), Serdes.String())) @@ -104,4 +118,22 @@ private static Topology simpleTopology() { private static Map bootstrapProperties() { return Map.of(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); } + + private static void stubPartitions( + final AdminClient adminClient, final Map partitionsByTopic) { + final DescribeTopicsResult result = mock(DescribeTopicsResult.class); + final Map> futures = new HashMap<>(); + partitionsByTopic.forEach( + (topic, partitionCount) -> futures.put(topic, futureFor(topic, partitionCount))); + when(result.topicNameValues()).thenReturn(futures); + when(adminClient.describeTopics(anySet())).thenReturn(result); + } + + private static KafkaFuture futureFor(final String topic, final int partitions) { + final List partitionInfos = + IntStream.range(0, partitions) + .mapToObj(index -> new TopicPartitionInfo(index, Node.noNode(), List.of(), List.of())) + .collect(toUnmodifiableList()); + return KafkaFuture.completedFuture(new TopicDescription(topic, false, partitionInfos)); + } } From 736f37d4c2ac2db47a16532b1a92195139379235 Mon Sep 17 00:00:00 2001 From: Suresh Prakash Date: Wed, 3 Jun 2026 13:48:50 +0530 Subject: [PATCH 4/8] Address review: explicit fallback gates, single-deadline allOf - Replace throw-and-catch in resolver with explicit if-gate: a non-positive replica count (zero, negative, or absent) logs and returns FALLBACK_NUM_STREAM_THREADS. optionalReplicaCount() now yields 0 instead of throwing. - Calculator: replace per-topic get(timeout) loop with KafkaFuture.allOf(...).get(timeout) so worst-case startup wait is one deadline instead of N. Per-future UnknownTopicOrPartitionException handling is preserved. - KafkaStreamsApp: when num.stream.threads = DYNAMIC and no resolver is provided (or the override throws), substitute FALLBACK_NUM_STREAM_THREADS so the literal "DYNAMIC" never reaches Kafka Streams config (it can't parse the string). - Promote FALLBACK_NUM_STREAM_THREADS to public so callers can reference the same constant. - Document regex/pattern subscription limitation in the calculator javadoc. - Document fallback rationale on FALLBACK_NUM_STREAM_THREADS. Co-Authored-By: Claude Opus 4.7 --- .../framework/KafkaStreamsApp.java | 46 +++++++++---------- .../DynamicStreamThreadsCountCalculator.java | 39 ++++++++++++---- .../threading/StreamThreadsCountResolver.java | 43 ++++++++--------- 3 files changed, 76 insertions(+), 52 deletions(-) diff --git a/kafka-streams-framework/src/main/java/org/hypertrace/core/kafkastreams/framework/KafkaStreamsApp.java b/kafka-streams-framework/src/main/java/org/hypertrace/core/kafkastreams/framework/KafkaStreamsApp.java index 4445bf2..f478072 100644 --- a/kafka-streams-framework/src/main/java/org/hypertrace/core/kafkastreams/framework/KafkaStreamsApp.java +++ b/kafka-streams-framework/src/main/java/org/hypertrace/core/kafkastreams/framework/KafkaStreamsApp.java @@ -36,7 +36,6 @@ import java.util.List; import java.util.Map; import java.util.Optional; -import java.util.OptionalInt; import java.util.Properties; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; @@ -110,8 +109,9 @@ protected void doInit() { streamsBuilder = buildTopology(streamsConfig, streamsBuilder, sourceStreams); this.topology = streamsBuilder.build(); - resolveDynamicStreamThreads(streamsConfig) - .ifPresent(threads -> streamsConfig.put(NUM_STREAM_THREADS_CONFIG, threads)); + if (StreamThreadsCountResolver.isDynamic(streamsConfig)) { + streamsConfig.put(NUM_STREAM_THREADS_CONFIG, resolveDynamicStreamThreads(streamsConfig)); + } getLogger().info("Finalized kafka streams configuration: {}", streamsConfig); @@ -292,31 +292,31 @@ protected Optional getStreamThreadsCountResolver() { return Optional.empty(); } - // Defensive: any unexpected runtime failure in dynamic resolution must NOT prevent the app from - // starting. If something goes wrong, log and return empty so the caller leaves the streams - // config untouched. JVM Errors (OOM, LinkageError, etc.) propagate — those are not recoverable. - private OptionalInt resolveDynamicStreamThreads(Map streamsProperties) { + // Caller must check StreamThreadsCountResolver.isDynamic(...) before invoking. Always returns + // a concrete value: either the resolver's computed count or FALLBACK_NUM_STREAM_THREADS. + // The string sentinel "DYNAMIC" must never leak back to Kafka Streams config — it can't be + // parsed as an integer. + private int resolveDynamicStreamThreads(Map streamsProperties) { + Optional resolver; try { - if (!StreamThreadsCountResolver.isDynamic(streamsProperties)) { - return OptionalInt.empty(); - } - Optional resolver = getStreamThreadsCountResolver(); - if (resolver.isEmpty()) { - getLogger() - .warn( - "{} is set to DYNAMIC but no StreamThreadsCountResolver is provided; leaving as-is", - NUM_STREAM_THREADS_CONFIG); - return OptionalInt.empty(); - } - return OptionalInt.of(resolver.get().resolve(this.topology, streamsProperties)); + resolver = getStreamThreadsCountResolver(); } catch (Exception exception) { getLogger() - .error( - "Unexpected error resolving dynamic {}; leaving config untouched", - NUM_STREAM_THREADS_CONFIG, + .warn( + "getStreamThreadsCountResolver() threw; falling back to {} stream threads", + StreamThreadsCountResolver.FALLBACK_NUM_STREAM_THREADS, exception); - return OptionalInt.empty(); + return StreamThreadsCountResolver.FALLBACK_NUM_STREAM_THREADS; + } + if (resolver.isEmpty()) { + getLogger() + .warn( + "{} is set to DYNAMIC but no StreamThreadsCountResolver is provided; falling back to {}", + NUM_STREAM_THREADS_CONFIG, + StreamThreadsCountResolver.FALLBACK_NUM_STREAM_THREADS); + return StreamThreadsCountResolver.FALLBACK_NUM_STREAM_THREADS; } + return resolver.get().resolve(this.topology, streamsProperties); } public Map getStreamsConfig(Config jobConfig) { diff --git a/kafka-streams-framework/src/main/java/org/hypertrace/core/kafkastreams/framework/threading/DynamicStreamThreadsCountCalculator.java b/kafka-streams-framework/src/main/java/org/hypertrace/core/kafkastreams/framework/threading/DynamicStreamThreadsCountCalculator.java index 23aa101..73888f2 100644 --- a/kafka-streams-framework/src/main/java/org/hypertrace/core/kafkastreams/framework/threading/DynamicStreamThreadsCountCalculator.java +++ b/kafka-streams-framework/src/main/java/org/hypertrace/core/kafkastreams/framework/threading/DynamicStreamThreadsCountCalculator.java @@ -29,6 +29,9 @@ *

For each sub-topology the maximum partition count across its source topics is the number of * stream tasks. Summing across sub-topologies and dividing by the replica count yields the threads * each instance should run to keep all tasks active without idle threads. + * + *

Regex/pattern subscriptions ({@link Source#topicPattern()}) are not supported — those + * sub-topologies contribute zero tasks. No current app subscribes via regex. */ public class DynamicStreamThreadsCountCalculator { @@ -93,13 +96,19 @@ private Map describePartitions( return Map.of(); } final DescribeTopicsResult result = adminClient.describeTopics(topics); + final Map> futures = result.topicNameValues(); + + // Single shared deadline across all topics — worst-case wait is one timeout, not N. + // AdminClient retries the underlying RPC internally per its own retry config. + awaitAll(futures.values(), DESCRIBE_TOPICS_TIMEOUT_MILLIS); + final Map partitions = new HashMap<>(); - for (final Entry> entry : - result.topicNameValues().entrySet()) { + for (final Entry> entry : futures.entrySet()) { try { - final TopicDescription description = - entry.getValue().get(DESCRIBE_TOPICS_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS); - partitions.put(entry.getKey(), description.partitions().size()); + // Non-blocking after awaitAll: each future is already complete (or completed + // exceptionally). Per-topic handling stays so UnknownTopicOrPartitionException + // doesn't fail the whole batch. + partitions.put(entry.getKey(), entry.getValue().get().partitions().size()); } catch (final ExecutionException executionException) { if (executionException.getCause() instanceof UnknownTopicOrPartitionException) { logger.warn( @@ -114,11 +123,25 @@ private Map describePartitions( Thread.currentThread().interrupt(); throw new RuntimeException( "Interrupted while describing topic " + entry.getKey(), interruptedException); - } catch (final TimeoutException timeoutException) { - throw new RuntimeException( - "Timed out describing topic " + entry.getKey(), timeoutException); } } return Map.copyOf(partitions); } + + private static void awaitAll( + final java.util.Collection> futures, final long timeoutMillis) { + try { + KafkaFuture.allOf(futures.toArray(KafkaFuture[]::new)) + .get(timeoutMillis, TimeUnit.MILLISECONDS); + } catch (final TimeoutException timeoutException) { + throw new RuntimeException( + "Timed out describing topics after " + timeoutMillis + "ms", timeoutException); + } catch (final InterruptedException interruptedException) { + Thread.currentThread().interrupt(); + throw new RuntimeException("Interrupted describing topics", interruptedException); + } catch (final ExecutionException executionException) { + // At least one topic future completed exceptionally; per-future handling below classifies + // (UnknownTopicOrPartitionException → 0 partitions; everything else → fail). + } + } } diff --git a/kafka-streams-framework/src/main/java/org/hypertrace/core/kafkastreams/framework/threading/StreamThreadsCountResolver.java b/kafka-streams-framework/src/main/java/org/hypertrace/core/kafkastreams/framework/threading/StreamThreadsCountResolver.java index cca70d1..2a451df 100644 --- a/kafka-streams-framework/src/main/java/org/hypertrace/core/kafkastreams/framework/threading/StreamThreadsCountResolver.java +++ b/kafka-streams-framework/src/main/java/org/hypertrace/core/kafkastreams/framework/threading/StreamThreadsCountResolver.java @@ -17,12 +17,16 @@ * resolution fails. * *

The replica count is supplied externally — applications typically wire it from a {@code - * REPLICA_COUNT} environment variable injected by the deployment template. + * REPLICA_COUNT} environment variable injected by the deployment template. A non-positive value + * (zero, negative, or unset) falls back to {@link #FALLBACK_NUM_STREAM_THREADS}. */ public class StreamThreadsCountResolver { public static final String DYNAMIC_SENTINEL = "DYNAMIC"; - static final int FALLBACK_NUM_STREAM_THREADS = 8; + + // Reasonable default for current Hypertrace Kafka Streams apps. Apps that need a different + // value should configure a numeric num.stream.threads explicitly instead of using DYNAMIC. + public static final int FALLBACK_NUM_STREAM_THREADS = 8; private static final Logger logger = LoggerFactory.getLogger(StreamThreadsCountResolver.class); @@ -55,15 +59,21 @@ public static boolean isDynamic(final Map streamsProperties) { /** * Resolve a concrete thread count for the given topology. Returns {@link - * #FALLBACK_NUM_STREAM_THREADS} on any failure so the application can still start. + * #FALLBACK_NUM_STREAM_THREADS} when prerequisites are missing (non-positive replica count) or + * the AdminClient/calculator call fails, so the application can still start. */ public int resolve(final Topology topology, final Map streamsProperties) { - try { - final int replicas = requirePositiveReplicaCount(); - try (final AdminClient adminClient = - adminClientFactory.apply(toProperties(streamsProperties))) { - return calculator.compute(topology, adminClient, replicas); - } + final int replicas = replicaCountSupplier.getAsInt(); + if (replicas <= 0) { + logger.warn( + "replica.count is non-positive ({}); falling back to {} stream threads", + replicas, + FALLBACK_NUM_STREAM_THREADS); + return FALLBACK_NUM_STREAM_THREADS; + } + try (final AdminClient adminClient = + adminClientFactory.apply(toProperties(streamsProperties))) { + return calculator.compute(topology, adminClient, replicas); } catch (final RuntimeException runtimeException) { logger.error( "Failed to compute dynamic num.stream.threads; falling back to {}", @@ -75,12 +85,11 @@ public int resolve(final Topology topology, final Map streamsPro /** * Convenience adapter — given an {@code Optional} replica-count source (the common - * config-loaded shape), produce a supplier that this resolver accepts. + * config-loaded shape), produce a supplier that yields the value when present and {@code 0} when + * absent (treated as a fallback signal by {@link #resolve}). */ public static IntSupplier optionalReplicaCount(final Optional replicaCount) { - return () -> - replicaCount.orElseThrow( - () -> new IllegalStateException("replica.count is not configured")); + return () -> replicaCount.orElse(0); } private static Properties toProperties(final Map streamsProperties) { @@ -93,12 +102,4 @@ private static Properties toProperties(final Map streamsProperti }); return properties; } - - private int requirePositiveReplicaCount() { - final int replicas = replicaCountSupplier.getAsInt(); - if (replicas <= 0) { - throw new IllegalStateException("replica.count must be positive, got " + replicas); - } - return replicas; - } } From f8ae1b083c9948959f3ddded7cf2f05bed7918ca Mon Sep 17 00:00:00 2001 From: Suresh Prakash Date: Wed, 3 Jun 2026 14:31:43 +0530 Subject: [PATCH 5/8] Add tests for DYNAMIC sentinel fallback paths Cover the two regression cases surfaced in review: when num.stream.threads is set to DYNAMIC but the app has no resolver wired up, and when the resolver-supplier throws. In both cases the framework must replace the literal sentinel with FALLBACK_NUM_STREAM_THREADS so Kafka Streams gets a parseable integer. --- .../kafkastreams/framework/SampleAppTest.java | 53 +++++++++++++++++++ 1 file changed, 53 insertions(+) diff --git a/kafka-streams-framework/src/test/java/org/hypertrace/core/kafkastreams/framework/SampleAppTest.java b/kafka-streams-framework/src/test/java/org/hypertrace/core/kafkastreams/framework/SampleAppTest.java index fc4920e..4a60260 100644 --- a/kafka-streams-framework/src/test/java/org/hypertrace/core/kafkastreams/framework/SampleAppTest.java +++ b/kafka-streams-framework/src/test/java/org/hypertrace/core/kafkastreams/framework/SampleAppTest.java @@ -4,14 +4,17 @@ import static org.apache.kafka.streams.StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG; import static org.apache.kafka.streams.StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG; import static org.apache.kafka.streams.StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG; +import static org.apache.kafka.streams.StreamsConfig.NUM_STREAM_THREADS_CONFIG; import static org.apache.kafka.streams.StreamsConfig.ROCKSDB_CONFIG_SETTER_CLASS_CONFIG; import static org.apache.kafka.streams.StreamsConfig.producerPrefix; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.MatcherAssert.assertThat; +import com.typesafe.config.Config; import io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde; import java.util.Map; +import java.util.Optional; import java.util.Properties; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.TestInputTopic; @@ -19,6 +22,7 @@ import org.apache.kafka.streams.TopologyTestDriver; import org.apache.kafka.streams.errors.LogAndContinueExceptionHandler; import org.hypertrace.core.kafkastreams.framework.rocksdb.BoundedMemoryConfigSetter; +import org.hypertrace.core.kafkastreams.framework.threading.StreamThreadsCountResolver; import org.hypertrace.core.serviceframework.config.ConfigClientFactory; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; @@ -84,4 +88,53 @@ public void baseStreamsConfigTest() { is(LogAndContinueExceptionHandler.class)); assertThat(baseStreamsConfig.get(producerPrefix(ACKS_CONFIG)), is("all")); } + + // Verifies the fix for "DYNAMIC sentinel could leak to Kafka Streams config when no resolver is + // wired up" — without resolver override, the framework must replace DYNAMIC with the integer + // fallback so Kafka Streams receives a parseable value. + @Test + public void dynamicWithoutResolverFallsBackToEight() { + SampleApp dynamicApp = + new SampleApp(ConfigClientFactory.getClient()) { + @Override + public Map getStreamsConfig(Config jobConfig) { + Map properties = super.getStreamsConfig(jobConfig); + properties.put(NUM_STREAM_THREADS_CONFIG, StreamThreadsCountResolver.DYNAMIC_SENTINEL); + return properties; + } + }; + + dynamicApp.doInit(); + + assertThat( + dynamicApp.streamsConfig.get(NUM_STREAM_THREADS_CONFIG), + is(StreamThreadsCountResolver.FALLBACK_NUM_STREAM_THREADS)); + } + + // Verifies the fix for "exception while obtaining the resolver should not leak DYNAMIC" — when + // getStreamThreadsCountResolver() throws, the framework must catch and fall back to the integer + // default rather than letting the literal sentinel reach Kafka Streams. + @Test + public void dynamicWithThrowingResolverFallsBackToEight() { + SampleApp dynamicApp = + new SampleApp(ConfigClientFactory.getClient()) { + @Override + public Map getStreamsConfig(Config jobConfig) { + Map properties = super.getStreamsConfig(jobConfig); + properties.put(NUM_STREAM_THREADS_CONFIG, StreamThreadsCountResolver.DYNAMIC_SENTINEL); + return properties; + } + + @Override + protected Optional getStreamThreadsCountResolver() { + throw new RuntimeException("simulated wiring failure"); + } + }; + + dynamicApp.doInit(); + + assertThat( + dynamicApp.streamsConfig.get(NUM_STREAM_THREADS_CONFIG), + is(StreamThreadsCountResolver.FALLBACK_NUM_STREAM_THREADS)); + } } From 1c5c6ce20b521befc6182e757328014c8a07823a Mon Sep 17 00:00:00 2001 From: Suresh Prakash Date: Thu, 4 Jun 2026 16:25:30 +0530 Subject: [PATCH 6/8] Detect pattern sources, use OptionalInt result, inline awaitAll - Calculator returns OptionalInt; pre-pass on Source.topicPattern() returns empty when any sub-topology subscribes via regex (was silently undercount). - Resolver also returns OptionalInt; non-positive replicas and AdminClient failures map to empty rather than each falling back independently. - Framework collapses every "cannot resolve" path through one .orElse so FALLBACK_NUM_STREAM_THREADS is the only place the literal substitution happens. - describePartitions inlines the awaitAll helper into a single deadline- driven loop. Behaviour identical (allOf + per-future get and now+timeout + per-future get(remaining) both share one 5s budget; AdminClient fires RPCs concurrently before returning futures), readability improved. - New test covers regex-source topology -> empty all the way through. --- .../framework/KafkaStreamsApp.java | 19 +++-- .../DynamicStreamThreadsCountCalculator.java | 82 +++++++++++-------- .../threading/StreamThreadsCountResolver.java | 22 +++-- .../kafkastreams/framework/SampleAppTest.java | 42 ++++++++++ ...namicStreamThreadsCountCalculatorTest.java | 23 +++++- .../StreamThreadsCountResolverTest.java | 29 +++---- 6 files changed, 143 insertions(+), 74 deletions(-) diff --git a/kafka-streams-framework/src/main/java/org/hypertrace/core/kafkastreams/framework/KafkaStreamsApp.java b/kafka-streams-framework/src/main/java/org/hypertrace/core/kafkastreams/framework/KafkaStreamsApp.java index f478072..8f012b2 100644 --- a/kafka-streams-framework/src/main/java/org/hypertrace/core/kafkastreams/framework/KafkaStreamsApp.java +++ b/kafka-streams-framework/src/main/java/org/hypertrace/core/kafkastreams/framework/KafkaStreamsApp.java @@ -36,6 +36,7 @@ import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.OptionalInt; import java.util.Properties; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; @@ -292,10 +293,11 @@ protected Optional getStreamThreadsCountResolver() { return Optional.empty(); } - // Caller must check StreamThreadsCountResolver.isDynamic(...) before invoking. Always returns - // a concrete value: either the resolver's computed count or FALLBACK_NUM_STREAM_THREADS. - // The string sentinel "DYNAMIC" must never leak back to Kafka Streams config — it can't be - // parsed as an integer. + // Caller must check StreamThreadsCountResolver.isDynamic(...) before invoking. Always returns a + // concrete int — every "cannot resolve" path (resolver-supplier throws, no resolver wired, + // resolver returns empty for unsupported topologies / non-positive replicas / AdminClient + // failure) collapses to FALLBACK_NUM_STREAM_THREADS so the literal "DYNAMIC" sentinel never + // reaches Kafka Streams. private int resolveDynamicStreamThreads(Map streamsProperties) { Optional resolver; try { @@ -316,7 +318,14 @@ private int resolveDynamicStreamThreads(Map streamsProperties) { StreamThreadsCountResolver.FALLBACK_NUM_STREAM_THREADS); return StreamThreadsCountResolver.FALLBACK_NUM_STREAM_THREADS; } - return resolver.get().resolve(this.topology, streamsProperties); + final OptionalInt resolved = resolver.get().resolve(this.topology, streamsProperties); + if (resolved.isEmpty()) { + getLogger() + .warn( + "Resolver could not compute dynamic num.stream.threads; falling back to {}", + StreamThreadsCountResolver.FALLBACK_NUM_STREAM_THREADS); + } + return resolved.orElse(StreamThreadsCountResolver.FALLBACK_NUM_STREAM_THREADS); } public Map getStreamsConfig(Config jobConfig) { diff --git a/kafka-streams-framework/src/main/java/org/hypertrace/core/kafkastreams/framework/threading/DynamicStreamThreadsCountCalculator.java b/kafka-streams-framework/src/main/java/org/hypertrace/core/kafkastreams/framework/threading/DynamicStreamThreadsCountCalculator.java index 73888f2..ce06da3 100644 --- a/kafka-streams-framework/src/main/java/org/hypertrace/core/kafkastreams/framework/threading/DynamicStreamThreadsCountCalculator.java +++ b/kafka-streams-framework/src/main/java/org/hypertrace/core/kafkastreams/framework/threading/DynamicStreamThreadsCountCalculator.java @@ -6,6 +6,7 @@ import java.util.HashMap; import java.util.Map; import java.util.Map.Entry; +import java.util.OptionalInt; import java.util.Set; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; @@ -30,8 +31,10 @@ * stream tasks. Summing across sub-topologies and dividing by the replica count yields the threads * each instance should run to keep all tasks active without idle threads. * - *

Regex/pattern subscriptions ({@link Source#topicPattern()}) are not supported — those - * sub-topologies contribute zero tasks. No current app subscribes via regex. + *

Returns {@link OptionalInt#empty()} when the topology contains a regex/pattern subscription + * ({@link Source#topicPattern()}) — those sub-topologies cannot be enumerated against the broker + * up-front, so dynamic sizing would silently under-count tasks. The caller falls back to its + * configured default in that case. */ public class DynamicStreamThreadsCountCalculator { @@ -47,12 +50,33 @@ private static Set sourceTopicsOf(final Subtopology subtopology) { .collect(toUnmodifiableSet()); } - public int compute(final Topology topology, final AdminClient adminClient, final int replicas) { + private static boolean hasPatternSource(final Subtopology subtopology) { + return subtopology.nodes().stream() + .filter(node -> node instanceof Source) + .map(node -> (Source) node) + .anyMatch(source -> source.topicPattern() != null); + } + + public OptionalInt compute( + final Topology topology, final AdminClient adminClient, final int replicas) { if (replicas <= 0) { throw new IllegalArgumentException("replicas must be positive, got " + replicas); } final TopologyDescription description = topology.describe(); + + // Bail out if any sub-topology subscribes via regex — topicSet() is empty for those, so + // dynamic sizing would silently under-count tasks. The caller substitutes its fallback. + final boolean anyPatternSource = + description.subtopologies().stream() + .anyMatch(DynamicStreamThreadsCountCalculator::hasPatternSource); + if (anyPatternSource) { + logger.warn( + "Topology contains a regex/pattern source; dynamic num.stream.threads is not supported. " + + "Caller will fall back to its configured default."); + return OptionalInt.empty(); + } + final Set sourceTopics = description.subtopologies().stream() .flatMap(subtopology -> sourceTopicsOf(subtopology).stream()) @@ -87,9 +111,12 @@ public int compute(final Topology topology, final AdminClient adminClient, final subtopologyCount, replicas, threads); - return threads; + return OptionalInt.of(threads); } + // Single-loop implementation: AdminClient.describeTopics() already fires all RPCs concurrently + // before returning futures, so iteration here only consumes a shared deadline (now+timeout) — + // total wall-clock is capped at DESCRIBE_TOPICS_TIMEOUT_MILLIS regardless of topic count. private Map describePartitions( final AdminClient adminClient, final Set topics) { if (topics.isEmpty()) { @@ -97,18 +124,26 @@ private Map describePartitions( } final DescribeTopicsResult result = adminClient.describeTopics(topics); final Map> futures = result.topicNameValues(); - - // Single shared deadline across all topics — worst-case wait is one timeout, not N. - // AdminClient retries the underlying RPC internally per its own retry config. - awaitAll(futures.values(), DESCRIBE_TOPICS_TIMEOUT_MILLIS); - + final long deadlineMillis = System.currentTimeMillis() + DESCRIBE_TOPICS_TIMEOUT_MILLIS; final Map partitions = new HashMap<>(); + for (final Entry> entry : futures.entrySet()) { + final long remainingMillis = deadlineMillis - System.currentTimeMillis(); + if (remainingMillis <= 0) { + throw new RuntimeException( + "Timed out describing topics after " + DESCRIBE_TOPICS_TIMEOUT_MILLIS + "ms"); + } try { - // Non-blocking after awaitAll: each future is already complete (or completed - // exceptionally). Per-topic handling stays so UnknownTopicOrPartitionException - // doesn't fail the whole batch. - partitions.put(entry.getKey(), entry.getValue().get().partitions().size()); + partitions.put( + entry.getKey(), + entry.getValue().get(remainingMillis, TimeUnit.MILLISECONDS).partitions().size()); + } catch (final TimeoutException timeoutException) { + throw new RuntimeException( + "Timed out describing topic " + entry.getKey(), timeoutException); + } catch (final InterruptedException interruptedException) { + Thread.currentThread().interrupt(); + throw new RuntimeException( + "Interrupted while describing topic " + entry.getKey(), interruptedException); } catch (final ExecutionException executionException) { if (executionException.getCause() instanceof UnknownTopicOrPartitionException) { logger.warn( @@ -119,29 +154,8 @@ private Map describePartitions( throw new RuntimeException( "Failed to describe topic " + entry.getKey(), executionException); } - } catch (final InterruptedException interruptedException) { - Thread.currentThread().interrupt(); - throw new RuntimeException( - "Interrupted while describing topic " + entry.getKey(), interruptedException); } } return Map.copyOf(partitions); } - - private static void awaitAll( - final java.util.Collection> futures, final long timeoutMillis) { - try { - KafkaFuture.allOf(futures.toArray(KafkaFuture[]::new)) - .get(timeoutMillis, TimeUnit.MILLISECONDS); - } catch (final TimeoutException timeoutException) { - throw new RuntimeException( - "Timed out describing topics after " + timeoutMillis + "ms", timeoutException); - } catch (final InterruptedException interruptedException) { - Thread.currentThread().interrupt(); - throw new RuntimeException("Interrupted describing topics", interruptedException); - } catch (final ExecutionException executionException) { - // At least one topic future completed exceptionally; per-future handling below classifies - // (UnknownTopicOrPartitionException → 0 partitions; everything else → fail). - } - } } diff --git a/kafka-streams-framework/src/main/java/org/hypertrace/core/kafkastreams/framework/threading/StreamThreadsCountResolver.java b/kafka-streams-framework/src/main/java/org/hypertrace/core/kafkastreams/framework/threading/StreamThreadsCountResolver.java index 2a451df..4c564be 100644 --- a/kafka-streams-framework/src/main/java/org/hypertrace/core/kafkastreams/framework/threading/StreamThreadsCountResolver.java +++ b/kafka-streams-framework/src/main/java/org/hypertrace/core/kafkastreams/framework/threading/StreamThreadsCountResolver.java @@ -2,6 +2,7 @@ import java.util.Map; import java.util.Optional; +import java.util.OptionalInt; import java.util.Properties; import java.util.function.Function; import java.util.function.IntSupplier; @@ -58,28 +59,25 @@ public static boolean isDynamic(final Map streamsProperties) { } /** - * Resolve a concrete thread count for the given topology. Returns {@link - * #FALLBACK_NUM_STREAM_THREADS} when prerequisites are missing (non-positive replica count) or - * the AdminClient/calculator call fails, so the application can still start. + * Resolve a thread count for the given topology. Returns {@link OptionalInt#empty()} when + * prerequisites are missing (non-positive replica count), the topology is unsupported (regex + * sources), or the AdminClient/calculator call fails — the caller substitutes its fallback so the + * application can still start. */ - public int resolve(final Topology topology, final Map streamsProperties) { + public OptionalInt resolve(final Topology topology, final Map streamsProperties) { final int replicas = replicaCountSupplier.getAsInt(); if (replicas <= 0) { - logger.warn( - "replica.count is non-positive ({}); falling back to {} stream threads", - replicas, - FALLBACK_NUM_STREAM_THREADS); - return FALLBACK_NUM_STREAM_THREADS; + logger.warn("replica.count is non-positive ({}); skipping dynamic resolution", replicas); + return OptionalInt.empty(); } try (final AdminClient adminClient = adminClientFactory.apply(toProperties(streamsProperties))) { return calculator.compute(topology, adminClient, replicas); } catch (final RuntimeException runtimeException) { logger.error( - "Failed to compute dynamic num.stream.threads; falling back to {}", - FALLBACK_NUM_STREAM_THREADS, + "Failed to compute dynamic num.stream.threads; skipping dynamic resolution", runtimeException); - return FALLBACK_NUM_STREAM_THREADS; + return OptionalInt.empty(); } } diff --git a/kafka-streams-framework/src/test/java/org/hypertrace/core/kafkastreams/framework/SampleAppTest.java b/kafka-streams-framework/src/test/java/org/hypertrace/core/kafkastreams/framework/SampleAppTest.java index 4a60260..9d009e0 100644 --- a/kafka-streams-framework/src/test/java/org/hypertrace/core/kafkastreams/framework/SampleAppTest.java +++ b/kafka-streams-framework/src/test/java/org/hypertrace/core/kafkastreams/framework/SampleAppTest.java @@ -16,11 +16,15 @@ import java.util.Map; import java.util.Optional; import java.util.Properties; +import java.util.regex.Pattern; import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.TestInputTopic; import org.apache.kafka.streams.TestOutputTopic; import org.apache.kafka.streams.TopologyTestDriver; import org.apache.kafka.streams.errors.LogAndContinueExceptionHandler; +import org.apache.kafka.streams.kstream.Consumed; +import org.apache.kafka.streams.kstream.KStream; import org.hypertrace.core.kafkastreams.framework.rocksdb.BoundedMemoryConfigSetter; import org.hypertrace.core.kafkastreams.framework.threading.StreamThreadsCountResolver; import org.hypertrace.core.serviceframework.config.ConfigClientFactory; @@ -111,6 +115,44 @@ public Map getStreamsConfig(Config jobConfig) { is(StreamThreadsCountResolver.FALLBACK_NUM_STREAM_THREADS)); } + // Pattern-source topology: calculator returns OptionalInt.empty() because regex subscriptions + // can't be enumerated up-front against the broker. The framework must substitute the integer + // fallback so Kafka Streams gets a parseable value (not the literal "DYNAMIC"). + @Test + public void dynamicWithPatternSourceFallsBackToEight() { + SampleApp dynamicApp = + new SampleApp(ConfigClientFactory.getClient()) { + @Override + public Map getStreamsConfig(Config jobConfig) { + Map properties = super.getStreamsConfig(jobConfig); + properties.put(NUM_STREAM_THREADS_CONFIG, StreamThreadsCountResolver.DYNAMIC_SENTINEL); + return properties; + } + + @Override + public StreamsBuilder buildTopology( + Map streamsConfig, + StreamsBuilder streamsBuilder, + Map> sourceStreams) { + streamsBuilder.stream( + Pattern.compile("input-.*"), Consumed.with(Serdes.String(), Serdes.String())) + .foreach((key, value) -> {}); + return streamsBuilder; + } + + @Override + protected Optional getStreamThreadsCountResolver() { + return Optional.of(new StreamThreadsCountResolver(() -> 8)); + } + }; + + dynamicApp.doInit(); + + assertThat( + dynamicApp.streamsConfig.get(NUM_STREAM_THREADS_CONFIG), + is(StreamThreadsCountResolver.FALLBACK_NUM_STREAM_THREADS)); + } + // Verifies the fix for "exception while obtaining the resolver should not leak DYNAMIC" — when // getStreamThreadsCountResolver() throws, the framework must catch and fall back to the integer // default rather than letting the literal sentinel reach Kafka Streams. diff --git a/kafka-streams-framework/src/test/java/org/hypertrace/core/kafkastreams/framework/threading/DynamicStreamThreadsCountCalculatorTest.java b/kafka-streams-framework/src/test/java/org/hypertrace/core/kafkastreams/framework/threading/DynamicStreamThreadsCountCalculatorTest.java index f00aa52..b36ccc4 100644 --- a/kafka-streams-framework/src/test/java/org/hypertrace/core/kafkastreams/framework/threading/DynamicStreamThreadsCountCalculatorTest.java +++ b/kafka-streams-framework/src/test/java/org/hypertrace/core/kafkastreams/framework/threading/DynamicStreamThreadsCountCalculatorTest.java @@ -10,7 +10,9 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.OptionalInt; import java.util.Set; +import java.util.regex.Pattern; import java.util.stream.IntStream; import org.apache.kafka.clients.admin.AdminClient; import org.apache.kafka.clients.admin.DescribeTopicsResult; @@ -44,7 +46,7 @@ void singleSubtopologyWithThirtyPartitionsAndEightReplicasGivesFourThreads() { final Topology topology = topologyForSubtopologies(Set.of("topic-a")); stubPartitions(Map.of("topic-a", 30)); - assertEquals(4, calculator.compute(topology, adminClient, 8)); + assertEquals(OptionalInt.of(4), calculator.compute(topology, adminClient, 8)); } @Test @@ -52,7 +54,7 @@ void twoSubtopologiesAreSummedThenDividedByReplicas() { final Topology topology = topologyForSubtopologies(Set.of("topic-a"), Set.of("topic-b")); stubPartitions(Map.of("topic-a", 10, "topic-b", 20)); - assertEquals(4, calculator.compute(topology, adminClient, 8)); + assertEquals(OptionalInt.of(4), calculator.compute(topology, adminClient, 8)); } @Test @@ -60,7 +62,7 @@ void absentTopicCountsAsZeroPartitions() { final Topology topology = topologyForSubtopologies(Set.of("topic-a"), Set.of("topic-missing")); stubPartitions(Map.of("topic-a", 16, "topic-missing", ABSENT)); - assertEquals(2, calculator.compute(topology, adminClient, 8)); + assertEquals(OptionalInt.of(2), calculator.compute(topology, adminClient, 8)); } @Test @@ -78,7 +80,20 @@ void totalTasksZeroReturnsOne() { final Topology topology = topologyForSubtopologies(Set.of("topic-a")); stubPartitions(Map.of("topic-a", ABSENT)); - assertEquals(1, calculator.compute(topology, adminClient, 8)); + assertEquals(OptionalInt.of(1), calculator.compute(topology, adminClient, 8)); + } + + // Regex/pattern subscriptions cannot be enumerated up-front against the broker, so dynamic + // sizing would silently under-count tasks. Calculator must signal "not applicable" via empty + // so the caller falls back to its configured default. + @Test + void patternSubscriptionReturnsEmpty() { + final StreamsBuilder builder = new StreamsBuilder(); + builder.stream(Pattern.compile("topic-.*"), Consumed.with(Serdes.String(), Serdes.String())) + .foreach((key, value) -> {}); + final Topology topology = builder.build(); + + assertEquals(OptionalInt.empty(), calculator.compute(topology, adminClient, 8)); } @SafeVarargs diff --git a/kafka-streams-framework/src/test/java/org/hypertrace/core/kafkastreams/framework/threading/StreamThreadsCountResolverTest.java b/kafka-streams-framework/src/test/java/org/hypertrace/core/kafkastreams/framework/threading/StreamThreadsCountResolverTest.java index 71cd7d5..7d03c08 100644 --- a/kafka-streams-framework/src/test/java/org/hypertrace/core/kafkastreams/framework/threading/StreamThreadsCountResolverTest.java +++ b/kafka-streams-framework/src/test/java/org/hypertrace/core/kafkastreams/framework/threading/StreamThreadsCountResolverTest.java @@ -10,6 +10,7 @@ import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.OptionalInt; import java.util.Properties; import java.util.function.Function; import java.util.function.IntSupplier; @@ -31,43 +32,35 @@ class StreamThreadsCountResolverTest { @Test - void calculatorThrowFallsBackToEight() { + void calculatorThrowReturnsEmpty() { final AdminClient adminClient = mock(AdminClient.class); when(adminClient.describeTopics(anySet())) .thenThrow(new RuntimeException("simulated broker outage")); final StreamThreadsCountResolver resolver = resolverWith(() -> 8, adminClient); - final int resolved = resolver.resolve(simpleTopology(), bootstrapProperties()); - - assertEquals(StreamThreadsCountResolver.FALLBACK_NUM_STREAM_THREADS, resolved); + assertEquals(OptionalInt.empty(), resolver.resolve(simpleTopology(), bootstrapProperties())); } @Test - void missingReplicaCountFallsBack() { + void missingReplicaCountReturnsEmpty() { final IntSupplier missing = StreamThreadsCountResolver.optionalReplicaCount(Optional.empty()); final StreamThreadsCountResolver resolver = resolverWith(missing, mock(AdminClient.class)); - final int resolved = resolver.resolve(simpleTopology(), bootstrapProperties()); - - assertEquals(StreamThreadsCountResolver.FALLBACK_NUM_STREAM_THREADS, resolved); + assertEquals(OptionalInt.empty(), resolver.resolve(simpleTopology(), bootstrapProperties())); } @Test - void zeroReplicaCountFallsBack() { + void zeroReplicaCountReturnsEmpty() { final StreamThreadsCountResolver resolver = resolverWith(() -> 0, mock(AdminClient.class)); - final int resolved = resolver.resolve(simpleTopology(), bootstrapProperties()); - - assertEquals(StreamThreadsCountResolver.FALLBACK_NUM_STREAM_THREADS, resolved); + assertEquals(OptionalInt.empty(), resolver.resolve(simpleTopology(), bootstrapProperties())); } @Test - void negativeReplicaCountFallsBack() { + void negativeReplicaCountReturnsEmpty() { final StreamThreadsCountResolver resolver = resolverWith(() -> -1, mock(AdminClient.class)); - final int resolved = resolver.resolve(simpleTopology(), bootstrapProperties()); - - assertEquals(StreamThreadsCountResolver.FALLBACK_NUM_STREAM_THREADS, resolved); + assertEquals(OptionalInt.empty(), resolver.resolve(simpleTopology(), bootstrapProperties())); } @Test @@ -78,9 +71,7 @@ void delegatesToCalculatorWithConfiguredReplicas() { stubPartitions(adminClient, Map.of("topic-a", 30)); final StreamThreadsCountResolver resolver = resolverWith(() -> 8, adminClient); - final int resolved = resolver.resolve(simpleTopology(), bootstrapProperties()); - - assertEquals(4, resolved); + assertEquals(OptionalInt.of(4), resolver.resolve(simpleTopology(), bootstrapProperties())); } @Test From 5522fba7f58428337a7d48fb5d6605f5ed7b0b0b Mon Sep 17 00:00:00 2001 From: Suresh Prakash Date: Thu, 4 Jun 2026 16:36:50 +0530 Subject: [PATCH 7/8] Document fallback scenarios on getStreamThreadsCountResolver MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Single source of truth for when the framework substitutes FALLBACK_NUM_STREAM_THREADS — placed on the user-facing override point so override authors don't have to chase the trail across resolver/calculator to know what bypasses their wiring. Helper comment now points at this javadoc instead of duplicating the list. --- .../kafkastreams/framework/KafkaStreamsApp.java | 16 ++++++++++++---- 1 file changed, 12 insertions(+), 4 deletions(-) diff --git a/kafka-streams-framework/src/main/java/org/hypertrace/core/kafkastreams/framework/KafkaStreamsApp.java b/kafka-streams-framework/src/main/java/org/hypertrace/core/kafkastreams/framework/KafkaStreamsApp.java index 8f012b2..afe71df 100644 --- a/kafka-streams-framework/src/main/java/org/hypertrace/core/kafkastreams/framework/KafkaStreamsApp.java +++ b/kafka-streams-framework/src/main/java/org/hypertrace/core/kafkastreams/framework/KafkaStreamsApp.java @@ -288,16 +288,24 @@ public abstract StreamsBuilder buildTopology( * StreamThreadsCountResolver.optionalReplicaCount(replicaCount))); * } * } + * + *

Fallback to {@link StreamThreadsCountResolver#FALLBACK_NUM_STREAM_THREADS} occurs when: + * + *

    + *
  • {@code num.stream.threads} is set to {@code DYNAMIC} but this method returns empty + *
  • this method itself throws + *
  • replica count supplied to the resolver is non-positive (env var unset/zero/negative) + *
  • the AdminClient/calculator call throws (broker unreachable, describe timeout, etc.) + *
  • the topology contains a regex/pattern source — partitions cannot be enumerated up-front + *
*/ protected Optional getStreamThreadsCountResolver() { return Optional.empty(); } // Caller must check StreamThreadsCountResolver.isDynamic(...) before invoking. Always returns a - // concrete int — every "cannot resolve" path (resolver-supplier throws, no resolver wired, - // resolver returns empty for unsupported topologies / non-positive replicas / AdminClient - // failure) collapses to FALLBACK_NUM_STREAM_THREADS so the literal "DYNAMIC" sentinel never - // reaches Kafka Streams. + // concrete int so the literal "DYNAMIC" sentinel never reaches Kafka Streams config — see + // getStreamThreadsCountResolver() javadoc for the full list of fallback triggers. private int resolveDynamicStreamThreads(Map streamsProperties) { Optional resolver; try { From c5a7bd024b0b486e9ec54d3450a086980baa1e88 Mon Sep 17 00:00:00 2001 From: Suresh Prakash Date: Thu, 4 Jun 2026 17:28:03 +0530 Subject: [PATCH 8/8] Replace DYNAMIC sentinel with boolean opt-in; drop static fallback MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Per review: the "DYNAMIC" string sentinel for num.stream.threads is type- unsafe — anything that constructs StreamsConfig from the map before doInit() substitutes the int will throw ConfigException. And FALLBACK_NUM_STREAM_THREADS = 8 is wrong for any app whose static value isn't 8. - Drop DYNAMIC_SENTINEL / isDynamic() / FALLBACK_NUM_STREAM_THREADS. - Add framework-only flag dynamic.num.stream.threads (boolean) on KafkaStreamsApp; stripped before the config reaches Kafka Streams. - Calculator: totalTasks == 0 now returns OptionalInt.empty() instead of OptionalInt.of(1) — caller keeps the configured value. - Resolver-empty / resolver-throws / no-resolver paths all leave the configured numeric num.stream.threads untouched. Configured value is the fallback by definition. Dual opt-in preserved: app overrides getStreamThreadsCountResolver() AND sets dynamic.num.stream.threads = true. Per-cluster rollout/rollback intact. --- .../framework/KafkaStreamsApp.java | 74 ++++++++++++------- .../DynamicStreamThreadsCountCalculator.java | 9 ++- .../threading/StreamThreadsCountResolver.java | 33 +++------ .../kafkastreams/framework/SampleAppTest.java | 46 ++++++------ ...namicStreamThreadsCountCalculatorTest.java | 5 +- .../StreamThreadsCountResolverTest.java | 19 ----- 6 files changed, 90 insertions(+), 96 deletions(-) diff --git a/kafka-streams-framework/src/main/java/org/hypertrace/core/kafkastreams/framework/KafkaStreamsApp.java b/kafka-streams-framework/src/main/java/org/hypertrace/core/kafkastreams/framework/KafkaStreamsApp.java index afe71df..e30ba12 100644 --- a/kafka-streams-framework/src/main/java/org/hypertrace/core/kafkastreams/framework/KafkaStreamsApp.java +++ b/kafka-streams-framework/src/main/java/org/hypertrace/core/kafkastreams/framework/KafkaStreamsApp.java @@ -69,6 +69,16 @@ public abstract class KafkaStreamsApp extends PlatformService { public static final String CLEANUP_LOCAL_STATE = "cleanup.local.state"; public static final String PRE_CREATE_TOPICS = "precreate.topics"; public static final String KAFKA_STREAMS_CONFIG_KEY = "kafka.streams.config"; + + /** + * Framework-level boolean opt-in (set in the streams config map) that gates dynamic {@code + * num.stream.threads} resolution. Apps must also override {@link + * #getStreamThreadsCountResolver()}; this flag exists separately so deployments can roll out and + * roll back per-cluster without code changes. The flag is consumed by the framework before the + * config reaches Kafka Streams and is not a Kafka config key. + */ + public static final String DYNAMIC_NUM_STREAM_THREADS_CONFIG = "dynamic.num.stream.threads"; + private static final String SHUTDOWN_DURATION = "shutdown.duration"; private static final Logger logger = LoggerFactory.getLogger(KafkaStreamsApp.class); @@ -110,8 +120,15 @@ protected void doInit() { streamsBuilder = buildTopology(streamsConfig, streamsBuilder, sourceStreams); this.topology = streamsBuilder.build(); - if (StreamThreadsCountResolver.isDynamic(streamsConfig)) { - streamsConfig.put(NUM_STREAM_THREADS_CONFIG, resolveDynamicStreamThreads(streamsConfig)); + // Strip the dynamic-resolution flag before it reaches Kafka Streams (it's a framework-level + // opt-in, not a Kafka config). When enabled and a resolver is wired up, replace the + // configured num.stream.threads with the dynamically-computed value; otherwise the + // configured value flows through unchanged. + final boolean dynamicEnabled = isDynamicNumStreamThreadsEnabled(streamsConfig); + streamsConfig.remove(DYNAMIC_NUM_STREAM_THREADS_CONFIG); + if (dynamicEnabled) { + resolveDynamicStreamThreads(streamsConfig) + .ifPresent(threads -> streamsConfig.put(NUM_STREAM_THREADS_CONFIG, threads)); } getLogger().info("Finalized kafka streams configuration: {}", streamsConfig); @@ -267,9 +284,9 @@ public abstract StreamsBuilder buildTopology( /** * Override in subclasses that want auto-sized {@code num.stream.threads}. Return a resolver - * configured with this app's replica-count source. The framework only invokes this when {@code - * num.stream.threads} is set to {@link StreamThreadsCountResolver#DYNAMIC_SENTINEL}; returning an - * empty optional disables dynamic resolution and the app keeps whatever value was configured. + * configured with this app's replica-count source. The framework only invokes this when {@link + * #DYNAMIC_NUM_STREAM_THREADS_CONFIG} is {@code true} in the streams config; returning an empty + * optional (the default) disables dynamic resolution. * *

Apps that don't override this are unaffected — the default returns {@code Optional.empty()} * and the framework leaves {@code num.stream.threads} exactly as configured. Apps that do @@ -289,51 +306,54 @@ public abstract StreamsBuilder buildTopology( * } * } * - *

Fallback to {@link StreamThreadsCountResolver#FALLBACK_NUM_STREAM_THREADS} occurs when: + *

The configured numeric {@code num.stream.threads} flows through unchanged when: * *

    - *
  • {@code num.stream.threads} is set to {@code DYNAMIC} but this method returns empty - *
  • this method itself throws - *
  • replica count supplied to the resolver is non-positive (env var unset/zero/negative) - *
  • the AdminClient/calculator call throws (broker unreachable, describe timeout, etc.) - *
  • the topology contains a regex/pattern source — partitions cannot be enumerated up-front + *
  • {@link #DYNAMIC_NUM_STREAM_THREADS_CONFIG} is {@code false} or unset + *
  • this method returns empty (no resolver wired up) + *
  • this method throws + *
  • the resolver returns empty: replica count is non-positive, the AdminClient/calculator + * call throws, the topology contains a regex/pattern source, or no source partitions are + * resolvable on the broker *
*/ protected Optional getStreamThreadsCountResolver() { return Optional.empty(); } - // Caller must check StreamThreadsCountResolver.isDynamic(...) before invoking. Always returns a - // concrete int so the literal "DYNAMIC" sentinel never reaches Kafka Streams config — see - // getStreamThreadsCountResolver() javadoc for the full list of fallback triggers. - private int resolveDynamicStreamThreads(Map streamsProperties) { - Optional resolver; + // Caller must check the dynamic-enabled flag before invoking. Returns OptionalInt.empty() when + // no resolver is wired or resolution cannot produce a value — caller keeps the configured + // num.stream.threads as the fallback. + private OptionalInt resolveDynamicStreamThreads(Map streamsProperties) { + final Optional resolver; try { resolver = getStreamThreadsCountResolver(); } catch (Exception exception) { getLogger() .warn( - "getStreamThreadsCountResolver() threw; falling back to {} stream threads", - StreamThreadsCountResolver.FALLBACK_NUM_STREAM_THREADS, + "getStreamThreadsCountResolver() threw; keeping configured num.stream.threads", exception); - return StreamThreadsCountResolver.FALLBACK_NUM_STREAM_THREADS; + return OptionalInt.empty(); } if (resolver.isEmpty()) { getLogger() .warn( - "{} is set to DYNAMIC but no StreamThreadsCountResolver is provided; falling back to {}", - NUM_STREAM_THREADS_CONFIG, - StreamThreadsCountResolver.FALLBACK_NUM_STREAM_THREADS); - return StreamThreadsCountResolver.FALLBACK_NUM_STREAM_THREADS; + "{} is true but no StreamThreadsCountResolver is provided; keeping configured num.stream.threads", + DYNAMIC_NUM_STREAM_THREADS_CONFIG); + return OptionalInt.empty(); } final OptionalInt resolved = resolver.get().resolve(this.topology, streamsProperties); if (resolved.isEmpty()) { getLogger() - .warn( - "Resolver could not compute dynamic num.stream.threads; falling back to {}", - StreamThreadsCountResolver.FALLBACK_NUM_STREAM_THREADS); + .warn("Resolver could not compute dynamic num.stream.threads; keeping configured value"); } - return resolved.orElse(StreamThreadsCountResolver.FALLBACK_NUM_STREAM_THREADS); + return resolved; + } + + private static boolean isDynamicNumStreamThreadsEnabled( + final Map streamsProperties) { + final Object value = streamsProperties.get(DYNAMIC_NUM_STREAM_THREADS_CONFIG); + return value != null && Boolean.parseBoolean(String.valueOf(value)); } public Map getStreamsConfig(Config jobConfig) { diff --git a/kafka-streams-framework/src/main/java/org/hypertrace/core/kafkastreams/framework/threading/DynamicStreamThreadsCountCalculator.java b/kafka-streams-framework/src/main/java/org/hypertrace/core/kafkastreams/framework/threading/DynamicStreamThreadsCountCalculator.java index ce06da3..50d5471 100644 --- a/kafka-streams-framework/src/main/java/org/hypertrace/core/kafkastreams/framework/threading/DynamicStreamThreadsCountCalculator.java +++ b/kafka-streams-framework/src/main/java/org/hypertrace/core/kafkastreams/framework/threading/DynamicStreamThreadsCountCalculator.java @@ -104,7 +104,14 @@ public OptionalInt compute( totalTasks += tasksForSubtopology; } - final int threads = totalTasks == 0 ? 1 : (int) Math.ceil((double) totalTasks / replicas); + if (totalTasks == 0) { + logger.warn( + "No resolvable partitions across {} sub-topologies; skipping dynamic num.stream.threads.", + subtopologyCount); + return OptionalInt.empty(); + } + + final int threads = (int) Math.ceil((double) totalTasks / replicas); logger.info( "Dynamic num.stream.threads: totalTasks={} across {} sub-topologies, replicas={}, computed={}", totalTasks, diff --git a/kafka-streams-framework/src/main/java/org/hypertrace/core/kafkastreams/framework/threading/StreamThreadsCountResolver.java b/kafka-streams-framework/src/main/java/org/hypertrace/core/kafkastreams/framework/threading/StreamThreadsCountResolver.java index 4c564be..0daa8cf 100644 --- a/kafka-streams-framework/src/main/java/org/hypertrace/core/kafkastreams/framework/threading/StreamThreadsCountResolver.java +++ b/kafka-streams-framework/src/main/java/org/hypertrace/core/kafkastreams/framework/threading/StreamThreadsCountResolver.java @@ -12,23 +12,18 @@ import org.slf4j.LoggerFactory; /** - * Resolves a concrete {@code num.stream.threads} value when the application has configured the - * {@link #DYNAMIC_SENTINEL} sentinel. Bridges the topology + AdminClient to the {@link - * DynamicStreamThreadsCountCalculator} and falls back to {@link #FALLBACK_NUM_STREAM_THREADS} when - * resolution fails. + * Resolves a concrete {@code num.stream.threads} value for an app that has opted into dynamic + * thread sizing. Bridges the topology + AdminClient to the {@link + * DynamicStreamThreadsCountCalculator} and returns {@link OptionalInt#empty()} when resolution + * cannot produce a meaningful value — the caller keeps the configured numeric {@code + * num.stream.threads}. * *

The replica count is supplied externally — applications typically wire it from a {@code * REPLICA_COUNT} environment variable injected by the deployment template. A non-positive value - * (zero, negative, or unset) falls back to {@link #FALLBACK_NUM_STREAM_THREADS}. + * (zero, negative, or unset) yields {@link OptionalInt#empty()}. */ public class StreamThreadsCountResolver { - public static final String DYNAMIC_SENTINEL = "DYNAMIC"; - - // Reasonable default for current Hypertrace Kafka Streams apps. Apps that need a different - // value should configure a numeric num.stream.threads explicitly instead of using DYNAMIC. - public static final int FALLBACK_NUM_STREAM_THREADS = 8; - private static final Logger logger = LoggerFactory.getLogger(StreamThreadsCountResolver.class); private final DynamicStreamThreadsCountCalculator calculator; @@ -48,21 +43,11 @@ public StreamThreadsCountResolver( this.adminClientFactory = adminClientFactory; } - /** - * Returns true if the {@code num.stream.threads} value in the given streams properties is the - * dynamic sentinel and should be resolved by this class. - */ - public static boolean isDynamic(final Map streamsProperties) { - final Object value = - streamsProperties.get(org.apache.kafka.streams.StreamsConfig.NUM_STREAM_THREADS_CONFIG); - return value != null && DYNAMIC_SENTINEL.equals(String.valueOf(value)); - } - /** * Resolve a thread count for the given topology. Returns {@link OptionalInt#empty()} when * prerequisites are missing (non-positive replica count), the topology is unsupported (regex - * sources), or the AdminClient/calculator call fails — the caller substitutes its fallback so the - * application can still start. + * sources), or the AdminClient/calculator call fails — the caller keeps the configured numeric + * {@code num.stream.threads} so the application can still start with a sane value. */ public OptionalInt resolve(final Topology topology, final Map streamsProperties) { final int replicas = replicaCountSupplier.getAsInt(); @@ -84,7 +69,7 @@ public OptionalInt resolve(final Topology topology, final Map st /** * Convenience adapter — given an {@code Optional} replica-count source (the common * config-loaded shape), produce a supplier that yields the value when present and {@code 0} when - * absent (treated as a fallback signal by {@link #resolve}). + * absent (treated as a "skip dynamic resolution" signal by {@link #resolve}). */ public static IntSupplier optionalReplicaCount(final Optional replicaCount) { return () -> replicaCount.orElse(0); diff --git a/kafka-streams-framework/src/test/java/org/hypertrace/core/kafkastreams/framework/SampleAppTest.java b/kafka-streams-framework/src/test/java/org/hypertrace/core/kafkastreams/framework/SampleAppTest.java index 9d009e0..80e1429 100644 --- a/kafka-streams-framework/src/test/java/org/hypertrace/core/kafkastreams/framework/SampleAppTest.java +++ b/kafka-streams-framework/src/test/java/org/hypertrace/core/kafkastreams/framework/SampleAppTest.java @@ -93,39 +93,43 @@ public void baseStreamsConfigTest() { assertThat(baseStreamsConfig.get(producerPrefix(ACKS_CONFIG)), is("all")); } - // Verifies the fix for "DYNAMIC sentinel could leak to Kafka Streams config when no resolver is - // wired up" — without resolver override, the framework must replace DYNAMIC with the integer - // fallback so Kafka Streams receives a parseable value. + // No resolver wired up → framework must keep the configured num.stream.threads (the configured + // value is the fallback by definition) and strip the framework-only flag before it reaches + // Kafka Streams. @Test - public void dynamicWithoutResolverFallsBackToEight() { + public void dynamicWithoutResolverKeepsConfiguredValue() { + final Object configuredThreads = sampleApp.streamsConfig.get(NUM_STREAM_THREADS_CONFIG); + SampleApp dynamicApp = new SampleApp(ConfigClientFactory.getClient()) { @Override public Map getStreamsConfig(Config jobConfig) { Map properties = super.getStreamsConfig(jobConfig); - properties.put(NUM_STREAM_THREADS_CONFIG, StreamThreadsCountResolver.DYNAMIC_SENTINEL); + properties.put(KafkaStreamsApp.DYNAMIC_NUM_STREAM_THREADS_CONFIG, true); return properties; } }; dynamicApp.doInit(); + assertThat(dynamicApp.streamsConfig.get(NUM_STREAM_THREADS_CONFIG), is(configuredThreads)); assertThat( - dynamicApp.streamsConfig.get(NUM_STREAM_THREADS_CONFIG), - is(StreamThreadsCountResolver.FALLBACK_NUM_STREAM_THREADS)); + dynamicApp.streamsConfig.containsKey(KafkaStreamsApp.DYNAMIC_NUM_STREAM_THREADS_CONFIG), + is(false)); } - // Pattern-source topology: calculator returns OptionalInt.empty() because regex subscriptions - // can't be enumerated up-front against the broker. The framework must substitute the integer - // fallback so Kafka Streams gets a parseable value (not the literal "DYNAMIC"). + // Pattern-source topology: calculator returns OptionalInt.empty(). Configured num.stream.threads + // flows through unchanged. @Test - public void dynamicWithPatternSourceFallsBackToEight() { + public void dynamicWithPatternSourceKeepsConfiguredValue() { + final Object configuredThreads = sampleApp.streamsConfig.get(NUM_STREAM_THREADS_CONFIG); + SampleApp dynamicApp = new SampleApp(ConfigClientFactory.getClient()) { @Override public Map getStreamsConfig(Config jobConfig) { Map properties = super.getStreamsConfig(jobConfig); - properties.put(NUM_STREAM_THREADS_CONFIG, StreamThreadsCountResolver.DYNAMIC_SENTINEL); + properties.put(KafkaStreamsApp.DYNAMIC_NUM_STREAM_THREADS_CONFIG, true); return properties; } @@ -148,22 +152,20 @@ protected Optional getStreamThreadsCountResolver() { dynamicApp.doInit(); - assertThat( - dynamicApp.streamsConfig.get(NUM_STREAM_THREADS_CONFIG), - is(StreamThreadsCountResolver.FALLBACK_NUM_STREAM_THREADS)); + assertThat(dynamicApp.streamsConfig.get(NUM_STREAM_THREADS_CONFIG), is(configuredThreads)); } - // Verifies the fix for "exception while obtaining the resolver should not leak DYNAMIC" — when - // getStreamThreadsCountResolver() throws, the framework must catch and fall back to the integer - // default rather than letting the literal sentinel reach Kafka Streams. + // Resolver throws → configured num.stream.threads flows through unchanged. @Test - public void dynamicWithThrowingResolverFallsBackToEight() { + public void dynamicWithThrowingResolverKeepsConfiguredValue() { + final Object configuredThreads = sampleApp.streamsConfig.get(NUM_STREAM_THREADS_CONFIG); + SampleApp dynamicApp = new SampleApp(ConfigClientFactory.getClient()) { @Override public Map getStreamsConfig(Config jobConfig) { Map properties = super.getStreamsConfig(jobConfig); - properties.put(NUM_STREAM_THREADS_CONFIG, StreamThreadsCountResolver.DYNAMIC_SENTINEL); + properties.put(KafkaStreamsApp.DYNAMIC_NUM_STREAM_THREADS_CONFIG, true); return properties; } @@ -175,8 +177,6 @@ protected Optional getStreamThreadsCountResolver() { dynamicApp.doInit(); - assertThat( - dynamicApp.streamsConfig.get(NUM_STREAM_THREADS_CONFIG), - is(StreamThreadsCountResolver.FALLBACK_NUM_STREAM_THREADS)); + assertThat(dynamicApp.streamsConfig.get(NUM_STREAM_THREADS_CONFIG), is(configuredThreads)); } } diff --git a/kafka-streams-framework/src/test/java/org/hypertrace/core/kafkastreams/framework/threading/DynamicStreamThreadsCountCalculatorTest.java b/kafka-streams-framework/src/test/java/org/hypertrace/core/kafkastreams/framework/threading/DynamicStreamThreadsCountCalculatorTest.java index b36ccc4..5f82245 100644 --- a/kafka-streams-framework/src/test/java/org/hypertrace/core/kafkastreams/framework/threading/DynamicStreamThreadsCountCalculatorTest.java +++ b/kafka-streams-framework/src/test/java/org/hypertrace/core/kafkastreams/framework/threading/DynamicStreamThreadsCountCalculatorTest.java @@ -75,12 +75,13 @@ void zeroOrNegativeReplicasThrows() { IllegalArgumentException.class, () -> calculator.compute(topology, adminClient, -1)); } + // No resolvable partitions -> caller keeps the configured num.stream.threads as-is. @Test - void totalTasksZeroReturnsOne() { + void totalTasksZeroReturnsEmpty() { final Topology topology = topologyForSubtopologies(Set.of("topic-a")); stubPartitions(Map.of("topic-a", ABSENT)); - assertEquals(OptionalInt.of(1), calculator.compute(topology, adminClient, 8)); + assertEquals(OptionalInt.empty(), calculator.compute(topology, adminClient, 8)); } // Regex/pattern subscriptions cannot be enumerated up-front against the broker, so dynamic diff --git a/kafka-streams-framework/src/test/java/org/hypertrace/core/kafkastreams/framework/threading/StreamThreadsCountResolverTest.java b/kafka-streams-framework/src/test/java/org/hypertrace/core/kafkastreams/framework/threading/StreamThreadsCountResolverTest.java index 7d03c08..a546e75 100644 --- a/kafka-streams-framework/src/test/java/org/hypertrace/core/kafkastreams/framework/threading/StreamThreadsCountResolverTest.java +++ b/kafka-streams-framework/src/test/java/org/hypertrace/core/kafkastreams/framework/threading/StreamThreadsCountResolverTest.java @@ -24,7 +24,6 @@ import org.apache.kafka.common.TopicPartitionInfo; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.StreamsBuilder; -import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.Topology; import org.apache.kafka.streams.kstream.Consumed; import org.junit.jupiter.api.Test; @@ -74,24 +73,6 @@ void delegatesToCalculatorWithConfiguredReplicas() { assertEquals(OptionalInt.of(4), resolver.resolve(simpleTopology(), bootstrapProperties())); } - @Test - void isDynamicTrueWhenSentinelSet() { - assertEquals( - true, - StreamThreadsCountResolver.isDynamic( - Map.of( - StreamsConfig.NUM_STREAM_THREADS_CONFIG, - StreamThreadsCountResolver.DYNAMIC_SENTINEL))); - } - - @Test - void isDynamicFalseForNumericValueOrAbsent() { - assertEquals( - false, - StreamThreadsCountResolver.isDynamic(Map.of(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 4))); - assertEquals(false, StreamThreadsCountResolver.isDynamic(Map.of())); - } - private static StreamThreadsCountResolver resolverWith( final IntSupplier replicaCountSupplier, final AdminClient adminClient) { final Function factory = properties -> adminClient;