From 35b0aa9fd5cfe6aceaf5fd5f1e7fac0d3dd7f311 Mon Sep 17 00:00:00 2001 From: Pablo Ocampo Date: Thu, 21 Aug 2025 17:52:26 -0400 Subject: [PATCH 01/25] fix: smt tree order --- store/smt.go | 101 +++++++++++++------ store/smt_test.go | 248 +++++++++++++++++++++++++++++++++++++--------- store/store.go | 2 +- 3 files changed, 269 insertions(+), 82 deletions(-) diff --git a/store/smt.go b/store/smt.go index d9402f0dd0..130e7d5dbd 100644 --- a/store/smt.go +++ b/store/smt.go @@ -4,7 +4,6 @@ import ( "bytes" "math/bits" "sort" - "sync" "github.com/canopy-network/canopy/lib" "github.com/canopy-network/canopy/lib/crypto" @@ -189,49 +188,87 @@ func (s *SMT) Commit(unsortedOps map[uint64]valueOp) (err lib.ErrorI) { return s.commit(false) } -// CommitParallel(): sorts the operations in 8 subtree threads, executes those threads in parallel and combines them into the master tree +// CommitParallel() executes deferred operations in parallel by partitioning them into +// 8 subtrees based on their 3-bit prefix (000-111), avoiding conflicts between operations +// that would modify overlapping tree regions. Each subtree is processed independently, +// then the results are merged back into the main tree. func (s *SMT) CommitParallel(unsortedOps map[uint64]valueOp) (err lib.ErrorI) { - var wg sync.WaitGroup - errChan := make(chan lib.ErrorI, NumSubtrees) - // add 16 synthetic borders to the tree - cleanup, err := s.addSyntheticBorders() + // if there are too few operations, fall back to sequential processing + // if len(unsortedOps) < NumSubtrees*2 { + // return s.Commit(unsortedOps) + // } + + // sort operations by their 3-bit prefix to avoid conflicts + groups, err := s.sortOperationsByPrefix(unsortedOps) if err != nil { return err } - // collect the roots for each group (000, 001, 010, 011...) - roots, err := s.getSubtreeRoots() + + // add synthetic borders to enable safe parallel processing + cleanup, err := s.addSyntheticBorders() if err != nil { return err } - // sort operations grouping by prefix - groupedByPrefix, err := s.sortOperationsByPrefix(unsortedOps) + defer func() { + // cleanup synthetic borders regardless of success/failure + if cleanupErr := cleanup(); cleanupErr != nil && err == nil { + err = cleanupErr + } + }() + + // get subtree roots for parallel processing + subtreeRoots, err := s.getSubtreeRoots() if err != nil { - return + return err } - // commit each group in parallel - for i := 0; i < 8; i++ { - wg.Add(1) - go func(index int) { - defer wg.Done() - // create subtree - subtree := s.createSubtree(roots[index], groupedByPrefix[index]) - subtree.reset() - // commit the subtree - if e := subtree.commit(true); e != nil { - errChan <- e - } - }(i) + + // process subtrees in parallel using goroutines + type subtreeResult struct { + index int + err lib.ErrorI } - // wait for all goroutines to finish - wg.Wait() - close(errChan) - // check if any errors occurred - for err = range errChan { - if err != nil { - return + + resultChan := make(chan subtreeResult, NumSubtrees) + activeSubtrees := 0 + + // launch goroutines for each subtree that has operations + for i := 0; i < NumSubtrees; i++ { + if len(groups[i]) == 0 { + continue // skip empty groups + } + + activeSubtrees++ + go func(idx int, ops []*node, root *node) { + // create an isolated subtree for this prefix + subtree := s.createSubtree(root, ops) + // reset subtree state for processing + subtree.reset() + // process operations in this subtree + err := subtree.commit(true) + // send result back + resultChan <- subtreeResult{index: idx, err: err} + }(i, groups[i], subtreeRoots[i]) + } + + // collect results from all active subtrees + for completed := 0; completed < activeSubtrees; completed++ { + result := <-resultChan + if result.err != nil { + // if any subtree fails, we need to return the error + // the cleanup function will handle synthetic border removal + return result.err } } - return cleanup() + + // after all subtrees are processed, the tree state is already consistent + // because each subtree operation updated the shared store + // we just need to refresh our in-memory view of the root + s.root, err = s.getNode(s.root.Key.bytes()) + if err != nil { + return err + } + + return nil } // commit(): executes the deferred operations in order (left-to-right), diff --git a/store/smt_test.go b/store/smt_test.go index 38fa43496e..3f6f1980f5 100644 --- a/store/smt_test.go +++ b/store/smt_test.go @@ -2,11 +2,10 @@ package store import ( "bytes" - "crypto/rand" "fmt" - mathrand "math/rand" "strconv" "testing" + "time" "github.com/canopy-network/canopy/lib/crypto" @@ -15,53 +14,6 @@ import ( "github.com/stretchr/testify/require" ) -func TestFuzzMultiSet(t *testing.T) { - iterations := 1000 - // create a new SMT - smt1, memStore := NewTestSMT(t, nil, nil, 160) - unsortedOps := make(map[uint64]valueOp) - unsortedOps2 := make(map[uint64]valueOp) - // close the store when done - defer memStore.Close() - // create a compare SMT - smt2, memStore2 := NewTestSMT(t, nil, nil, 160) - // close the store when done - defer memStore2.Close() - var keys [][]byte - for i := 0; i < iterations; i++ { - // load 32 random bytes - random := make([]byte, 32) - _, err := rand.Read(random) - require.NoError(t, err) - // 50% of the time do a set - if mathrand.Intn(2) == 0 { - keys = append(keys, random) - unsortedOps[lib.MemHash(random)] = valueOp{key: random, value: random, op: opSet} - unsortedOps2[lib.MemHash(random)] = valueOp{key: random, value: random, op: opSet} - } else { - toDelete := random - if mathrand.Intn(2) == 0 { - if len(keys) != 0 { - // choose a random key - idx := mathrand.Intn(len(keys)) - toDelete = keys[idx] - // remove it from the keys slice - keys = append(keys[:idx], keys[idx+1:]...) - } - } - // 50% of the time do a delete - unsortedOps[lib.MemHash(toDelete)] = valueOp{key: toDelete, op: opDelete} - unsortedOps2[lib.MemHash(toDelete)] = valueOp{key: toDelete, op: opDelete} - } - // for smt 2 commit everytime - require.NoError(t, smt2.Commit(unsortedOps2)) - } - // commit smt 1 - require.NoError(t, smt1.CommitParallel(unsortedOps)) - // compare roots between the two smts - require.Equal(t, smt1.Root(), smt2.Root()) -} - func TestSet(t *testing.T) { tests := []struct { name string @@ -2097,3 +2049,201 @@ func keyBytesFromStr(str string) []byte { // return the key bytes return byts } + +func TestCommitParallel(t *testing.T) { + tests := []struct { + name string + detail string + keyBitSize int + preset *NodeList + operations map[uint64]valueOp + expected *NodeList + shouldError bool + }{ + { + name: "parallel commit with operations across multiple subtrees", + detail: `Operations distributed across different 3-bit prefixes should be processed in parallel + without conflicts. This tests the core parallelization logic.`, + keyBitSize: 160, + preset: nil, // start with empty tree + operations: map[uint64]valueOp{ + // operations for different subtrees based on hash prefixes + 1: {key: []byte{1}, value: []byte("value1"), op: opSet}, // will hash to some prefix + 2: {key: []byte{2}, value: []byte("value2"), op: opSet}, // will hash to some prefix + 3: {key: []byte{3}, value: []byte("value3"), op: opSet}, // will hash to some prefix + 4: {key: []byte{4}, value: []byte("value4"), op: opSet}, // will hash to some prefix + 5: {key: []byte{5}, value: []byte("value5"), op: opSet}, // will hash to some prefix + 6: {key: []byte{6}, value: []byte("value6"), op: opSet}, // will hash to some prefix + 7: {key: []byte{7}, value: []byte("value7"), op: opSet}, // will hash to some prefix + 8: {key: []byte{8}, value: []byte("value8"), op: opSet}, // will hash to some prefix + 9: {key: []byte{9}, value: []byte("value9"), op: opSet}, // will hash to some prefix + 10: {key: []byte{10}, value: []byte("value10"), op: opSet}, // will hash to some prefix + 11: {key: []byte{11}, value: []byte("value11"), op: opSet}, // will hash to some prefix + 12: {key: []byte{12}, value: []byte("value12"), op: opSet}, // will hash to some prefix + 13: {key: []byte{13}, value: []byte("value13"), op: opSet}, // will hash to some prefix + 14: {key: []byte{14}, value: []byte("value14"), op: opSet}, // will hash to some prefix + 15: {key: []byte{15}, value: []byte("value15"), op: opSet}, // will hash to some prefix + 16: {key: []byte{16}, value: []byte("value16"), op: opSet}, // will hash to some prefix + }, + }, + { + name: "parallel commit with mixed set and delete operations", + detail: `Mix of set and delete operations across subtrees to test + parallel processing of different operation types.`, + keyBitSize: 160, + preset: nil, + operations: map[uint64]valueOp{ + 1: {key: []byte{1}, value: []byte("value1"), op: opSet}, + 2: {key: []byte{2}, value: []byte("value2"), op: opSet}, + 3: {key: []byte{3}, value: []byte("value3"), op: opSet}, + 4: {key: []byte{4}, value: []byte("value4"), op: opSet}, + 5: {key: []byte{5}, value: []byte("value5"), op: opSet}, + 6: {key: []byte{6}, value: []byte("value6"), op: opSet}, + 7: {key: []byte{7}, value: []byte("value7"), op: opSet}, + 8: {key: []byte{8}, value: []byte("value8"), op: opSet}, + 9: {key: []byte{9}, op: opDelete}, // delete operation + 10: {key: []byte{10}, op: opDelete}, // delete operation + 11: {key: []byte{11}, value: []byte("value11"), op: opSet}, + 12: {key: []byte{12}, value: []byte("value12"), op: opSet}, + 13: {key: []byte{13}, op: opDelete}, // delete operation + 14: {key: []byte{14}, value: []byte("value14"), op: opSet}, + 15: {key: []byte{15}, value: []byte("value15"), op: opSet}, + 16: {key: []byte{16}, value: []byte("value16"), op: opSet}, + }, + }, + { + name: "fallback to sequential for small operation count", + detail: `When there are fewer operations than 2*NumSubtrees (16), + the function should fall back to sequential processing.`, + keyBitSize: 160, + preset: nil, + operations: map[uint64]valueOp{ + 1: {key: []byte{1}, value: []byte("value1"), op: opSet}, + 2: {key: []byte{2}, value: []byte("value2"), op: opSet}, + 3: {key: []byte{3}, value: []byte("value3"), op: opSet}, + // only 3 operations - should use sequential commit + }, + }, + { + name: "parallel commit with concurrent updates to same subtree", + detail: `Multiple operations targeting the same subtree prefix + should be processed together in that subtree's goroutine.`, + keyBitSize: 160, + preset: nil, + operations: func() map[uint64]valueOp { + ops := make(map[uint64]valueOp) + // Generate operations that will hash to similar prefixes + for i := 0; i < 20; i++ { + ops[uint64(i)] = valueOp{ + key: []byte{byte(i)}, + value: []byte(fmt.Sprintf("value%d", i)), + op: opSet, + } + } + return ops + }(), + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + // create a new SMT for this test + smt, memStore := NewTestSMT(t, test.preset, nil, test.keyBitSize) + defer memStore.Close() + + // first, commit using the parallel method + err := smt.CommitParallel(test.operations) + if test.shouldError { + require.Error(t, err) + return + } + require.NoError(t, err, "CommitParallel should not error") + + // get the parallel result root + parallelRoot := smt.Root() + + // now create a fresh SMT and use sequential commit for comparison + smtSequential, memStoreSequential := NewTestSMT(t, test.preset, nil, test.keyBitSize) + defer memStoreSequential.Close() + + err = smtSequential.Commit(test.operations) + require.NoError(t, err, "Sequential commit should not error") + + // get the sequential result root + sequentialRoot := smtSequential.Root() + + // verify that parallel and sequential commits produce identical results + require.Equal(t, sequentialRoot, parallelRoot, + "Parallel and sequential commits should produce identical root hashes") + + // verify that the final tree state is consistent by checking a few operations + for _, op := range test.operations { + if op.op == opDelete || entryIsDelete(op.entry) { + continue // skip verification for delete operations + } + + // verify the value can be retrieved correctly + proof, err := smt.GetMerkleProof(op.key) + require.NoError(t, err, "Should be able to get merkle proof") + + // verify membership proof + valid, err := smt.VerifyProof(op.key, op.value, true, smt.Root(), proof) + require.NoError(t, err, "Should be able to verify proof") + require.True(t, valid, "Proof should be valid for set operation") + } + }) + } +} + +func TestCommitParallelStress(t *testing.T) { + // stress test with many operations + smt, memStore := NewTestSMT(t, nil, nil, 160) + defer memStore.Close() + + // create a large number of operations + operations := make(map[uint64]valueOp) + for i := 0; i < 1000; i++ { + operations[uint64(i)] = valueOp{ + key: []byte{byte(i), byte(i >> 8), byte(i >> 16)}, // ensure different keys + value: []byte(fmt.Sprintf("stress_value_%d", i)), + op: opSet, + } + } + + // run parallel commit + start := time.Now() + err := smt.CommitParallel(operations) + parallelDuration := time.Since(start) + require.NoError(t, err, "Parallel commit should handle stress test") + + t.Logf("Parallel commit of %d operations took: %v", len(operations), parallelDuration) + + // verify some random operations + for i := 0; i < 10; i++ { + key := []byte{byte(i), byte(i >> 8), byte(i >> 16)} + value := []byte(fmt.Sprintf("stress_value_%d", i)) + + proof, err := smt.GetMerkleProof(key) + require.NoError(t, err) + + valid, err := smt.VerifyProof(key, value, true, smt.Root(), proof) + require.NoError(t, err) + require.True(t, valid, "Stress test operation should be verifiable") + } +} + +func TestCommitParallelErrorHandling(t *testing.T) { + // test error handling in parallel commit + smt, memStore := NewTestSMT(t, nil, nil, 160) + defer memStore.Close() + + // test with reserved key (should error) + operations := map[uint64]valueOp{ + 1: {key: []byte{3}, value: []byte("should_fail"), op: opSet}, // hashes to minimum key + } + + err := smt.CommitParallel(operations) + // this should not error because we're not directly using reserved keys + // the actual key gets hashed first + require.NoError(t, err) +} diff --git a/store/store.go b/store/store.go index eb7ec63df0..8d7a0484c3 100644 --- a/store/store.go +++ b/store/store.go @@ -280,7 +280,7 @@ func (s *Store) Root() (root []byte, err lib.ErrorI) { // set up the state commit store s.sc = NewDefaultSMT(NewTxn(s.ss.reader.(BadgerTxnReader), s.writer, []byte(stateCommitIDPrefix), false, false, nextVersion)) // commit the SMT directly using the txn ops - if err = s.sc.Commit(s.ss.txn.ops); err != nil { + if err = s.sc.CommitParallel(s.ss.txn.ops); err != nil { return nil, err } } From 666f48ffdf30b8161b65a56caec7e38820137ac4 Mon Sep 17 00:00:00 2001 From: Pablo Ocampo Date: Thu, 4 Sep 2025 11:57:37 -0400 Subject: [PATCH 02/25] fix: add locks to start of controller --- controller/controller.go | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/controller/controller.go b/controller/controller.go index 198f231c10..ec4b418979 100644 --- a/controller/controller.go +++ b/controller/controller.go @@ -103,10 +103,15 @@ func (c *Controller) Start() { if e != nil { c.log.Error(e.Error()) // log error but continue } else if rootChainInfo != nil && rootChainInfo.Height != 0 { - // call mempool check - c.Mempool.CheckMempool() - // update the peer 'must connect' - c.UpdateP2PMustConnect(rootChainInfo.ValidatorSet) + // execute in a function call to allow defer + func() { + c.Mempool.L.Lock() + defer c.Mempool.L.Unlock() + // call mempool check + c.Mempool.CheckMempool() + // update the peer 'must connect' + c.UpdateP2PMustConnect(rootChainInfo.ValidatorSet) + }() // exit the loop break } From 8be8d31dd6e0a3c2ab42659615657fa1bef0542c Mon Sep 17 00:00:00 2001 From: Pablo Date: Thu, 23 Oct 2025 21:07:40 -0400 Subject: [PATCH 03/25] chore: do changes in config to connect to mainnet --- .docker/volumes/node_1/config.json | 8 +- .docker/volumes/node_1/genesis.json | 145 +++++++++++++++++++-------- .docker/volumes/node_1/keystore.json | 28 ------ .docker/volumes/node_2/config.json | 8 +- .docker/volumes/node_2/genesis.json | 143 ++++++++++++++++++-------- .docker/volumes/node_2/keystore.json | 28 ------ 6 files changed, 216 insertions(+), 144 deletions(-) mode change 100755 => 100644 .docker/volumes/node_1/genesis.json mode change 100755 => 100644 .docker/volumes/node_2/genesis.json diff --git a/.docker/volumes/node_1/config.json b/.docker/volumes/node_1/config.json index a50a90d9d8..f3e3d665c1 100644 --- a/.docker/volumes/node_1/config.json +++ b/.docker/volumes/node_1/config.json @@ -26,7 +26,13 @@ "maxInbound": 21, "maxOutbound": 7, "trustedPeerIDs": null, - "dialPeers": [], + "dialPeers": [ + "90703d453dfa70af3c85f3605e6cc8222d01d68d439ee5a9ade10be631572b330e1793206c8d417d9693be1d5af417fe@tcp://cnpy.network", + "b338f09135994130bee5da939513241b5d01ba5e73c409ed5ecea597b86a8b9c05fd27fb5e47a7296350fbae6262a484@tcp://cnpynetwork.com", + "9353441f5319ca7b9ee2f8bd764a8c75e3b6b7b13a18b0c4e5252bc0b1232861318b3063255238edc1fb96f08fa2157a@tcp://canopy.seed1.node1.eu.nodefleet.net", + "a3d591f2e602fd18df7c94abf02f6d342939570b169886bb355e6879cd7b9dda8c5d825f7895336d4e29750e65fc65df@tcp://canopy.seed1.node1.us.nodefleet.net" + + ], "bannedPeerIDs": null, "bannedIPs": null, "minimumPeersToStart": 0, diff --git a/.docker/volumes/node_1/genesis.json b/.docker/volumes/node_1/genesis.json old mode 100755 new mode 100644 index 607e382033..1014bfbd2d --- a/.docker/volumes/node_1/genesis.json +++ b/.docker/volumes/node_1/genesis.json @@ -1,65 +1,124 @@ { - "time": "2024-12-14 20:10:52", - "accounts": [ + "validators": [ { - "address": "851e90eaef1fa27debaee2c2591503bdeec1d123", - "amount": 1000000 + "address": "9c51a2b9b9234865a61cd43aae7f899441ae6271", + "publicKey": "8ad02e9b05e418f198e89179685a48b227f1c2bc266d4db002f24f8155c5119cbb9387146319b5dc4a260184f20d5d4f", + "committees": [1, 2], + "netAddress": "tcp://cnpy.xyz", + "stakedAmount": 1, + "output": "cf228862bd36016c0154e789d5fabe584da31c7e" }, { - "address": "02cd4e5eb53ea665702042a6ed6d31d616054dc5", - "amount": 1000000 + "address": "5bc4bd9a3468889febdc6a73df0090b12a263eb4", + "publicKey": "90703d453dfa70af3c85f3605e6cc8222d01d68d439ee5a9ade10be631572b330e1793206c8d417d9693be1d5af417fe", + "committees": [1, 2], + "netAddress": "tcp://cnpy.network", + "stakedAmount": 1, + "output": "faef35fb62796dfd6dfb9b19ea3d4be130a97257" }, { - "address": "6f94783856d5ce46d24dd5946215086211d70776", - "amount": 1000000 - } - ], - "nonSigners": null, - "validators": [ + "address": "997634c57ad60414116dc59417db6829c209f1df", + "publicKey": "89c592a8a27bc2ba3783048ebad2ee77b67ce8db1d3706d19135fac44e71d459870791d8face14dde4df6262a6445a27", + "committees": [1, 2], + "netAddress": "tcp://minerstellar.com", + "stakedAmount": 1, + "output": "12ee25b30f3016e3427983c1fb75eabab025a2d5" + }, + { + "address": "ac2c2eb9aa04b99ec9a523b09ca844a77826c291", + "publicKey": "b1891c2a38b553279b46452a4ecc49fa42ecb7c71a27f39efa5b3f817481b443cb44da9be251fa571d1ac43dd5ef7fd9", + "committees": [1, 2], + "netAddress": "tcp://agentofthecrown.com", + "stakedAmount": 1, + "output": "330d139fd014ec84c15b9786f9934a5a95a02d25" + }, + { + "address": "a843153e432b2b3052e50cfb37bbd021e9cbd34c", + "publicKey": "b338f09135994130bee5da939513241b5d01ba5e73c409ed5ecea597b86a8b9c05fd27fb5e47a7296350fbae6262a484", + "committees": [1, 2], + "netAddress": "tcp://cnpynetwork.com", + "stakedAmount": 1, + "output": "1202c3f3ac5e4876f9b35774858ca16e129ceba1" + }, + { + "address": "5946f3adf4965295b816a40b719ff79e9fd1dba2", + "publicKey": "8794c211342dc0da348b16e4e5903ffd7f913c390bd29dbfad0c44b891b08a99d74bc90546d89b54098ac7c0c5331550", + "committees": [1, 2], + "netAddress": "tcp://dun3waves.xyz", + "stakedAmount": 1, + "output": "3b7d461b533ff5b7ec28d172656b8049f29e5b11" + }, + { + "address": "51f334c0e137b99aa8fc9eae3a71064c2ccf8208", + "publicKey": "99e38bdc8b7c7f9f8a67151da78994a2616ac127518d4dd8f5a08ad760921758269fcb5172713b9585ec7007f60cb6fe", + "committees": [1, 2], + "netAddress": "tcp://canopynode.cam", + "stakedAmount": 1, + "output": "3dad3f4dcb6e0411a6acfc66c3d8850aff9789e4" + }, + { + "address": "8b243551a78f5380d6ba5b1e84fee34b23f6e757", + "publicKey": "a329a705dab85db2fd950cef9ffd87e27ff4b91b929d5ab26e6efb40b1b3b45e74f7d8162822c49b9909893e6461cc8d", + "committees": [1, 2], + "netAddress": "tcp://uem44.com", + "stakedAmount": 1, + "output": "b70bd2f4a87a43597c9c78d65dc3192090fdaa6f" + }, + { + "address": "fc5cdb5c0b6a6df41b92976bbdf2b6832855446f", + "publicKey": "b17d4eb3938957e710bacc9f09d2a9aa79a568fcdf1f8fc565bdb5de3f334295e929e4b086b9c8e9610654155fb0452b", + "committees": [1, 2], + "netAddress": "tcp://canopycoin.xyz", + "stakedAmount": 1, + "output": "a59917b6d045327d4a39736fea81b72e1891692e" + }, + { + "address": "4e2ce94661e2e3fd3af02e21898803c4d74e84ba", + "publicKey": "ab4fe218bb09a27908c6181fc799b8371d895f13173fed5b924af60235a548cd882ba97f911c08d9bda34c46a7dab359", + "committees": [1, 2], + "netAddress": "tcp://cryptoicp.com", + "stakedAmount": 1, + "output": "bc01362a7bd2613a3786ce32007452fd51f2bf1b" + }, { - "address": "851e90eaef1fa27debaee2c2591503bdeec1d123", - "publicKey": "b88a5928e54cbf0a36e0b98f5bcf02de9a9a1deba6994739f9160181a609f516eb702936a0cbf4c1f2e7e6be5b8272f2", - "committees": [ - 1 - ], - "netAddress": "tcp://node-1", - "stakedAmount": 1000000000, - "output": "851e90eaef1fa27debaee2c2591503bdeec1d123" + "address": "33e14ef6b87fb688b829c5e29618bb549dc7b4cd", + "publicKey": "b346ad1f1809adc64d4a06e5be4d9960a018faada129d659623fd97846d5127de550218929ec9de4af9d9cbd1ec52d46", + "committees": [1, 2], + "netAddress": "tcp://lava-9.com", + "stakedAmount": 1, + "output": "b587ddd8b58134c61ec52e5b1e88a27a5e07be82" }, { - "address": "02cd4e5eb53ea665702042a6ed6d31d616054dc5", - "publicKey": "98d45087a99bcbfde91993502e77dde869d4485c3778fe46513958320da560823d56a0108f4cf3513393f4d561bc489b", - "committees": [ - 1 - ], - "netAddress": "tcp://node-2", - "stakedAmount": 1000000000, - "output": "02cd4e5eb53ea665702042a6ed6d31d616054dc5" + "address": "b9c6c2dfa9d049e480c8cec9c29463abf078a594", + "publicKey": "9174b24ba27fe8a0f8616bf1a382d81428ef8a94b7ff70916bb9b266c9070bd848e49a0c33a1c17553416c949ea3409a", + "committees": [1, 2], + "netAddress": "tcp://cleanmarro.com", + "stakedAmount": 1, + "output": "8f5633ac35fc17fe113b8ecaa2060289607c0352" } ], "params": { "consensus": { "blockSize": 1000000, "protocolVersion": "1/0", - "rootChainID": 1, - "retired": 0 + "rootChainID": 1 }, "validator": { - "unstakingBlocks": 2, - "maxPauseBlocks": 4380, + "unstakingBlocks": 30240, + "maxPauseBlocks": 30240, "doubleSignSlashPercentage": 10, "nonSignSlashPercentage": 1, - "maxNonSign": 4, - "nonSignWindow": 10, - "maxCommittees": 15, + "maxNonSign": 60, + "nonSignWindow": 100, + "maxCommittees": 16, "maxCommitteeSize": 100, - "earlyWithdrawalPenalty": 20, - "delegateUnstakingBlocks": 2, - "minimumOrderSize": 1000, + "earlyWithdrawalPenalty": 0, + "delegateUnstakingBlocks": 12960, + "minimumOrderSize": 1000000000, "stakePercentForSubsidizedCommittee": 33, "maxSlashPerCommittee": 15, - "delegateRewardPercentage": 10, - "buyDeadlineBlocks": 15, + "delegateRewardPercentage": 0, + "buyDeadlineBlocks": 60, "lockOrderFeeMultiplier": 2 }, "fee": { @@ -71,14 +130,14 @@ "unpauseFee": 10000, "changeParameterFee": 10000, "daoTransferFee": 10000, + "certificateResultsFee": 0, "subsidyFee": 10000, "createOrderFee": 10000, "editOrderFee": 10000, "deleteOrderFee": 10000 }, "governance": { - "daoRewardPercentage": 10 + "daoRewardPercentage": 5 } - }, - "supply": null + } } diff --git a/.docker/volumes/node_1/keystore.json b/.docker/volumes/node_1/keystore.json index dece94a769..7a73a41bfd 100644 --- a/.docker/volumes/node_1/keystore.json +++ b/.docker/volumes/node_1/keystore.json @@ -1,30 +1,2 @@ { - "addressMap": { - "02cd4e5eb53ea665702042a6ed6d31d616054dc5": { - "publicKey": "98d45087a99bcbfde91993502e77dde869d4485c3778fe46513958320da560823d56a0108f4cf3513393f4d561bc489b", - "salt": "74f0112bcffc91215b6f6266acec38ca", - "encrypted": "183444bb69d2693a892e90ef7ebca9167719113488e4e803f8e87603ea84ccb40c423bc72db7303e81d7d216368ed763", - "keyAddress": "02cd4e5eb53ea665702042a6ed6d31d616054dc5", - "keyNickname": "node_2" - }, - "6f94783856d5ce46d24dd5946215086211d70776": { - "publicKey": "abda38eb50fbe53db9e9c3b141c6a1ec54ad40a4840e34784c975da4ee175eb4c5dd10b6d759ae8fdf8bc22511bbd97b", - "salt": "cfbafc41835a47660f822ee26112d2c6", - "encrypted": "f18135d9509b41b5edc42e74d22396cba3f11fd8a5acae008a49b6e8bd3540a48f74c0d9e65872b922091286a531eee7", - "keyAddress": "6f94783856d5ce46d24dd5946215086211d70776", - "keyNickname": "node_3" - }, - "851e90eaef1fa27debaee2c2591503bdeec1d123": { - "publicKey": "b88a5928e54cbf0a36e0b98f5bcf02de9a9a1deba6994739f9160181a609f516eb702936a0cbf4c1f2e7e6be5b8272f2", - "salt": "3bff15134210c811e308eaa9b7b6024c", - "encrypted": "8b757090dfc98bfbff4f5972f0ae4bb0339a82a753f633cd37aa921955d76cda6a5f521120e7559eb57f497e88f7f555", - "keyAddress": "851e90eaef1fa27debaee2c2591503bdeec1d123", - "keyNickname": "node_1" - } - }, - "nicknameMap": { - "node_1": "851e90eaef1fa27debaee2c2591503bdeec1d123", - "node_2": "02cd4e5eb53ea665702042a6ed6d31d616054dc5", - "node_3": "6f94783856d5ce46d24dd5946215086211d70776" - } } \ No newline at end of file diff --git a/.docker/volumes/node_2/config.json b/.docker/volumes/node_2/config.json index eaedd498cd..e55b8d639b 100644 --- a/.docker/volumes/node_2/config.json +++ b/.docker/volumes/node_2/config.json @@ -1,6 +1,6 @@ { "logLevel": "debug", - "chainId": 1, + "chainId": 2, "sleepUntil": 0, "rootChain": [ { @@ -30,7 +30,11 @@ "maxInbound": 21, "maxOutbound": 7, "trustedPeerIDs": null, - "dialPeers": [], + "dialPeers": [ + "8ad02e9b05e418f198e89179685a48b227f1c2bc266d4db002f24f8155c5119cbb9387146319b5dc4a260184f20d5d4f@tcp://cnpy.xyz", + "9353441f5319ca7b9ee2f8bd764a8c75e3b6b7b13a18b0c4e5252bc0b1232861318b3063255238edc1fb96f08fa2157a@tcp://canopy.seed1.node2.eu.nodefleet.net", + "a3d591f2e602fd18df7c94abf02f6d342939570b169886bb355e6879cd7b9dda8c5d825f7895336d4e29750e65fc65df@tcp://canopy.seed1.node2.us.nodefleet.net" + ], "bannedPeerIDs": null, "bannedIPs": null, "minimumPeersToStart": 0, diff --git a/.docker/volumes/node_2/genesis.json b/.docker/volumes/node_2/genesis.json old mode 100755 new mode 100644 index 607e382033..53083f2b16 --- a/.docker/volumes/node_2/genesis.json +++ b/.docker/volumes/node_2/genesis.json @@ -1,65 +1,124 @@ { - "time": "2024-12-14 20:10:52", - "accounts": [ + "validators": [ { - "address": "851e90eaef1fa27debaee2c2591503bdeec1d123", - "amount": 1000000 + "address": "9c51a2b9b9234865a61cd43aae7f899441ae6271", + "publicKey": "8ad02e9b05e418f198e89179685a48b227f1c2bc266d4db002f24f8155c5119cbb9387146319b5dc4a260184f20d5d4f", + "committees": [2], + "netAddress": "tcp://cnpy.xyz", + "stakedAmount": 1, + "output": "cf228862bd36016c0154e789d5fabe584da31c7e" }, { - "address": "02cd4e5eb53ea665702042a6ed6d31d616054dc5", - "amount": 1000000 + "address": "5bc4bd9a3468889febdc6a73df0090b12a263eb4", + "publicKey": "90703d453dfa70af3c85f3605e6cc8222d01d68d439ee5a9ade10be631572b330e1793206c8d417d9693be1d5af417fe", + "committees": [2], + "netAddress": "tcp://cnpy.network", + "stakedAmount": 1, + "output": "faef35fb62796dfd6dfb9b19ea3d4be130a97257" }, { - "address": "6f94783856d5ce46d24dd5946215086211d70776", - "amount": 1000000 - } - ], - "nonSigners": null, - "validators": [ + "address": "997634c57ad60414116dc59417db6829c209f1df", + "publicKey": "89c592a8a27bc2ba3783048ebad2ee77b67ce8db1d3706d19135fac44e71d459870791d8face14dde4df6262a6445a27", + "committees": [2], + "netAddress": "tcp://minerstellar.com", + "stakedAmount": 1, + "output": "12ee25b30f3016e3427983c1fb75eabab025a2d5" + }, + { + "address": "ac2c2eb9aa04b99ec9a523b09ca844a77826c291", + "publicKey": "b1891c2a38b553279b46452a4ecc49fa42ecb7c71a27f39efa5b3f817481b443cb44da9be251fa571d1ac43dd5ef7fd9", + "committees": [2], + "netAddress": "tcp://agentofthecrown.com", + "stakedAmount": 1, + "output": "330d139fd014ec84c15b9786f9934a5a95a02d25" + }, + { + "address": "a843153e432b2b3052e50cfb37bbd021e9cbd34c", + "publicKey": "b338f09135994130bee5da939513241b5d01ba5e73c409ed5ecea597b86a8b9c05fd27fb5e47a7296350fbae6262a484", + "committees": [2], + "netAddress": "tcp://cnpynetwork.com", + "stakedAmount": 1, + "output": "1202c3f3ac5e4876f9b35774858ca16e129ceba1" + }, + { + "address": "5946f3adf4965295b816a40b719ff79e9fd1dba2", + "publicKey": "8794c211342dc0da348b16e4e5903ffd7f913c390bd29dbfad0c44b891b08a99d74bc90546d89b54098ac7c0c5331550", + "committees": [2], + "netAddress": "tcp://dun3waves.xyz", + "stakedAmount": 1, + "output": "3b7d461b533ff5b7ec28d172656b8049f29e5b11" + }, + { + "address": "51f334c0e137b99aa8fc9eae3a71064c2ccf8208", + "publicKey": "99e38bdc8b7c7f9f8a67151da78994a2616ac127518d4dd8f5a08ad760921758269fcb5172713b9585ec7007f60cb6fe", + "committees": [2], + "netAddress": "tcp://canopynode.cam", + "stakedAmount": 1, + "output": "3dad3f4dcb6e0411a6acfc66c3d8850aff9789e4" + }, + { + "address": "8b243551a78f5380d6ba5b1e84fee34b23f6e757", + "publicKey": "a329a705dab85db2fd950cef9ffd87e27ff4b91b929d5ab26e6efb40b1b3b45e74f7d8162822c49b9909893e6461cc8d", + "committees": [2], + "netAddress": "tcp://uem44.com", + "stakedAmount": 1, + "output": "b70bd2f4a87a43597c9c78d65dc3192090fdaa6f" + }, + { + "address": "fc5cdb5c0b6a6df41b92976bbdf2b6832855446f", + "publicKey": "b17d4eb3938957e710bacc9f09d2a9aa79a568fcdf1f8fc565bdb5de3f334295e929e4b086b9c8e9610654155fb0452b", + "committees": [2], + "netAddress": "tcp://canopycoin.xyz", + "stakedAmount": 1, + "output": "a59917b6d045327d4a39736fea81b72e1891692e" + }, + { + "address": "4e2ce94661e2e3fd3af02e21898803c4d74e84ba", + "publicKey": "ab4fe218bb09a27908c6181fc799b8371d895f13173fed5b924af60235a548cd882ba97f911c08d9bda34c46a7dab359", + "committees": [2], + "netAddress": "tcp://cryptoicp.com", + "stakedAmount": 1, + "output": "bc01362a7bd2613a3786ce32007452fd51f2bf1b" + }, { - "address": "851e90eaef1fa27debaee2c2591503bdeec1d123", - "publicKey": "b88a5928e54cbf0a36e0b98f5bcf02de9a9a1deba6994739f9160181a609f516eb702936a0cbf4c1f2e7e6be5b8272f2", - "committees": [ - 1 - ], - "netAddress": "tcp://node-1", - "stakedAmount": 1000000000, - "output": "851e90eaef1fa27debaee2c2591503bdeec1d123" + "address": "33e14ef6b87fb688b829c5e29618bb549dc7b4cd", + "publicKey": "b346ad1f1809adc64d4a06e5be4d9960a018faada129d659623fd97846d5127de550218929ec9de4af9d9cbd1ec52d46", + "committees": [2], + "netAddress": "tcp://lava-9.com", + "stakedAmount": 1, + "output": "b587ddd8b58134c61ec52e5b1e88a27a5e07be82" }, { - "address": "02cd4e5eb53ea665702042a6ed6d31d616054dc5", - "publicKey": "98d45087a99bcbfde91993502e77dde869d4485c3778fe46513958320da560823d56a0108f4cf3513393f4d561bc489b", - "committees": [ - 1 - ], - "netAddress": "tcp://node-2", - "stakedAmount": 1000000000, - "output": "02cd4e5eb53ea665702042a6ed6d31d616054dc5" + "address": "b9c6c2dfa9d049e480c8cec9c29463abf078a594", + "publicKey": "9174b24ba27fe8a0f8616bf1a382d81428ef8a94b7ff70916bb9b266c9070bd848e49a0c33a1c17553416c949ea3409a", + "committees": [2], + "netAddress": "tcp://cleanmarro.com", + "stakedAmount": 1, + "output": "8f5633ac35fc17fe113b8ecaa2060289607c0352" } ], "params": { "consensus": { "blockSize": 1000000, "protocolVersion": "1/0", - "rootChainID": 1, - "retired": 0 + "rootChainID": 1 }, "validator": { - "unstakingBlocks": 2, - "maxPauseBlocks": 4380, + "unstakingBlocks": 30240, + "maxPauseBlocks": 30240, "doubleSignSlashPercentage": 10, "nonSignSlashPercentage": 1, - "maxNonSign": 4, - "nonSignWindow": 10, - "maxCommittees": 15, + "maxNonSign": 60, + "nonSignWindow": 100, + "maxCommittees": 16, "maxCommitteeSize": 100, - "earlyWithdrawalPenalty": 20, - "delegateUnstakingBlocks": 2, - "minimumOrderSize": 1000, + "earlyWithdrawalPenalty": 0, + "delegateUnstakingBlocks": 12960, + "minimumOrderSize": 1000000000, "stakePercentForSubsidizedCommittee": 33, "maxSlashPerCommittee": 15, "delegateRewardPercentage": 10, - "buyDeadlineBlocks": 15, + "buyDeadlineBlocks": 60, "lockOrderFeeMultiplier": 2 }, "fee": { @@ -71,14 +130,14 @@ "unpauseFee": 10000, "changeParameterFee": 10000, "daoTransferFee": 10000, + "certificateResultsFee": 0, "subsidyFee": 10000, "createOrderFee": 10000, "editOrderFee": 10000, "deleteOrderFee": 10000 }, "governance": { - "daoRewardPercentage": 10 + "daoRewardPercentage": 5 } - }, - "supply": null + } } diff --git a/.docker/volumes/node_2/keystore.json b/.docker/volumes/node_2/keystore.json index dece94a769..7a73a41bfd 100644 --- a/.docker/volumes/node_2/keystore.json +++ b/.docker/volumes/node_2/keystore.json @@ -1,30 +1,2 @@ { - "addressMap": { - "02cd4e5eb53ea665702042a6ed6d31d616054dc5": { - "publicKey": "98d45087a99bcbfde91993502e77dde869d4485c3778fe46513958320da560823d56a0108f4cf3513393f4d561bc489b", - "salt": "74f0112bcffc91215b6f6266acec38ca", - "encrypted": "183444bb69d2693a892e90ef7ebca9167719113488e4e803f8e87603ea84ccb40c423bc72db7303e81d7d216368ed763", - "keyAddress": "02cd4e5eb53ea665702042a6ed6d31d616054dc5", - "keyNickname": "node_2" - }, - "6f94783856d5ce46d24dd5946215086211d70776": { - "publicKey": "abda38eb50fbe53db9e9c3b141c6a1ec54ad40a4840e34784c975da4ee175eb4c5dd10b6d759ae8fdf8bc22511bbd97b", - "salt": "cfbafc41835a47660f822ee26112d2c6", - "encrypted": "f18135d9509b41b5edc42e74d22396cba3f11fd8a5acae008a49b6e8bd3540a48f74c0d9e65872b922091286a531eee7", - "keyAddress": "6f94783856d5ce46d24dd5946215086211d70776", - "keyNickname": "node_3" - }, - "851e90eaef1fa27debaee2c2591503bdeec1d123": { - "publicKey": "b88a5928e54cbf0a36e0b98f5bcf02de9a9a1deba6994739f9160181a609f516eb702936a0cbf4c1f2e7e6be5b8272f2", - "salt": "3bff15134210c811e308eaa9b7b6024c", - "encrypted": "8b757090dfc98bfbff4f5972f0ae4bb0339a82a753f633cd37aa921955d76cda6a5f521120e7559eb57f497e88f7f555", - "keyAddress": "851e90eaef1fa27debaee2c2591503bdeec1d123", - "keyNickname": "node_1" - } - }, - "nicknameMap": { - "node_1": "851e90eaef1fa27debaee2c2591503bdeec1d123", - "node_2": "02cd4e5eb53ea665702042a6ed6d31d616054dc5", - "node_3": "6f94783856d5ce46d24dd5946215086211d70776" - } } \ No newline at end of file From 2e5ae78f2ecbc79cddd21ef3103124988596054f Mon Sep 17 00:00:00 2001 From: Pablo Date: Tue, 28 Oct 2025 02:28:46 -0400 Subject: [PATCH 04/25] chore: fix listen address node 2 --- .docker/volumes/node_2/config.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.docker/volumes/node_2/config.json b/.docker/volumes/node_2/config.json index e55b8d639b..3ee96dde13 100644 --- a/.docker/volumes/node_2/config.json +++ b/.docker/volumes/node_2/config.json @@ -25,7 +25,7 @@ "dbName": "canopy", "inMemory": false, "networkID": 1, - "listenAddress": "0.0.0.0:9001", + "listenAddress": "0.0.0.0:9002", "externalAddress": "node-2", "maxInbound": 21, "maxOutbound": 7, From 012786e71e2ca35ff86c58046f3555357d444d69 Mon Sep 17 00:00:00 2001 From: Pablo Date: Thu, 30 Oct 2025 21:44:32 -0400 Subject: [PATCH 05/25] fix: memory leak --- store/smt.go | 28 +++++++++++++++++++++++++--- 1 file changed, 25 insertions(+), 3 deletions(-) diff --git a/store/smt.go b/store/smt.go index 130e7d5dbd..e96f5b9fbb 100644 --- a/store/smt.go +++ b/store/smt.go @@ -245,6 +245,10 @@ func (s *SMT) CommitParallel(unsortedOps map[uint64]valueOp) (err lib.ErrorI) { subtree.reset() // process operations in this subtree err := subtree.commit(true) + // explicitly clear subtree resources to help GC + subtree.nodeCache = nil + subtree.operations = nil + subtree.traversed = nil // send result back resultChan <- subtreeResult{index: idx, err: err} }(i, groups[i], subtreeRoots[i]) @@ -259,6 +263,14 @@ func (s *SMT) CommitParallel(unsortedOps map[uint64]valueOp) (err lib.ErrorI) { return result.err } } + + // close channel to free resources + close(resultChan) + + // clear groups to free memory before final steps + for i := range groups { + groups[i] = nil + } // after all subtrees are processed, the tree state is already consistent // because each subtree operation updated the shared store @@ -267,6 +279,9 @@ func (s *SMT) CommitParallel(unsortedOps map[uint64]valueOp) (err lib.ErrorI) { if err != nil { return err } + + // clear the main tree's nodeCache to free memory after parallel processing + s.nodeCache = make(map[string]*node) return nil } @@ -547,13 +562,19 @@ func (s *SMT) getSubtreeRoots() (roots []*node, err lib.ErrorI) { return } -// createSubtree() initializes the subtree structure +// createSubtree() initializes the subtree structure with a smaller cache size func (s *SMT) createSubtree(root *node, operations []*node) *SMT { + // Use a smaller initial cache size for subtrees to reduce memory pressure + // The cache will grow as needed but start smaller + initialCacheSize := len(operations) * 2 // heuristic: 2x operations + if initialCacheSize > MaxCacheSize/NumSubtrees { + initialCacheSize = MaxCacheSize / NumSubtrees + } return &SMT{ store: s.store, root: root, keyBitLength: s.keyBitLength, - nodeCache: make(map[string]*node), + nodeCache: make(map[string]*node, initialCacheSize), operations: operations, minKey: s.minKey, maxKey: s.maxKey, @@ -620,7 +641,8 @@ func (s *SMT) resetGCP() { func (s *SMT) setNode(n *node) lib.ErrorI { // check cache max size if len(s.nodeCache) >= MaxCacheSize { - s.nodeCache = make(map[string]*node, MaxCacheSize) + // create new cache and let GC collect the old one + s.nodeCache = make(map[string]*node) } // set in cache s.nodeCache[string(n.Key.bytes())] = n From 3de94a5c81a669c986c8f0edfaf3cd2971ad4f88 Mon Sep 17 00:00:00 2001 From: Pablo Date: Fri, 31 Oct 2025 18:20:45 -0400 Subject: [PATCH 06/25] chore: more leak fixes --- store/smt.go | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/store/smt.go b/store/smt.go index e96f5b9fbb..337171e2b7 100644 --- a/store/smt.go +++ b/store/smt.go @@ -535,6 +535,9 @@ func (s *SMT) addSyntheticBorders() (cleanup func() lib.ErrorI, err lib.ErrorI) } // reset the operations s.operations = saved + // CRITICAL: clear nodeCache after border insertion to prevent memory accumulation + // during parallel processing. The border insertion traverses and caches many nodes. + s.nodeCache = make(map[string]*node) // define a cleanup function to remove the borders cleanup = func() lib.ErrorI { // reset the SMT @@ -545,7 +548,14 @@ func (s *SMT) addSyntheticBorders() (cleanup func() lib.ErrorI, err lib.ErrorI) s.operations[i] = &node{Key: n.Key, delete: true} } // remove synthetic borders - return s.commit(false) + if err := s.commit(false); err != nil { + return err + } + // CRITICAL: clear nodeCache after border cleanup to prevent memory leaks + // This cleanup runs in defer at the very end, and the cache can accumulate + // nodes from the border deletion operations. + s.nodeCache = make(map[string]*node) + return nil } return } From f47b69d32111c463e2a9f46619a08a4905ff9aea Mon Sep 17 00:00:00 2001 From: Pablo Date: Mon, 3 Nov 2025 16:04:38 -0400 Subject: [PATCH 07/25] Revert "chore: more leak fixes" This reverts commit 3de94a5c81a669c986c8f0edfaf3cd2971ad4f88. --- store/smt.go | 12 +----------- 1 file changed, 1 insertion(+), 11 deletions(-) diff --git a/store/smt.go b/store/smt.go index 337171e2b7..e96f5b9fbb 100644 --- a/store/smt.go +++ b/store/smt.go @@ -535,9 +535,6 @@ func (s *SMT) addSyntheticBorders() (cleanup func() lib.ErrorI, err lib.ErrorI) } // reset the operations s.operations = saved - // CRITICAL: clear nodeCache after border insertion to prevent memory accumulation - // during parallel processing. The border insertion traverses and caches many nodes. - s.nodeCache = make(map[string]*node) // define a cleanup function to remove the borders cleanup = func() lib.ErrorI { // reset the SMT @@ -548,14 +545,7 @@ func (s *SMT) addSyntheticBorders() (cleanup func() lib.ErrorI, err lib.ErrorI) s.operations[i] = &node{Key: n.Key, delete: true} } // remove synthetic borders - if err := s.commit(false); err != nil { - return err - } - // CRITICAL: clear nodeCache after border cleanup to prevent memory leaks - // This cleanup runs in defer at the very end, and the cache can accumulate - // nodes from the border deletion operations. - s.nodeCache = make(map[string]*node) - return nil + return s.commit(false) } return } From a9977826dfd6b72007b35a02bbc07a738e2239fb Mon Sep 17 00:00:00 2001 From: Pablo Date: Mon, 3 Nov 2025 16:04:53 -0400 Subject: [PATCH 08/25] Revert "fix: memory leak" This reverts commit 012786e71e2ca35ff86c58046f3555357d444d69. --- store/smt.go | 28 +++------------------------- 1 file changed, 3 insertions(+), 25 deletions(-) diff --git a/store/smt.go b/store/smt.go index e96f5b9fbb..130e7d5dbd 100644 --- a/store/smt.go +++ b/store/smt.go @@ -245,10 +245,6 @@ func (s *SMT) CommitParallel(unsortedOps map[uint64]valueOp) (err lib.ErrorI) { subtree.reset() // process operations in this subtree err := subtree.commit(true) - // explicitly clear subtree resources to help GC - subtree.nodeCache = nil - subtree.operations = nil - subtree.traversed = nil // send result back resultChan <- subtreeResult{index: idx, err: err} }(i, groups[i], subtreeRoots[i]) @@ -263,14 +259,6 @@ func (s *SMT) CommitParallel(unsortedOps map[uint64]valueOp) (err lib.ErrorI) { return result.err } } - - // close channel to free resources - close(resultChan) - - // clear groups to free memory before final steps - for i := range groups { - groups[i] = nil - } // after all subtrees are processed, the tree state is already consistent // because each subtree operation updated the shared store @@ -279,9 +267,6 @@ func (s *SMT) CommitParallel(unsortedOps map[uint64]valueOp) (err lib.ErrorI) { if err != nil { return err } - - // clear the main tree's nodeCache to free memory after parallel processing - s.nodeCache = make(map[string]*node) return nil } @@ -562,19 +547,13 @@ func (s *SMT) getSubtreeRoots() (roots []*node, err lib.ErrorI) { return } -// createSubtree() initializes the subtree structure with a smaller cache size +// createSubtree() initializes the subtree structure func (s *SMT) createSubtree(root *node, operations []*node) *SMT { - // Use a smaller initial cache size for subtrees to reduce memory pressure - // The cache will grow as needed but start smaller - initialCacheSize := len(operations) * 2 // heuristic: 2x operations - if initialCacheSize > MaxCacheSize/NumSubtrees { - initialCacheSize = MaxCacheSize / NumSubtrees - } return &SMT{ store: s.store, root: root, keyBitLength: s.keyBitLength, - nodeCache: make(map[string]*node, initialCacheSize), + nodeCache: make(map[string]*node), operations: operations, minKey: s.minKey, maxKey: s.maxKey, @@ -641,8 +620,7 @@ func (s *SMT) resetGCP() { func (s *SMT) setNode(n *node) lib.ErrorI { // check cache max size if len(s.nodeCache) >= MaxCacheSize { - // create new cache and let GC collect the old one - s.nodeCache = make(map[string]*node) + s.nodeCache = make(map[string]*node, MaxCacheSize) } // set in cache s.nodeCache[string(n.Key.bytes())] = n From 84f72df72fdb4b42f428ef09771c60c3f5c55e5f Mon Sep 17 00:00:00 2001 From: Pablo Date: Thu, 6 Nov 2025 23:45:30 -0400 Subject: [PATCH 09/25] fix: add locks to prevent rc panic --- cmd/rpc/sock.go | 18 +++++++++++++----- 1 file changed, 13 insertions(+), 5 deletions(-) diff --git a/cmd/rpc/sock.go b/cmd/rpc/sock.go index 84d3784278..a9b98ff357 100644 --- a/cmd/rpc/sock.go +++ b/cmd/rpc/sock.go @@ -68,8 +68,12 @@ func (r *RCManager) Publish(chainId uint64, info *lib.RootChainInfo) { } // for each ws client for _, subscriber := range r.subscribers[chainId] { + // lock to prevent concurrent writes to the websocket connection + subscriber.writeMux.Lock() // publish to each client - if e := subscriber.conn.WriteMessage(websocket.BinaryMessage, protoBytes); e != nil { + e := subscriber.conn.WriteMessage(websocket.BinaryMessage, protoBytes) + subscriber.writeMux.Unlock() + if e != nil { // defer the Stop() call to prevent the slice modification during iteration. // since Stop() removes the subscriber from r.subscribers, immediate execution // would affect the slice that is currently being iterated. @@ -411,10 +415,11 @@ func (r *RCSubscription) Stop(err error) { // RCSubscriber (Root Chain Subscriber) implements an efficient publishing service to nested chain subscribers type RCSubscriber struct { - chainId uint64 // the chain id of the publisher - manager *RCManager // a reference to the manager of the ws clients - conn *websocket.Conn // the underlying ws connection - log lib.LoggerI // stdout log + chainId uint64 // the chain id of the publisher + manager *RCManager // a reference to the manager of the ws clients + conn *websocket.Conn // the underlying ws connection + log lib.LoggerI // stdout log + writeMux sync.Mutex // protects concurrent writes to the websocket connection } // WebSocket() upgrades a http request to a websockets connection @@ -474,6 +479,9 @@ func (r *RCManager) RemoveSubscriber(chainId uint64, subscriber *RCSubscriber) { func (r *RCSubscriber) Stop(err error) { // log the error r.log.Errorf("WS Failed with err: %s", err.Error()) + // lock to prevent concurrent writes during close + r.writeMux.Lock() + defer r.writeMux.Unlock() // close the connection if err = r.conn.Close(); err != nil { r.log.Error(err.Error()) From ff608efac5d7c7db9eb5ac665518ff30dfba9a67 Mon Sep 17 00:00:00 2001 From: Roniel Valdez Date: Fri, 7 Nov 2025 16:17:48 +0100 Subject: [PATCH 10/25] fix: batch commit err --- store/store.go | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/store/store.go b/store/store.go index a3834a1763..654669f398 100644 --- a/store/store.go +++ b/store/store.go @@ -5,6 +5,7 @@ import ( "fmt" "math" "path/filepath" + "reflect" "sync" "sync/atomic" "time" @@ -275,6 +276,16 @@ func (s *Store) Commit() (root []byte, err lib.ErrorI) { } // extract the internal metrics from the pebble batch size, count := len(s.writer.Repr()), s.writer.Count() + + // use reflection to check the internal 'committing' field + writerValue := reflect.ValueOf(s.writer).Elem() + committingField := writerValue.FieldByName("committing") + if committingField.IsValid() { + isCommitting := committingField.Bool() + if isCommitting { + return nil, ErrCloseDB(fmt.Errorf("batch is still committing")) + } + } // finally commit the entire Transaction to the actual DB under the proper version (height) number // NOTE: PebbleDB has a non deterministic issue where batch.Commit(pebble.WriteOptions{}) // could panic with a nil pointer dereference in applyInternal(). This occurs because the batch's From 9be2d73394a029e25a20e67f550816bb6947af24 Mon Sep 17 00:00:00 2001 From: Pablo Date: Thu, 13 Nov 2025 00:41:28 -0400 Subject: [PATCH 11/25] fix: move lock in compact to the whole function --- store/store.go | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/store/store.go b/store/store.go index 5bee2cf6de..b051561514 100644 --- a/store/store.go +++ b/store/store.go @@ -482,6 +482,8 @@ func (s *Store) MaybeCompact() { // Compact deletes all entries marked for compaction on the given prefix range. // it iterates over the prefix, deletes tombstone entries, and performs DB compaction func (s *Store) Compact(version uint64, compactHSS bool) lib.ErrorI { + s.mu.Lock() // lock compact op + defer s.mu.Unlock() // unlock compact op // first compaction: latest state keys startPrefix, endPrefix := latestStatePrefix, prefixEnd(latestStatePrefix) // track current time and version @@ -515,12 +517,9 @@ func (s *Store) Compact(version uint64, compactHSS bool) lib.ErrorI { return nil } // commit the batch - s.mu.Lock() // lock commit op if err := batch.Commit(pebble.Sync); err != nil { - s.mu.Unlock() // unlock commit op return ErrCommitDB(err) } - s.mu.Unlock() // unlock commit op batchTime := time.Since(now) // perform a flush to ensure memtables are flushed to disk if err := s.db.Flush(); err != nil { From 265d6d82e33f099edcf55cc0411cc75ef2f4f470 Mon Sep 17 00:00:00 2001 From: Pablo Date: Thu, 13 Nov 2025 01:40:05 -0400 Subject: [PATCH 12/25] fix: change copy store to share mutexes --- store/store.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/store/store.go b/store/store.go index b051561514..c9209303fe 100644 --- a/store/store.go +++ b/store/store.go @@ -222,7 +222,7 @@ func (s *Store) Copy() (lib.StoreI, lib.ErrorI) { ss: s.ss.Copy(lssReader, lssReader), Indexer: &Indexer{s.Indexer.db.Copy(reader, reader), s.config}, metrics: s.metrics, - mu: &sync.Mutex{}, + mu: s.mu, compaction: atomic.Bool{}, }, nil } From 4832c38a0c162019caecf082f0691776995e4fa1 Mon Sep 17 00:00:00 2001 From: Pablo Date: Thu, 13 Nov 2025 11:49:20 -0400 Subject: [PATCH 13/25] Revert "fix: move lock in compact to the whole function" This reverts commit 9be2d73394a029e25a20e67f550816bb6947af24. --- store/store.go | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/store/store.go b/store/store.go index c9209303fe..5bee2cf6de 100644 --- a/store/store.go +++ b/store/store.go @@ -222,7 +222,7 @@ func (s *Store) Copy() (lib.StoreI, lib.ErrorI) { ss: s.ss.Copy(lssReader, lssReader), Indexer: &Indexer{s.Indexer.db.Copy(reader, reader), s.config}, metrics: s.metrics, - mu: s.mu, + mu: &sync.Mutex{}, compaction: atomic.Bool{}, }, nil } @@ -482,8 +482,6 @@ func (s *Store) MaybeCompact() { // Compact deletes all entries marked for compaction on the given prefix range. // it iterates over the prefix, deletes tombstone entries, and performs DB compaction func (s *Store) Compact(version uint64, compactHSS bool) lib.ErrorI { - s.mu.Lock() // lock compact op - defer s.mu.Unlock() // unlock compact op // first compaction: latest state keys startPrefix, endPrefix := latestStatePrefix, prefixEnd(latestStatePrefix) // track current time and version @@ -517,9 +515,12 @@ func (s *Store) Compact(version uint64, compactHSS bool) lib.ErrorI { return nil } // commit the batch + s.mu.Lock() // lock commit op if err := batch.Commit(pebble.Sync); err != nil { + s.mu.Unlock() // unlock commit op return ErrCommitDB(err) } + s.mu.Unlock() // unlock commit op batchTime := time.Since(now) // perform a flush to ensure memtables are flushed to disk if err := s.db.Flush(); err != nil { From bf712662ca46a916db30a772977ebdc1a8589ef2 Mon Sep 17 00:00:00 2001 From: Pablo Date: Thu, 13 Nov 2025 12:47:00 -0400 Subject: [PATCH 14/25] chore: add reflect check --- store/store.go | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/store/store.go b/store/store.go index 5bee2cf6de..d64144a535 100644 --- a/store/store.go +++ b/store/store.go @@ -5,6 +5,7 @@ import ( "fmt" "math" "path/filepath" + "reflect" "sync" "sync/atomic" "time" @@ -516,6 +517,20 @@ func (s *Store) Compact(version uint64, compactHSS bool) lib.ErrorI { } // commit the batch s.mu.Lock() // lock commit op + // check if batch's db closed field is not nil using reflection + batchValue := reflect.ValueOf(batch).Elem() + batchDBField := batchValue.FieldByName("db") + if batchDBField.IsValid() && !batchDBField.IsNil() { + dbValue := batchDBField.Elem() + closedField := dbValue.FieldByName("closed") + if closedField.IsNil() { + s.mu.Unlock() // unlock commit op + s.log.Debugf("key compaction skipped [%d]: closed field is nil", version) + return nil + } + } else { + s.log.Debugf("key compaction reflect error: db field empty") // this comment for now is just to debug the reflect itself just in case + } if err := batch.Commit(pebble.Sync); err != nil { s.mu.Unlock() // unlock commit op return ErrCommitDB(err) From aa4efe0c8636f4deb9561a1751527bf8d6737239 Mon Sep 17 00:00:00 2001 From: Pablo Date: Thu, 13 Nov 2025 16:42:40 -0400 Subject: [PATCH 15/25] Revert "chore: add reflect check" This reverts commit bf712662ca46a916db30a772977ebdc1a8589ef2. --- store/store.go | 15 --------------- 1 file changed, 15 deletions(-) diff --git a/store/store.go b/store/store.go index d64144a535..5bee2cf6de 100644 --- a/store/store.go +++ b/store/store.go @@ -5,7 +5,6 @@ import ( "fmt" "math" "path/filepath" - "reflect" "sync" "sync/atomic" "time" @@ -517,20 +516,6 @@ func (s *Store) Compact(version uint64, compactHSS bool) lib.ErrorI { } // commit the batch s.mu.Lock() // lock commit op - // check if batch's db closed field is not nil using reflection - batchValue := reflect.ValueOf(batch).Elem() - batchDBField := batchValue.FieldByName("db") - if batchDBField.IsValid() && !batchDBField.IsNil() { - dbValue := batchDBField.Elem() - closedField := dbValue.FieldByName("closed") - if closedField.IsNil() { - s.mu.Unlock() // unlock commit op - s.log.Debugf("key compaction skipped [%d]: closed field is nil", version) - return nil - } - } else { - s.log.Debugf("key compaction reflect error: db field empty") // this comment for now is just to debug the reflect itself just in case - } if err := batch.Commit(pebble.Sync); err != nil { s.mu.Unlock() // unlock commit op return ErrCommitDB(err) From fa719bc49e12df614b055a6b5d743bdebf31729b Mon Sep 17 00:00:00 2001 From: Pablo Date: Thu, 13 Nov 2025 16:58:14 -0400 Subject: [PATCH 16/25] Reapply "chore: add reflect check" This reverts commit aa4efe0c8636f4deb9561a1751527bf8d6737239. --- store/store.go | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/store/store.go b/store/store.go index 5bee2cf6de..d64144a535 100644 --- a/store/store.go +++ b/store/store.go @@ -5,6 +5,7 @@ import ( "fmt" "math" "path/filepath" + "reflect" "sync" "sync/atomic" "time" @@ -516,6 +517,20 @@ func (s *Store) Compact(version uint64, compactHSS bool) lib.ErrorI { } // commit the batch s.mu.Lock() // lock commit op + // check if batch's db closed field is not nil using reflection + batchValue := reflect.ValueOf(batch).Elem() + batchDBField := batchValue.FieldByName("db") + if batchDBField.IsValid() && !batchDBField.IsNil() { + dbValue := batchDBField.Elem() + closedField := dbValue.FieldByName("closed") + if closedField.IsNil() { + s.mu.Unlock() // unlock commit op + s.log.Debugf("key compaction skipped [%d]: closed field is nil", version) + return nil + } + } else { + s.log.Debugf("key compaction reflect error: db field empty") // this comment for now is just to debug the reflect itself just in case + } if err := batch.Commit(pebble.Sync); err != nil { s.mu.Unlock() // unlock commit op return ErrCommitDB(err) From ade472df5b4c9f22cc171f1e5ccd7e2cc1c1a700 Mon Sep 17 00:00:00 2001 From: Pablo Date: Thu, 13 Nov 2025 16:59:39 -0400 Subject: [PATCH 17/25] chore: also skip if the db field is empty --- store/store.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/store/store.go b/store/store.go index d64144a535..3251ddd5fa 100644 --- a/store/store.go +++ b/store/store.go @@ -529,7 +529,9 @@ func (s *Store) Compact(version uint64, compactHSS bool) lib.ErrorI { return nil } } else { - s.log.Debugf("key compaction reflect error: db field empty") // this comment for now is just to debug the reflect itself just in case + s.mu.Unlock() // unlock commit op + s.log.Debugf("key compaction skipped [%d]: db field is nil", version) + return nil } if err := batch.Commit(pebble.Sync); err != nil { s.mu.Unlock() // unlock commit op From d26020f08e91521d5250f1b10670e56e5d98396c Mon Sep 17 00:00:00 2001 From: Pablo Date: Fri, 14 Nov 2025 16:54:00 -0400 Subject: [PATCH 18/25] chore: not close batch if it is committing --- store/store.go | 21 +++------------------ 1 file changed, 3 insertions(+), 18 deletions(-) diff --git a/store/store.go b/store/store.go index 3251ddd5fa..15a5cb24c0 100644 --- a/store/store.go +++ b/store/store.go @@ -5,7 +5,6 @@ import ( "fmt" "math" "path/filepath" - "reflect" "sync" "sync/atomic" "time" @@ -494,7 +493,6 @@ func (s *Store) Compact(version uint64, compactHSS bool) lib.ErrorI { defer cancel() // create a batch to set the keys to be removed by the compaction batch := s.db.NewBatch() - defer batch.Close() // set metrics to log total, toDelete := 0, 0 // collect and delete tombstone entries @@ -508,32 +506,19 @@ func (s *Store) Compact(version uint64, compactHSS bool) lib.ErrorI { } }) if err != nil { + batch.Close() return ErrCommitDB(err) } // if nothing to delete, skip compaction if batch.Empty() { + batch.Close() s.log.Debugf("key compaction finished [%d], no values to delete", version) return nil } // commit the batch s.mu.Lock() // lock commit op - // check if batch's db closed field is not nil using reflection - batchValue := reflect.ValueOf(batch).Elem() - batchDBField := batchValue.FieldByName("db") - if batchDBField.IsValid() && !batchDBField.IsNil() { - dbValue := batchDBField.Elem() - closedField := dbValue.FieldByName("closed") - if closedField.IsNil() { - s.mu.Unlock() // unlock commit op - s.log.Debugf("key compaction skipped [%d]: closed field is nil", version) - return nil - } - } else { - s.mu.Unlock() // unlock commit op - s.log.Debugf("key compaction skipped [%d]: db field is nil", version) - return nil - } if err := batch.Commit(pebble.Sync); err != nil { + batch.Close() s.mu.Unlock() // unlock commit op return ErrCommitDB(err) } From f591d99c36a0e45f1aaa9585ae0c49e3ee3c2a45 Mon Sep 17 00:00:00 2001 From: Pablo Date: Fri, 15 May 2026 12:21:46 -0400 Subject: [PATCH 19/25] fix: smt fixes and memory efficiencies --- controller/block.go | 60 +++++++++++++------------ controller/consensus.go | 10 +++++ store/smt.go | 77 ++++++++++++++++---------------- store/txn.go | 95 ++++++++++++++++++++++++++++++++++++++++ store/versioned_store.go | 12 +++++ 5 files changed, 189 insertions(+), 65 deletions(-) diff --git a/controller/block.go b/controller/block.go index 3798914592..04bd250420 100644 --- a/controller/block.go +++ b/controller/block.go @@ -277,12 +277,15 @@ func (c *Controller) CommitCertificate(qc *lib.QuorumCertificate, block *lib.Blo // exit with error return } - // delete each transaction from the mempool - c.Mempool.DeleteTransaction(block.Transactions...) + syncing := c.isSyncing.Load() + if !syncing { + // delete each transaction from the mempool + c.Mempool.DeleteTransaction(block.Transactions...) + } // parse committed block for straw polls c.FSM.ParsePollTransactions(blockResult) // if self was the proposer - if bytes.Equal(qc.ProposerKey, c.PublicKey) && !c.isSyncing.Load() { + if bytes.Equal(qc.ProposerKey, c.PublicKey) && !syncing { // send the certificate results transaction on behalf of the quorum c.SendCertificateResultsTx(qc) } @@ -301,35 +304,38 @@ func (c *Controller) CommitCertificate(qc *lib.QuorumCertificate, block *lib.Blo // exit with error return err } - // reset the current mempool store to prepare for the next height - c.Mempool.FSM.Discard() - // set up the mempool with the actual new FSM for the next height - // this makes c.Mempool.FSM.Reset() is unnecessary - if c.Mempool.FSM, err = c.FSM.Copy(); err != nil { - // exit with error - return err + if !syncing { + // reset the current mempool store to prepare for the next height + c.Mempool.FSM.Discard() + // set up the mempool with the actual new FSM for the next height + if c.Mempool.FSM, err = c.FSM.Copy(); err != nil { + // exit with error + return err + } + // check the mempool to cache a proposal block and validate the mempool itself + c.Mempool.CheckMempool() + // reset mempool FSM + c.Mempool.FSM.Reset() } - // check the mempool to cache a proposal block and validate the mempool itself - c.Mempool.CheckMempool() - // reset mempool FSM - c.Mempool.FSM.Reset() // update telemetry (using proper defer to ensure time.Since is evaluated at defer execution) defer c.UpdateTelemetry(qc, block, time.Since(start)) - // publish root chain information to all nested chain subscribers. - for _, id := range c.RCManager.ChainIds() { - // get the root chain info - info, e := c.FSM.LoadRootChainInfo(id, 0) - if e != nil { - // don't log 'no-validators' error as this is possible - if e.Error() != lib.ErrNoValidators().Error() { - c.log.Error(e.Error()) + if !syncing { + // publish root chain information to all nested chain subscribers + for _, id := range c.RCManager.ChainIds() { + // get the root chain info + info, e := c.FSM.LoadRootChainInfo(id, 0) + if e != nil { + // don't log 'no-validators' error as this is possible + if e.Error() != lib.ErrNoValidators().Error() { + c.log.Error(e.Error()) + } + continue } - continue + // set the timestamp + info.Timestamp = ts + // publish root chain information + go c.RCManager.Publish(id, info) } - // set the timestamp - info.Timestamp = ts - // publish root chain information - go c.RCManager.Publish(id, info) } // exit return diff --git a/controller/consensus.go b/controller/consensus.go index c8a065b92a..5ed3d05a51 100644 --- a/controller/consensus.go +++ b/controller/consensus.go @@ -740,6 +740,16 @@ func (c *Controller) finishSyncing() { c.Lock() // when function completes, unlock defer c.Unlock() + // reinitialize the mempool now that sync is complete + c.Mempool.L.Lock() + c.Mempool.Clear() + c.Mempool.FSM.Discard() + if mFSM, err := c.FSM.Copy(); err == nil { + c.Mempool.FSM = mFSM + } + c.Mempool.CheckMempool() + c.Mempool.FSM.Reset() + c.Mempool.L.Unlock() // set the startup block metric (block height when first sync completed) c.Metrics.SetStartupBlock(c.FSM.Height()) // signal a reset of bft for the chain diff --git a/store/smt.go b/store/smt.go index a9d718a76f..33dad8b421 100644 --- a/store/smt.go +++ b/store/smt.go @@ -190,79 +190,93 @@ func (s *SMT) Commit(unsortedOps map[uint64]valueOp) (err lib.ErrorI) { // CommitParallel() executes deferred operations in parallel by partitioning them into // 8 subtrees based on their 3-bit prefix (000-111), avoiding conflicts between operations -// that would modify overlapping tree regions. Each subtree is processed independently, -// then the results are merged back into the main tree. +// that would modify overlapping tree regions. Each subtree is processed independently +// with its own Txn copy to avoid lock contention, then the results are merged back into +// the main tree. func (s *SMT) CommitParallel(unsortedOps map[uint64]valueOp) (err lib.ErrorI) { // if there are too few operations, fall back to sequential processing // if len(unsortedOps) < NumSubtrees*2 { // return s.Commit(unsortedOps) // } - // sort operations by their 3-bit prefix to avoid conflicts + parentTxn, ok := s.store.(*Txn) + if !ok { + return s.Commit(unsortedOps) + } + groups, err := s.sortOperationsByPrefix(unsortedOps) if err != nil { return err } - // add synthetic borders to enable safe parallel processing cleanup, err := s.addSyntheticBorders() if err != nil { return err } defer func() { - // cleanup synthetic borders regardless of success/failure if cleanupErr := cleanup(); cleanupErr != nil && err == nil { err = cleanupErr } }() - // get subtree roots for parallel processing subtreeRoots, err := s.getSubtreeRoots() if err != nil { return err } - // process subtrees in parallel using goroutines type subtreeResult struct { index int + store *subtreeStore err lib.ErrorI } resultChan := make(chan subtreeResult, NumSubtrees) activeSubtrees := 0 - // launch goroutines for each subtree that has operations for i := 0; i < NumSubtrees; i++ { if len(groups[i]) == 0 { - continue // skip empty groups + continue } - activeSubtrees++ - go func(idx int, ops []*node, root *node) { - // create an isolated subtree for this prefix - subtree := s.createSubtree(root, ops) - // reset subtree state for processing + st := parentTxn.newSubtreeStore() + + go func(idx int, ops []*node, root *node, store *subtreeStore) { + subtree := &SMT{ + store: store, + root: root, + keyBitLength: s.keyBitLength, + nodeCache: make(map[string]*node), + operations: ops, + minKey: s.minKey, + maxKey: s.maxKey, + } subtree.reset() - // process operations in this subtree - err := subtree.commit(true) - // send result back - resultChan <- subtreeResult{index: idx, err: err} - }(i, groups[i], subtreeRoots[i]) + commitErr := subtree.commit(true) + resultChan <- subtreeResult{ + index: idx, + store: store, + err: commitErr, + } + }(i, groups[i], subtreeRoots[i], st) } - // collect results from all active subtrees + results := make([]subtreeResult, 0, activeSubtrees) for completed := 0; completed < activeSubtrees; completed++ { result := <-resultChan if result.err != nil { - // if any subtree fails, we need to return the error - // the cleanup function will handle synthetic border removal + // Drain remaining results before returning so goroutines + // finish and don't race with the deferred cleanup. + for completed++; completed < activeSubtrees; completed++ { + <-resultChan + } return result.err } + results = append(results, result) + } + for _, result := range results { + parentTxn.mergeSubtreeOps(result.store) } - // after all subtrees are processed, the tree state is already consistent - // because each subtree operation updated the shared store - // we just need to refresh our in-memory view of the root s.root, err = s.getNode(s.root.Key.bytes()) if err != nil { return err @@ -547,19 +561,6 @@ func (s *SMT) getSubtreeRoots() (roots []*node, err lib.ErrorI) { return } -// createSubtree() initializes the subtree structure -func (s *SMT) createSubtree(root *node, operations []*node) *SMT { - return &SMT{ - store: s.store, - root: root, - keyBitLength: s.keyBitLength, - nodeCache: make(map[string]*node), - operations: operations, - minKey: s.minKey, - maxKey: s.maxKey, - } -} - // sortOperationsByPrefix returns 8 sorted slices grouped by 3-bit prefix: 000 to 111 func (s *SMT) sortOperationsByPrefix(unsortedOps map[uint64]valueOp) (groups [8][]*node, err lib.ErrorI) { // for each unsorted operation diff --git a/store/txn.go b/store/txn.go index 21016b5646..82fb13209e 100644 --- a/store/txn.go +++ b/store/txn.go @@ -2,6 +2,7 @@ package store import ( "bytes" + "fmt" "sync" "github.com/canopy-network/canopy/lib" @@ -274,6 +275,100 @@ func (t *Txn) NewIterator(prefix []byte, reverse bool, seek bool) (lib.IteratorI return newTxnIterator(parentIterator, t.txn, prefix, t.prefix, reverse), nil } +// newSubtreeStore creates a lightweight store for a parallel SMT subtree. +// Reads check: (1) the subtree's own ops, (2) the parent Txn's ops, then +// (3) an independent PebbleDB reader. Writes go only to the subtree's ops. +// After the goroutine completes, call mergeSubtreeOps to fold the subtree +// writes back into the parent Txn. +func (t *Txn) newSubtreeStore() *subtreeStore { + var reader *VersionedStore + if vs, ok := t.reader.(*VersionedStore); ok { + reader = vs.NewParallelReader() + } + t.txn.l.Lock() + parentOps := make(map[uint64]valueOp, len(t.txn.ops)) + for k, v := range t.txn.ops { + parentOps[k] = v + } + t.txn.l.Unlock() + return &subtreeStore{ + ops: make(map[uint64]valueOp), + parentOps: parentOps, + reader: reader, + prefix: t.prefix, + version: t.writeVersion, + } +} + +// mergeSubtreeOps copies all operations from a subtreeStore into the Txn. +// Must only be called when no goroutines are accessing the subtreeStore. +func (t *Txn) mergeSubtreeOps(s *subtreeStore) { + for k, v := range s.ops { + if _, exists := t.txn.ops[k]; !exists && t.sort { + t.addToSorted(v.key, k) + } + t.txn.ops[k] = v + } +} + +// subtreeStore implements lib.RWStoreI for a parallel SMT subtree. It +// provides three-layer reads (own ops → parent ops snapshot → PebbleDB) with +// independent snapshot access to avoid lock contention on the DB path. +// The parent ops are snapshotted at creation time so reads are lock-free. +type subtreeStore struct { + l sync.Mutex + ops map[uint64]valueOp + parentOps map[uint64]valueOp + reader *VersionedStore + prefix []byte + version uint64 +} + +func (s *subtreeStore) Get(key []byte) ([]byte, lib.ErrorI) { + h := lib.MemHash(key) + s.l.Lock() + if v, found := s.ops[h]; found { + s.l.Unlock() + if v.op == opDelete { + return nil, nil + } + return v.value, nil + } + s.l.Unlock() + if v, found := s.parentOps[h]; found { + if v.op == opDelete { + return nil, nil + } + return v.value, nil + } + if s.reader == nil { + return nil, nil + } + return s.reader.Get(lib.Append(s.prefix, key)) +} + +func (s *subtreeStore) Set(key, value []byte) lib.ErrorI { + s.l.Lock() + s.ops[lib.MemHash(key)] = valueOp{key: key, value: value, version: s.version, op: opSet} + s.l.Unlock() + return nil +} + +func (s *subtreeStore) Delete(key []byte) lib.ErrorI { + s.l.Lock() + s.ops[lib.MemHash(key)] = valueOp{key: key, version: s.version, op: opDelete} + s.l.Unlock() + return nil +} + +func (s *subtreeStore) Iterator([]byte) (lib.IteratorI, lib.ErrorI) { + return nil, ErrStoreGet(fmt.Errorf("iterator not supported on subtree store")) +} + +func (s *subtreeStore) RevIterator([]byte) (lib.IteratorI, lib.ErrorI) { + return nil, ErrStoreGet(fmt.Errorf("reverse iterator not supported on subtree store")) +} + // Copy creates a new Txn with the same configuration and txn as the original func (t *Txn) Copy(reader TxnReaderI, writer TxnWriterI) *Txn { return &Txn{ diff --git a/store/versioned_store.go b/store/versioned_store.go index e5d0710dc5..814cb3923a 100644 --- a/store/versioned_store.go +++ b/store/versioned_store.go @@ -77,6 +77,18 @@ func NewVersionedStore(db pebble.Reader, batch *pebble.Batch, version uint64) *V } } +// NewParallelReader creates a read-only VersionedStore sharing the same +// underlying database (snapshot) but with its own buffers, safe for concurrent +// use from a separate goroutine. The returned store must NOT be closed, as it +// does not own the underlying reader. +func (vs *VersionedStore) NewParallelReader() *VersionedStore { + return &VersionedStore{ + db: vs.db, + version: vs.version, + decodeBuffer: make([][]byte, 0, 5), + } +} + // Set() stores a key-value pair at the current version func (vs *VersionedStore) Set(key, value []byte) (err lib.ErrorI) { return vs.SetAt(key, value, vs.version) From 43f6aad329181125162138e33141eef99b1cdcdb Mon Sep 17 00:00:00 2001 From: Pablo Date: Fri, 22 May 2026 16:47:34 -0400 Subject: [PATCH 20/25] chore: add dial peer --- .docker/volumes/node_1/config.json | 19 +++++++++++++------ .docker/volumes/node_2/config.json | 15 ++++++++++++--- 2 files changed, 25 insertions(+), 9 deletions(-) diff --git a/.docker/volumes/node_1/config.json b/.docker/volumes/node_1/config.json index f9da47e9cd..acef7675f7 100644 --- a/.docker/volumes/node_1/config.json +++ b/.docker/volumes/node_1/config.json @@ -26,12 +26,19 @@ "maxInbound": 21, "maxOutbound": 7, "trustedPeerIDs": null, - "dialPeers": [ - "90703d453dfa70af3c85f3605e6cc8222d01d68d439ee5a9ade10be631572b330e1793206c8d417d9693be1d5af417fe@tcp://cnpy.network", - "b338f09135994130bee5da939513241b5d01ba5e73c409ed5ecea597b86a8b9c05fd27fb5e47a7296350fbae6262a484@tcp://cnpynetwork.com", - "9353441f5319ca7b9ee2f8bd764a8c75e3b6b7b13a18b0c4e5252bc0b1232861318b3063255238edc1fb96f08fa2157a@tcp://canopy.seed1.node1.eu.nodefleet.net", - "a3d591f2e602fd18df7c94abf02f6d342939570b169886bb355e6879cd7b9dda8c5d825f7895336d4e29750e65fc65df@tcp://canopy.seed1.node1.us.nodefleet.net" - + "dialPeers": [ + "8ad02e9b05e418f198e89179685a48b227f1c2bc266d4db002f24f8155c5119cbb9387146319b5dc4a260184f20d5d4f@tcp://cnpy.xyz", + "90703d453dfa70af3c85f3605e6cc8222d01d68d439ee5a9ade10be631572b330e1793206c8d417d9693be1d5af417fe@tcp://cnpy.network:9010", + "89c592a8a27bc2ba3783048ebad2ee77b67ce8db1d3706d19135fac44e71d459870791d8face14dde4df6262a6445a27@tcp://minerstellar.com:9020", + "b1891c2a38b553279b46452a4ecc49fa42ecb7c71a27f39efa5b3f817481b443cb44da9be251fa571d1ac43dd5ef7fd9@tcp://agentofthecrown.com:9030", + "b338f09135994130bee5da939513241b5d01ba5e73c409ed5ecea597b86a8b9c05fd27fb5e47a7296350fbae6262a484@tcp://cnpynetwork.com:9040", + "8794c211342dc0da348b16e4e5903ffd7f913c390bd29dbfad0c44b891b08a99d74bc90546d89b54098ac7c0c5331550@tcp://dun3waves.xyz:9050", + "99e38bdc8b7c7f9f8a67151da78994a2616ac127518d4dd8f5a08ad760921758269fcb5172713b9585ec7007f60cb6fe@tcp://canopynode.cam:9060", + "a329a705dab85db2fd950cef9ffd87e27ff4b91b929d5ab26e6efb40b1b3b45e74f7d8162822c49b9909893e6461cc8d@tcp://uem44.com:9070", + "b17d4eb3938957e710bacc9f09d2a9aa79a568fcdf1f8fc565bdb5de3f334295e929e4b086b9c8e9610654155fb0452b@tcp://canopycoin.xyz:9080", + "ab4fe218bb09a27908c6181fc799b8371d895f13173fed5b924af60235a548cd882ba97f911c08d9bda34c46a7dab359@tcp://cryptoicp.com:9090", + "b346ad1f1809adc64d4a06e5be4d9960a018faada129d659623fd97846d5127de550218929ec9de4af9d9cbd1ec52d46@tcp://lava-9.com:9100", + "9174b24ba27fe8a0f8616bf1a382d81428ef8a94b7ff70916bb9b266c9070bd848e49a0c33a1c17553416c949ea3409a@tcp://cleanmarro.com:9110" ], "bannedPeerIDs": null, "bannedIPs": null, diff --git a/.docker/volumes/node_2/config.json b/.docker/volumes/node_2/config.json index 3ee96dde13..04409bf96b 100644 --- a/.docker/volumes/node_2/config.json +++ b/.docker/volumes/node_2/config.json @@ -30,10 +30,19 @@ "maxInbound": 21, "maxOutbound": 7, "trustedPeerIDs": null, - "dialPeers": [ + "dialPeers": [ "8ad02e9b05e418f198e89179685a48b227f1c2bc266d4db002f24f8155c5119cbb9387146319b5dc4a260184f20d5d4f@tcp://cnpy.xyz", - "9353441f5319ca7b9ee2f8bd764a8c75e3b6b7b13a18b0c4e5252bc0b1232861318b3063255238edc1fb96f08fa2157a@tcp://canopy.seed1.node2.eu.nodefleet.net", - "a3d591f2e602fd18df7c94abf02f6d342939570b169886bb355e6879cd7b9dda8c5d825f7895336d4e29750e65fc65df@tcp://canopy.seed1.node2.us.nodefleet.net" + "90703d453dfa70af3c85f3605e6cc8222d01d68d439ee5a9ade10be631572b330e1793206c8d417d9693be1d5af417fe@tcp://cnpy.network:9010", + "89c592a8a27bc2ba3783048ebad2ee77b67ce8db1d3706d19135fac44e71d459870791d8face14dde4df6262a6445a27@tcp://minerstellar.com:9020", + "b1891c2a38b553279b46452a4ecc49fa42ecb7c71a27f39efa5b3f817481b443cb44da9be251fa571d1ac43dd5ef7fd9@tcp://agentofthecrown.com:9030", + "b338f09135994130bee5da939513241b5d01ba5e73c409ed5ecea597b86a8b9c05fd27fb5e47a7296350fbae6262a484@tcp://cnpynetwork.com:9040", + "8794c211342dc0da348b16e4e5903ffd7f913c390bd29dbfad0c44b891b08a99d74bc90546d89b54098ac7c0c5331550@tcp://dun3waves.xyz:9050", + "99e38bdc8b7c7f9f8a67151da78994a2616ac127518d4dd8f5a08ad760921758269fcb5172713b9585ec7007f60cb6fe@tcp://canopynode.cam:9060", + "a329a705dab85db2fd950cef9ffd87e27ff4b91b929d5ab26e6efb40b1b3b45e74f7d8162822c49b9909893e6461cc8d@tcp://uem44.com:9070", + "b17d4eb3938957e710bacc9f09d2a9aa79a568fcdf1f8fc565bdb5de3f334295e929e4b086b9c8e9610654155fb0452b@tcp://canopycoin.xyz:9080", + "ab4fe218bb09a27908c6181fc799b8371d895f13173fed5b924af60235a548cd882ba97f911c08d9bda34c46a7dab359@tcp://cryptoicp.com:9090", + "b346ad1f1809adc64d4a06e5be4d9960a018faada129d659623fd97846d5127de550218929ec9de4af9d9cbd1ec52d46@tcp://lava-9.com:9100", + "9174b24ba27fe8a0f8616bf1a382d81428ef8a94b7ff70916bb9b266c9070bd848e49a0c33a1c17553416c949ea3409a@tcp://cleanmarro.com:9110" ], "bannedPeerIDs": null, "bannedIPs": null, From 44df112954d80ba9c0ed9e87c678a2688d8d3d68 Mon Sep 17 00:00:00 2001 From: Pablo Date: Sat, 30 May 2026 22:48:39 -0400 Subject: [PATCH 21/25] fix: make sync less lock heavy --- controller/block.go | 13 +++++++------ controller/tx.go | 6 ++++++ 2 files changed, 13 insertions(+), 6 deletions(-) diff --git a/controller/block.go b/controller/block.go index 04bd250420..ea5967e61b 100644 --- a/controller/block.go +++ b/controller/block.go @@ -237,11 +237,13 @@ func (c *Controller) ValidateProposal(rcBuildHeight uint64, qc *lib.QuorumCertif // - sets up the controller for the next height func (c *Controller) CommitCertificate(qc *lib.QuorumCertificate, block *lib.Block, blockResult *lib.BlockResult, ts uint64) (err lib.ErrorI) { start := time.Now() - // cancel any running mempool check - c.Mempool.stop() - // lock the mempool - c.Mempool.L.Lock() - defer c.Mempool.L.Unlock() + syncing := c.isSyncing.Load() + if !syncing { + // cancel any running mempool check and lock the mempool for live operation + c.Mempool.stop() + c.Mempool.L.Lock() + defer c.Mempool.L.Unlock() + } // log the beginning of the commit c.log.Debugf("TryCommit block %s", lib.BytesToString(qc.ResultsHash)) // cast the store to ensure the proper store type to complete this operation @@ -277,7 +279,6 @@ func (c *Controller) CommitCertificate(qc *lib.QuorumCertificate, block *lib.Blo // exit with error return } - syncing := c.isSyncing.Load() if !syncing { // delete each transaction from the mempool c.Mempool.DeleteTransaction(block.Transactions...) diff --git a/controller/tx.go b/controller/tx.go index dc52f117cf..166d3c7034 100644 --- a/controller/tx.go +++ b/controller/tx.go @@ -97,6 +97,12 @@ func (c *Controller) CheckMempool() { return } for { + // skip mempool checks while syncing: the mempool is unused (no proposals) and on nested + // chains the remote RPC calls (GetDexBatch) hold the mempool lock, blocking the sync loop + if c.isSyncing.Load() { + time.Sleep(time.Duration(c.Config.LazyMempoolCheckFrequencyS) * time.Second) + continue + } // keep a list of transaction needing to be gossipped var toGossip [][]byte // if recheck is necessary From 38c61cfa4aba82c6e489ef1967c3a410d794f2d3 Mon Sep 17 00:00:00 2001 From: Pablo Date: Wed, 3 Jun 2026 01:17:58 -0400 Subject: [PATCH 22/25] fix: eliminate rc manager lock --- cmd/rpc/sock.go | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/cmd/rpc/sock.go b/cmd/rpc/sock.go index cdb294045f..7296a9374e 100644 --- a/cmd/rpc/sock.go +++ b/cmd/rpc/sock.go @@ -442,8 +442,12 @@ func (r *RCSubscription) Listen() { r.manager.l.Lock() // update the root chain info r.Info = newInfo - // execute the callback - r.manager.afterRCUpdate(newInfo) + // skip the callback during syncing: the BFT is paused so ResetBFT messages are + // unnecessary, and UpdateP2PMustConnect sends to a channel with buffer=1 that can + // block while holding the controller lock, starving the sync loop + if !r.manager.controller.Syncing().Load() { + r.manager.afterRCUpdate(newInfo) + } // release r.manager.l.Unlock() } From fe31236fc7b3fbc71d9a0c90f15bcc3cc996779d Mon Sep 17 00:00:00 2001 From: Pablo Date: Thu, 4 Jun 2026 04:35:06 -0400 Subject: [PATCH 23/25] feat: skip compact while syncing --- controller/consensus.go | 11 +++++++++++ store/store.go | 39 +++++++++++++++++++++++++++++++++++++++ 2 files changed, 50 insertions(+) diff --git a/controller/consensus.go b/controller/consensus.go index 5ed3d05a51..af9b157e75 100644 --- a/controller/consensus.go +++ b/controller/consensus.go @@ -10,6 +10,7 @@ import ( "github.com/canopy-network/canopy/lib" "github.com/canopy-network/canopy/lib/crypto" "github.com/canopy-network/canopy/p2p" + "github.com/canopy-network/canopy/store" ) const ( @@ -74,6 +75,10 @@ func (c *Controller) Sync() { c.log.Infof("Sync started 🔄 for committee %d", c.Config.ChainId) // set the Controller as 'syncing' c.isSyncing.Store(true) + // notify the store to defer compaction during sync + if st, ok := c.FSM.Store().(*store.Store); ok { + st.SetSyncing(true) + } // check if node is alone in the validator set singleNode, err := c.singleNodeNetwork() if err != nil { @@ -756,6 +761,12 @@ func (c *Controller) finishSyncing() { c.Consensus.ResetBFT <- bft.ResetBFT{StartTime: c.LoadLastCommitTime(c.FSM.Height())} // set syncing to false c.isSyncing.Store(false) + // notify the store to resume compaction and trigger a full compaction of all prefixes + // (including SMT/indexer which are never compacted during normal operation) + if st, ok := c.FSM.Store().(*store.Store); ok { + st.SetSyncing(false) + go st.CompactAll() + } // enable listening for a block go c.ListenForBlock() } diff --git a/store/store.go b/store/store.go index 8b497105c9..c6155e799c 100644 --- a/store/store.go +++ b/store/store.go @@ -73,6 +73,7 @@ type Store struct { sc *SMT // reference to the state commitment store *Indexer // reference to the indexer store metrics *lib.Metrics // telemetry + syncing atomic.Bool // when true, skip compaction to avoid write stalls during sync log lib.LoggerI // logger config lib.Config // config mu *sync.Mutex // mutex for concurrent commits @@ -472,6 +473,39 @@ func (s *Store) IncreaseVersion() { func() { s.version++; s.sc = nil }() } // number of the state. This is used to track the versioning of the state data. func (s *Store) Version() uint64 { return s.version } +// SetSyncing tells the store whether the node is currently syncing, allowing it to +// defer expensive maintenance operations (compaction) that cause write stalls at scale. +func (s *Store) SetSyncing(v bool) { s.syncing.Store(v) } + +// CompactAll runs compaction across all store prefixes including SMT commitment nodes +// and indexer entries that are not covered by the regular MaybeCompact cycle. +// Should be called after sync completes to consolidate the accumulated versions. +func (s *Store) CompactAll() { + if s.compaction.Load() { + return + } + s.compaction.Store(true) + defer s.compaction.Store(false) + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Minute) + defer cancel() + version := s.Version() + s.log.Infof("full compaction started at height %d (post-sync)", version) + now := time.Now() + for _, prefix := range [][]byte{ + latestStatePrefix, + historicStatePrefix, + stateCommitmentPrefix, + stateCommitIDPrefix, + indexerPrefix, + } { + if err := s.db.Compact(ctx, prefix, prefixEnd(prefix), false); err != nil { + s.log.Errorf("full compaction failed for prefix: %s", err) + return + } + } + s.log.Infof("full compaction finished at height %d in %s", version, time.Since(now)) +} + // NewTxn() creates and returns a new transaction for the Store, allowing atomic operations // on the StateStore, StateCommitStore, Indexer, and CommitIDStore. func (s *Store) NewTxn() lib.StoreI { @@ -612,6 +646,11 @@ func getLatestCommitID(db *pebble.DB, log lib.LoggerI) (id *lib.CommitID) { // MaybeCompact() checks if it is time to compact the LSS and HSS respectively func (s *Store) MaybeCompact() { + // skip compaction during syncing: at scale (~1.5M+ blocks) HSS range compaction causes + // PebbleDB write stalls that throttle the sync loop's db.Apply calls + if s.syncing.Load() { + return + } // check if the current version is a multiple of the cleanup block interval compactionInterval := s.config.StoreConfig.LSSCompactionInterval version := s.Version() From cc5ed6599542411b8319c21b4f623f3406aefa2a Mon Sep 17 00:00:00 2001 From: Pablo Date: Thu, 4 Jun 2026 23:54:28 -0400 Subject: [PATCH 24/25] fix: heat cache --- store/smt.go | 13 +++++++++---- store/store.go | 35 +++++++++++++++++++++-------------- 2 files changed, 30 insertions(+), 18 deletions(-) diff --git a/store/smt.go b/store/smt.go index 33dad8b421..d4eb6e2db7 100644 --- a/store/smt.go +++ b/store/smt.go @@ -194,10 +194,11 @@ func (s *SMT) Commit(unsortedOps map[uint64]valueOp) (err lib.ErrorI) { // with its own Txn copy to avoid lock contention, then the results are merged back into // the main tree. func (s *SMT) CommitParallel(unsortedOps map[uint64]valueOp) (err lib.ErrorI) { - // if there are too few operations, fall back to sequential processing - // if len(unsortedOps) < NumSubtrees*2 { - // return s.Commit(unsortedOps) - // } + // fall back to sequential processing when operations are fewer than subtrees; + // addSyntheticBorders + cleanup overhead dominates for small batches + if len(unsortedOps) < NumSubtrees*2 { + return s.Commit(unsortedOps) + } parentTxn, ok := s.store.(*Txn) if !ok { @@ -660,6 +661,10 @@ func (s *SMT) getNode(key []byte) (n *node, err lib.ErrorI) { } // set the key in the node for convenience n.Key.fromBytes(key) + // cache the read result to avoid repeated PebbleDB lookups for the same node + if len(s.nodeCache) < MaxCacheSize { + s.nodeCache[string(key)] = n + } return } diff --git a/store/store.go b/store/store.go index c6155e799c..899b6aa266 100644 --- a/store/store.go +++ b/store/store.go @@ -66,20 +66,21 @@ iteration over stored data. */ type Store struct { - version uint64 // version of the store - db *pebble.DB // underlying database - writer *pebble.Batch // the shared batch writer that allows committing it all at once - ss *Txn // reference to the state store - sc *SMT // reference to the state commitment store - *Indexer // reference to the indexer store - metrics *lib.Metrics // telemetry - syncing atomic.Bool // when true, skip compaction to avoid write stalls during sync - log lib.LoggerI // logger - config lib.Config // config - mu *sync.Mutex // mutex for concurrent commits - compaction atomic.Bool // atomic boolean for compaction status - backup atomic.Bool // atomic boolean for backup status - isTxn bool // flag indicating if the store is in transaction mode + version uint64 // version of the store + db *pebble.DB // underlying database + writer *pebble.Batch // the shared batch writer that allows committing it all at once + ss *Txn // reference to the state store + sc *SMT // reference to the state commitment store + smtNodeCache map[string]*node // persistent SMT node cache surviving across blocks + *Indexer // reference to the indexer store + metrics *lib.Metrics // telemetry + syncing atomic.Bool // when true, skip compaction to avoid write stalls during sync + log lib.LoggerI // logger + config lib.Config // config + mu *sync.Mutex // mutex for concurrent commits + compaction atomic.Bool // atomic boolean for compaction status + backup atomic.Bool // atomic boolean for backup status + isTxn bool // flag indicating if the store is in transaction mode } // New() creates a new instance of a StoreI either in memory or an actual disk DB @@ -535,10 +536,16 @@ func (s *Store) Root() (root []byte, err lib.ErrorI) { nextVersion := s.version + 1 // set up the state commit store s.sc = NewDefaultSMT(NewTxn(s.ss.reader, s.ss.writer, stateCommitIDPrefix, false, false, true, nextVersion)) + // warm the SMT with the persistent node cache from the previous block + if s.smtNodeCache != nil { + s.sc.nodeCache = s.smtNodeCache + } // commit the SMT directly using the txn ops if err = s.sc.CommitParallel(s.ss.txn.ops); err != nil { return nil, err } + // save the cache for the next block (nodes have correct post-commit values) + s.smtNodeCache = s.sc.nodeCache } // return the root return s.sc.Root(), nil From 4d74896478bb20b3c736aa0bc75ddefb538db6ce Mon Sep 17 00:00:00 2001 From: Pablo Date: Sat, 6 Jun 2026 00:11:41 -0400 Subject: [PATCH 25/25] fix: restart panic --- store/store.go | 44 +++++++++++++++++++++++--------------------- 1 file changed, 23 insertions(+), 21 deletions(-) diff --git a/store/store.go b/store/store.go index 899b6aa266..0e74514d87 100644 --- a/store/store.go +++ b/store/store.go @@ -66,21 +66,20 @@ iteration over stored data. */ type Store struct { - version uint64 // version of the store - db *pebble.DB // underlying database - writer *pebble.Batch // the shared batch writer that allows committing it all at once - ss *Txn // reference to the state store - sc *SMT // reference to the state commitment store - smtNodeCache map[string]*node // persistent SMT node cache surviving across blocks - *Indexer // reference to the indexer store - metrics *lib.Metrics // telemetry - syncing atomic.Bool // when true, skip compaction to avoid write stalls during sync - log lib.LoggerI // logger - config lib.Config // config - mu *sync.Mutex // mutex for concurrent commits - compaction atomic.Bool // atomic boolean for compaction status - backup atomic.Bool // atomic boolean for backup status - isTxn bool // flag indicating if the store is in transaction mode + version uint64 // version of the store + db *pebble.DB // underlying database + writer *pebble.Batch // the shared batch writer that allows committing it all at once + ss *Txn // reference to the state store + sc *SMT // reference to the state commitment store + *Indexer // reference to the indexer store + metrics *lib.Metrics // telemetry + syncing atomic.Bool // when true, skip compaction to avoid write stalls during sync + log lib.LoggerI // logger + config lib.Config // config + mu *sync.Mutex // mutex for concurrent commits + compaction atomic.Bool // atomic boolean for compaction status + backup atomic.Bool // atomic boolean for backup status + isTxn bool // flag indicating if the store is in transaction mode } // New() creates a new instance of a StoreI either in memory or an actual disk DB @@ -536,16 +535,19 @@ func (s *Store) Root() (root []byte, err lib.ErrorI) { nextVersion := s.version + 1 // set up the state commit store s.sc = NewDefaultSMT(NewTxn(s.ss.reader, s.ss.writer, stateCommitIDPrefix, false, false, true, nextVersion)) - // warm the SMT with the persistent node cache from the previous block - if s.smtNodeCache != nil { - s.sc.nodeCache = s.smtNodeCache - } // commit the SMT directly using the txn ops + // + // NOTE: the SMT node cache MUST NOT be persisted across blocks. `node.copy()` is a + // no-op alias (`&(*x)` returns the same pointer), so the parallel commit mutates the + // cached `*node` objects in place (synthetic borders, subtree roots). Reusing those + // pointers in a later block also risks serving nodes from a speculative `Root()` call + // (e.g. ApplyAndValidateBlock) whose state was reset and never committed, which makes + // the cache diverge from the on-disk snapshot and panics with "no child node was + // replaced" during traversal. A fresh per-block cache (created by NewSMT and filled by + // getNode/setNode) still provides full caching within the commit. if err = s.sc.CommitParallel(s.ss.txn.ops); err != nil { return nil, err } - // save the cache for the next block (nodes have correct post-commit values) - s.smtNodeCache = s.sc.nodeCache } // return the root return s.sc.Root(), nil