diff --git a/catchup/classBasedPeerSelector.go b/catchup/classBasedPeerSelector.go index 2eca3bef2c..360228269b 100644 --- a/catchup/classBasedPeerSelector.go +++ b/catchup/classBasedPeerSelector.go @@ -18,9 +18,10 @@ package catchup import ( "errors" + "time" + "github.com/algorand/go-algorand/network" "github.com/algorand/go-deadlock" - "time" ) // classBasedPeerSelector is a rankPooledPeerSelector that tracks and ranks classes of peers based on their response behavior. @@ -56,8 +57,13 @@ func (c *classBasedPeerSelector) rankPeer(psp *peerSelectorPeer, rank int) (int, } // Peer was in this class, if there was any kind of download issue, we increment the failure count - if rank >= peerRankNoBlockForRound { + failure := rank >= peerRankNoBlockForRound + if failure { wp.downloadFailures++ + } else { + // class usually multiple peers and we do not want to punish the entire class for one peer's failure + // by decrementing the downloadFailures + wp.downloadFailures = max(wp.downloadFailures-1, 0) } break diff --git a/catchup/classBasedPeerSelector_test.go b/catchup/classBasedPeerSelector_test.go index 40c708fe13..bfaa2100cf 100644 --- a/catchup/classBasedPeerSelector_test.go +++ b/catchup/classBasedPeerSelector_test.go @@ -17,11 +17,12 @@ package catchup import ( + "testing" + "time" + "github.com/algorand/go-algorand/network" "github.com/algorand/go-algorand/test/partitiontest" "github.com/stretchr/testify/require" - "testing" - "time" ) // Use to mock the wrapped peer selectors where warranted @@ -481,6 +482,7 @@ func TestClassBasedPeerSelector_integration(t *testing.T) { require.Nil(t, err) require.Equal(t, mockP1WrappedPeer, peerResult) cps.rankPeer(mockP1WrappedPeer, peerRankNoBlockForRound) + cps.rankPeer(mockP1WrappedPeer, peerRankNoBlockForRound) peerResult, err = cps.getNextPeer() require.Nil(t, err) @@ -495,4 +497,8 @@ func TestClassBasedPeerSelector_integration(t *testing.T) { require.Equal(t, 4, cps.peerSelectors[0].downloadFailures) require.Equal(t, 0, cps.peerSelectors[1].downloadFailures) + + // make sure successful download decreases download failures + cps.rankPeer(mockP1WrappedPeer, durationRank) + require.Equal(t, 3, cps.peerSelectors[0].downloadFailures) } diff --git a/catchup/peerSelector.go b/catchup/peerSelector.go index 851e60771c..f534fec5c5 100644 --- a/catchup/peerSelector.go +++ b/catchup/peerSelector.go @@ -17,6 +17,7 @@ package catchup import ( + "container/list" "errors" "math" "sort" @@ -142,7 +143,7 @@ type rankPooledPeerSelector struct { // selection gaps, and the count of peerRankDownloadFailed incidents. type historicStats struct { windowSize int - rankSamples []int + rankSamples *list.List rankSum uint64 requestGaps []uint64 gapSum float64 @@ -157,12 +158,12 @@ func makeHistoricStatus(windowSize int, class peerClass) *historicStats { // that will determine the rank of the peer. hs := historicStats{ windowSize: windowSize, - rankSamples: make([]int, windowSize), + rankSamples: list.New(), requestGaps: make([]uint64, 0, windowSize), rankSum: uint64(class.initialRank) * uint64(windowSize), gapSum: 0.0} for i := 0; i < windowSize; i++ { - hs.rankSamples[i] = class.initialRank + hs.rankSamples.PushBack(class.initialRank) } return &hs } @@ -209,7 +210,7 @@ func (hs *historicStats) resetRequestPenalty(steps int, initialRank int, class p if steps == 0 { hs.requestGaps = make([]uint64, 0, hs.windowSize) hs.gapSum = 0.0 - return int(float64(hs.rankSum) / float64(len(hs.rankSamples))) + return int(float64(hs.rankSum) / float64(hs.rankSamples.Len())) } if steps > len(hs.requestGaps) { @@ -219,7 +220,7 @@ func (hs *historicStats) resetRequestPenalty(steps int, initialRank int, class p hs.gapSum -= 1.0 / float64(hs.requestGaps[s]) } hs.requestGaps = hs.requestGaps[steps:] - return boundRankByClass(int(hs.computerPenalty()*(float64(hs.rankSum)/float64(len(hs.rankSamples)))), class) + return boundRankByClass(int(hs.computerPenalty()*(float64(hs.rankSum)/float64(hs.rankSamples.Len()))), class) } // push pushes a new rank to the historicStats, and returns the new @@ -233,12 +234,6 @@ func (hs *historicStats) push(value int, counter uint64, class peerClass) (avera return value } - // This is a moving window. Remore the least recent value once the window is full - if len(hs.rankSamples) == hs.windowSize { - hs.rankSum -= uint64(hs.rankSamples[0]) - hs.rankSamples = hs.rankSamples[1:] - } - initialRank := value // Download may fail for various reasons. Give it additional tries @@ -266,11 +261,17 @@ func (hs *historicStats) push(value int, counter uint64, class peerClass) (avera } } - hs.rankSamples = append(hs.rankSamples, value) - hs.rankSum += uint64(value) + // This is a moving window of windowSize size so an old value is removed and a new value is added. + oldFrontElem := hs.rankSamples.Front() + oldRank := oldFrontElem.Value.(int) + // Update rankSum (remove old value, add new value) + hs.rankSum = hs.rankSum - uint64(oldRank) + uint64(value) + // Update node value and move it to the back of the list by reusing the node + oldFrontElem.Value = value + hs.rankSamples.MoveToBack(oldFrontElem) // The average performance of the peer - average := float64(hs.rankSum) / float64(len(hs.rankSamples)) + average := float64(hs.rankSum) / float64(hs.rankSamples.Len()) if int(average) > upperBound(class) && (initialRank == peerRankDownloadFailed || initialRank == peerRankNoBlockForRound) { // peerRankDownloadFailed will be delayed, to give the peer @@ -280,10 +281,10 @@ func (hs *historicStats) push(value int, counter uint64, class peerClass) (avera return initialRank } - // A penalty is added relative to how freequently the peer is used + // A penalty is added relative to how frequently the peer is used penalty := hs.updateRequestPenalty(counter) - // The rank based on the performance and the freequency + // The rank based on the performance and the frequency avgWithPenalty := int(penalty * average) // Keep the peer in the same class. The value passed will be @@ -303,7 +304,7 @@ func makeRankPooledPeerSelector(net peersRetriever, initialPeersClasses []peerCl return selector } -// getNextPeer returns the next peer. It randomally selects a peer from a pool that has +// getNextPeer returns the next peer. It randomly selects a peer from a pool that has // the lowest rank value. Given that the peers are grouped by their ranks, allow us to // prioritize peers based on their class and/or performance. func (ps *rankPooledPeerSelector) getNextPeer() (psp *peerSelectorPeer, err error) { diff --git a/catchup/service.go b/catchup/service.go index dfd0361960..60b2c5ffb2 100644 --- a/catchup/service.go +++ b/catchup/service.go @@ -281,7 +281,7 @@ func (s *Service) fetchAndWrite(ctx context.Context, r basics.Round, prevFetchCo i++ select { case <-ctx.Done(): - s.log.Debugf("fetchAndWrite(%v): Aborted", r) + s.log.Debugf("fetchAndWrite(%d): Aborted", r) return false default: } @@ -309,10 +309,11 @@ func (s *Service) fetchAndWrite(ctx context.Context, r basics.Round, prevFetchCo psp, getPeerErr := peerSelector.getNextPeer() if getPeerErr != nil { - s.log.Debugf("fetchAndWrite: was unable to obtain a peer to retrieve the block from") + s.log.Debugf("fetchAndWrite(%d): was unable to obtain a peer to retrieve the block from: %v", r, getPeerErr) return false } peer := psp.Peer + s.log.Debugf("fetchAndWrite(%d): got %s peer: %s", r, psp.peerClass, peerAddress(peer)) // Try to fetch, timing out after retryInterval block, cert, blockDownloadDuration, err := s.innerFetch(ctx, r, peer) @@ -343,15 +344,16 @@ func (s *Service) fetchAndWrite(ctx context.Context, r basics.Round, prevFetchCo peerErrors[peer]++ } } - s.log.Debugf("fetchAndWrite(%v): Could not fetch: %v (attempt %d)", r, err, i) - peerSelector.rankPeer(psp, failureRank) + s.log.Debugf("fetchAndWrite(%d): Could not fetch: %v (attempt %d), peer %s", r, err, i, peerAddress(psp.Peer)) + o, n := peerSelector.rankPeer(psp, failureRank) + s.log.Debugf("fetchAndWrite(%d): Could not fetch: ranked peer %s with %d from %d to %d", r, peerAddress(psp.Peer), failureRank, o, n) // we've just failed to retrieve a block; wait until the previous block is fetched before trying again // to avoid the usecase where the first block doesn't exist, and we're making many requests down the chain // for no reason. select { case <-ctx.Done(): - s.log.Infof("fetchAndWrite(%v): Aborted while waiting for lookback block to ledger", r) + s.log.Infof("fetchAndWrite(%d): Aborted while waiting for lookback block to ledger", r) return false case <-lookbackComplete: } @@ -360,7 +362,7 @@ func (s *Service) fetchAndWrite(ctx context.Context, r basics.Round, prevFetchCo // someone already wrote the block to the ledger, we should stop syncing return false } - s.log.Debugf("fetchAndWrite(%v): Got block and cert contents: %v %v", r, block, cert) + s.log.Debugf("fetchAndWrite(%d): Got block and cert contents: %v %v", r, block, cert) // Check that the block's contents match the block header (necessary with an untrusted block because b.Hash() only hashes the header) if s.cfg.CatchupVerifyPaysetHash() { @@ -368,11 +370,11 @@ func (s *Service) fetchAndWrite(ctx context.Context, r basics.Round, prevFetchCo peerSelector.rankPeer(psp, peerRankInvalidDownload) // Check if this mismatch is due to an unsupported protocol version if _, ok := config.Consensus[block.BlockHeader.CurrentProtocol]; !ok { - s.log.Errorf("fetchAndWrite(%v): unsupported protocol version detected: '%v'", r, block.BlockHeader.CurrentProtocol) + s.log.Errorf("fetchAndWrite(%d): unsupported protocol version detected: '%v'", r, block.BlockHeader.CurrentProtocol) return false } - s.log.Warnf("fetchAndWrite(%v): block contents do not match header (attempt %d)", r, i) + s.log.Warnf("fetchAndWrite(%d): block contents do not match header (attempt %d)", r, i) continue // retry the fetch } } @@ -380,7 +382,7 @@ func (s *Service) fetchAndWrite(ctx context.Context, r basics.Round, prevFetchCo // make sure that we have the lookBack block that's required for authenticating this block select { case <-ctx.Done(): - s.log.Debugf("fetchAndWrite(%v): Aborted while waiting for lookback block to ledger", r) + s.log.Debugf("fetchAndWrite(%d): Aborted while waiting for lookback block to ledger", r) return false case <-lookbackComplete: } @@ -388,7 +390,7 @@ func (s *Service) fetchAndWrite(ctx context.Context, r basics.Round, prevFetchCo if s.cfg.CatchupVerifyCertificate() { err = s.auth.Authenticate(block, cert) if err != nil { - s.log.Warnf("fetchAndWrite(%v): cert did not authenticate block (attempt %d): %v", r, i, err) + s.log.Warnf("fetchAndWrite(%d): cert did not authenticate block (attempt %d): %v", r, i, err) peerSelector.rankPeer(psp, peerRankInvalidDownload) continue // retry the fetch } @@ -396,12 +398,12 @@ func (s *Service) fetchAndWrite(ctx context.Context, r basics.Round, prevFetchCo peerRank := peerSelector.peerDownloadDurationToRank(psp, blockDownloadDuration) r1, r2 := peerSelector.rankPeer(psp, peerRank) - s.log.Debugf("fetchAndWrite(%d): ranked peer with %d from %d to %d", r, peerRank, r1, r2) + s.log.Debugf("fetchAndWrite(%d): ranked peer %s with %d from %d to %d", r, peerAddress(psp.Peer), peerRank, r1, r2) // Write to ledger, noting that ledger writes must be in order select { case <-ctx.Done(): - s.log.Debugf("fetchAndWrite(%v): Aborted while waiting to write to ledger", r) + s.log.Debugf("fetchAndWrite(%d): Aborted while waiting to write to ledger", r) return false case <-prevFetchCompleteChan: // make sure the ledger wrote enough of the account data to disk, since we don't want the ledger to hold a large amount of data in memory. @@ -457,16 +459,16 @@ func (s *Service) fetchAndWrite(ctx context.Context, r basics.Round, prevFetchCo return false case errors.As(err, &protocolErr): if !s.protocolErrorLogged { - logging.Base().Errorf("fetchAndWrite(%v): unrecoverable protocol error detected: %v", r, err) + logging.Base().Errorf("fetchAndWrite(%d): unrecoverable protocol error detected: %v", r, err) s.protocolErrorLogged = true } default: - s.log.Errorf("fetchAndWrite(%v): ledger write failed: %v", r, err) + s.log.Errorf("fetchAndWrite(%d): ledger write failed: %v", r, err) } return false } - s.log.Debugf("fetchAndWrite(%v): Wrote block to ledger", r) + s.log.Debugf("fetchAndWrite(%d): Wrote block to ledger", r) return true } } @@ -497,7 +499,7 @@ func (s *Service) pipelinedFetch(seedLookback uint64) { ps := createPeerSelector(s.net) if _, err := ps.getNextPeer(); err != nil { - s.log.Debugf("pipelinedFetch: was unable to obtain a peer to retrieve the block from") + s.log.Debugf("pipelinedFetch: was unable to obtain a peer to retrieve the block from: %v", err) return } @@ -546,6 +548,7 @@ func (s *Service) pipelinedFetch(seedLookback uint64) { if !completedOK { // there was an error; defer will cancel the pipeline + s.log.Debugf("pipelinedFetch: quitting on fetchAndWrite error (firstRound=%d, nextRound=%d)", firstRound-1, nextRound) return } @@ -582,6 +585,7 @@ func (s *Service) pipelinedFetch(seedLookback uint64) { } case <-s.ctx.Done(): + s.log.Debugf("pipelinedFetch: Aborted (firstRound=%d, nextRound=%d)", firstRound, nextRound) return } } @@ -759,7 +763,7 @@ func (s *Service) fetchRound(cert agreement.Certificate, verifier *agreement.Asy for s.ledger.LastRound() < cert.Round { psp, getPeerErr := ps.getNextPeer() if getPeerErr != nil { - s.log.Debugf("fetchRound: was unable to obtain a peer to retrieve the block from") + s.log.Debugf("fetchRound: was unable to obtain a peer to retrieve the block from: %s", getPeerErr) select { case <-s.ctx.Done(): logging.Base().Debugf("fetchRound was asked to quit while collecting peers") @@ -788,7 +792,7 @@ func (s *Service) fetchRound(cert agreement.Certificate, verifier *agreement.Asy failureRank = peerRankNoBlockForRound // If a peer does not have the block after few attempts it probably has not persisted the block yet. // Give it some time to persist the block and try again. - // None, there is no exit condition on too many retries as per the function contract. + // Note, there is no exit condition on too many retries as per the function contract. if count, ok := peerErrors[peer]; ok { if count > errNoBlockForRoundThreshold { time.Sleep(50 * time.Millisecond) @@ -797,7 +801,7 @@ func (s *Service) fetchRound(cert agreement.Certificate, verifier *agreement.Asy // for the low number of connected peers (like 2) the following scenario is possible: // - both peers do not have the block // - peer selector punishes one of the peers more than the other - // - the punished peer gets the block, and the less punished peer stucks. + // - the punished peer gets the block, and the less punished peer stuck. // It this case reset the peer selector to let it re-learn priorities. ps = createPeerSelector(s.net) } @@ -877,7 +881,7 @@ func (s *Service) roundIsNotSupported(nextRound basics.Round) bool { return true } -func createPeerSelector(net network.GossipNode) peerSelector { +func createPeerSelector(net peersRetriever) peerSelector { wrappedPeerSelectors := []*wrappedPeerSelector{ { peerClass: network.PeersConnectedOut, diff --git a/network/gossipNode.go b/network/gossipNode.go index e82b5fc0fe..2f103e4930 100644 --- a/network/gossipNode.go +++ b/network/gossipNode.go @@ -60,6 +60,21 @@ const ( PeersPhonebookArchivalNodes PeerOption = iota ) +func (po PeerOption) String() string { + switch po { + case PeersConnectedOut: + return "ConnectedOut" + case PeersConnectedIn: + return "ConnectedIn" + case PeersPhonebookRelays: + return "PhonebookRelays" + case PeersPhonebookArchivalNodes: + return "PhonebookArchivalNodes" + default: + return "Unknown PeerOption" + } +} + // GossipNode represents a node in the gossip network type GossipNode interface { Address() (string, bool)