Skip to content

Commit 9187236

Browse files
Expand getStreamThreadsCountResolver() javadoc with override example
Adds a concrete code snippet showing how a consumer app wires its own replica-count source through StreamThreadsCountResolver, so adopters don't have to read the calculator/resolver internals to integrate. Also clarifies that apps not overriding the hook are unaffected. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
1 parent 05ba77b commit 9187236

1 file changed

Lines changed: 19 additions & 1 deletion

File tree

  • kafka-streams-framework/src/main/java/org/hypertrace/core/kafkastreams/framework

kafka-streams-framework/src/main/java/org/hypertrace/core/kafkastreams/framework/KafkaStreamsApp.java

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -264,9 +264,27 @@ public abstract StreamsBuilder buildTopology(
264264

265265
/**
266266
* Override in subclasses that want auto-sized {@code num.stream.threads}. Return a resolver
267-
* configured with this app's replica count source. The framework only invokes this when {@code
267+
* configured with this app's replica-count source. The framework only invokes this when {@code
268268
* num.stream.threads} is set to {@link StreamThreadsCountResolver#DYNAMIC_SENTINEL}; returning an
269269
* empty optional disables dynamic resolution and the app keeps whatever value was configured.
270+
*
271+
* <p>Apps that don't override this are unaffected — the default returns {@code Optional.empty()}
272+
* and the framework leaves {@code num.stream.threads} exactly as configured. Apps that do
273+
* override should supply replica count from wherever the deployment exposes it (e.g. the {@code
274+
* REPLICA_COUNT} environment variable injected by the k8s template, or a HOCON config key).
275+
*
276+
* <p>Example:
277+
*
278+
* <pre>{@code
279+
* @Override
280+
* protected Optional<StreamThreadsCountResolver> getStreamThreadsCountResolver() {
281+
* final Optional<Integer> replicaCount =
282+
* ConfigUtils.optionalInteger(getAppConfig(), "replica.count");
283+
* return Optional.of(
284+
* new StreamThreadsCountResolver(
285+
* StreamThreadsCountResolver.optionalReplicaCount(replicaCount)));
286+
* }
287+
* }</pre>
270288
*/
271289
protected Optional<StreamThreadsCountResolver> getStreamThreadsCountResolver() {
272290
return Optional.empty();

0 commit comments

Comments
 (0)