From 57cabf02d92bedfdd610be6e1f0d453055a37a69 Mon Sep 17 00:00:00 2001 From: vilkris Date: Fri, 27 Dec 2024 14:45:04 +0200 Subject: [PATCH] Improve account pool performance --- chain/account_pool.go | 70 +++++++++++---- chain/tests/account_pool_test.go | 149 +++++++++++++++++++++++++++++++ 2 files changed, 204 insertions(+), 15 deletions(-) create mode 100644 chain/tests/account_pool_test.go diff --git a/chain/account_pool.go b/chain/account_pool.go index 05b377d..4be3bd7 100644 --- a/chain/account_pool.go +++ b/chain/account_pool.go @@ -20,6 +20,7 @@ var ( ErrFailedToAddAccountBlockTransaction = errors.Errorf("failed to insert account-block-transaction") ErrPlasmaRatioIsWorse = errors.Errorf("plasma ratio is smaller for current block") ErrHashTieBreak = errors.Errorf("hash tie-break is worse for current block") + ErrBlockHeightNotFound = errors.Errorf("block height does not exist in account manager") // MaxAccountBlocksInMomentum takes into account batched account-blocks MaxAccountBlocksInMomentum = 100 @@ -29,17 +30,53 @@ type Stable interface { GetStableAccountDB(address types.Address) db.DB } +type accountManager struct { + db db.Manager + blocks map[uint64]*nom.AccountBlock +} + +func (am *accountManager) Add(transaction *nom.AccountBlockTransaction) error { + if err := am.db.Add(transaction); err != nil { + return err + } + am.blocks[transaction.Block.Height] = transaction.Block + for _, d := range transaction.Block.DescendantBlocks { + am.blocks[d.Height] = d + } + return nil +} + +func (am *accountManager) Pop() error { + frontier := db.GetFrontierIdentifier(am.db.Frontier()).Height + if err := am.db.Pop(); err != nil { + return err + } + delete(am.blocks, frontier) + return nil +} + +func (am *accountManager) BlockByHeight(height uint64) (*nom.AccountBlock, error) { + block, ok := am.blocks[height] + if !ok { + return nil, ErrBlockHeightNotFound + } + return block, nil +} + type accountPool struct { log log15.Logger stable Stable - managers map[types.Address]db.Manager + managers map[types.Address]*accountManager changes sync.Mutex } -func (ap *accountPool) getAccountManager(address types.Address) db.Manager { +func (ap *accountPool) getAccountManager(address types.Address) *accountManager { manager := ap.managers[address] if manager == nil { - manager = db.NewMemDBManager(ap.stable.GetStableAccountDB(address)) + manager = &accountManager{ + db: db.NewMemDBManager(ap.stable.GetStableAccountDB(address)), + blocks: make(map[uint64]*nom.AccountBlock), + } ap.managers[address] = manager } return manager @@ -146,7 +183,7 @@ func (ap *accountPool) addAccountBlockTransaction(transaction *nom.AccountBlockT // rollback blocks and insert this one manager := ap.getAccountManager(address) for { - currentIdentifier := db.GetFrontierIdentifier(manager.Frontier()) + currentIdentifier := db.GetFrontierIdentifier(manager.db.Frontier()) if currentIdentifier == previous { break } @@ -166,7 +203,7 @@ func (ap *accountPool) GetPatch(address types.Address, identifier types.HashHeig ap.changes.Lock() defer ap.changes.Unlock() - return ap.getAccountManager(address).GetPatch(identifier) + return ap.getAccountManager(address).db.GetPatch(identifier) } func (ap *accountPool) GetAccountStore(address types.Address, identifier types.HashHeight) store.Account { ap.changes.Lock() @@ -182,9 +219,9 @@ func (ap *accountPool) GetAccountStore(address types.Address, identifier types.H } manager := ap.getAccountManager(address) - accountDb := manager.Get(identifier) + accountDb := manager.db.Get(identifier) if accountDb == nil { - frontier := db.GetFrontierIdentifier(manager.Frontier()) + frontier := db.GetFrontierIdentifier(manager.db.Frontier()) ap.log.Info("unable to get account store", "address", address, "frontier-identifier", frontier, "reason", "missing-db") return nil } @@ -201,7 +238,7 @@ func (ap *accountPool) getStableAccountStore(address types.Address) store.Accoun return account.NewAccountStore(address, db.NewMemDBManager(ap.stable.GetStableAccountDB(address)).Frontier()) } func (ap *accountPool) getFrontierAccountStore(address types.Address) store.Account { - return account.NewAccountStore(address, ap.getAccountManager(address).Frontier()) + return account.NewAccountStore(address, ap.getAccountManager(address).db.Frontier()) } func (ap *accountPool) InsertMomentum(detailed *nom.DetailedMomentum) { @@ -216,7 +253,7 @@ func (ap *accountPool) DeleteMomentum(*nom.DetailedMomentum) { ap.changes.Lock() defer ap.changes.Unlock() - ap.managers = make(map[types.Address]db.Manager) + ap.managers = make(map[types.Address]*accountManager) } func (ap *accountPool) rebuild(detailed *nom.DetailedMomentum) error { addresses := make([]types.Address, 0, len(ap.managers)) @@ -233,9 +270,9 @@ func (ap *accountPool) rebuild(detailed *nom.DetailedMomentum) error { oldManager := ap.managers[address] stable := account.NewAccountStore(address, ap.stable.GetStableAccountDB(address)) - uncommittedStore := account.NewAccountStore(address, oldManager.Frontier()) + uncommittedStore := account.NewAccountStore(address, oldManager.db.Frontier()) for i := stable.Identifier().Height + 1; i <= uncommittedStore.Identifier().Height; i += 1 { - block, err := uncommittedStore.ByHeight(i) + block, err := oldManager.BlockByHeight(i) common.DealWithErr(err) uncommitted = append(uncommitted, block) } @@ -248,9 +285,12 @@ func (ap *accountPool) rebuild(detailed *nom.DetailedMomentum) error { } log.Debug("staring applying blocks", "num-uncommitted", len(uncommitted)) - manager := db.NewMemDBManager(ap.stable.GetStableAccountDB(address)) + manager := &accountManager{ + db: db.NewMemDBManager(ap.stable.GetStableAccountDB(address)), + blocks: make(map[uint64]*nom.AccountBlock), + } for _, block := range uncommitted { - patch := oldManager.GetPatch(block.Identifier()) + patch := oldManager.db.GetPatch(block.Identifier()) err := manager.Add(&nom.AccountBlockTransaction{ Block: block, Changes: patch, @@ -310,7 +350,7 @@ func (ap *accountPool) getUncommittedAccountBlocksByAddress(address types.Addres stable := ap.getStableAccountStore(address) frontier := ap.getFrontierAccountStore(address) for i := stable.Identifier().Height + 1; i <= frontier.Identifier().Height; i += 1 { - block, err := frontier.ByHeight(i) + block, err := ap.getAccountManager(address).BlockByHeight(i) common.DealWithErr(err) blocks = append(blocks, block) } @@ -322,7 +362,7 @@ func newAccountPool(stable Stable) *accountPool { return &accountPool{ log: common.ChainLogger.New("module", "account-pool"), stable: stable, - managers: make(map[types.Address]db.Manager), + managers: make(map[types.Address]*accountManager), } } func NewAccountPool(stable Stable) AccountPool { diff --git a/chain/tests/account_pool_test.go b/chain/tests/account_pool_test.go new file mode 100644 index 0000000..cd9642b --- /dev/null +++ b/chain/tests/account_pool_test.go @@ -0,0 +1,149 @@ +package tests + +import ( + "math/big" + "testing" + + g "github.com/zenon-network/go-zenon/chain/genesis/mock" + "github.com/zenon-network/go-zenon/chain/nom" + "github.com/zenon-network/go-zenon/common" + "github.com/zenon-network/go-zenon/common/types" + "github.com/zenon-network/go-zenon/vm/constants" + "github.com/zenon-network/go-zenon/vm/embedded/definition" + "github.com/zenon-network/go-zenon/zenon/mock" +) + +func TestAccountPool_GetAllUncommittedAccountBlocks(t *testing.T) { + z := mock.NewMockZenon(t) + defer z.StopPanic() + + blocks := []*nom.AccountBlock{ + { + Address: g.User1.Address, + }, + { + Address: g.User2.Address, + }, + { + Address: g.User3.Address, + }, + { + Address: g.User1.Address, + ToAddress: types.TokenContract, + TokenStandard: types.ZnnTokenStandard, + Amount: constants.TokenIssueAmount, + Data: definition.ABIToken.PackMethodPanic(definition.IssueMethodName, + "test.tok3n_na-m3", //param.TokenName + "TEST", //param.TokenSymbol + "", //param.TokenDomain + big.NewInt(100), //param.TotalSupply + big.NewInt(1000), //param.MaxSupply + uint8(1), //param.Decimals + true, //param.IsMintable + true, //param.IsBurnable + false, //param.IsUtility + ), + }, + } + + for _, block := range blocks { + if types.IsEmbeddedAddress(block.ToAddress) { + z.CallContract(block) + } else { + z.InsertSendBlock(block, nil, mock.SkipVmChanges) + } + } + + uncommitted := z.Chain().GetAllUncommittedAccountBlocks() + common.Expect(t, len(uncommitted), 4) + + z.InsertNewMomentum() // generate contract receive and its descendant block + + uncommitted = z.Chain().GetAllUncommittedAccountBlocks() + common.Expect(t, len(uncommitted), 2) + + z.InsertNewMomentum() + + uncommitted = z.Chain().GetAllUncommittedAccountBlocks() + common.Expect(t, len(uncommitted), 0) +} + +func TestAccountPool_Rebuild(t *testing.T) { + z := mock.NewMockZenon(t) + defer z.StopPanic() + + for i := 0; i < 100; i++ { + z.InsertSendBlock(&nom.AccountBlock{ + Address: g.User1.Address, + }, nil, mock.SkipVmChanges) + z.InsertSendBlock(&nom.AccountBlock{ + Address: g.User2.Address, + }, nil, mock.SkipVmChanges) + } + + uncommitted := z.Chain().GetAllUncommittedAccountBlocks() + common.Expect(t, len(uncommitted), 200) + + z.InsertNewMomentum() // trigger rebuild + + uncommitted = z.Chain().GetAllUncommittedAccountBlocks() + common.Expect(t, len(uncommitted), 100) +} + +func TestAccountPool_Priority(t *testing.T) { + z := mock.NewMockZenon(t) + defer z.StopPanic() + + lowPriorityBlock := &nom.AccountBlock{ + Address: g.User1.Address, + FusedPlasma: 21000, + } + + z.InsertSendBlock(lowPriorityBlock, nil, mock.SkipVmChanges) + + uncommitted := z.Chain().GetAllUncommittedAccountBlocks() + common.Expect(t, len(uncommitted), 1) + + highPriorityBlock := uncommitted[0] + highPriorityBlock.FusedPlasma = 22000 + + z.InsertSendBlock(highPriorityBlock, nil, mock.SkipVmChanges) + + uncommitted = z.Chain().GetAllUncommittedAccountBlocks() + common.Expect(t, len(uncommitted), 1) + common.ExpectString(t, uncommitted[0].Hash.String(), highPriorityBlock.Hash.String()) + + z.InsertSendBlock(lowPriorityBlock, nil, mock.SkipVmChanges) + + uncommitted = z.Chain().GetAllUncommittedAccountBlocks() + common.Expect(t, len(uncommitted), 1) + common.ExpectString(t, uncommitted[0].Hash.String(), highPriorityBlock.Hash.String()) +} + +func BenchmarkAccountPool_GetAllUncommittedAccountBlocks(b *testing.B) { + z := mock.NewMockZenon(b) + defer z.StopPanic() + + for i := 0; i < 500; i++ { + z.InsertSendBlock(&nom.AccountBlock{ + Address: g.User1.Address, + }, nil, mock.SkipVmChanges) + z.InsertSendBlock(&nom.AccountBlock{ + Address: g.User2.Address, + }, nil, mock.SkipVmChanges) + z.InsertSendBlock(&nom.AccountBlock{ + Address: g.User3.Address, + }, nil, mock.SkipVmChanges) + z.InsertSendBlock(&nom.AccountBlock{ + Address: g.User4.Address, + }, nil, mock.SkipVmChanges) + z.InsertSendBlock(&nom.AccountBlock{ + Address: g.User5.Address, + }, nil, mock.SkipVmChanges) + } + + b.ResetTimer() + for i := 0; i < b.N; i++ { + z.Chain().GetAllUncommittedAccountBlocks() + } +}