Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 54 additions & 3 deletions storage/src/rocksdb_storage/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,8 @@ use lazy_static::lazy_static;
#[cfg(feature = "unsafe-dump-load")]
use rocksdb::IngestExternalFileOptions;
use rocksdb::{
checkpoint::Checkpoint, ColumnFamily, ColumnFamilyDescriptor, OptimisticTransactionDB,
Transaction, WriteBatchWithTransaction, DEFAULT_COLUMN_FAMILY_NAME,
checkpoint::Checkpoint, ColumnFamily, ColumnFamilyDescriptor, FlushOptions,
OptimisticTransactionDB, Transaction, WriteBatchWithTransaction, DEFAULT_COLUMN_FAMILY_NAME,
};

use super::{PrefixedRocksDbImmediateStorageContext, PrefixedRocksDbTransactionContext};
Expand Down Expand Up @@ -611,7 +611,19 @@ impl<'db> Storage<'db> for RocksDbStorage {
}

fn flush(&self) -> Result<(), Error> {
self.db.flush().map_err(RocksDBError)
// Flush all column families: `set_atomic_flush(true)` requires it, and a
// default-only flush leaves the roots/meta/aux memtables unpersisted.
self.db
.flush_cfs_opt(
&[
cf_default(&self.db),
cf_aux(&self.db),
cf_roots(&self.db),
cf_meta(&self.db),
],
&FlushOptions::default(),
)
.map_err(RocksDBError)
}

fn get_transactional_storage_context<'b, B>(
Expand Down Expand Up @@ -714,6 +726,13 @@ fn cf_meta(storage: &Db) -> &ColumnFamily {
.expect("meta column family must exist")
}

/// Get the default column family
fn cf_default(storage: &Db) -> &ColumnFamily {
storage
.cf_handle(DEFAULT_COLUMN_FAMILY_NAME)
.expect("default column family must exist")
}

#[cfg(test)]
mod tests {
use super::*;
Expand All @@ -723,6 +742,38 @@ mod tests {
};
use grovedb_path::SubtreePath;

#[test]
fn flush_persists_all_column_families() {
let storage = TempStorage::new();
let db = &storage.db;

// Populate every column family.
db.put_cf(cf_default(db), b"kd", b"vd")
.expect("default put");
db.put_cf(cf_aux(db), b"ka", b"va").expect("aux put");
db.put_cf(cf_roots(db), b"kr", b"vr").expect("roots put");
db.put_cf(cf_meta(db), b"km", b"vm").expect("meta put");

storage.flush().expect("flush");

// Every CF's active memtable must now be empty (data flushed to SST).
for (name, cf) in [
(DEFAULT_COLUMN_FAMILY_NAME, cf_default(db)),
(AUX_CF_NAME, cf_aux(db)),
(ROOTS_CF_NAME, cf_roots(db)),
(META_CF_NAME, cf_meta(db)),
] {
let entries = db
.property_int_value_cf(cf, "rocksdb.num-entries-active-mem-table")
.expect("memtable property read")
.expect("memtable property present");
assert_eq!(
entries, 0,
"column family '{name}' was not flushed: {entries} entries still in the memtable",
);
}
}

#[test]
fn test_build_prefix() {
let path_a = [b"aa".as_ref(), b"b"];
Expand Down
Loading