|
4 | 4 | import static org.apache.kafka.streams.StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG; |
5 | 5 | import static org.apache.kafka.streams.StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG; |
6 | 6 | import static org.apache.kafka.streams.StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG; |
| 7 | +import static org.apache.kafka.streams.StreamsConfig.NUM_STREAM_THREADS_CONFIG; |
7 | 8 | import static org.apache.kafka.streams.StreamsConfig.ROCKSDB_CONFIG_SETTER_CLASS_CONFIG; |
8 | 9 | import static org.apache.kafka.streams.StreamsConfig.producerPrefix; |
9 | 10 | import static org.hamcrest.CoreMatchers.equalTo; |
10 | 11 | import static org.hamcrest.CoreMatchers.is; |
11 | 12 | import static org.hamcrest.MatcherAssert.assertThat; |
12 | 13 |
|
| 14 | +import com.typesafe.config.Config; |
13 | 15 | import io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde; |
14 | 16 | import java.util.Map; |
| 17 | +import java.util.Optional; |
15 | 18 | import java.util.Properties; |
16 | 19 | import org.apache.kafka.common.serialization.Serdes; |
17 | 20 | import org.apache.kafka.streams.TestInputTopic; |
18 | 21 | import org.apache.kafka.streams.TestOutputTopic; |
19 | 22 | import org.apache.kafka.streams.TopologyTestDriver; |
20 | 23 | import org.apache.kafka.streams.errors.LogAndContinueExceptionHandler; |
21 | 24 | import org.hypertrace.core.kafkastreams.framework.rocksdb.BoundedMemoryConfigSetter; |
| 25 | +import org.hypertrace.core.kafkastreams.framework.threading.StreamThreadsCountResolver; |
22 | 26 | import org.hypertrace.core.serviceframework.config.ConfigClientFactory; |
23 | 27 | import org.junit.jupiter.api.AfterEach; |
24 | 28 | import org.junit.jupiter.api.BeforeEach; |
@@ -84,4 +88,53 @@ public void baseStreamsConfigTest() { |
84 | 88 | is(LogAndContinueExceptionHandler.class)); |
85 | 89 | assertThat(baseStreamsConfig.get(producerPrefix(ACKS_CONFIG)), is("all")); |
86 | 90 | } |
| 91 | + |
| 92 | + // Verifies the fix for "DYNAMIC sentinel could leak to Kafka Streams config when no resolver is |
| 93 | + // wired up" — without resolver override, the framework must replace DYNAMIC with the integer |
| 94 | + // fallback so Kafka Streams receives a parseable value. |
| 95 | + @Test |
| 96 | + public void dynamicWithoutResolverFallsBackToEight() { |
| 97 | + SampleApp dynamicApp = |
| 98 | + new SampleApp(ConfigClientFactory.getClient()) { |
| 99 | + @Override |
| 100 | + public Map<String, Object> getStreamsConfig(Config jobConfig) { |
| 101 | + Map<String, Object> properties = super.getStreamsConfig(jobConfig); |
| 102 | + properties.put(NUM_STREAM_THREADS_CONFIG, StreamThreadsCountResolver.DYNAMIC_SENTINEL); |
| 103 | + return properties; |
| 104 | + } |
| 105 | + }; |
| 106 | + |
| 107 | + dynamicApp.doInit(); |
| 108 | + |
| 109 | + assertThat( |
| 110 | + dynamicApp.streamsConfig.get(NUM_STREAM_THREADS_CONFIG), |
| 111 | + is(StreamThreadsCountResolver.FALLBACK_NUM_STREAM_THREADS)); |
| 112 | + } |
| 113 | + |
| 114 | + // Verifies the fix for "exception while obtaining the resolver should not leak DYNAMIC" — when |
| 115 | + // getStreamThreadsCountResolver() throws, the framework must catch and fall back to the integer |
| 116 | + // default rather than letting the literal sentinel reach Kafka Streams. |
| 117 | + @Test |
| 118 | + public void dynamicWithThrowingResolverFallsBackToEight() { |
| 119 | + SampleApp dynamicApp = |
| 120 | + new SampleApp(ConfigClientFactory.getClient()) { |
| 121 | + @Override |
| 122 | + public Map<String, Object> getStreamsConfig(Config jobConfig) { |
| 123 | + Map<String, Object> properties = super.getStreamsConfig(jobConfig); |
| 124 | + properties.put(NUM_STREAM_THREADS_CONFIG, StreamThreadsCountResolver.DYNAMIC_SENTINEL); |
| 125 | + return properties; |
| 126 | + } |
| 127 | + |
| 128 | + @Override |
| 129 | + protected Optional<StreamThreadsCountResolver> getStreamThreadsCountResolver() { |
| 130 | + throw new RuntimeException("simulated wiring failure"); |
| 131 | + } |
| 132 | + }; |
| 133 | + |
| 134 | + dynamicApp.doInit(); |
| 135 | + |
| 136 | + assertThat( |
| 137 | + dynamicApp.streamsConfig.get(NUM_STREAM_THREADS_CONFIG), |
| 138 | + is(StreamThreadsCountResolver.FALLBACK_NUM_STREAM_THREADS)); |
| 139 | + } |
87 | 140 | } |
0 commit comments