diff --git a/catchup/networkFetcher.go b/catchup/networkFetcher.go deleted file mode 100644 index ef5ef111dc..0000000000 --- a/catchup/networkFetcher.go +++ /dev/null @@ -1,134 +0,0 @@ -// Copyright (C) 2019-2023 Algorand, Inc. -// This file is part of go-algorand -// -// go-algorand is free software: you can redistribute it and/or modify -// it under the terms of the GNU Affero General Public License as -// published by the Free Software Foundation, either version 3 of the -// License, or (at your option) any later version. -// -// go-algorand is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Affero General Public License for more details. -// -// You should have received a copy of the GNU Affero General Public License -// along with go-algorand. If not, see . - -package catchup - -import ( - "context" - "errors" - "fmt" - "time" - - "github.com/algorand/go-algorand/agreement" - "github.com/algorand/go-algorand/config" - "github.com/algorand/go-algorand/data/basics" - "github.com/algorand/go-algorand/data/bookkeeping" - "github.com/algorand/go-algorand/logging" - "github.com/algorand/go-algorand/network" -) - -// NetworkFetcher is the struct used to export fetchBlock function from universalFetcher -type NetworkFetcher struct { - log logging.Logger - cfg config.Local - auth BlockAuthenticator - peerSelector *peerSelector - fetcher *universalBlockFetcher -} - -// MakeNetworkFetcher initializes a NetworkFetcher service -func MakeNetworkFetcher(log logging.Logger, net network.GossipNode, cfg config.Local, auth BlockAuthenticator, pipelineFetch bool) *NetworkFetcher { - netFetcher := &NetworkFetcher{ - log: log, - cfg: cfg, - auth: auth, - peerSelector: createPeerSelector(net, cfg, pipelineFetch), - fetcher: makeUniversalBlockFetcher(log, net, cfg), - } - return netFetcher -} - -func (netFetcher *NetworkFetcher) getHTTPPeer() (network.HTTPPeer, *peerSelectorPeer, error) { - for retryCount := 0; retryCount < netFetcher.cfg.CatchupBlockDownloadRetryAttempts; retryCount++ { - psp, err := netFetcher.peerSelector.getNextPeer() - if err != nil { - if err != errPeerSelectorNoPeerPoolsAvailable { - err = fmt.Errorf("FetchBlock: unable to obtain a list of peers to download the block from : %w", err) - return nil, nil, err - } - // this is a possible on startup, since the network package might have yet to retrieve the list of peers. - netFetcher.log.Infof("FetchBlock: unable to obtain a list of peers to download the block from; will retry shortly.") - time.Sleep(noPeersAvailableSleepInterval) - continue - } - peer := psp.Peer - httpPeer, ok := peer.(network.HTTPPeer) - if ok { - return httpPeer, psp, nil - } - netFetcher.log.Warnf("FetchBlock: non-HTTP peer was provided by the peer selector") - netFetcher.peerSelector.rankPeer(psp, peerRankInvalidDownload) - } - return nil, nil, errors.New("FetchBlock: recurring non-HTTP peer was provided by the peer selector") -} - -// FetchBlock function given a round number returns a block from a http peer -func (netFetcher *NetworkFetcher) FetchBlock(ctx context.Context, round basics.Round) (*bookkeeping.Block, - *agreement.Certificate, time.Duration, error) { - // internal retry attempt to fetch the block - for retryCount := 0; retryCount < netFetcher.cfg.CatchupBlockDownloadRetryAttempts; retryCount++ { - httpPeer, psp, err := netFetcher.getHTTPPeer() - if err != nil { - return nil, nil, time.Duration(0), err - } - - blk, cert, downloadDuration, err := netFetcher.fetcher.fetchBlock(ctx, round, httpPeer) - if err != nil { - if ctx.Err() != nil { - // caller of the function decided to cancel the download - return nil, nil, time.Duration(0), err - } - netFetcher.log.Infof("FetchBlock: failed to download block %d on attempt %d out of %d. %v", - round, retryCount+1, netFetcher.cfg.CatchupBlockDownloadRetryAttempts, err) - netFetcher.peerSelector.rankPeer(psp, peerRankDownloadFailed) - continue // retry the fetch - } - - // Check that the block's contents match the block header - if !blk.ContentsMatchHeader() && blk.Round() > 0 { - netFetcher.peerSelector.rankPeer(psp, peerRankInvalidDownload) - // Check if this mismatch is due to an unsupported protocol version - if _, ok := config.Consensus[blk.BlockHeader.CurrentProtocol]; !ok { - netFetcher.log.Errorf("FetchBlock: downloaded block(%v) unsupported protocol version detected: '%v'", - round, blk.BlockHeader.CurrentProtocol) - } - netFetcher.log.Warnf("FetchBlock: downloaded block(%v) contents do not match header", round) - netFetcher.log.Infof("FetchBlock: failed to download block %d on attempt %d out of %d. %v", - round, retryCount+1, netFetcher.cfg.CatchupBlockDownloadRetryAttempts, err) - continue // retry the fetch - } - - // Authenticate the block. for correct execution, caller should call FetchBlock only when the lookback block is available - if netFetcher.cfg.CatchupVerifyCertificate() { - err = netFetcher.auth.Authenticate(blk, cert) - if err != nil { - netFetcher.log.Warnf("FetchBlock: cert authenticatation failed for block %d on attempt %d out of %d. %v", - round, retryCount+1, netFetcher.cfg.CatchupBlockDownloadRetryAttempts, err) - netFetcher.peerSelector.rankPeer(psp, peerRankInvalidDownload) - continue // retry the fetch - } - } - - // upon successful download rank the peer according to the download speed - peerRank := netFetcher.peerSelector.peerDownloadDurationToRank(psp, downloadDuration) - netFetcher.peerSelector.rankPeer(psp, peerRank) - return blk, cert, downloadDuration, err - - } - err := fmt.Errorf("FetchBlock failed after multiple blocks download attempts: %v unsuccessful attempts", - netFetcher.cfg.CatchupBlockDownloadRetryAttempts) - return nil, nil, time.Duration(0), err -} diff --git a/catchup/networkFetcher_test.go b/catchup/networkFetcher_test.go deleted file mode 100644 index 89e12d9b34..0000000000 --- a/catchup/networkFetcher_test.go +++ /dev/null @@ -1,190 +0,0 @@ -// Copyright (C) 2019-2023 Algorand, Inc. -// This file is part of go-algorand -// -// go-algorand is free software: you can redistribute it and/or modify -// it under the terms of the GNU Affero General Public License as -// published by the Free Software Foundation, either version 3 of the -// License, or (at your option) any later version. -// -// go-algorand is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Affero General Public License for more details. -// -// You should have received a copy of the GNU Affero General Public License -// along with go-algorand. If not, see . - -package catchup - -import ( - "context" - "sync" - "testing" - - "github.com/algorand/go-algorand/config" - "github.com/algorand/go-algorand/data/bookkeeping" - "github.com/algorand/go-algorand/logging" - "github.com/algorand/go-algorand/rpcs" - "github.com/algorand/go-algorand/test/partitiontest" - "github.com/stretchr/testify/require" -) - -func TestFetchBlock(t *testing.T) { - partitiontest.PartitionTest(t) - - ledger, next, b, err := buildTestLedger(t, bookkeeping.Block{}) - if err != nil { - t.Fatal(err) - return - } - - blockServiceConfig := config.GetDefaultLocal() - blockServiceConfig.EnableBlockService = true - blockServiceConfig.EnableBlockServiceFallbackToArchiver = false - - net := &httpTestPeerSource{} - ls := rpcs.MakeBlockService(logging.Base(), blockServiceConfig, ledger, net, "test genesisID") - - node := basicRPCNode{} - node.RegisterHTTPHandler(rpcs.BlockServiceBlockPath, ls) - node.start() - defer node.stop() - rootURL := node.rootURL() - - net.addPeer(rootURL) - - // Disable block authentication - cfg := config.GetDefaultLocal() - cfg.CatchupBlockValidateMode = 1 - fetcher := MakeNetworkFetcher(logging.TestingLog(t), net, cfg, nil, false) - - block, _, duration, err := fetcher.FetchBlock(context.Background(), next) - - require.NoError(t, err) - require.Equal(t, &b, block) - require.GreaterOrEqual(t, int64(duration), int64(0)) - - block, cert, duration, err := fetcher.FetchBlock(context.Background(), next+1) - - require.Error(t, errNoBlockForRound, err) - require.Contains(t, err.Error(), "FetchBlock failed after multiple blocks download attempts") - require.Nil(t, block) - require.Nil(t, cert) - require.Equal(t, int64(duration), int64(0)) -} - -func TestConcurrentAttemptsToFetchBlockSuccess(t *testing.T) { - partitiontest.PartitionTest(t) - - ledger, next, b, err := buildTestLedger(t, bookkeeping.Block{}) - if err != nil { - t.Fatal(err) - return - } - - blockServiceConfig := config.GetDefaultLocal() - blockServiceConfig.EnableBlockService = true - blockServiceConfig.EnableBlockServiceFallbackToArchiver = false - - net := &httpTestPeerSource{} - ls := rpcs.MakeBlockService(logging.Base(), blockServiceConfig, ledger, net, "test genesisID") - - node := basicRPCNode{} - node.RegisterHTTPHandler(rpcs.BlockServiceBlockPath, ls) - node.start() - defer node.stop() - rootURL := node.rootURL() - - net.addPeer(rootURL) - - // Disable block authentication - cfg := config.GetDefaultLocal() - cfg.CatchupBlockValidateMode = 1 - fetcher := MakeNetworkFetcher(logging.TestingLog(t), net, cfg, nil, false) - - // start is used to synchronize concurrent fetchBlock attempts - // parallelRequests represents number of concurrent attempts - start := make(chan struct{}) - parallelRequests := int(cfg.CatchupParallelBlocks) - var wg sync.WaitGroup - wg.Add(parallelRequests) - for i := 0; i < parallelRequests; i++ { - go func() { - <-start - block, _, duration, err := fetcher.FetchBlock(context.Background(), next) - require.NoError(t, err) - require.Equal(t, &b, block) - require.GreaterOrEqual(t, int64(duration), int64(0)) - wg.Done() - }() - } - close(start) - wg.Wait() -} - -func TestHTTPPeerNotAvailable(t *testing.T) { - partitiontest.PartitionTest(t) - - net := &httpTestPeerSource{} - - // Disable block authentication - cfg := config.GetDefaultLocal() - cfg.CatchupBlockValidateMode = 1 - cfg.CatchupBlockDownloadRetryAttempts = 1 - - fetcher := MakeNetworkFetcher(logging.TestingLog(t), net, cfg, nil, false) - - _, _, _, err := fetcher.FetchBlock(context.Background(), 1) - require.Contains(t, err.Error(), "recurring non-HTTP peer was provided by the peer selector") -} - -func TestFetchBlockFailed(t *testing.T) { - partitiontest.PartitionTest(t) - - net := &httpTestPeerSource{} - wsPeer := makeTestUnicastPeer(net, t) - net.addPeer(wsPeer.GetAddress()) - - // Disable block authentication - cfg := config.GetDefaultLocal() - cfg.CatchupBlockValidateMode = 1 - cfg.CatchupBlockDownloadRetryAttempts = 1 - - fetcher := MakeNetworkFetcher(logging.TestingLog(t), net, cfg, nil, false) - - _, _, _, err := fetcher.FetchBlock(context.Background(), 1) - require.Contains(t, err.Error(), "FetchBlock failed after multiple blocks download attempts") -} - -func TestFetchBlockAuthenticationFailed(t *testing.T) { - partitiontest.PartitionTest(t) - - ledger, next, _, err := buildTestLedger(t, bookkeeping.Block{}) - if err != nil { - t.Fatal(err) - return - } - - blockServiceConfig := config.GetDefaultLocal() - blockServiceConfig.EnableBlockService = true - blockServiceConfig.EnableBlockServiceFallbackToArchiver = false - - net := &httpTestPeerSource{} - ls := rpcs.MakeBlockService(logging.Base(), blockServiceConfig, ledger, net, "test genesisID") - - node := basicRPCNode{} - node.RegisterHTTPHandler(rpcs.BlockServiceBlockPath, ls) - node.start() - defer node.stop() - rootURL := node.rootURL() - - net.addPeer(rootURL) - - cfg := config.GetDefaultLocal() - cfg.CatchupBlockDownloadRetryAttempts = 1 - - fetcher := MakeNetworkFetcher(logging.TestingLog(t), net, cfg, &mockedAuthenticator{errorRound: int(next)}, false) - - _, _, _, err = fetcher.FetchBlock(context.Background(), next) - require.Contains(t, err.Error(), "FetchBlock failed after multiple blocks download attempts") -}