Skip to content

Commit 4c2ad12

Browse files
committed
Add Disable WAL support
1 parent 0aa8c44 commit 4c2ad12

2 files changed

Lines changed: 9 additions & 0 deletions

File tree

kafka-streams-framework/src/main/java/org/hypertrace/core/kafkastreams/framework/rocksdb/BoundedMemoryConfigSetter.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
import static org.hypertrace.core.kafkastreams.framework.rocksdb.RocksDBConfigs.MAX_SIZE_AMPLIFICATION_PERCENT;
99
import static org.hypertrace.core.kafkastreams.framework.rocksdb.RocksDBConfigs.OPTIMIZE_FOR_POINT_LOOKUPS;
1010
import static org.hypertrace.core.kafkastreams.framework.rocksdb.RocksDBConfigs.PERIODIC_COMPACTION_SECONDS;
11+
import static org.hypertrace.core.kafkastreams.framework.rocksdb.RocksDBConfigs.WAL_DISABLED;
1112

1213
import java.util.Map;
1314
import org.apache.kafka.streams.state.RocksDBConfigSetter;
@@ -17,6 +18,7 @@
1718
import org.rocksdb.CompressionType;
1819
import org.rocksdb.InfoLogLevel;
1920
import org.rocksdb.Options;
21+
import org.rocksdb.WriteOptions;
2022

2123
public class BoundedMemoryConfigSetter implements RocksDBConfigSetter {
2224

@@ -52,6 +54,12 @@ public void setConfig(String storeName, Options options, Map<String, Object> con
5254
options.setUseDirectReads(Boolean.valueOf(String.valueOf(configs.get(DIRECT_READS_ENABLED))));
5355
}
5456

57+
if (configs.containsKey(WAL_DISABLED)) {
58+
options.setWalDir("");
59+
WriteOptions writeOptions = new WriteOptions();
60+
writeOptions.setDisableWAL(true);
61+
}
62+
5563
if (configs.containsKey(OPTIMIZE_FOR_POINT_LOOKUPS)) {
5664
Boolean optimizeForPointLookups =
5765
Boolean.valueOf(String.valueOf(configs.get(OPTIMIZE_FOR_POINT_LOOKUPS)));

kafka-streams-framework/src/main/java/org/hypertrace/core/kafkastreams/framework/rocksdb/RocksDBConfigs.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ public class RocksDBConfigs {
3636
public static final String DIRECT_READS_ENABLED = rocksdbPrefix("direct.reads.enabled");
3737
public static final String OPTIMIZE_FOR_POINT_LOOKUPS = rocksdbPrefix("optimize.point.lookups");
3838
public static final String LOG_LEVEL_CONFIG = rocksdbPrefix("log.level");
39+
public static final String WAL_DISABLED = rocksdbPrefix("optimize.point.lookups");
3940

4041
public static String rocksdbPrefix(String configKey) {
4142
return ROCKS_DB_PREFIX + configKey;

0 commit comments

Comments
 (0)