diff --git a/catchup/pref_test.go b/catchup/pref_test.go index 978fae2463..8214a81646 100644 --- a/catchup/pref_test.go +++ b/catchup/pref_test.go @@ -54,11 +54,11 @@ func BenchmarkServiceFetchBlocks(b *testing.B) { require.NoError(b, err) // Make Service - syncer := MakeService(logging.Base(), defaultConfig, net, local, nil, new(mockedAuthenticator)) + syncer := MakeService(logging.Base(), defaultConfig, net, local, nil, new(mockedAuthenticator), nil) syncer.fetcherFactory = makeMockFactory(&MockedFetcher{ledger: remote, timeout: false, tries: make(map[basics.Round]int), latency: 100 * time.Millisecond, predictable: true}) b.StartTimer() - syncer.sync() + syncer.sync(nil) b.StopTimer() local.Close() require.Equal(b, remote.LastRound(), local.LastRound()) diff --git a/catchup/service.go b/catchup/service.go index 5ff44f1173..d906f6b3b1 100644 --- a/catchup/service.go +++ b/catchup/service.go @@ -18,6 +18,7 @@ package catchup import ( "context" + "fmt" "sync" "sync/atomic" "time" @@ -36,21 +37,25 @@ import ( ) const catchupPeersForSync = 10 +const blockQueryPeerLimit = 10 // this should be at least the number of relays const catchupRetryLimit = 500 +// PendingUnmatchedCertificate is a single certificate that is being waited upon to have its corresponding block fetched. +type PendingUnmatchedCertificate struct { + Cert agreement.Certificate + VoteVerifier *agreement.AsyncVoteVerifier +} + // Ledger represents the interface of a block database which the // catchup server should interact with. type Ledger interface { - NextRound() basics.Round - LastRound() basics.Round - Wait(basics.Round) chan struct{} + agreement.LedgerReader AddBlock(bookkeeping.Block, agreement.Certificate) error - ConsensusParams(basics.Round) (config.ConsensusParams, error) - + EnsureBlock(block *bookkeeping.Block, c agreement.Certificate) + LastRound() basics.Round Block(basics.Round) (bookkeeping.Block, error) - BlockCert(basics.Round) (bookkeeping.Block, agreement.Certificate, error) } // Service represents the catchup service. Once started and until it is stopped, it ensures that the ledger is up to date with network. @@ -70,10 +75,13 @@ type Service struct { // The channel gets closed when the initial sync is complete. This allows for other services to avoid // the overhead of starting prematurely (before this node is caught-up and can validate messages for example). - InitialSyncDone chan struct{} - initialSyncNotified uint32 - protocolErrorLogged bool - lastSupportedRound basics.Round + InitialSyncDone chan struct{} + initialSyncNotified uint32 + protocolErrorLogged bool + lastSupportedRound basics.Round + unmatchedPendingCertificates <-chan PendingUnmatchedCertificate + + latestRoundFetcherFactory rpcs.FetcherFactory } // A BlockAuthenticator authenticates blocks given a certificate. @@ -90,7 +98,7 @@ type BlockAuthenticator interface { // MakeService creates a catchup service instance from its constituent components // If wsf is nil, then fetch over gossip is disabled. -func MakeService(log logging.Logger, config config.Local, net network.GossipNode, ledger Ledger, wsf *rpcs.WsFetcherService, auth BlockAuthenticator) (s *Service) { +func MakeService(log logging.Logger, config config.Local, net network.GossipNode, ledger Ledger, wsf *rpcs.WsFetcherService, auth BlockAuthenticator, unmatchedPendingCertificates <-chan PendingUnmatchedCertificate) (s *Service) { s = &Service{} s.ctx, s.cancel = context.WithCancel(context.Background()) s.cfg = config @@ -98,6 +106,9 @@ func MakeService(log logging.Logger, config config.Local, net network.GossipNode s.ledger = ledger s.net = net s.auth = auth + s.unmatchedPendingCertificates = unmatchedPendingCertificates + + s.latestRoundFetcherFactory = rpcs.MakeNetworkFetcherFactory(net, blockQueryPeerLimit, wsf) s.log = log.With("Context", "sync") s.InitialSyncDone = make(chan struct{}) @@ -408,7 +419,7 @@ func (s *Service) periodicSync() { case <-s.ctx.Done(): return } - s.sync() + s.sync(nil) stuckInARow := 0 sleepDuration := s.deadlineTimeout for { @@ -429,7 +440,10 @@ func (s *Service) periodicSync() { continue } s.log.Info("It's been too long since our ledger advanced; resyncing") - s.sync() + s.sync(nil) + case cert := <-s.unmatchedPendingCertificates: + // the agreement service has a valid certificate for a block, but not the block itself. + s.sync(&cert) } if currBlock == s.ledger.LastRound() { @@ -446,8 +460,9 @@ func (s *Service) periodicSync() { } // Syncs the client with the network. sync asks the network for last known block and tries to sync the system -// up the to the highest number it gets -func (s *Service) sync() { +// up the to the highest number it gets. When a certificate is provided, the sync function attempts to keep trying +// to fetch the matching block or abort when the catchup service exits. +func (s *Service) sync(cert *PendingUnmatchedCertificate) { // Only run sync once at a time // Store start time of sync - in NS so we can compute time.Duration (which is based on NS) start := time.Now() @@ -464,14 +479,19 @@ func (s *Service) sync() { StartRound: uint64(pr), }) - seedLookback := uint64(2) - proto, err := s.ledger.ConsensusParams(pr) - if err != nil { - s.log.Errorf("catchup: could not get consensus parameters for round %v: $%v", pr, err) + if cert == nil { + seedLookback := uint64(2) + proto, err := s.ledger.ConsensusParams(pr) + if err != nil { + s.log.Errorf("catchup: could not get consensus parameters for round %v: $%v", pr, err) + } else { + seedLookback = proto.SeedLookback + } + s.pipelinedFetch(seedLookback) } else { - seedLookback = proto.SeedLookback + // we want to fetch a single round. no need to be concerned about lookback. + s.fetchRound(cert.Cert, cert.VoteVerifier) } - s.pipelinedFetch(seedLookback) initSync := false @@ -492,6 +512,65 @@ func (s *Service) sync() { s.log.Infof("Catchup Service: finished catching up, now at round %v (previously %v). Total time catching up %v.", s.ledger.LastRound(), pr, elapsedTime) } +// TODO this doesn't actually use the digest from cert! +func (s *Service) fetchRound(cert agreement.Certificate, verifier *agreement.AsyncVoteVerifier) { + blockHash := bookkeeping.BlockHash(cert.Proposal.BlockDigest) // semantic digest (i.e., hash of the block header), not byte-for-byte digest + fetcher := s.latestRoundFetcherFactory.NewOverGossip(protocol.UniEnsBlockReqTag) + defer func() { + fetcher.Close() + }() + for s.ledger.LastRound() < cert.Round { + if fetcher.OutOfPeers(cert.Round) { + fetcher.Close() + // refresh peers and try again + logging.Base().Warn("fetchRound found no outgoing peers") + s.net.RequestConnectOutgoing(true, s.ctx.Done()) + fetcher = s.latestRoundFetcherFactory.NewOverGossip(protocol.UniEnsBlockReqTag) + } + // Ask the fetcher to get the block somehow + block, fetchedCert, rpcc, err := s.innerFetch(fetcher, cert.Round) + + if err != nil { + select { + case <-s.ctx.Done(): + logging.Base().Debugf("fetchRound was asked to quit before we could acquire the block") + return + default: + } + logging.Base().Warnf("fetchRound could not acquire block, fetcher errored out: %v", err) + continue + } + rpcc.Close() + + if block.Hash() == blockHash && block.ContentsMatchHeader() { + s.ledger.EnsureBlock(block, cert) + return + } + // Otherwise, fetcher gave us the wrong block + logging.Base().Warnf("fetcher gave us bad/wrong block (for round %d): fetched hash %v; want hash %v", cert.Round, block.Hash(), blockHash) + + // As a failsafe, if the cert we fetched is valid but for the wrong block, panic as loudly as possible + if cert.Round == fetchedCert.Round && + cert.Proposal.BlockDigest != fetchedCert.Proposal.BlockDigest && + fetchedCert.Authenticate(*block, s.ledger, verifier) == nil { + s := "!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!\n" + s += "!!!!!!!!!! FORK DETECTED !!!!!!!!!!!\n" + s += "!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!\n" + s += "fetchRound called with a cert authenticating block with hash %v.\n" + s += "We fetched a valid cert authenticating a different block, %v. This indicates a fork.\n\n" + s += "Cert from our agreement service:\n%#v\n\n" + s += "Cert from the fetcher:\n%#v\n\n" + s += "Block from the fetcher:\n%#v\n\n" + s += "!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!\n" + s += "!!!!!!!!!! FORK DETECTED !!!!!!!!!!!\n" + s += "!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!\n" + s = fmt.Sprintf(s, cert.Proposal.BlockDigest, fetchedCert.Proposal.BlockDigest, cert, fetchedCert, block) + fmt.Println(s) + logging.Base().Error(s) + } + } +} + // nextRoundIsNotSupported returns true if the next round upgrades to a protocol version // which is not supported. // In case of an error, it returns false diff --git a/catchup/service_test.go b/catchup/service_test.go index 62db567632..d12d313550 100644 --- a/catchup/service_test.go +++ b/catchup/service_test.go @@ -30,8 +30,10 @@ import ( "github.com/algorand/go-algorand/agreement" "github.com/algorand/go-algorand/components/mocks" "github.com/algorand/go-algorand/config" + "github.com/algorand/go-algorand/crypto" "github.com/algorand/go-algorand/data/basics" "github.com/algorand/go-algorand/data/bookkeeping" + "github.com/algorand/go-algorand/data/committee" "github.com/algorand/go-algorand/logging" "github.com/algorand/go-algorand/protocol" "github.com/algorand/go-algorand/rpcs" @@ -115,14 +117,15 @@ func (m *MockedFetcher) FetchBlock(ctx context.Context, round basics.Round) (*bo // Add random delay to get it out of sync time.Sleep(time.Duration(rand.Int()%50) * time.Millisecond) } - - block, cert, err := m.ledger.BlockCert(round) + block, err := m.ledger.Block(round) if round > m.ledger.LastRound() { return nil, nil, nil, errors.New("no block") } else if err != nil { panic(err) } + var cert agreement.Certificate + cert.Proposal.BlockDigest = block.Digest() return &block, &cert, &m.client, nil } @@ -182,10 +185,10 @@ func TestServiceFetchBlocksSameRange(t *testing.T) { net := &mocks.MockNetwork{} // Make Service - syncer := MakeService(logging.Base(), defaultConfig, net, local, nil, &mockedAuthenticator{errorRound: -1}) + syncer := MakeService(logging.Base(), defaultConfig, net, local, nil, &mockedAuthenticator{errorRound: -1}, nil) syncer.fetcherFactory = makeMockFactory(&MockedFetcher{ledger: remote, timeout: false, tries: make(map[basics.Round]int)}) - syncer.sync() + syncer.sync(nil) require.Equal(t, remote.LastRound(), local.LastRound()) } @@ -197,7 +200,7 @@ func TestPeriodicSync(t *testing.T) { initialLocalRound := local.LastRound() // Make Service - s := MakeService(logging.Base(), defaultConfig, &mocks.MockNetwork{}, local, nil, auth) + s := MakeService(logging.Base(), defaultConfig, &mocks.MockNetwork{}, local, nil, auth, nil) s.deadlineTimeout = 2 * time.Second factory := MockedFetcherFactory{fetcher: &MockedFetcher{ledger: remote, timeout: false, tries: make(map[basics.Round]int)}} @@ -232,7 +235,7 @@ func TestServiceFetchBlocksOneBlock(t *testing.T) { net := &mocks.MockNetwork{} // Make Service - s := MakeService(logging.Base(), defaultConfig, net, local, nil, &mockedAuthenticator{errorRound: -1}) + s := MakeService(logging.Base(), defaultConfig, net, local, nil, &mockedAuthenticator{errorRound: -1}, nil) factory := MockedFetcherFactory{fetcher: &MockedFetcher{ledger: remote, timeout: false, tries: make(map[basics.Round]int)}} s.fetcherFactory = &factory @@ -240,7 +243,7 @@ func TestServiceFetchBlocksOneBlock(t *testing.T) { require.False(t, factory.fetcher.client.closed) // Fetch blocks - s.sync() + s.sync(nil) // Asserts that the last block is the one we expect require.Equal(t, lastRoundAtStart+basics.Round(numBlocks), local.LastRound()) @@ -270,7 +273,7 @@ func TestAbruptWrites(t *testing.T) { lastRound := local.LastRound() // Make Service - s := MakeService(logging.Base(), defaultConfig, &mocks.MockNetwork{}, local, nil, &mockedAuthenticator{errorRound: -1}) + s := MakeService(logging.Base(), defaultConfig, &mocks.MockNetwork{}, local, nil, &mockedAuthenticator{errorRound: -1}, nil) factory := MockedFetcherFactory{fetcher: &MockedFetcher{ledger: remote, timeout: false, tries: make(map[basics.Round]int)}} s.fetcherFactory = &factory @@ -281,14 +284,16 @@ func TestAbruptWrites(t *testing.T) { defer wg.Done() for i := basics.Round(lastRound + 1); i <= basics.Round(numberOfBlocks); i++ { time.Sleep(time.Duration(rand.Uint32()%5) * time.Millisecond) - blk, cert, err := remote.BlockCert(i) + blk, err := remote.Block(i) require.NoError(t, err) + var cert agreement.Certificate + cert.Proposal.BlockDigest = blk.Digest() err = local.AddBlock(blk, cert) require.NoError(t, err) } }() - s.sync() + s.sync(nil) require.Equal(t, remote.LastRound(), local.LastRound()) } @@ -302,11 +307,11 @@ func TestServiceFetchBlocksMultiBlocks(t *testing.T) { lastRoundAtStart := local.LastRound() // Make Service - syncer := MakeService(logging.Base(), defaultConfig, &mocks.MockNetwork{}, local, nil, &mockedAuthenticator{errorRound: -1}) + syncer := MakeService(logging.Base(), defaultConfig, &mocks.MockNetwork{}, local, nil, &mockedAuthenticator{errorRound: -1}, nil) syncer.fetcherFactory = &MockedFetcherFactory{fetcher: &MockedFetcher{ledger: remote, timeout: false, tries: make(map[basics.Round]int)}} // Fetch blocks - syncer.sync() + syncer.sync(nil) // Asserts that the last block is the one we expect require.Equal(t, lastRoundAtStart+numberOfBlocks, local.LastRound()) @@ -331,10 +336,10 @@ func TestServiceFetchBlocksMalformed(t *testing.T) { lastRoundAtStart := local.LastRound() // Make Service - s := MakeService(logging.Base(), defaultConfig, &mocks.MockNetwork{}, local, nil, &mockedAuthenticator{errorRound: int(lastRoundAtStart + 1)}) + s := MakeService(logging.Base(), defaultConfig, &mocks.MockNetwork{}, local, nil, &mockedAuthenticator{errorRound: int(lastRoundAtStart + 1)}, nil) s.fetcherFactory = &MockedFetcherFactory{fetcher: &MockedFetcher{ledger: remote, timeout: false, tries: make(map[basics.Round]int)}} - s.sync() + s.sync(nil) require.Equal(t, lastRoundAtStart, local.LastRound()) require.True(t, s.fetcherFactory.(*MockedFetcherFactory).fetcher.client.closed) } @@ -446,7 +451,7 @@ func helperTestOnSwitchToUnSupportedProtocol( remote = Ledger(mRemote) // Make Service - s := MakeService(logging.Base(), defaultConfig, &mocks.MockNetwork{}, local, nil, &mockedAuthenticator{errorRound: -1}) + s := MakeService(logging.Base(), defaultConfig, &mocks.MockNetwork{}, local, nil, &mockedAuthenticator{errorRound: -1}, nil) s.deadlineTimeout = 2 * time.Second s.fetcherFactory = &MockedFetcherFactory{fetcher: &MockedFetcher{ledger: remote, timeout: false, tries: make(map[basics.Round]int)}} @@ -529,15 +534,6 @@ func (m *mockedLedger) Wait(r basics.Round) chan struct{} { return m.chans[r] } -func (m *mockedLedger) BlockCert(r basics.Round) (bookkeeping.Block, agreement.Certificate, error) { - m.mu.Lock() - defer m.mu.Unlock() - if r > m.lastRound() { - return bookkeeping.Block{}, agreement.Certificate{}, errors.New("mockedLedger.BlockCert: round too high") - } - return m.blocks[r], agreement.Certificate{}, nil -} - func (m *mockedLedger) Block(r basics.Round) (bookkeeping.Block, error) { m.mu.Lock() defer m.mu.Unlock() @@ -547,6 +543,26 @@ func (m *mockedLedger) Block(r basics.Round) (bookkeeping.Block, error) { return m.blocks[r], nil } +func (m *mockedLedger) BalanceRecord(basics.Round, basics.Address) (basics.BalanceRecord, error) { + return basics.BalanceRecord{}, errors.New("not needed for mockedLedger") +} +func (m *mockedLedger) Circulation(basics.Round) (basics.MicroAlgos, error) { + return basics.MicroAlgos{}, errors.New("not needed for mockedLedger") +} +func (m *mockedLedger) ConsensusVersion(basics.Round) (protocol.ConsensusVersion, error) { + return protocol.ConsensusCurrentVersion, nil +} +func (m *mockedLedger) EnsureBlock(block *bookkeeping.Block, c agreement.Certificate) { + m.AddBlock(*block, c) +} +func (m *mockedLedger) Seed(basics.Round) (committee.Seed, error) { + return committee.Seed{}, errors.New("not needed for mockedLedger") +} + +func (m *mockedLedger) LookupDigest(basics.Round) (crypto.Digest, error) { + return crypto.Digest{}, errors.New("not needed for mockedLedger") +} + func testingenv(t testing.TB, numBlocks int) (ledger, emptyLedger Ledger) { mLedger := new(mockedLedger) mEmptyLedger := new(mockedLedger) @@ -595,3 +611,34 @@ func testingenvWithUpgrade( return mLedger, mEmptyLedger } + +type MockVoteVerifier struct{} + +func (avv *MockVoteVerifier) Quit() { +} +func (avv *MockVoteVerifier) Parallelism() int { + return 1 +} + +func TestCatchupUnmatchedCertificate(t *testing.T) { + // Make Ledger + remote, local := testingenv(t, 10) + + lastRoundAtStart := local.LastRound() + + // Make Service + s := MakeService(logging.Base(), defaultConfig, &mocks.MockNetwork{}, local, nil, &mockedAuthenticator{errorRound: int(lastRoundAtStart + 1)}, nil) + s.latestRoundFetcherFactory = &MockedFetcherFactory{fetcher: &MockedFetcher{ledger: remote, timeout: false, tries: make(map[basics.Round]int)}} + for roundNumber := 2; roundNumber < 10; roundNumber += 3 { + pc := &PendingUnmatchedCertificate{ + Cert: agreement.Certificate{ + Round: basics.Round(roundNumber), + }, + VoteVerifier: agreement.MakeAsyncVoteVerifier(nil), + } + block, _ := remote.Block(basics.Round(roundNumber)) + pc.Cert.Proposal.BlockDigest = block.Digest() + s.sync(pc) + require.True(t, s.latestRoundFetcherFactory.(*MockedFetcherFactory).fetcher.client.closed) + } +} diff --git a/node/impls.go b/node/impls.go index 136c5d1068..92176bf68c 100644 --- a/node/impls.go +++ b/node/impls.go @@ -22,6 +22,7 @@ import ( "time" "github.com/algorand/go-algorand/agreement" + "github.com/algorand/go-algorand/catchup" "github.com/algorand/go-algorand/data" "github.com/algorand/go-algorand/data/basics" "github.com/algorand/go-algorand/data/bookkeeping" @@ -30,16 +31,11 @@ import ( "github.com/algorand/go-algorand/ledger" "github.com/algorand/go-algorand/logging" "github.com/algorand/go-algorand/logging/telemetryspec" - "github.com/algorand/go-algorand/network" - "github.com/algorand/go-algorand/protocol" - "github.com/algorand/go-algorand/rpcs" "github.com/algorand/go-algorand/util/execpool" ) // TODO these implementations should be pushed down into the corresponding structs or alternatively turned into new structs in the correct subpackages -const blockQueryPeerLimit = 10 - type blockAuthenticatorImpl struct { *data.Ledger *agreement.AsyncVoteVerifier @@ -145,9 +141,14 @@ func (vb validatedBlock) Block() bookkeeping.Block { // agreementLedger implements the agreement.Ledger interface. type agreementLedger struct { *data.Ledger + UnmatchedPendingCertificates chan catchup.PendingUnmatchedCertificate +} - ff rpcs.FetcherFactory - n network.GossipNode +func makeAgreementLedger(ledger *data.Ledger) agreementLedger { + return agreementLedger{ + Ledger: ledger, + UnmatchedPendingCertificates: make(chan catchup.PendingUnmatchedCertificate, 1), + } } // EnsureBlock implements agreement.LedgerWriter.EnsureBlock. @@ -161,108 +162,44 @@ func (l agreementLedger) EnsureValidatedBlock(ve agreement.ValidatedBlock, c agr } // EnsureDigest implements agreement.LedgerWriter.EnsureDigest. -// TODO: Get rid of EnsureDigest -- instead the ledger should expose what blocks it's waiting on, and a separate service should fetch them and call EnsureBlock -// should "retry until cert matches" logic live here or in the abstract fetcher? func (l agreementLedger) EnsureDigest(cert agreement.Certificate, quit chan struct{}, verifier *agreement.AsyncVoteVerifier) { - round := cert.Round - blockHash := bookkeeping.BlockHash(cert.Proposal.BlockDigest) // semantic digest (i.e., hash of the block header), not byte-for-byte digest - logging.Base().Debug("consensus was reached on a block we don't have yet: ", blockHash) - for { - // Ask the fetcher to get the block somehow - block, fetchedCert, err := l.FetchBlockByDigest(round, quit) - if err != nil { - select { - case <-quit: - logging.Base().Debugf("EnsureDigest was asked to quit before we could acquire the block") - return - default: - } - logging.Base().Panicf("EnsureDigest could not acquire block, fetcher errored out: %v", err) - } - - if block.Hash() == blockHash && block.ContentsMatchHeader() { - l.EnsureBlock(block, cert) - return - } - // Otherwise, fetcher gave us the wrong block - logging.Base().Warnf("fetcher gave us bad/wrong block (for round %d): fetched hash %v; want hash %v", round, block.Hash(), blockHash) - - // As a failsafe, if the cert we fetched is valid but for the wrong block, panic as loudly as possible - if cert.Round == fetchedCert.Round && - cert.Proposal.BlockDigest != fetchedCert.Proposal.BlockDigest && - fetchedCert.Authenticate(block, l, verifier) == nil { - s := "!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!\n" - s += "!!!!!!!!!! FORK DETECTED !!!!!!!!!!!\n" - s += "!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!\n" - s += "EnsureDigest called with a cert authenticating block with hash %v.\n" - s += "We fetched a valid cert authenticating a different block, %v. This indicates a fork.\n\n" - s += "Cert from our agreement service:\n%#v\n\n" - s += "Cert from the fetcher:\n%#v\n\n" - s += "Block from the fetcher:\n%#v\n\n" - s += "!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!\n" - s += "!!!!!!!!!! FORK DETECTED !!!!!!!!!!!\n" - s += "!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!\n" - s = fmt.Sprintf(s, cert.Proposal.BlockDigest, fetchedCert.Proposal.BlockDigest, cert, fetchedCert, block) - fmt.Println(s) - logging.Base().Error(s) - } + certRoundReachedCh := l.Wait(cert.Round) + // clear out the pending certificates ( if any ) + select { + case pendingCert := <-l.UnmatchedPendingCertificates: + logging.Base().Debugf("agreementLedger.EnsureDigest has flushed out pending request for certificate for round %d in favor of recent certificate for round %d", pendingCert.Cert.Round, cert.Round) + default: } -} -func (l agreementLedger) innerFetch(fetcher rpcs.Fetcher, round basics.Round, quit chan struct{}) (*bookkeeping.Block, *agreement.Certificate, error) { - ctx, cancel := context.WithTimeout(context.Background(), rpcs.DefaultFetchTimeout) - defer cancel() - type fbreturn struct { - block *bookkeeping.Block - cert *agreement.Certificate - err error - } - localdone := make(chan fbreturn, 1) - go func() { - block, cert, _, err := fetcher.FetchBlock(ctx, round) - localdone <- fbreturn{block, cert, err} - }() + // if the quit channel is closed, we want to exit here before placing the request on the UnmatchedPendingCertificates + // channel. select { - case ret := <-localdone: - return ret.block, ret.cert, ret.err case <-quit: - return nil, nil, nil - case <-l.Wait(round): - return nil, nil, nil + logging.Base().Debugf("EnsureDigest was asked to quit before we enqueue the certificate request") + return + default: } -} -// FetchBlockByDigest is a helper for EnsureDigest. -// TODO This is a kludge. Instead we should have a service that sees what the ledger is waiting on, fetches it, and calls EnsureBlock on it. -// TODO this doesn't actually use the digest from cert! -func (l agreementLedger) FetchBlockByDigest(round basics.Round, quit chan struct{}) (bookkeeping.Block, agreement.Certificate, error) { - fetcher := l.ff.NewOverGossip(protocol.UniEnsBlockReqTag) + // The channel send to UnmatchedPendingCertificates is guaranteed to be non-blocking since due to the fact that - + // 1. the channel capacity is 1 + // 2. we just cleared a single item off this channel ( if there was any ) + // 3. the EnsureDigest method is being called with the agreeement service guarantee + // 4. no other senders to this channel exists + // we want to have this as a select statement to check if we neeed to exit before enqueueing the task to the catchup service. + l.UnmatchedPendingCertificates <- catchup.PendingUnmatchedCertificate{Cert: cert, VoteVerifier: verifier} + defer func() { - fetcher.Close() - }() - for { - if fetcher.OutOfPeers(round) { - fetcher.Close() - // refresh peers and try again - logging.Base().Warn("fetchBlockByDigest found no outgoing peers") - l.n.RequestConnectOutgoing(true, quit) - fetcher = l.ff.NewOverGossip(protocol.UniEnsBlockReqTag) - } - block, cert, err := l.innerFetch(fetcher, round, quit) - if err == nil { - if block == nil || cert == nil { - // nil error, nil block = async write - logging.Base().Debugf("async write of block from round %v to ledger (or quit)", round) - return l.BlockCert(round) // err is nil because ledger.Wait returned - } - return *block, *cert, nil - } + // clear out the content of the UnmatchedPendingCertificates channel if we somehow managed to get this round aquired by a different method ( i.e. regular catchup ) select { - case <-quit: - return bookkeeping.Block{}, agreement.Certificate{}, fmt.Errorf("asked to abort") + case <-l.UnmatchedPendingCertificates: default: - logging.Base().Debugf("error fetching block (%v), trying again", err) - // todo: consider rate-limiting here if a node is completely offline. } + }() + + select { + case <-quit: + logging.Base().Debugf("EnsureDigest was asked to quit before we could acquire the block") + case <-certRoundReachedCh: + // great! we've reached the desired round. } } diff --git a/node/node.go b/node/node.go index 083c9fe6ca..8b7bbd2a20 100644 --- a/node/node.go +++ b/node/node.go @@ -230,7 +230,7 @@ func MakeFull(log logging.Logger, rootDir string, cfg config.Local, phonebookDir blockFactory := makeBlockFactory(node.ledger, node.transactionPool, node.config.EnableProcessBlockStats, node.highPriorityCryptoVerificationPool) blockValidator := blockValidatorImpl{l: node.ledger, tp: node.transactionPool, verificationPool: node.highPriorityCryptoVerificationPool} - agreementLedger := agreementLedger{Ledger: node.ledger, ff: rpcs.MakeNetworkFetcherFactory(node.net, blockQueryPeerLimit, node.wsFetcherService), n: node.net} + agreementLedger := makeAgreementLedger(node.ledger) agreementParameters := agreement.Parameters{ Logger: log, @@ -247,7 +247,7 @@ func MakeFull(log logging.Logger, rootDir string, cfg config.Local, phonebookDir } node.algorandService = agreement.MakeService(agreementParameters) - node.syncer = catchup.MakeService(node.log, node.config, p2pNode, node.ledger, node.wsFetcherService, blockAuthenticatorImpl{Ledger: node.ledger, AsyncVoteVerifier: agreement.MakeAsyncVoteVerifier(node.lowPriorityCryptoVerificationPool)}) + node.syncer = catchup.MakeService(node.log, node.config, p2pNode, node.ledger, node.wsFetcherService, blockAuthenticatorImpl{Ledger: node.ledger, AsyncVoteVerifier: agreement.MakeAsyncVoteVerifier(node.lowPriorityCryptoVerificationPool)}, agreementLedger.UnmatchedPendingCertificates) node.txPoolSyncer = rpcs.MakeTxSyncer(node.transactionPool, node.net, node.txHandler.SolicitedTxHandler(), time.Duration(cfg.TxSyncIntervalSeconds)*time.Second, time.Duration(cfg.TxSyncTimeoutSeconds)*time.Second, cfg.TxSyncServeResponseSize) err = node.loadParticipationKeys() diff --git a/test/e2e-go/features/catchup/basicCatchup_test.go b/test/e2e-go/features/catchup/basicCatchup_test.go index fb7575eec5..bc81235cd3 100644 --- a/test/e2e-go/features/catchup/basicCatchup_test.go +++ b/test/e2e-go/features/catchup/basicCatchup_test.go @@ -103,7 +103,7 @@ func TestCatchupOverGossip(t *testing.T) { // Let the network make some progress - waitForRound := uint64(5) + waitForRound := uint64(3) err = fixture.ClientWaitForRoundWithTimeout(fixture.GetAlgodClientForController(nc), waitForRound) a.NoError(err) @@ -123,6 +123,22 @@ func TestCatchupOverGossip(t *testing.T) { // Now, catch up err = fixture.LibGoalFixture.ClientWaitForRoundWithTimeout(lg, waitForRound) a.NoError(err) + + // wait until the round number on the secondary node matches the round number on the primary node. + for { + nodeLibGoalClient := fixture.LibGoalFixture.GetLibGoalClientFromDataDir(nc.GetDataDir()) + nodeStatus, err := nodeLibGoalClient.Status() + a.NoError(err) + + primaryStatus, err := lg.Status() + a.NoError(err) + a.True(nodeStatus.LastRound >= primaryStatus.LastRound) + if nodeStatus.LastRound == primaryStatus.LastRound && waitForRound < nodeStatus.LastRound { + //t.Logf("Both nodes reached round %d\n", primaryStatus.LastRound) + break + } + time.Sleep(50 * time.Millisecond) + } } func TestStoppedCatchupOnUnsupported(t *testing.T) { @@ -132,7 +148,7 @@ func TestStoppedCatchupOnUnsupported(t *testing.T) { t.Parallel() a := require.New(t) - defer os.Unsetenv("ALGORAND_TEST_UNUPGRADEDPROTOCOL_DELETE_UPGRADE") + defer os.Unsetenv("ALGORAND_TEST_UNUPGRADEDPROTOCOL_DELETE_UPGRADE") os.Setenv("ALGORAND_TEST_UNUPGRADEDPROTOCOL_DELETE_UPGRADE", "0") // Overview of this test: @@ -166,7 +182,7 @@ func TestStoppedCatchupOnUnsupported(t *testing.T) { a.NoError(err) cloneClient, err := fixture.StartNode(cloneDataDir) a.NoError(err) - defer shutdownClonedNode(cloneDataDir, &fixture, t) + defer shutdownClonedNode(cloneDataDir, &fixture, t) // Now, catch up err = fixture.LibGoalFixture.ClientWaitForRoundWithTimeout(cloneClient, waitForRound) @@ -219,8 +235,8 @@ func TestStoppedCatchupOnUnsupported(t *testing.T) { } // shutdownClonedNode replicates the behavior of fixture.Shutdown() for network nodes on cloned node -// It deletes the directory if the test passes, otherwise it preserves it -func shutdownClonedNode(nodeDataDir string, f * fixtures.RestClientFixture, t *testing.T) { +// It deletes the directory if the test passes, otherwise it preserves it +func shutdownClonedNode(nodeDataDir string, f *fixtures.RestClientFixture, t *testing.T) { nc := f.LibGoalFixture.GetNodeControllerForDataDir(nodeDataDir) nc.FullStop() if !t.Failed() {