Skip to content

Commit a3e238c

Browse files
committed
Node sync updates
1 parent 9b42b50 commit a3e238c

1 file changed

Lines changed: 235 additions & 62 deletions

File tree

controller/consensus.go

Lines changed: 235 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,50 @@ import (
1313
"github.com/canopy-network/canopy/p2p"
1414
)
1515

16+
const (
17+
// Maximum size of the block sync request queue
18+
blockSyncQueueSize = uint64(80)
19+
// How often the queue is checked and more block requests sent
20+
blockRequestInterval = 100 * time.Millisecond
21+
blockRequestTimeout = 1 * time.Second
22+
// Increase or decrease the block request rate relative to the default
23+
// Increase this to try to sync faster at the risk of hitting rate limits
24+
rateScaleFactor = 1.00
25+
)
26+
1627
/* This file contains the high level functionality of the continued agreement on the blocks of the chain */
1728

29+
// blockSyncRequest tracks each block request that has been sent
30+
type blockSyncRequest struct {
31+
timestamp time.Time
32+
height uint64
33+
peerPublicKey []byte
34+
message *lib.MessageAndMetadata
35+
blockMessage *lib.BlockMessage
36+
}
37+
38+
// getRandomAllowedPeer randomizes the provided peer list and chooses the first one that is not rate-limited
39+
func getRandomAllowedPeer(peers []string, limiter *lib.SimpleLimiter) string {
40+
// Create a copy of the peer list
41+
copy := make([]string, 0, len(peers))
42+
for i := range peers {
43+
copy = append(copy, peers[i])
44+
}
45+
// Shuffle the list in order to try all peers in a random order
46+
rand.Shuffle(len(copy), func(i, j int) {
47+
copy[i], copy[j] = copy[j], copy[i]
48+
})
49+
// Find a peer that is not rate limited
50+
for _, peer := range copy {
51+
blocked, allBlocked := limiter.NewRequest(peer)
52+
if !blocked && !allBlocked {
53+
return peer
54+
}
55+
}
56+
// No peers were allowed to send
57+
return ""
58+
}
59+
1860
// Sync() downloads the blockchain from peers until 'synced' to the latest 'height'
1961
// 1) Get the height and begin block params from the state_machine
2062
// 2) Get peer max_height from P2P
@@ -35,81 +77,212 @@ func (c *Controller) Sync() {
3577
// exit
3678
return
3779
}
38-
// poll max height of all peers
39-
maxHeight, minVDFIterations, syncingPeers := c.pollMaxHeight(1)
40-
// while still below the latest height
80+
// Find the height the FSM is expecting to receive next
81+
fsmHeight := c.FSM.Height()
82+
// queue contains block requests either in-flight or completed
83+
queue := map[uint64]blockSyncRequest{}
84+
// How often to send block requests to maintain the queue
85+
requestTicker := time.NewTicker(blockRequestInterval)
86+
defer requestTicker.Stop()
87+
// Get an initial max height, min vdf iterations and syncing peers
88+
maxHeight, minVDFIterations, _ := c.pollMaxHeight(1)
89+
c.log.Infof("Starting sync 🔄 at height %d", fsmHeight)
90+
// Create a limiter to prevent peers from disconnecting and slashing rep
91+
limiter := lib.NewLimiter(p2p.MaxBlockReqPerWindow*rateScaleFactor, c.P2P.MaxPossiblePeers()*p2p.MaxBlockReqPerWindow, p2p.BlockReqWindowS)
92+
93+
// Loop until the sync is complete
94+
// The purpose is to keep the queue full and hand the next block to the FSM
95+
// - List of current peers queried from P2P module
96+
// - Block requests are sent to a peer if there is room available in the queue
97+
// - Peers are chosen randomly from ones which are not rate-limited
98+
// - Block responses are verified and given to the FSM in the expected order
4199
for !c.syncingDone(maxHeight, minVDFIterations) {
42-
// get a random peer to send a 'block request' to
43-
requested, _ := lib.StringToBytes(syncingPeers[rand.Intn(len(syncingPeers))])
44-
// log the initialization of the block request
45-
c.log.Infof("Syncing height %d 🔄 from %s", c.FSM.Height(), lib.BytesToTruncatedString(requested))
46-
// send the request to the
47-
go c.RequestBlock(false, requested)
48-
// block until one of the two cases happens
49100
select {
50-
// a) got a block in the inbox
51-
case msg := <-c.P2P.Inbox(Block):
52-
// if the responder does not equal the requester
53-
responder := msg.Sender.Address.PublicKey
54-
// log the receipt of a 'block response'
55-
c.log.Debugf("Received a block response msg from %s", lib.BytesToTruncatedString(responder))
56-
// check to see if the 'responder' is who was 'requested'
57-
if !bytes.Equal(responder, requested) {
58-
// log this unexpected behavior
59-
c.log.Warn("unexpected sender")
60-
// slash the reputation of the unexpected responder
61-
c.P2P.ChangeReputation(responder, p2p.UnexpectedBlockRep)
62-
// exit the select to re-poll
63-
break
64-
}
65-
// cast the message to a block message
66-
blockMessage, ok := msg.Message.(*lib.BlockMessage)
67-
// if the cast fails
68-
if !ok {
69-
// log this unexpected behavior
70-
c.log.Warn("Not a block response msg")
71-
// slash the reputation of the peer
72-
c.P2P.ChangeReputation(msg.Sender.Address.PublicKey, p2p.InvalidBlockRep)
73-
// exit the select to re-poll
74-
break
101+
case <-limiter.TimeToReset():
102+
limiter.Reset()
103+
case <-requestTicker.C:
104+
// Get current chain height
105+
fsmHeight := c.FSM.Height()
106+
// Get an updated list of available peers
107+
peers, _, _ := c.P2P.PeerSet.GetAllInfos()
108+
// Update syncing peers list
109+
syncingPeers := make([]string, len(peers))
110+
for _, peer := range peers {
111+
syncingPeers = append(syncingPeers, lib.BytesToString(peer.Address.PublicKey))
75112
}
76-
// process the block message received from the peer
77-
if _, err := c.HandlePeerBlock(blockMessage, true); err != nil {
78-
// log this unexpected behavior
79-
c.log.Warnf("Syncing peer block invalid:\n%s", err.Error())
80-
// slash the reputation of the peer
81-
c.P2P.ChangeReputation(msg.Sender.Address.PublicKey, p2p.InvalidBlockRep)
82-
// exit the select to re-poll
83-
break
113+
// Remove requests that have timed out
114+
c.applyTimeouts(queue)
115+
// Calculate the height to stop at when updating queue
116+
stopHeight := min(fsmHeight+blockSyncQueueSize, maxHeight)
117+
// Send block requests for any missing heights in the queue
118+
c.sendBlockRequests(fsmHeight, stopHeight, queue, limiter, syncingPeers)
119+
case msg := <-c.P2P.Inbox(Block):
120+
// verify the response
121+
blockMsg, height := c.verifyResponse(msg, queue)
122+
// Update queued request with this response
123+
if blockMsg != nil {
124+
c.log.Debugf("Received height %d from %s", height, lib.BytesToTruncatedString(msg.Sender.Address.PublicKey))
125+
// find the queued request for this height
126+
// verifyResponse() confirms this height is present in the queue
127+
req := queue[height]
128+
// update request with the response data
129+
req.blockMessage = blockMsg
130+
req.message = msg
131+
// add to queue waiting for handing to the FSM
132+
queue[height] = req
84133
}
85-
// each peer is individually polled for 'max height' in each request
86-
// if the max height has grown, we accept that as the new max height
87-
if blockMessage.MaxHeight > maxHeight && blockMessage.TotalVdfIterations >= minVDFIterations {
88-
// log the update
89-
c.log.Debugf("Updated chain %d with max height: %d and iterations %d", c.Config.ChainId, maxHeight, minVDFIterations)
134+
// process queued response messages
135+
m, v := c.processQueue(c.FSM.Height(), maxHeight, queue)
136+
// check if max height and min vdf needs updating
137+
if m >= maxHeight && v >= minVDFIterations {
90138
// update the max height and vdf iterations
91-
maxHeight, minVDFIterations = blockMessage.MaxHeight, blockMessage.TotalVdfIterations
139+
maxHeight, minVDFIterations = m, v
140+
c.log.Debugf("Updated chain %d with max height: %d and iterations %d", c.Config.ChainId, maxHeight, minVDFIterations)
92141
}
93-
// success, increase the peer reputation
94-
c.P2P.ChangeReputation(responder, p2p.GoodBlockRep)
95-
// execute another iteration without polling peers
96-
continue
97-
// b) a timeout occurred before a block landed in the inbox
98-
case <-time.After(p2p.SyncTimeoutS * time.Second):
99-
// log the timeout
100-
c.log.Warnf("Timeout waiting for sync block")
101-
// slash the peer reputation
102-
c.P2P.ChangeReputation(requested, p2p.TimeoutRep)
103142
}
104-
// update the syncing peers and poll the peers for their max height + minimum vdf iterations
105-
maxHeight, minVDFIterations, syncingPeers = c.pollMaxHeight(1)
106143
}
107-
// log 'sync complete'
144+
// Syncing complete
108145
c.log.Info("Synced to top ✅")
109146
// signal that the node is synced to top
110147
c.finishSyncing()
111148
}
112149

150+
func (c *Controller) processQueue(startHeight, stopHeight uint64, queue map[uint64]blockSyncRequest) (maxReceivedHeight, minVDFIterations uint64) {
151+
for height := startHeight; height < stopHeight; height++ {
152+
// Get the next height to be sent to FSM. This height is the required next height
153+
req, success := queue[height]
154+
// If required block not present, break and keep waiting
155+
if !success {
156+
c.log.Debugf("Height %d not found in queue, queue size: %d", height, len(queue))
157+
break
158+
}
159+
// Request has been sent but response yet to be received
160+
if req.blockMessage == nil {
161+
break
162+
}
163+
// remove request from queue
164+
delete(queue, height)
165+
// convenience variable
166+
blockMsg := req.blockMessage
167+
// start timing the HandlePeerBlock call
168+
start := time.Now()
169+
// process the block message received from the peer
170+
if _, err := c.HandlePeerBlock(blockMsg, true); err != nil {
171+
h := blockMsg.BlockAndCertificate.Header.Height
172+
// log this unexpected behavior
173+
c.log.Warnf("Syncing peer block height %d invalid:\n%s", h, err.Error())
174+
// slash the reputation of the peer
175+
c.P2P.ChangeReputation(req.message.Sender.Address.PublicKey, p2p.InvalidBlockRep)
176+
break
177+
}
178+
// calculate and log the elapsed time
179+
elapsed := time.Since(start)
180+
c.log.Infof("Block %d sync complete. HandlePeerBlock took %s", height, elapsed)
181+
// calculate and log the elapsed time
182+
// success, increase the peer reputation
183+
c.P2P.ChangeReputation(req.message.Sender.Address.PublicKey, p2p.GoodBlockRep)
184+
// check if max height and minimum vdf iterations should be updated
185+
if blockMsg.MaxHeight > maxReceivedHeight && blockMsg.TotalVdfIterations >= minVDFIterations {
186+
// update the max height and vdf iterations
187+
maxReceivedHeight, minVDFIterations = blockMsg.MaxHeight, blockMsg.TotalVdfIterations
188+
}
189+
}
190+
return
191+
}
192+
193+
// Send requests for heights missing in the queue
194+
// They can be missing because:
195+
// - Sync has just started and queue isn't full yet
196+
// - A previous request for a height timed out and was removed from queue
197+
// - A previous request for a height was removed from the queue for processing
198+
// Failure in processing leaves that height missing in the queue, triggering another request
199+
// - Block height has advanced and there's room at the end of the queue
200+
func (c *Controller) sendBlockRequests(start, stop uint64, queue map[uint64]blockSyncRequest, limiter *lib.SimpleLimiter, peers []string) {
201+
// Send requests to populate the queue
202+
for height := start; height < stop; height++ {
203+
// Reached the specified stop height
204+
if height >= stop {
205+
break
206+
}
207+
// A block request has already been sent for this height
208+
if _, ok := queue[height]; ok {
209+
continue
210+
}
211+
// Find a random peer that is not rate limited
212+
allowedPeer := getRandomAllowedPeer(peers, limiter)
213+
if allowedPeer == "" {
214+
// All peers rate-limited, cannot send any more requests
215+
return
216+
}
217+
peerPublicKey, _ := lib.StringToBytes(allowedPeer)
218+
219+
c.log.Debugf("Requesting block for height %d 🔄 from %s", height, lib.BytesToTruncatedString(peerPublicKey))
220+
221+
// Send block request to selected peer
222+
err := c.P2P.SendTo(peerPublicKey, BlockRequest, &lib.BlockRequestMessage{
223+
ChainId: c.Config.ChainId,
224+
Height: height,
225+
HeightOnly: false,
226+
})
227+
if err != nil {
228+
c.log.Errorf("Error requesting block for height %d 🔄 from %s", height, lib.BytesToTruncatedString(peerPublicKey))
229+
break
230+
}
231+
232+
// Add new request to queue
233+
queue[height] = blockSyncRequest{
234+
height: height,
235+
peerPublicKey: peerPublicKey,
236+
}
237+
}
238+
}
239+
240+
// applyTimeouts removes reuqests from the queue that have timed out
241+
func (c *Controller) applyTimeouts(queue map[uint64]blockSyncRequest) []blockSyncRequest {
242+
expired := make([]blockSyncRequest, 0)
243+
// Find expired requests
244+
for _, req := range queue {
245+
elapsed := time.Since(req.timestamp)
246+
if elapsed > blockRequestTimeout {
247+
c.log.Warnf("Request for height %d timed out: %s", req.height, elapsed)
248+
expired = append(expired, req)
249+
}
250+
}
251+
// Delete expired requests
252+
for _, req := range expired {
253+
delete(queue, req.height)
254+
}
255+
return expired
256+
}
257+
258+
// verifyResponse validates the block response is ready for the FSM
259+
// - Response is a proper blockResponse type
260+
// - A request for it exists in the queue
261+
// - Response is from the expected peer
262+
func (c *Controller) verifyResponse(msg *lib.MessageAndMetadata, queue map[uint64]blockSyncRequest) (blockMessage *lib.BlockMessage, height uint64) {
263+
blockMessage, ok := msg.Message.(*lib.BlockMessage)
264+
if !ok {
265+
c.log.Warn("Not a block response msg")
266+
c.P2P.ChangeReputation(msg.Sender.Address.PublicKey, p2p.InvalidBlockRep)
267+
return nil, 0
268+
}
269+
// Get the height in the message
270+
msgHeight := blockMessage.BlockAndCertificate.GetHeader().GetHeight()
271+
// Check for height in queue
272+
if _, ok := queue[msgHeight]; !ok {
273+
c.log.Warnf("Request not found for height %d, sent from %s", msgHeight, lib.BytesToTruncatedString(msg.Sender.Address.PublicKey))
274+
return nil, 0
275+
}
276+
// Get responder and verify proper sender
277+
responder := msg.Sender.Address.PublicKey
278+
if !bytes.Equal(responder, queue[msgHeight].peerPublicKey) {
279+
c.log.Warnf("unexpected sender %s for height %d", lib.BytesToTruncatedString(responder), msgHeight)
280+
c.P2P.ChangeReputation(responder, p2p.UnexpectedBlockRep)
281+
return nil, 0
282+
}
283+
return blockMessage, msgHeight
284+
}
285+
113286
// SUBSCRIBERS BELOW
114287

115288
// ListenForConsensus() listens and internally routes inbound consensus messages

0 commit comments

Comments
 (0)