Skip to content

Commit 472fc35

Browse files
added flush memtable functionality
1 parent 86ce8af commit 472fc35

5 files changed

Lines changed: 110 additions & 5 deletions

File tree

lib/error.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -308,6 +308,8 @@ const (
308308
CodeInvalidMerkleTreeProof ErrorCode = 11
309309
CodeGarbageCollectDB ErrorCode = 12
310310
CodeSetEntry ErrorCode = 13
311+
CodeReadBytes ErrorCode = 14
312+
CodeFlushMemTable ErrorCode = 15
311313

312314
RPCModule ErrorModule = "rpc"
313315
CodeRPCTimeout ErrorCode = 1

store/error.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,3 +57,11 @@ func ErrInvalidMerkleTree() lib.ErrorI {
5757
func ErrInvalidMerkleTreeProof() lib.ErrorI {
5858
return lib.NewError(lib.CodeInvalidMerkleTreeProof, lib.StorageModule, "merkle tree proof is invalid")
5959
}
60+
61+
func ErrReadBytes(err error) lib.ErrorI {
62+
return lib.NewError(lib.CodeReadBytes, lib.StorageModule, fmt.Sprintf("random read bytes failed with err: %s", err.Error()))
63+
}
64+
65+
func ErrFlushMemTable(err error) lib.ErrorI {
66+
return lib.NewError(lib.CodeFlushMemTable, lib.StorageModule, fmt.Sprintf("flush memtable failed with err: %s", err.Error()))
67+
}

store/store.go

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -134,7 +134,6 @@ func (s *Store) NewReadOnly(queryVersion uint64) (lib.StoreI, lib.ErrorI) {
134134
var useHistorical bool
135135
// if the query version is older than the partition frequency
136136
if queryVersion+1 < partitionHeight(s.version) {
137-
fmt.Println("USING HISTORICAL")
138137
useHistorical = true
139138
}
140139
// make a reader for the specified version
@@ -283,7 +282,7 @@ func (s *Store) Partition() {
283282
// if the item is 'deleted'
284283
if it.Deleted() {
285284
// set the item as deleted at the partition height and discard earlier versions
286-
if e := writer.SetEntryAt(newEntry(lib.Append([]byte(latestStatePrefix), k), nil, badgerDeleteBit), it.Version()); e != nil {
285+
if e := writer.SetEntryAt(newEntry(lib.Append([]byte(latestStatePrefix), k), nil, badgerDeleteBit|badgerDiscardEarlierVersions), it.Version()); e != nil {
287286
return ErrSetEntry(e)
288287
}
289288
} else {
@@ -296,7 +295,7 @@ func (s *Store) Partition() {
296295
return ErrSetEntry(e)
297296
}
298297
// re-write the latest version with the 'discard' flag set
299-
if e := writer.SetEntryAt(newEntry(lib.Append([]byte(latestStatePrefix), k), v, badgerNoDiscardBit), snapshotHeight); e != nil {
298+
if e := writer.SetEntryAt(newEntry(lib.Append([]byte(latestStatePrefix), k), v, badgerDiscardEarlierVersions), snapshotHeight); e != nil {
300299
return ErrSetEntry(e)
301300
}
302301
}
@@ -311,11 +310,14 @@ func (s *Store) Partition() {
311310
}
312311
// if the partition height is past the partition frequency, set the discardTs at the partition height-1
313312
if snapshotHeight > partitionFrequency {
314-
fmt.Println("SETTING DISCARD TS @", snapshotHeight-2)
315313
sc.db.SetDiscardTs(snapshotHeight - 2)
316314
}
317315
// if the GC isn't already running
318316
if !s.isGarbageCollecting.Swap(true) {
317+
// force the mem table to flush to the LSM
318+
if err = FlushMemTable(sc.db); err != nil {
319+
return err
320+
}
319321
// run LSM compaction
320322
if fe := sc.db.Flatten(32); fe != nil {
321323
return ErrGarbageCollectDB(fe)

store/store_test.go

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -391,6 +391,71 @@ func TestPartitionIntegration(t *testing.T) {
391391
require.Len(t, deletedKeys, 0)
392392
}
393393

394+
func TestFlushMemtable(t *testing.T) {
395+
// create temp directory for test db
396+
tempDir := t.TempDir()
397+
path := filepath.Join(tempDir, "test-db")
398+
399+
// set up db with a configuration that will trigger a GC after multiple partitions
400+
db, err := badger.OpenManaged(badger.DefaultOptions(path).WithNumVersionsToKeep(math.MaxInt64).
401+
WithValueThreshold(1).WithLoggingLevel(badger.INFO).WithMemTableSize(int64(units.GB)))
402+
require.NoError(t, err)
403+
// set up the store
404+
store, err := NewStoreWithDB(db, nil, lib.NewDefaultLogger(), true)
405+
require.NoError(t, err)
406+
defer func() {
407+
require.NoError(t, store.Close())
408+
}()
409+
410+
// insert large dataset at various heights before partition boundary
411+
var k []byte
412+
keys := [][]byte{
413+
bytes.Repeat([]byte("0"), 32),
414+
bytes.Repeat([]byte("1"), 32),
415+
bytes.Repeat([]byte("2"), 32),
416+
bytes.Repeat([]byte("3"), 32),
417+
bytes.Repeat([]byte("4"), 32),
418+
bytes.Repeat([]byte("5"), 32),
419+
bytes.Repeat([]byte("6"), 32),
420+
bytes.Repeat([]byte("7"), 32),
421+
bytes.Repeat([]byte("8"), 32),
422+
bytes.Repeat([]byte("9"), 32),
423+
}
424+
iterations := 1000
425+
for i := 1; i < iterations; i++ {
426+
// if not the first iteration
427+
if i != 0 {
428+
// delete the last key in the db
429+
require.NoError(t, store.Delete(k))
430+
}
431+
// use a key to be updated repeatedly to generate versions
432+
k = keys[i%10]
433+
434+
// set the value in the db
435+
require.NoError(t, store.Set(k, nil))
436+
437+
// commit regularly to create multiple versions
438+
_, err = store.Commit()
439+
require.NoError(t, err)
440+
}
441+
db.SetDiscardTs(uint64(iterations - 1))
442+
require.NoError(t, FlushMemTable(db))
443+
require.NoError(t, db.Flatten(32))
444+
445+
// use an archive iterator to iterate through the deleted keys
446+
iterator, err := store.lss.ArchiveIterator(nil)
447+
require.NoError(t, err)
448+
it := iterator.(*Iterator)
449+
defer it.Close()
450+
451+
numKeys := 0
452+
for ; it.Valid(); it.Next() {
453+
numKeys++
454+
}
455+
456+
require.True(t, numKeys == 1)
457+
}
458+
394459
func TestHistoricalPrefix(t *testing.T) {
395460
tests := []struct {
396461
name string

store/wrapper_txn.go

Lines changed: 29 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@ package store
22

33
import (
44
"bytes"
5+
"crypto/rand"
6+
"math"
57
"reflect"
68
"unsafe"
79

@@ -25,7 +27,7 @@ const (
2527
badgerDiscardEarlierVersions byte = 1 << 2 // badgerDB 'discard earlier versions' flag
2628
badgerDeleteBit byte = 1 << 0 // badgerDB 'tombstoned' flag
2729
badgerNoDiscardBit byte = 1 << 3 // badgerDB 'never discard' bit
28-
badgerGCRatio = .000001 // the ratio when badgerDB will run the garbage collector
30+
badgerGCRatio = .15 // the ratio when badgerDB will run the garbage collector
2931
badgerSizeFieldName = "size" // badgerDB Txn 'size' field name
3032
badgerCountFieldName = "count" // badgerDB Txn 'count' field name
3133
badgerTxnFieldName = "txn" // badgerDB WriteBatch 'txn' field name
@@ -206,6 +208,32 @@ func newEntry(key, value []byte, meta byte) (e *badger.Entry) {
206208
return
207209
}
208210

211+
// FlushMemTable() ensures badgerDB is flushing its mem table before running flatten
212+
// IMPORTANT - discardTs must be set before this
213+
func FlushMemTable(db *badger.DB) lib.ErrorI {
214+
// get random 32 bytes
215+
randomPrefix := make([]byte, 32)
216+
if _, err := rand.Read(randomPrefix); err != nil {
217+
return ErrReadBytes(err)
218+
}
219+
// create a new transaction to write to the database
220+
tx := db.NewTransactionAt(math.MaxUint64, true)
221+
// write the random prefix to the database
222+
if err := tx.Set(randomPrefix, nil); err != nil {
223+
return ErrSetEntry(err)
224+
}
225+
// commit the transaction
226+
if err := tx.CommitAt(math.MaxUint64, nil); err != nil {
227+
return ErrCommitDB(err)
228+
}
229+
// call drop prefix which triggers the mempool flush
230+
// NOTE: this only works if an actual prefix exists
231+
if err := db.DropPrefix(randomPrefix); err != nil {
232+
return ErrFlushMemTable(err)
233+
}
234+
return nil
235+
}
236+
209237
// setMeta() accesses the private field 'meta' of badgerDB's `Entry`
210238
// badger doesn't yet allow users to explicitly set keys as *do not discard*
211239
// https://github.com/hypermodeinc/badger/issues/2192

0 commit comments

Comments
 (0)