Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions catchup/classBasedPeerSelector.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,10 @@ package catchup

import (
"errors"
"time"

"github.com/algorand/go-algorand/network"
"github.com/algorand/go-deadlock"
"time"
)

// classBasedPeerSelector is a rankPooledPeerSelector that tracks and ranks classes of peers based on their response behavior.
Expand Down Expand Up @@ -56,8 +57,13 @@ func (c *classBasedPeerSelector) rankPeer(psp *peerSelectorPeer, rank int) (int,
}

// Peer was in this class, if there was any kind of download issue, we increment the failure count
if rank >= peerRankNoBlockForRound {
failure := rank >= peerRankNoBlockForRound
if failure {
wp.downloadFailures++
} else {
// class usually multiple peers and we do not want to punish the entire class for one peer's failure
// by decrementing the downloadFailures
wp.downloadFailures = max(wp.downloadFailures-1, 0)
}

break
Expand Down
10 changes: 8 additions & 2 deletions catchup/classBasedPeerSelector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,12 @@
package catchup

import (
"testing"
"time"

"github.com/algorand/go-algorand/network"
"github.com/algorand/go-algorand/test/partitiontest"
"github.com/stretchr/testify/require"
"testing"
"time"
)

// Use to mock the wrapped peer selectors where warranted
Expand Down Expand Up @@ -481,6 +482,7 @@ func TestClassBasedPeerSelector_integration(t *testing.T) {
require.Nil(t, err)
require.Equal(t, mockP1WrappedPeer, peerResult)
cps.rankPeer(mockP1WrappedPeer, peerRankNoBlockForRound)
cps.rankPeer(mockP1WrappedPeer, peerRankNoBlockForRound)
Comment thread
gmalouf marked this conversation as resolved.

peerResult, err = cps.getNextPeer()
require.Nil(t, err)
Expand All @@ -495,4 +497,8 @@ func TestClassBasedPeerSelector_integration(t *testing.T) {

require.Equal(t, 4, cps.peerSelectors[0].downloadFailures)
require.Equal(t, 0, cps.peerSelectors[1].downloadFailures)

// make sure successful download decreases download failures
cps.rankPeer(mockP1WrappedPeer, durationRank)
require.Equal(t, 3, cps.peerSelectors[0].downloadFailures)
}
35 changes: 18 additions & 17 deletions catchup/peerSelector.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package catchup

import (
"container/list"
"errors"
"math"
"sort"
Expand Down Expand Up @@ -142,7 +143,7 @@ type rankPooledPeerSelector struct {
// selection gaps, and the count of peerRankDownloadFailed incidents.
type historicStats struct {
windowSize int
rankSamples []int
rankSamples *list.List
rankSum uint64
requestGaps []uint64
gapSum float64
Expand All @@ -157,12 +158,12 @@ func makeHistoricStatus(windowSize int, class peerClass) *historicStats {
// that will determine the rank of the peer.
hs := historicStats{
windowSize: windowSize,
rankSamples: make([]int, windowSize),
rankSamples: list.New(),
requestGaps: make([]uint64, 0, windowSize),
rankSum: uint64(class.initialRank) * uint64(windowSize),
gapSum: 0.0}
for i := 0; i < windowSize; i++ {
hs.rankSamples[i] = class.initialRank
hs.rankSamples.PushBack(class.initialRank)
}
return &hs
}
Expand Down Expand Up @@ -209,7 +210,7 @@ func (hs *historicStats) resetRequestPenalty(steps int, initialRank int, class p
if steps == 0 {
hs.requestGaps = make([]uint64, 0, hs.windowSize)
hs.gapSum = 0.0
return int(float64(hs.rankSum) / float64(len(hs.rankSamples)))
return int(float64(hs.rankSum) / float64(hs.rankSamples.Len()))
}

if steps > len(hs.requestGaps) {
Expand All @@ -219,7 +220,7 @@ func (hs *historicStats) resetRequestPenalty(steps int, initialRank int, class p
hs.gapSum -= 1.0 / float64(hs.requestGaps[s])
}
hs.requestGaps = hs.requestGaps[steps:]
return boundRankByClass(int(hs.computerPenalty()*(float64(hs.rankSum)/float64(len(hs.rankSamples)))), class)
return boundRankByClass(int(hs.computerPenalty()*(float64(hs.rankSum)/float64(hs.rankSamples.Len()))), class)
}

// push pushes a new rank to the historicStats, and returns the new
Expand All @@ -233,12 +234,6 @@ func (hs *historicStats) push(value int, counter uint64, class peerClass) (avera
return value
}

// This is a moving window. Remore the least recent value once the window is full
Comment thread
gmalouf marked this conversation as resolved.
if len(hs.rankSamples) == hs.windowSize {
hs.rankSum -= uint64(hs.rankSamples[0])
hs.rankSamples = hs.rankSamples[1:]
}

initialRank := value

// Download may fail for various reasons. Give it additional tries
Expand Down Expand Up @@ -266,11 +261,17 @@ func (hs *historicStats) push(value int, counter uint64, class peerClass) (avera
}
}

hs.rankSamples = append(hs.rankSamples, value)
hs.rankSum += uint64(value)
// This is a moving window of windowSize size so an old value is removed and a new value is added.
oldFrontElem := hs.rankSamples.Front()
oldRank := oldFrontElem.Value.(int)
// Update rankSum (remove old value, add new value)
hs.rankSum = hs.rankSum - uint64(oldRank) + uint64(value)
// Update node value and move it to the back of the list by reusing the node
oldFrontElem.Value = value
hs.rankSamples.MoveToBack(oldFrontElem)

// The average performance of the peer
average := float64(hs.rankSum) / float64(len(hs.rankSamples))
average := float64(hs.rankSum) / float64(hs.rankSamples.Len())

if int(average) > upperBound(class) && (initialRank == peerRankDownloadFailed || initialRank == peerRankNoBlockForRound) {
// peerRankDownloadFailed will be delayed, to give the peer
Expand All @@ -280,10 +281,10 @@ func (hs *historicStats) push(value int, counter uint64, class peerClass) (avera
return initialRank
}

// A penalty is added relative to how freequently the peer is used
// A penalty is added relative to how frequently the peer is used
penalty := hs.updateRequestPenalty(counter)

// The rank based on the performance and the freequency
// The rank based on the performance and the frequency
avgWithPenalty := int(penalty * average)

// Keep the peer in the same class. The value passed will be
Expand All @@ -303,7 +304,7 @@ func makeRankPooledPeerSelector(net peersRetriever, initialPeersClasses []peerCl
return selector
}

// getNextPeer returns the next peer. It randomally selects a peer from a pool that has
// getNextPeer returns the next peer. It randomly selects a peer from a pool that has
// the lowest rank value. Given that the peers are grouped by their ranks, allow us to
// prioritize peers based on their class and/or performance.
func (ps *rankPooledPeerSelector) getNextPeer() (psp *peerSelectorPeer, err error) {
Expand Down
44 changes: 24 additions & 20 deletions catchup/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,7 @@
i++
select {
case <-ctx.Done():
s.log.Debugf("fetchAndWrite(%v): Aborted", r)
s.log.Debugf("fetchAndWrite(%d): Aborted", r)
return false
default:
}
Expand Down Expand Up @@ -309,10 +309,11 @@

psp, getPeerErr := peerSelector.getNextPeer()
if getPeerErr != nil {
s.log.Debugf("fetchAndWrite: was unable to obtain a peer to retrieve the block from")
s.log.Debugf("fetchAndWrite(%d): was unable to obtain a peer to retrieve the block from: %v", r, getPeerErr)

Check warning on line 312 in catchup/service.go

View check run for this annotation

Codecov / codecov/patch

catchup/service.go#L312

Added line #L312 was not covered by tests
return false
}
peer := psp.Peer
s.log.Debugf("fetchAndWrite(%d): got %s peer: %s", r, psp.peerClass, peerAddress(peer))

// Try to fetch, timing out after retryInterval
block, cert, blockDownloadDuration, err := s.innerFetch(ctx, r, peer)
Expand Down Expand Up @@ -343,15 +344,16 @@
peerErrors[peer]++
}
}
s.log.Debugf("fetchAndWrite(%v): Could not fetch: %v (attempt %d)", r, err, i)
peerSelector.rankPeer(psp, failureRank)
s.log.Debugf("fetchAndWrite(%d): Could not fetch: %v (attempt %d), peer %s", r, err, i, peerAddress(psp.Peer))
o, n := peerSelector.rankPeer(psp, failureRank)
s.log.Debugf("fetchAndWrite(%d): Could not fetch: ranked peer %s with %d from %d to %d", r, peerAddress(psp.Peer), failureRank, o, n)

// we've just failed to retrieve a block; wait until the previous block is fetched before trying again
// to avoid the usecase where the first block doesn't exist, and we're making many requests down the chain
// for no reason.
select {
case <-ctx.Done():
s.log.Infof("fetchAndWrite(%v): Aborted while waiting for lookback block to ledger", r)
s.log.Infof("fetchAndWrite(%d): Aborted while waiting for lookback block to ledger", r)
return false
case <-lookbackComplete:
}
Expand All @@ -360,48 +362,48 @@
// someone already wrote the block to the ledger, we should stop syncing
return false
}
s.log.Debugf("fetchAndWrite(%v): Got block and cert contents: %v %v", r, block, cert)
s.log.Debugf("fetchAndWrite(%d): Got block and cert contents: %v %v", r, block, cert)

// Check that the block's contents match the block header (necessary with an untrusted block because b.Hash() only hashes the header)
if s.cfg.CatchupVerifyPaysetHash() {
if !block.ContentsMatchHeader() {
peerSelector.rankPeer(psp, peerRankInvalidDownload)
// Check if this mismatch is due to an unsupported protocol version
if _, ok := config.Consensus[block.BlockHeader.CurrentProtocol]; !ok {
s.log.Errorf("fetchAndWrite(%v): unsupported protocol version detected: '%v'", r, block.BlockHeader.CurrentProtocol)
s.log.Errorf("fetchAndWrite(%d): unsupported protocol version detected: '%v'", r, block.BlockHeader.CurrentProtocol)

Check warning on line 373 in catchup/service.go

View check run for this annotation

Codecov / codecov/patch

catchup/service.go#L373

Added line #L373 was not covered by tests
return false
}

s.log.Warnf("fetchAndWrite(%v): block contents do not match header (attempt %d)", r, i)
s.log.Warnf("fetchAndWrite(%d): block contents do not match header (attempt %d)", r, i)

Check warning on line 377 in catchup/service.go

View check run for this annotation

Codecov / codecov/patch

catchup/service.go#L377

Added line #L377 was not covered by tests
continue // retry the fetch
}
}

// make sure that we have the lookBack block that's required for authenticating this block
select {
case <-ctx.Done():
s.log.Debugf("fetchAndWrite(%v): Aborted while waiting for lookback block to ledger", r)
s.log.Debugf("fetchAndWrite(%d): Aborted while waiting for lookback block to ledger", r)
return false
case <-lookbackComplete:
}

if s.cfg.CatchupVerifyCertificate() {
err = s.auth.Authenticate(block, cert)
if err != nil {
s.log.Warnf("fetchAndWrite(%v): cert did not authenticate block (attempt %d): %v", r, i, err)
s.log.Warnf("fetchAndWrite(%d): cert did not authenticate block (attempt %d): %v", r, i, err)
peerSelector.rankPeer(psp, peerRankInvalidDownload)
continue // retry the fetch
}
}

peerRank := peerSelector.peerDownloadDurationToRank(psp, blockDownloadDuration)
r1, r2 := peerSelector.rankPeer(psp, peerRank)
s.log.Debugf("fetchAndWrite(%d): ranked peer with %d from %d to %d", r, peerRank, r1, r2)
s.log.Debugf("fetchAndWrite(%d): ranked peer %s with %d from %d to %d", r, peerAddress(psp.Peer), peerRank, r1, r2)

// Write to ledger, noting that ledger writes must be in order
select {
case <-ctx.Done():
s.log.Debugf("fetchAndWrite(%v): Aborted while waiting to write to ledger", r)
s.log.Debugf("fetchAndWrite(%d): Aborted while waiting to write to ledger", r)
return false
case <-prevFetchCompleteChan:
// make sure the ledger wrote enough of the account data to disk, since we don't want the ledger to hold a large amount of data in memory.
Expand Down Expand Up @@ -457,16 +459,16 @@
return false
case errors.As(err, &protocolErr):
if !s.protocolErrorLogged {
logging.Base().Errorf("fetchAndWrite(%v): unrecoverable protocol error detected: %v", r, err)
logging.Base().Errorf("fetchAndWrite(%d): unrecoverable protocol error detected: %v", r, err)

Check warning on line 462 in catchup/service.go

View check run for this annotation

Codecov / codecov/patch

catchup/service.go#L462

Added line #L462 was not covered by tests
s.protocolErrorLogged = true
}
default:
s.log.Errorf("fetchAndWrite(%v): ledger write failed: %v", r, err)
s.log.Errorf("fetchAndWrite(%d): ledger write failed: %v", r, err)

Check warning on line 466 in catchup/service.go

View check run for this annotation

Codecov / codecov/patch

catchup/service.go#L466

Added line #L466 was not covered by tests
}

return false
}
s.log.Debugf("fetchAndWrite(%v): Wrote block to ledger", r)
s.log.Debugf("fetchAndWrite(%d): Wrote block to ledger", r)
return true
}
}
Expand Down Expand Up @@ -497,7 +499,7 @@

ps := createPeerSelector(s.net)
if _, err := ps.getNextPeer(); err != nil {
s.log.Debugf("pipelinedFetch: was unable to obtain a peer to retrieve the block from")
s.log.Debugf("pipelinedFetch: was unable to obtain a peer to retrieve the block from: %v", err)
return
}

Expand Down Expand Up @@ -546,6 +548,7 @@

if !completedOK {
// there was an error; defer will cancel the pipeline
s.log.Debugf("pipelinedFetch: quitting on fetchAndWrite error (firstRound=%d, nextRound=%d)", firstRound-1, nextRound)
return
}

Expand Down Expand Up @@ -582,6 +585,7 @@
}

case <-s.ctx.Done():
s.log.Debugf("pipelinedFetch: Aborted (firstRound=%d, nextRound=%d)", firstRound, nextRound)
return
}
}
Expand Down Expand Up @@ -759,7 +763,7 @@
for s.ledger.LastRound() < cert.Round {
psp, getPeerErr := ps.getNextPeer()
if getPeerErr != nil {
s.log.Debugf("fetchRound: was unable to obtain a peer to retrieve the block from")
s.log.Debugf("fetchRound: was unable to obtain a peer to retrieve the block from: %s", getPeerErr)

Check warning on line 766 in catchup/service.go

View check run for this annotation

Codecov / codecov/patch

catchup/service.go#L766

Added line #L766 was not covered by tests
select {
case <-s.ctx.Done():
logging.Base().Debugf("fetchRound was asked to quit while collecting peers")
Expand Down Expand Up @@ -788,7 +792,7 @@
failureRank = peerRankNoBlockForRound
// If a peer does not have the block after few attempts it probably has not persisted the block yet.
// Give it some time to persist the block and try again.
// None, there is no exit condition on too many retries as per the function contract.
// Note, there is no exit condition on too many retries as per the function contract.
if count, ok := peerErrors[peer]; ok {
if count > errNoBlockForRoundThreshold {
time.Sleep(50 * time.Millisecond)
Expand All @@ -797,7 +801,7 @@
// for the low number of connected peers (like 2) the following scenario is possible:
// - both peers do not have the block
// - peer selector punishes one of the peers more than the other
// - the punished peer gets the block, and the less punished peer stucks.
// - the punished peer gets the block, and the less punished peer stuck.
// It this case reset the peer selector to let it re-learn priorities.
ps = createPeerSelector(s.net)
}
Expand Down Expand Up @@ -877,7 +881,7 @@
return true
}

func createPeerSelector(net network.GossipNode) peerSelector {
func createPeerSelector(net peersRetriever) peerSelector {
wrappedPeerSelectors := []*wrappedPeerSelector{
{
peerClass: network.PeersConnectedOut,
Expand Down
15 changes: 15 additions & 0 deletions network/gossipNode.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,21 @@
PeersPhonebookArchivalNodes PeerOption = iota
)

func (po PeerOption) String() string {
switch po {
case PeersConnectedOut:
return "ConnectedOut"
case PeersConnectedIn:
return "ConnectedIn"
case PeersPhonebookRelays:
return "PhonebookRelays"
case PeersPhonebookArchivalNodes:
return "PhonebookArchivalNodes"
default:
return "Unknown PeerOption"

Check warning on line 74 in network/gossipNode.go

View check run for this annotation

Codecov / codecov/patch

network/gossipNode.go#L63-L74

Added lines #L63 - L74 were not covered by tests
}
}

// GossipNode represents a node in the gossip network
type GossipNode interface {
Address() (string, bool)
Expand Down