Skip to content

Commit f591d99

Browse files
committed
fix: smt fixes and memory efficiencies
1 parent 22eedac commit f591d99

5 files changed

Lines changed: 189 additions & 65 deletions

File tree

controller/block.go

Lines changed: 33 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -277,12 +277,15 @@ func (c *Controller) CommitCertificate(qc *lib.QuorumCertificate, block *lib.Blo
277277
// exit with error
278278
return
279279
}
280-
// delete each transaction from the mempool
281-
c.Mempool.DeleteTransaction(block.Transactions...)
280+
syncing := c.isSyncing.Load()
281+
if !syncing {
282+
// delete each transaction from the mempool
283+
c.Mempool.DeleteTransaction(block.Transactions...)
284+
}
282285
// parse committed block for straw polls
283286
c.FSM.ParsePollTransactions(blockResult)
284287
// if self was the proposer
285-
if bytes.Equal(qc.ProposerKey, c.PublicKey) && !c.isSyncing.Load() {
288+
if bytes.Equal(qc.ProposerKey, c.PublicKey) && !syncing {
286289
// send the certificate results transaction on behalf of the quorum
287290
c.SendCertificateResultsTx(qc)
288291
}
@@ -301,35 +304,38 @@ func (c *Controller) CommitCertificate(qc *lib.QuorumCertificate, block *lib.Blo
301304
// exit with error
302305
return err
303306
}
304-
// reset the current mempool store to prepare for the next height
305-
c.Mempool.FSM.Discard()
306-
// set up the mempool with the actual new FSM for the next height
307-
// this makes c.Mempool.FSM.Reset() is unnecessary
308-
if c.Mempool.FSM, err = c.FSM.Copy(); err != nil {
309-
// exit with error
310-
return err
307+
if !syncing {
308+
// reset the current mempool store to prepare for the next height
309+
c.Mempool.FSM.Discard()
310+
// set up the mempool with the actual new FSM for the next height
311+
if c.Mempool.FSM, err = c.FSM.Copy(); err != nil {
312+
// exit with error
313+
return err
314+
}
315+
// check the mempool to cache a proposal block and validate the mempool itself
316+
c.Mempool.CheckMempool()
317+
// reset mempool FSM
318+
c.Mempool.FSM.Reset()
311319
}
312-
// check the mempool to cache a proposal block and validate the mempool itself
313-
c.Mempool.CheckMempool()
314-
// reset mempool FSM
315-
c.Mempool.FSM.Reset()
316320
// update telemetry (using proper defer to ensure time.Since is evaluated at defer execution)
317321
defer c.UpdateTelemetry(qc, block, time.Since(start))
318-
// publish root chain information to all nested chain subscribers.
319-
for _, id := range c.RCManager.ChainIds() {
320-
// get the root chain info
321-
info, e := c.FSM.LoadRootChainInfo(id, 0)
322-
if e != nil {
323-
// don't log 'no-validators' error as this is possible
324-
if e.Error() != lib.ErrNoValidators().Error() {
325-
c.log.Error(e.Error())
322+
if !syncing {
323+
// publish root chain information to all nested chain subscribers
324+
for _, id := range c.RCManager.ChainIds() {
325+
// get the root chain info
326+
info, e := c.FSM.LoadRootChainInfo(id, 0)
327+
if e != nil {
328+
// don't log 'no-validators' error as this is possible
329+
if e.Error() != lib.ErrNoValidators().Error() {
330+
c.log.Error(e.Error())
331+
}
332+
continue
326333
}
327-
continue
334+
// set the timestamp
335+
info.Timestamp = ts
336+
// publish root chain information
337+
go c.RCManager.Publish(id, info)
328338
}
329-
// set the timestamp
330-
info.Timestamp = ts
331-
// publish root chain information
332-
go c.RCManager.Publish(id, info)
333339
}
334340
// exit
335341
return

controller/consensus.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -740,6 +740,16 @@ func (c *Controller) finishSyncing() {
740740
c.Lock()
741741
// when function completes, unlock
742742
defer c.Unlock()
743+
// reinitialize the mempool now that sync is complete
744+
c.Mempool.L.Lock()
745+
c.Mempool.Clear()
746+
c.Mempool.FSM.Discard()
747+
if mFSM, err := c.FSM.Copy(); err == nil {
748+
c.Mempool.FSM = mFSM
749+
}
750+
c.Mempool.CheckMempool()
751+
c.Mempool.FSM.Reset()
752+
c.Mempool.L.Unlock()
743753
// set the startup block metric (block height when first sync completed)
744754
c.Metrics.SetStartupBlock(c.FSM.Height())
745755
// signal a reset of bft for the chain

store/smt.go

Lines changed: 39 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -190,79 +190,93 @@ func (s *SMT) Commit(unsortedOps map[uint64]valueOp) (err lib.ErrorI) {
190190

191191
// CommitParallel() executes deferred operations in parallel by partitioning them into
192192
// 8 subtrees based on their 3-bit prefix (000-111), avoiding conflicts between operations
193-
// that would modify overlapping tree regions. Each subtree is processed independently,
194-
// then the results are merged back into the main tree.
193+
// that would modify overlapping tree regions. Each subtree is processed independently
194+
// with its own Txn copy to avoid lock contention, then the results are merged back into
195+
// the main tree.
195196
func (s *SMT) CommitParallel(unsortedOps map[uint64]valueOp) (err lib.ErrorI) {
196197
// if there are too few operations, fall back to sequential processing
197198
// if len(unsortedOps) < NumSubtrees*2 {
198199
// return s.Commit(unsortedOps)
199200
// }
200201

201-
// sort operations by their 3-bit prefix to avoid conflicts
202+
parentTxn, ok := s.store.(*Txn)
203+
if !ok {
204+
return s.Commit(unsortedOps)
205+
}
206+
202207
groups, err := s.sortOperationsByPrefix(unsortedOps)
203208
if err != nil {
204209
return err
205210
}
206211

207-
// add synthetic borders to enable safe parallel processing
208212
cleanup, err := s.addSyntheticBorders()
209213
if err != nil {
210214
return err
211215
}
212216
defer func() {
213-
// cleanup synthetic borders regardless of success/failure
214217
if cleanupErr := cleanup(); cleanupErr != nil && err == nil {
215218
err = cleanupErr
216219
}
217220
}()
218221

219-
// get subtree roots for parallel processing
220222
subtreeRoots, err := s.getSubtreeRoots()
221223
if err != nil {
222224
return err
223225
}
224226

225-
// process subtrees in parallel using goroutines
226227
type subtreeResult struct {
227228
index int
229+
store *subtreeStore
228230
err lib.ErrorI
229231
}
230232

231233
resultChan := make(chan subtreeResult, NumSubtrees)
232234
activeSubtrees := 0
233235

234-
// launch goroutines for each subtree that has operations
235236
for i := 0; i < NumSubtrees; i++ {
236237
if len(groups[i]) == 0 {
237-
continue // skip empty groups
238+
continue
238239
}
239-
240240
activeSubtrees++
241-
go func(idx int, ops []*node, root *node) {
242-
// create an isolated subtree for this prefix
243-
subtree := s.createSubtree(root, ops)
244-
// reset subtree state for processing
241+
st := parentTxn.newSubtreeStore()
242+
243+
go func(idx int, ops []*node, root *node, store *subtreeStore) {
244+
subtree := &SMT{
245+
store: store,
246+
root: root,
247+
keyBitLength: s.keyBitLength,
248+
nodeCache: make(map[string]*node),
249+
operations: ops,
250+
minKey: s.minKey,
251+
maxKey: s.maxKey,
252+
}
245253
subtree.reset()
246-
// process operations in this subtree
247-
err := subtree.commit(true)
248-
// send result back
249-
resultChan <- subtreeResult{index: idx, err: err}
250-
}(i, groups[i], subtreeRoots[i])
254+
commitErr := subtree.commit(true)
255+
resultChan <- subtreeResult{
256+
index: idx,
257+
store: store,
258+
err: commitErr,
259+
}
260+
}(i, groups[i], subtreeRoots[i], st)
251261
}
252262

253-
// collect results from all active subtrees
263+
results := make([]subtreeResult, 0, activeSubtrees)
254264
for completed := 0; completed < activeSubtrees; completed++ {
255265
result := <-resultChan
256266
if result.err != nil {
257-
// if any subtree fails, we need to return the error
258-
// the cleanup function will handle synthetic border removal
267+
// Drain remaining results before returning so goroutines
268+
// finish and don't race with the deferred cleanup.
269+
for completed++; completed < activeSubtrees; completed++ {
270+
<-resultChan
271+
}
259272
return result.err
260273
}
274+
results = append(results, result)
275+
}
276+
for _, result := range results {
277+
parentTxn.mergeSubtreeOps(result.store)
261278
}
262279

263-
// after all subtrees are processed, the tree state is already consistent
264-
// because each subtree operation updated the shared store
265-
// we just need to refresh our in-memory view of the root
266280
s.root, err = s.getNode(s.root.Key.bytes())
267281
if err != nil {
268282
return err
@@ -547,19 +561,6 @@ func (s *SMT) getSubtreeRoots() (roots []*node, err lib.ErrorI) {
547561
return
548562
}
549563

550-
// createSubtree() initializes the subtree structure
551-
func (s *SMT) createSubtree(root *node, operations []*node) *SMT {
552-
return &SMT{
553-
store: s.store,
554-
root: root,
555-
keyBitLength: s.keyBitLength,
556-
nodeCache: make(map[string]*node),
557-
operations: operations,
558-
minKey: s.minKey,
559-
maxKey: s.maxKey,
560-
}
561-
}
562-
563564
// sortOperationsByPrefix returns 8 sorted slices grouped by 3-bit prefix: 000 to 111
564565
func (s *SMT) sortOperationsByPrefix(unsortedOps map[uint64]valueOp) (groups [8][]*node, err lib.ErrorI) {
565566
// for each unsorted operation

store/txn.go

Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package store
22

33
import (
44
"bytes"
5+
"fmt"
56
"sync"
67

78
"github.com/canopy-network/canopy/lib"
@@ -274,6 +275,100 @@ func (t *Txn) NewIterator(prefix []byte, reverse bool, seek bool) (lib.IteratorI
274275
return newTxnIterator(parentIterator, t.txn, prefix, t.prefix, reverse), nil
275276
}
276277

278+
// newSubtreeStore creates a lightweight store for a parallel SMT subtree.
279+
// Reads check: (1) the subtree's own ops, (2) the parent Txn's ops, then
280+
// (3) an independent PebbleDB reader. Writes go only to the subtree's ops.
281+
// After the goroutine completes, call mergeSubtreeOps to fold the subtree
282+
// writes back into the parent Txn.
283+
func (t *Txn) newSubtreeStore() *subtreeStore {
284+
var reader *VersionedStore
285+
if vs, ok := t.reader.(*VersionedStore); ok {
286+
reader = vs.NewParallelReader()
287+
}
288+
t.txn.l.Lock()
289+
parentOps := make(map[uint64]valueOp, len(t.txn.ops))
290+
for k, v := range t.txn.ops {
291+
parentOps[k] = v
292+
}
293+
t.txn.l.Unlock()
294+
return &subtreeStore{
295+
ops: make(map[uint64]valueOp),
296+
parentOps: parentOps,
297+
reader: reader,
298+
prefix: t.prefix,
299+
version: t.writeVersion,
300+
}
301+
}
302+
303+
// mergeSubtreeOps copies all operations from a subtreeStore into the Txn.
304+
// Must only be called when no goroutines are accessing the subtreeStore.
305+
func (t *Txn) mergeSubtreeOps(s *subtreeStore) {
306+
for k, v := range s.ops {
307+
if _, exists := t.txn.ops[k]; !exists && t.sort {
308+
t.addToSorted(v.key, k)
309+
}
310+
t.txn.ops[k] = v
311+
}
312+
}
313+
314+
// subtreeStore implements lib.RWStoreI for a parallel SMT subtree. It
315+
// provides three-layer reads (own ops → parent ops snapshot → PebbleDB) with
316+
// independent snapshot access to avoid lock contention on the DB path.
317+
// The parent ops are snapshotted at creation time so reads are lock-free.
318+
type subtreeStore struct {
319+
l sync.Mutex
320+
ops map[uint64]valueOp
321+
parentOps map[uint64]valueOp
322+
reader *VersionedStore
323+
prefix []byte
324+
version uint64
325+
}
326+
327+
func (s *subtreeStore) Get(key []byte) ([]byte, lib.ErrorI) {
328+
h := lib.MemHash(key)
329+
s.l.Lock()
330+
if v, found := s.ops[h]; found {
331+
s.l.Unlock()
332+
if v.op == opDelete {
333+
return nil, nil
334+
}
335+
return v.value, nil
336+
}
337+
s.l.Unlock()
338+
if v, found := s.parentOps[h]; found {
339+
if v.op == opDelete {
340+
return nil, nil
341+
}
342+
return v.value, nil
343+
}
344+
if s.reader == nil {
345+
return nil, nil
346+
}
347+
return s.reader.Get(lib.Append(s.prefix, key))
348+
}
349+
350+
func (s *subtreeStore) Set(key, value []byte) lib.ErrorI {
351+
s.l.Lock()
352+
s.ops[lib.MemHash(key)] = valueOp{key: key, value: value, version: s.version, op: opSet}
353+
s.l.Unlock()
354+
return nil
355+
}
356+
357+
func (s *subtreeStore) Delete(key []byte) lib.ErrorI {
358+
s.l.Lock()
359+
s.ops[lib.MemHash(key)] = valueOp{key: key, version: s.version, op: opDelete}
360+
s.l.Unlock()
361+
return nil
362+
}
363+
364+
func (s *subtreeStore) Iterator([]byte) (lib.IteratorI, lib.ErrorI) {
365+
return nil, ErrStoreGet(fmt.Errorf("iterator not supported on subtree store"))
366+
}
367+
368+
func (s *subtreeStore) RevIterator([]byte) (lib.IteratorI, lib.ErrorI) {
369+
return nil, ErrStoreGet(fmt.Errorf("reverse iterator not supported on subtree store"))
370+
}
371+
277372
// Copy creates a new Txn with the same configuration and txn as the original
278373
func (t *Txn) Copy(reader TxnReaderI, writer TxnWriterI) *Txn {
279374
return &Txn{

store/versioned_store.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,18 @@ func NewVersionedStore(db pebble.Reader, batch *pebble.Batch, version uint64) *V
7777
}
7878
}
7979

80+
// NewParallelReader creates a read-only VersionedStore sharing the same
81+
// underlying database (snapshot) but with its own buffers, safe for concurrent
82+
// use from a separate goroutine. The returned store must NOT be closed, as it
83+
// does not own the underlying reader.
84+
func (vs *VersionedStore) NewParallelReader() *VersionedStore {
85+
return &VersionedStore{
86+
db: vs.db,
87+
version: vs.version,
88+
decodeBuffer: make([][]byte, 0, 5),
89+
}
90+
}
91+
8092
// Set() stores a key-value pair at the current version
8193
func (vs *VersionedStore) Set(key, value []byte) (err lib.ErrorI) {
8294
return vs.SetAt(key, value, vs.version)

0 commit comments

Comments
 (0)