Skip to content
Merged
1 change: 1 addition & 0 deletions kafka-streams-framework/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ dependencies {
testImplementation(commonLibs.junit.jupiter)
testImplementation(localLibs.junit.pioneer)
testImplementation(commonLibs.mockito.core)
testImplementation(commonLibs.mockito.junit)
testImplementation(localLibs.hamcrest.core)
testRuntimeOnly(commonLibs.log4j.slf4j2.impl)
}
Expand Down
1 change: 1 addition & 0 deletions kafka-streams-framework/gradle.lockfile
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ org.junit:junit-bom:5.10.0=testCompileClasspath
org.junit:junit-bom:5.11.2=testRuntimeClasspath
org.latencyutils:LatencyUtils:2.0.3=runtimeClasspath,testRuntimeClasspath
org.mockito:mockito-core:5.8.0=testCompileClasspath,testRuntimeClasspath
org.mockito:mockito-junit-jupiter:5.8.0=testCompileClasspath,testRuntimeClasspath
org.objenesis:objenesis:3.3=testRuntimeClasspath
org.opentest4j:opentest4j:1.3.0=testCompileClasspath,testRuntimeClasspath
org.projectlombok:lombok:1.18.30=annotationProcessor,compileClasspath,testCompileClasspath
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import static org.apache.kafka.streams.StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG;
import static org.apache.kafka.streams.StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG;
import static org.apache.kafka.streams.StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG;
import static org.apache.kafka.streams.StreamsConfig.NUM_STREAM_THREADS_CONFIG;
import static org.apache.kafka.streams.StreamsConfig.ROCKSDB_CONFIG_SETTER_CLASS_CONFIG;
import static org.apache.kafka.streams.StreamsConfig.TOPOLOGY_OPTIMIZATION;
import static org.apache.kafka.streams.StreamsConfig.consumerPrefix;
Expand All @@ -34,6 +35,8 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
Expand All @@ -49,6 +52,7 @@
import org.hypertrace.core.kafkastreams.framework.listeners.LoggingStateListener;
import org.hypertrace.core.kafkastreams.framework.listeners.LoggingStateRestoreListener;
import org.hypertrace.core.kafkastreams.framework.rocksdb.BoundedMemoryConfigSetter;
import org.hypertrace.core.kafkastreams.framework.threading.StreamThreadsCountResolver;
import org.hypertrace.core.kafkastreams.framework.timestampextractors.UseWallclockTimeOnInvalidTimestamp;
import org.hypertrace.core.kafkastreams.framework.topics.creator.KafkaTopicCreator;
import org.hypertrace.core.kafkastreams.framework.util.ExceptionUtils;
Expand All @@ -65,6 +69,16 @@ public abstract class KafkaStreamsApp extends PlatformService {
public static final String CLEANUP_LOCAL_STATE = "cleanup.local.state";
public static final String PRE_CREATE_TOPICS = "precreate.topics";
public static final String KAFKA_STREAMS_CONFIG_KEY = "kafka.streams.config";

/**
* Framework-level boolean opt-in (set in the streams config map) that gates dynamic {@code
* num.stream.threads} resolution. Apps must also override {@link
* #getStreamThreadsCountResolver()}; this flag exists separately so deployments can roll out and
* roll back per-cluster without code changes. The flag is consumed by the framework before the
* config reaches Kafka Streams and is not a Kafka config key.
*/
public static final String DYNAMIC_NUM_STREAM_THREADS_CONFIG = "dynamic.num.stream.threads";

private static final String SHUTDOWN_DURATION = "shutdown.duration";
private static final Logger logger = LoggerFactory.getLogger(KafkaStreamsApp.class);

Expand Down Expand Up @@ -106,6 +120,17 @@ protected void doInit() {
streamsBuilder = buildTopology(streamsConfig, streamsBuilder, sourceStreams);
this.topology = streamsBuilder.build();

// Strip the dynamic-resolution flag before it reaches Kafka Streams (it's a framework-level
// opt-in, not a Kafka config). When enabled and a resolver is wired up, replace the
// configured num.stream.threads with the dynamically-computed value; otherwise the
// configured value flows through unchanged.
final boolean dynamicEnabled = isDynamicNumStreamThreadsEnabled(streamsConfig);
streamsConfig.remove(DYNAMIC_NUM_STREAM_THREADS_CONFIG);
if (dynamicEnabled) {
resolveDynamicStreamThreads(streamsConfig)
.ifPresent(threads -> streamsConfig.put(NUM_STREAM_THREADS_CONFIG, threads));
}

getLogger().info("Finalized kafka streams configuration: {}", streamsConfig);

// pre-create input/output topics required for kstream application
Expand Down Expand Up @@ -257,6 +282,80 @@ public abstract StreamsBuilder buildTopology(
StreamsBuilder streamsBuilder,
Map<String, KStream<?, ?>> sourceStreams);

/**
* Override in subclasses that want auto-sized {@code num.stream.threads}. Return a resolver
* configured with this app's replica-count source. The framework only invokes this when {@link
* #DYNAMIC_NUM_STREAM_THREADS_CONFIG} is {@code true} in the streams config; returning an empty
* optional (the default) disables dynamic resolution.
*
* <p>Apps that don't override this are unaffected — the default returns {@code Optional.empty()}
* and the framework leaves {@code num.stream.threads} exactly as configured. Apps that do
* override should supply replica count from wherever the deployment exposes it (e.g. the {@code
* REPLICA_COUNT} environment variable injected by the k8s template, or a HOCON config key).
*
* <p>Example:
*
* <pre>{@code
* @Override
* protected Optional<StreamThreadsCountResolver> getStreamThreadsCountResolver() {
* final Optional<Integer> replicaCount =
* ConfigUtils.optionalInteger(getAppConfig(), "replica.count");
* return Optional.of(
* new StreamThreadsCountResolver(
* StreamThreadsCountResolver.optionalReplicaCount(replicaCount)));
* }
* }</pre>
*
* <p>The configured numeric {@code num.stream.threads} flows through unchanged when:
*
* <ul>
* <li>{@link #DYNAMIC_NUM_STREAM_THREADS_CONFIG} is {@code false} or unset
* <li>this method returns empty (no resolver wired up)
* <li>this method throws
* <li>the resolver returns empty: replica count is non-positive, the AdminClient/calculator
* call throws, the topology contains a regex/pattern source, or no source partitions are
* resolvable on the broker
* </ul>
*/
protected Optional<StreamThreadsCountResolver> getStreamThreadsCountResolver() {
return Optional.empty();
}

// Caller must check the dynamic-enabled flag before invoking. Returns OptionalInt.empty() when
// no resolver is wired or resolution cannot produce a value — caller keeps the configured
// num.stream.threads as the fallback.
private OptionalInt resolveDynamicStreamThreads(Map<String, Object> streamsProperties) {
final Optional<StreamThreadsCountResolver> resolver;
try {
resolver = getStreamThreadsCountResolver();
} catch (Exception exception) {
getLogger()
.warn(
"getStreamThreadsCountResolver() threw; keeping configured num.stream.threads",
exception);
return OptionalInt.empty();
}
if (resolver.isEmpty()) {
getLogger()
.warn(
"{} is true but no StreamThreadsCountResolver is provided; keeping configured num.stream.threads",
DYNAMIC_NUM_STREAM_THREADS_CONFIG);
return OptionalInt.empty();
}
final OptionalInt resolved = resolver.get().resolve(this.topology, streamsProperties);
if (resolved.isEmpty()) {
getLogger()
.warn("Resolver could not compute dynamic num.stream.threads; keeping configured value");
}
return resolved;
}

private static boolean isDynamicNumStreamThreadsEnabled(
final Map<String, Object> streamsProperties) {
final Object value = streamsProperties.get(DYNAMIC_NUM_STREAM_THREADS_CONFIG);
return value != null && Boolean.parseBoolean(String.valueOf(value));
}

public Map<String, Object> getStreamsConfig(Config jobConfig) {
return new HashMap<>(ConfigUtils.getFlatMapConfig(jobConfig, getStreamsConfigKey()));
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,168 @@
package org.hypertrace.core.kafkastreams.framework.threading;

import static java.util.stream.Collectors.toUnmodifiableSet;

import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
import java.util.OptionalInt;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.DescribeTopicsResult;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.TopologyDescription;
import org.apache.kafka.streams.TopologyDescription.Source;
import org.apache.kafka.streams.TopologyDescription.Subtopology;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Computes a per-instance {@code num.stream.threads} value from a topology and the partition count
* of every source topic.
*
* <p>For each sub-topology the maximum partition count across its source topics is the number of
* stream tasks. Summing across sub-topologies and dividing by the replica count yields the threads
* each instance should run to keep all tasks active without idle threads.
*
* <p>Returns {@link OptionalInt#empty()} when the topology contains a regex/pattern subscription
* ({@link Source#topicPattern()}) — those sub-topologies cannot be enumerated against the broker
* up-front, so dynamic sizing would silently under-count tasks. The caller falls back to its
* configured default in that case.
*/
public class DynamicStreamThreadsCountCalculator {

private static final long DESCRIBE_TOPICS_TIMEOUT_MILLIS = Duration.ofSeconds(5).toMillis();
private static final Logger logger =
LoggerFactory.getLogger(DynamicStreamThreadsCountCalculator.class);

private static Set<String> sourceTopicsOf(final Subtopology subtopology) {
return subtopology.nodes().stream()
.filter(node -> node instanceof Source)
.map(node -> (Source) node)
.flatMap(source -> source.topicSet().stream())
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Curious whether topic pattern subscriptions are in scope here — source.topicSet() returns empty when topics are subscribed via regex, which would make those sub-topologies contribute 0 partitions. Is this a known limitation or something worth handling?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch — Source.topicSet() is empty for regex subscriptions and topicPattern() would be the way to handle them. None of the current Hypertrace Kafka Streams apps subscribe via regex, so I've documented this as a known limitation in the class javadoc rather than handling it now (would need an extra AdminClient.listTopics() + Pattern match round-trip). Apps that adopt regex shouldn't opt into DYNAMIC until that's added (736f37d).

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If any sub-topology uses a pattern subscription, the dynamic calculation is fundamentally incomplete — it will silently under-count tasks and under-provision threads. Rather than
documenting this and hoping apps don't hit it, the resolver should detect it and return empty:

for (final Subtopology subtopology : description.subtopologies()) {                                                                                                               
  boolean hasPatternSource = subtopology.nodes().stream()                                                                                                                         
      .filter(node -> node instanceof Source)                                                                                                                                     
      .map(node -> (Source) node)                                                                                                                                                 
      .anyMatch(source -> source.topicPattern() != null);                                                                                                                         
                                                                                                                                                                                  
  if (hasPatternSource) {                                                                                                                                                         
    logger.warn("Sub-topology uses pattern subscription; "                                                                                                                        
        + "dynamic thread calculation not supported. "                                                                                                                            
        + "Keeping configured num.stream.threads.");                                                                                                                              
    return OptionalInt.empty();                                                                                                                                                   
  }                                                                                                                                                                               
}  

And the caller simply doesn't override:

Optional<StreamThreadsCountResolver> resolver = getStreamThreadsCountResolver();                                                                                                  
if (resolver.isPresent()) {                                                                                                                                                       
  OptionalInt computed = resolver.get().resolve(topology, streamsConfig);                                                                                                         
  if (computed.isPresent()) {                                                                                                                                                     
    streamsConfig.put(NUM_STREAM_THREADS_CONFIG, computed.getAsInt());                                                                                                            
  }                                                                                                                                                                               
  // else: leave num.stream.threads as-is — configured value wins                                                                                                                 
} 

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch — bumping this from a doc note to detection is the right call. Currently a regex-only sub-topology silently contributes 0 tasks, and an all-regex topology lands on totalTasks == 0 → 1 thread (severe under-provisioning, no signal). Switched compute() to return OptionalInt, added the pattern pre-check, and the call site at the framework layer translates empty → FALLBACK_NUM_STREAM_THREADS via a single .orElse(...) (so all "cannot resolve" paths collapse there). Test added covering the regex-source path end-to-end (1c5c6ce).

.collect(toUnmodifiableSet());
}

private static boolean hasPatternSource(final Subtopology subtopology) {
return subtopology.nodes().stream()
.filter(node -> node instanceof Source)
.map(node -> (Source) node)
.anyMatch(source -> source.topicPattern() != null);
}

public OptionalInt compute(
final Topology topology, final AdminClient adminClient, final int replicas) {
if (replicas <= 0) {
throw new IllegalArgumentException("replicas must be positive, got " + replicas);
}

final TopologyDescription description = topology.describe();

// Bail out if any sub-topology subscribes via regex — topicSet() is empty for those, so
// dynamic sizing would silently under-count tasks. The caller substitutes its fallback.
final boolean anyPatternSource =
description.subtopologies().stream()
.anyMatch(DynamicStreamThreadsCountCalculator::hasPatternSource);
if (anyPatternSource) {
logger.warn(
"Topology contains a regex/pattern source; dynamic num.stream.threads is not supported. "
+ "Caller will fall back to its configured default.");
return OptionalInt.empty();
}

final Set<String> sourceTopics =
description.subtopologies().stream()
.flatMap(subtopology -> sourceTopicsOf(subtopology).stream())
.collect(toUnmodifiableSet());

final Map<String, Integer> partitionsByTopic = describePartitions(adminClient, sourceTopics);

int totalTasks = 0;
int subtopologyCount = 0;
for (final Subtopology subtopology : description.subtopologies()) {
subtopologyCount++;
final Set<String> subtopologyTopics = sourceTopicsOf(subtopology);

final int tasksForSubtopology =
subtopologyTopics.stream()
.mapToInt(topic -> partitionsByTopic.getOrDefault(topic, 0))
.max()
.orElse(0);

if (tasksForSubtopology == 0) {
logger.warn(
"Sub-topology has no resolvable partitions; topics={}. Pod restart will be needed once topics exist.",
subtopologyTopics);
}
totalTasks += tasksForSubtopology;
}

if (totalTasks == 0) {
logger.warn(
"No resolvable partitions across {} sub-topologies; skipping dynamic num.stream.threads.",
subtopologyCount);
return OptionalInt.empty();
}

final int threads = (int) Math.ceil((double) totalTasks / replicas);
logger.info(
"Dynamic num.stream.threads: totalTasks={} across {} sub-topologies, replicas={}, computed={}",
totalTasks,
subtopologyCount,
replicas,
threads);
return OptionalInt.of(threads);
}

// Single-loop implementation: AdminClient.describeTopics() already fires all RPCs concurrently
// before returning futures, so iteration here only consumes a shared deadline (now+timeout) —
// total wall-clock is capped at DESCRIBE_TOPICS_TIMEOUT_MILLIS regardless of topic count.
private Map<String, Integer> describePartitions(
final AdminClient adminClient, final Set<String> topics) {
if (topics.isEmpty()) {
return Map.of();
}
final DescribeTopicsResult result = adminClient.describeTopics(topics);
final Map<String, KafkaFuture<TopicDescription>> futures = result.topicNameValues();
final long deadlineMillis = System.currentTimeMillis() + DESCRIBE_TOPICS_TIMEOUT_MILLIS;
final Map<String, Integer> partitions = new HashMap<>();

for (final Entry<String, KafkaFuture<TopicDescription>> entry : futures.entrySet()) {
final long remainingMillis = deadlineMillis - System.currentTimeMillis();
if (remainingMillis <= 0) {
throw new RuntimeException(
"Timed out describing topics after " + DESCRIBE_TOPICS_TIMEOUT_MILLIS + "ms");
}
try {
partitions.put(
entry.getKey(),
entry.getValue().get(remainingMillis, TimeUnit.MILLISECONDS).partitions().size());
} catch (final TimeoutException timeoutException) {
throw new RuntimeException(
"Timed out describing topic " + entry.getKey(), timeoutException);
} catch (final InterruptedException interruptedException) {
Thread.currentThread().interrupt();
throw new RuntimeException(
"Interrupted while describing topic " + entry.getKey(), interruptedException);
} catch (final ExecutionException executionException) {
if (executionException.getCause() instanceof UnknownTopicOrPartitionException) {
logger.warn(
"Topic absent on broker: {}. Treating as 0 partitions; restart needed once created.",
entry.getKey());
partitions.put(entry.getKey(), 0);
} else {
throw new RuntimeException(
"Failed to describe topic " + entry.getKey(), executionException);
}
}
}
return Map.copyOf(partitions);
}
}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor suggestion — the two-phase approach (awaitAll + extract loop) works correctly but adds indirection that's harder to reason about at a glance. The swallowed ExecutionException in awaitAll looks like a bug on first read until you trace it to the per-topic handling below.

Since describeTopics() fires all requests concurrently at call time, a single loop with a shared deadline gives the same behavior — same total timeout, same parallelism — with a more linear flow:

private Map<String, Integer> describePartitions(AdminClient adminClient, Set<String> topics) {
  if (topics.isEmpty()) {
    return Map.of();
  }

  Map<String, KafkaFuture<TopicDescription>> futures =
      adminClient.describeTopics(topics).topicNameValues();

  long deadline = System.currentTimeMillis() + DESCRIBE_TOPICS_TIMEOUT_MILLIS;
  Map<String, Integer> partitions = new HashMap<>();

  for (Entry<String, KafkaFuture<TopicDescription>> entry : futures.entrySet()) {
    long remaining = deadline - System.currentTimeMillis();
    if (remaining <= 0) {
      throw new RuntimeException("Timed out describing topics");
    }
    try {
      partitions.put(
          entry.getKey(),
          entry.getValue().get(remaining, TimeUnit.MILLISECONDS).partitions().size());
    } catch (TimeoutException e) {
      throw new RuntimeException("Timed out describing topic " + entry.getKey(), e);
    } catch (InterruptedException e) {
      Thread.currentThread().interrupt();
      throw new RuntimeException("Interrupted describing topics", e);
    } catch (ExecutionException e) {
      if (e.getCause() instanceof UnknownTopicOrPartitionException) {
        logger.warn("Topic absent on broker: {}. Treating as 0 partitions.", entry.getKey());
        partitions.put(entry.getKey(), 0);
      } else {
        throw new RuntimeException("Failed to describe topic " + entry.getKey(), e);
      }
    }
  }
  return Map.copyOf(partitions);
}

This keeps exception handling in one place, removes the implicit "must call A before B" contract, and eliminates the awaitAll helper entirely. Happy to hear if there was a specific reason for the split — just flagging as a readability improvement. 🙂

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Verified equivalence and adopted (1c5c6ce). Kunal's earlier round-1 fix was specifically about the N×5s case where each future had its own independent timeout. Both allOf(...).get(timeout) and deadline = now + timeout; future.get(remaining, MS) share a single 5s budget across all topics — AdminClient.describeTopics() fires all RPCs concurrently before returning futures, so iteration order in the loop only consumes the shared budget, doesn't change parallelism. Pure readability swap: dropped awaitAll, kept the per-topic UnknownTopicOrPartitionException → 0 partitions handling.

Loading
Loading