Skip to content

Commit a880961

Browse files
cleanup partition changes
1 parent 729097c commit a880961

4 files changed

Lines changed: 111 additions & 79 deletions

File tree

store/store.go

Lines changed: 4 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,7 @@ func NewStore(path string, metrics *lib.Metrics, log lib.LoggerI) (lib.StoreI, l
9090
// single batch. It is seemingly unknown why the 10% limit is set
9191
// https://discuss.dgraph.io/t/discussion-badgerdb-should-offer-arbitrarily-sized-atomic-transactions/8736
9292
db, err := badger.OpenManaged(badger.DefaultOptions(path).WithNumVersionsToKeep(math.MaxInt64).
93-
WithLoggingLevel(badger.DEBUG).WithMemTableSize(int64(1*units.GB + 280*units.MB)))
93+
WithLoggingLevel(badger.ERROR).WithMemTableSize(int64(1*units.GB + 280*units.MB)))
9494
if err != nil {
9595
return nil, ErrOpenDB(err)
9696
}
@@ -272,27 +272,19 @@ func (s *Store) Partition() {
272272
k, v := it.Key(), it.Value()
273273
// skip items that are already marked for Garbage Collection
274274
if deDuplicator.Found(lib.BytesToString(k)) {
275-
// set the item as prunable
276-
if e := writer.SetEntryAt(newEntry(lib.Append([]byte(latestStatePrefix), k), nil, badgerDeleteBit), it.Version()); e != nil {
277-
return ErrSetEntry(e)
278-
}
279275
continue
280276
}
281277
// if the item is 'deleted'
282278
if it.Deleted() {
283279
// set the item as deleted at the partition height and discard earlier versions
284-
if e := writer.SetEntryAt(newEntry(lib.Append([]byte(latestStatePrefix), k), nil, badgerDeleteBit|badgerDiscardEarlierVersions), it.Version()); e != nil {
280+
if e := writer.SetEntryAt(newEntry(lib.Append([]byte(latestStatePrefix), k), nil, badgerDeleteBit|badgerDiscardEarlierVersions), snapshotHeight); e != nil {
285281
return ErrSetEntry(e)
286282
}
287283
} else {
288284
// set item in the historical partition
289285
if e := writer.SetEntryAt(newEntry(lib.Append(partitionPrefix, k), v, badgerNoDiscardBit), snapshotHeight); e != nil {
290286
return ErrSetEntry(e)
291287
}
292-
// set the item as prunable
293-
if e := writer.SetEntryAt(newEntry(lib.Append([]byte(latestStatePrefix), k), nil, badgerDeleteBit), it.Version()); e != nil {
294-
return ErrSetEntry(e)
295-
}
296288
// re-write the latest version with the 'discard' flag set
297289
if e := writer.SetEntryAt(newEntry(lib.Append([]byte(latestStatePrefix), k), v, badgerDiscardEarlierVersions), snapshotHeight); e != nil {
298290
return ErrSetEntry(e)
@@ -363,7 +355,7 @@ func (s *Store) Set(k, v []byte) lib.ErrorI {
363355
return err
364356
}
365357
// set in the state store @ historical partition
366-
if err := s.hss.SetNonPruneable(k, v); err != nil {
358+
if err := s.hss.Set(k, v); err != nil {
367359
return err
368360
}
369361
// set in the state commit store
@@ -377,7 +369,7 @@ func (s *Store) Delete(k []byte) lib.ErrorI {
377369
return err
378370
}
379371
// delete from the state store @ historical partition
380-
if err := s.hss.DeleteNonPrunable(k); err != nil {
372+
if err := s.hss.Tombstone(k); err != nil {
381373
return err
382374
}
383375
// delete from the state commit store

store/store_test.go

Lines changed: 84 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -391,14 +391,96 @@ func TestPartitionIntegration(t *testing.T) {
391391
require.Len(t, deletedKeys, 0)
392392
}
393393

394+
func TestBadgerBitBehavior(t *testing.T) {
395+
tests := []struct {
396+
name string
397+
nonPruneable bool
398+
finalBit byte
399+
expectedKeyCount int
400+
}{
401+
{
402+
name: "discard earlier versions bit",
403+
finalBit: badgerDiscardEarlierVersions,
404+
expectedKeyCount: 1,
405+
},
406+
{
407+
name: "delete bit",
408+
finalBit: badgerDeleteBit,
409+
expectedKeyCount: 0,
410+
},
411+
{
412+
name: "discard earlier versions bit",
413+
finalBit: badgerDiscardEarlierVersions | badgerNoDiscardBit,
414+
expectedKeyCount: 1000,
415+
},
416+
{
417+
name: "delete bit",
418+
finalBit: badgerDeleteBit | badgerNoDiscardBit,
419+
expectedKeyCount: 1000,
420+
},
421+
}
422+
for _, test := range tests {
423+
t.Run(test.name, func(t *testing.T) {
424+
// create temp directory for test db
425+
tempDir := t.TempDir()
426+
path := filepath.Join(tempDir, "test-db")
427+
428+
// set up db with a configuration that will trigger a GC after multiple partitions
429+
db, err := badger.OpenManaged(badger.DefaultOptions(path).WithNumVersionsToKeep(math.MaxInt64).
430+
WithLoggingLevel(badger.INFO).WithMemTableSize(int64(units.GB)))
431+
require.NoError(t, err)
432+
// set up the store
433+
store, err := NewStoreWithDB(db, nil, lib.NewDefaultLogger(), true)
434+
require.NoError(t, err)
435+
defer func() {
436+
require.NoError(t, store.Close())
437+
}()
438+
439+
// insert large dataset at various heights before partition boundary
440+
k := []byte("my_key")
441+
iterations := 1000
442+
for i := 0; i < iterations; i++ {
443+
// set the value in the db
444+
require.NoError(t, store.lss.Tombstone(k)) // tombstoned but mark keep
445+
446+
// commit regularly to create multiple versions
447+
_, err = store.Commit()
448+
require.NoError(t, err)
449+
}
450+
tx := db.NewTransactionAt(uint64(iterations), true)
451+
// set an entry with a bit that marks it as deleted and prevents it from being discarded
452+
require.NoError(t, tx.SetEntry(newEntry(lib.Append([]byte(latestStatePrefix), k), nil, test.finalBit)))
453+
require.NoError(t, tx.CommitAt(uint64(iterations), nil))
454+
455+
db.SetDiscardTs(uint64(iterations))
456+
457+
require.NoError(t, FlushMemTable(db))
458+
require.NoError(t, db.Flatten(1))
459+
460+
// use an archive iterator to iterate through the deleted keys
461+
iterator, err := store.lss.ArchiveIterator(nil)
462+
require.NoError(t, err)
463+
it := iterator.(*Iterator)
464+
defer it.Close()
465+
466+
numKeys := 0
467+
for ; it.Valid(); it.Next() {
468+
numKeys++
469+
}
470+
471+
require.Equal(t, test.expectedKeyCount, numKeys)
472+
})
473+
}
474+
}
475+
394476
func TestFlushMemtable(t *testing.T) {
395477
// create temp directory for test db
396478
tempDir := t.TempDir()
397479
path := filepath.Join(tempDir, "test-db")
398480

399481
// set up db with a configuration that will trigger a GC after multiple partitions
400482
db, err := badger.OpenManaged(badger.DefaultOptions(path).WithNumVersionsToKeep(math.MaxInt64).
401-
WithValueThreshold(1).WithLoggingLevel(badger.INFO).WithMemTableSize(int64(units.GB)))
483+
WithLoggingLevel(badger.INFO).WithMemTableSize(int64(units.GB)))
402484
require.NoError(t, err)
403485
// set up the store
404486
store, err := NewStoreWithDB(db, nil, lib.NewDefaultLogger(), true)
@@ -453,7 +535,7 @@ func TestFlushMemtable(t *testing.T) {
453535
numKeys++
454536
}
455537

456-
require.True(t, numKeys == 1)
538+
require.Equal(t, 1, numKeys)
457539
}
458540

459541
func TestHistoricalPrefix(t *testing.T) {

store/wrapper_txn.go

Lines changed: 23 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -12,16 +12,29 @@ import (
1212
)
1313

1414
const (
15+
// ----------------------------------------------------------------------------------------------------------------
1516
// BadgerDB garbage collector behavior is not well documented leading to many open issues in their repository
1617
// However, here is our current understanding based on experimentation
17-
// ----------------------------------------------------------------------------------------------------------
18-
// BadgerDB Garbage Collection Rules listed by priority
19-
// 1. MergeOp Bit prevents garbage collection of a key regardless
20-
// 2. Nothing above DiscardTs will be garbage collected
21-
// 3. Deleting, expiring, and setting ‘Discard earlier versions’ signals the removal of older versions of a key
22-
// 4. Deleting or expiring a key makes it eligible for GC
23-
// 5. If KeepNumVersions is set to max int it will prevent automatic GC of older versions
24-
// ------------------------------------------------------------------------------
18+
// ----------------------------------------------------------------------------------------------------------------
19+
// 1. Manual Keep (Protection)
20+
// - `badgerNoDiscardBit` prevents automatic GC of a key version.
21+
// - However, it can be manually superseded by a manual removal
22+
//
23+
// 2. Manual Remove (Explicit Deletion or Pruning)
24+
// - Deleting a key at a higher ts removes earlier versions once `discardTs >= ts`.
25+
// - Setting `badgerDiscardEarlierVersions` is similar, except it retains the current version.
26+
//
27+
// 3. Auto Remove – Tombstones
28+
// - Deleted keys (tombstoned) <= `discardTs` are automatically purged unless protected by `badgerNoDiscardBit`
29+
//
30+
// 4. Auto Remove – Set Entries
31+
// - For non-deleted (live) keys, Badger retains the number of versions to retain is defined by `KeepNumVersions`.
32+
// - Older versions exceeding this count are automatically eligible for GC.
33+
//
34+
// Note:
35+
// - The first GC pass after updating `discardTs` and flushing memtable is deterministic
36+
// - Subsequent GC runs are probabilistic, depending on reclaimable space and value log thresholds
37+
// ----------------------------------------------------------------------------------------------------------------
2538
// Bits source: https://github.com/hypermodeinc/badger/blob/85389e88bf308c1dc271383b77b67f4ef4a85194/value.go#L37
2639
badgerMetaFieldName = "meta" // badgerDB Entry 'meta' field name
2740
badgerDiscardEarlierVersions byte = 1 << 2 // badgerDB 'discard earlier versions' flag
@@ -85,17 +98,8 @@ func (t *TxnWrapper) Delete(k []byte) lib.ErrorI {
8598
return nil
8699
}
87100

88-
// Set() stores the key-value pair in the BadgerDB transaction
89-
func (t *TxnWrapper) SetNonPruneable(k, v []byte) lib.ErrorI {
90-
// set an entry with a bit that prevents it from being discarded
91-
if err := t.db.SetEntry(newEntry(lib.Append(t.prefix, k), v, badgerNoDiscardBit)); err != nil {
92-
return ErrStoreSet(err)
93-
}
94-
return nil
95-
}
96-
97-
// DeleteNonPrunable() removes the key-value pair from the BadgerDB transaction but prevents it from being garbage collected
98-
func (t *TxnWrapper) DeleteNonPrunable(k []byte) lib.ErrorI {
101+
// Tombstone() removes the key-value pair from the BadgerDB transaction but prevents it from being garbage collected
102+
func (t *TxnWrapper) Tombstone(k []byte) lib.ErrorI {
99103
// set an entry with a bit that marks it as deleted and prevents it from being discarded
100104
if err := t.db.SetEntry(newEntry(lib.Append(t.prefix, k), nil, badgerDeleteBit|badgerNoDiscardBit)); err != nil {
101105
return ErrStoreDelete(err)

store/wrapper_txn_test.go

Lines changed: 0 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -28,52 +28,6 @@ func TestGetSetDelete(t *testing.T) {
2828
require.Contains(t, er.Error(), badger.ErrKeyNotFound.Error())
2929
}
3030

31-
func TestSetDeleteNonPruneable(t *testing.T) {
32-
db, store, cleanup := newTestTxnWrapper(t)
33-
defer cleanup()
34-
// first set 'a' and 'b' at height 1
35-
require.NoError(t, store.SetNonPruneable([]byte("a"), []byte("a")))
36-
require.NoError(t, store.SetNonPruneable([]byte("b"), []byte("b")))
37-
require.NoError(t, store.db.CommitAt(1, nil))
38-
// delete 'b' at height 2
39-
tx := db.NewTransactionAt(1, true)
40-
nextStore := NewTxnWrapper(tx, lib.NewDefaultLogger(), []byte(latestStatePrefix))
41-
require.NoError(t, nextStore.DeleteNonPrunable([]byte("b")))
42-
require.NoError(t, nextStore.db.CommitAt(2, nil))
43-
// read at height 2
44-
tx2 := db.NewTransactionAt(2, true)
45-
nextStore2 := NewTxnWrapper(tx2, lib.NewDefaultLogger(), []byte(latestStatePrefix))
46-
defer nextStore2.Close()
47-
// use an archive iterator
48-
iterator, err := nextStore2.ArchiveIterator(nil)
49-
require.NoError(t, err)
50-
it := iterator.(*Iterator)
51-
defer it.Close()
52-
var count int
53-
// check the expected values of each item of the archive iterator and the order
54-
// including the pruning and delete bits
55-
for ; it.Valid(); it.Next() {
56-
require.True(t, getMeta(it.parent.Item())&badgerNoDiscardBit != 0)
57-
switch count {
58-
case 0:
59-
require.Equal(t, it.Key(), []byte("a"))
60-
require.Equal(t, it.Value(), []byte("a"))
61-
require.False(t, getMeta(it.parent.Item())&badgerDeleteBit != 0)
62-
case 1:
63-
require.Equal(t, it.Key(), []byte("b"))
64-
require.Equal(t, it.Value(), []byte(nil))
65-
require.True(t, getMeta(it.parent.Item())&badgerDeleteBit != 0)
66-
case 2:
67-
require.Equal(t, it.Key(), []byte("b"))
68-
require.Equal(t, it.Value(), []byte("b"))
69-
require.False(t, getMeta(it.parent.Item())&badgerDeleteBit != 0)
70-
}
71-
count++
72-
}
73-
// ensure 3 keys
74-
require.Equal(t, count, 3)
75-
}
76-
7731
func TestIteratorBasic(t *testing.T) {
7832
_, parent, cleanup := newTestTxnWrapper(t)
7933
defer cleanup()

0 commit comments

Comments
 (0)