Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions catchup/pref_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,11 +54,11 @@ func BenchmarkServiceFetchBlocks(b *testing.B) {
require.NoError(b, err)

// Make Service
syncer := MakeService(logging.Base(), defaultConfig, net, local, nil, new(mockedAuthenticator))
syncer := MakeService(logging.Base(), defaultConfig, net, local, nil, new(mockedAuthenticator), nil)
syncer.fetcherFactory = makeMockFactory(&MockedFetcher{ledger: remote, timeout: false, tries: make(map[basics.Round]int), latency: 100 * time.Millisecond, predictable: true})

b.StartTimer()
syncer.sync()
syncer.sync(nil)
b.StopTimer()
local.Close()
require.Equal(b, remote.LastRound(), local.LastRound())
Expand Down
121 changes: 100 additions & 21 deletions catchup/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package catchup

import (
"context"
"fmt"
"sync"
"sync/atomic"
"time"
Expand All @@ -36,21 +37,25 @@ import (
)

const catchupPeersForSync = 10
const blockQueryPeerLimit = 10

// this should be at least the number of relays
const catchupRetryLimit = 500

// PendingUnmatchedCertificate is a single certificate that is being waited upon to have its corresponding block fetched.
type PendingUnmatchedCertificate struct {
Cert agreement.Certificate
VoteVerifier *agreement.AsyncVoteVerifier
}

// Ledger represents the interface of a block database which the
// catchup server should interact with.
type Ledger interface {
NextRound() basics.Round
LastRound() basics.Round
Wait(basics.Round) chan struct{}
agreement.LedgerReader
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you need the entire LedgerReader here as a dependency? (Or just Wait?)

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Needed for the usage of Certificate.Authenticate() call; I'm not a big fan of having yet-another Ledger interface.
I'd rather to have an instance of the data/Ledger directly, but I don't want to make this into a bigger change in the context of this change. ( I guess it used to be important for preventing circular referencing ? )

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess the point of this interface was to decouple the Ledger-related requirements of the catchup package from the rest of the codebase, and importing agreement.LedgerReader seems to work against that objective. I don't have strong opinions here, though.

AddBlock(bookkeeping.Block, agreement.Certificate) error
ConsensusParams(basics.Round) (config.ConsensusParams, error)

EnsureBlock(block *bookkeeping.Block, c agreement.Certificate)
LastRound() basics.Round
Block(basics.Round) (bookkeeping.Block, error)
BlockCert(basics.Round) (bookkeeping.Block, agreement.Certificate, error)
}

// Service represents the catchup service. Once started and until it is stopped, it ensures that the ledger is up to date with network.
Expand All @@ -70,10 +75,13 @@ type Service struct {

// The channel gets closed when the initial sync is complete. This allows for other services to avoid
// the overhead of starting prematurely (before this node is caught-up and can validate messages for example).
InitialSyncDone chan struct{}
initialSyncNotified uint32
protocolErrorLogged bool
lastSupportedRound basics.Round
InitialSyncDone chan struct{}
initialSyncNotified uint32
protocolErrorLogged bool
lastSupportedRound basics.Round
unmatchedPendingCertificates <-chan PendingUnmatchedCertificate

latestRoundFetcherFactory rpcs.FetcherFactory
}

// A BlockAuthenticator authenticates blocks given a certificate.
Expand All @@ -90,14 +98,17 @@ type BlockAuthenticator interface {

// MakeService creates a catchup service instance from its constituent components
// If wsf is nil, then fetch over gossip is disabled.
func MakeService(log logging.Logger, config config.Local, net network.GossipNode, ledger Ledger, wsf *rpcs.WsFetcherService, auth BlockAuthenticator) (s *Service) {
func MakeService(log logging.Logger, config config.Local, net network.GossipNode, ledger Ledger, wsf *rpcs.WsFetcherService, auth BlockAuthenticator, unmatchedPendingCertificates <-chan PendingUnmatchedCertificate) (s *Service) {
s = &Service{}
s.ctx, s.cancel = context.WithCancel(context.Background())
s.cfg = config
s.fetcherFactory = rpcs.MakeNetworkFetcherFactory(net, catchupPeersForSync, wsf)
s.ledger = ledger
s.net = net
s.auth = auth
s.unmatchedPendingCertificates = unmatchedPendingCertificates

s.latestRoundFetcherFactory = rpcs.MakeNetworkFetcherFactory(net, blockQueryPeerLimit, wsf)
Comment thread
algonautshant marked this conversation as resolved.

s.log = log.With("Context", "sync")
s.InitialSyncDone = make(chan struct{})
Expand Down Expand Up @@ -408,7 +419,7 @@ func (s *Service) periodicSync() {
case <-s.ctx.Done():
return
}
s.sync()
s.sync(nil)
stuckInARow := 0
sleepDuration := s.deadlineTimeout
for {
Expand All @@ -429,7 +440,10 @@ func (s *Service) periodicSync() {
continue
}
s.log.Info("It's been too long since our ledger advanced; resyncing")
s.sync()
s.sync(nil)
case cert := <-s.unmatchedPendingCertificates:
// the agreement service has a valid certificate for a block, but not the block itself.
s.sync(&cert)
}

if currBlock == s.ledger.LastRound() {
Expand All @@ -446,8 +460,9 @@ func (s *Service) periodicSync() {
}

// Syncs the client with the network. sync asks the network for last known block and tries to sync the system
// up the to the highest number it gets
func (s *Service) sync() {
// up the to the highest number it gets. When a certificate is provided, the sync function attempts to keep trying
// to fetch the matching block or abort when the catchup service exits.
func (s *Service) sync(cert *PendingUnmatchedCertificate) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pleas add a comment to describe the purpose of cert, and that it can be nil.

// Only run sync once at a time
// Store start time of sync - in NS so we can compute time.Duration (which is based on NS)
start := time.Now()
Expand All @@ -464,14 +479,19 @@ func (s *Service) sync() {
StartRound: uint64(pr),
})

seedLookback := uint64(2)
proto, err := s.ledger.ConsensusParams(pr)
if err != nil {
s.log.Errorf("catchup: could not get consensus parameters for round %v: $%v", pr, err)
if cert == nil {
seedLookback := uint64(2)
proto, err := s.ledger.ConsensusParams(pr)
if err != nil {
s.log.Errorf("catchup: could not get consensus parameters for round %v: $%v", pr, err)
} else {
seedLookback = proto.SeedLookback
}
s.pipelinedFetch(seedLookback)
} else {
seedLookback = proto.SeedLookback
// we want to fetch a single round. no need to be concerned about lookback.
s.fetchRound(cert.Cert, cert.VoteVerifier)
}
s.pipelinedFetch(seedLookback)

initSync := false

Expand All @@ -492,6 +512,65 @@ func (s *Service) sync() {
s.log.Infof("Catchup Service: finished catching up, now at round %v (previously %v). Total time catching up %v.", s.ledger.LastRound(), pr, elapsedTime)
}

// TODO this doesn't actually use the digest from cert!
func (s *Service) fetchRound(cert agreement.Certificate, verifier *agreement.AsyncVoteVerifier) {
Comment thread
derbear marked this conversation as resolved.
blockHash := bookkeeping.BlockHash(cert.Proposal.BlockDigest) // semantic digest (i.e., hash of the block header), not byte-for-byte digest
fetcher := s.latestRoundFetcherFactory.NewOverGossip(protocol.UniEnsBlockReqTag)
defer func() {
fetcher.Close()
}()
for s.ledger.LastRound() < cert.Round {
if fetcher.OutOfPeers(cert.Round) {
fetcher.Close()
// refresh peers and try again
logging.Base().Warn("fetchRound found no outgoing peers")
s.net.RequestConnectOutgoing(true, s.ctx.Done())
fetcher = s.latestRoundFetcherFactory.NewOverGossip(protocol.UniEnsBlockReqTag)
}
// Ask the fetcher to get the block somehow
block, fetchedCert, rpcc, err := s.innerFetch(fetcher, cert.Round)

if err != nil {
select {
case <-s.ctx.Done():
logging.Base().Debugf("fetchRound was asked to quit before we could acquire the block")
return
default:
}
logging.Base().Warnf("fetchRound could not acquire block, fetcher errored out: %v", err)
continue
}
rpcc.Close()

if block.Hash() == blockHash && block.ContentsMatchHeader() {
s.ledger.EnsureBlock(block, cert)
return
}
// Otherwise, fetcher gave us the wrong block
logging.Base().Warnf("fetcher gave us bad/wrong block (for round %d): fetched hash %v; want hash %v", cert.Round, block.Hash(), blockHash)

// As a failsafe, if the cert we fetched is valid but for the wrong block, panic as loudly as possible
if cert.Round == fetchedCert.Round &&
cert.Proposal.BlockDigest != fetchedCert.Proposal.BlockDigest &&
fetchedCert.Authenticate(*block, s.ledger, verifier) == nil {
s := "!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!\n"
s += "!!!!!!!!!! FORK DETECTED !!!!!!!!!!!\n"
s += "!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!\n"
s += "fetchRound called with a cert authenticating block with hash %v.\n"
s += "We fetched a valid cert authenticating a different block, %v. This indicates a fork.\n\n"
s += "Cert from our agreement service:\n%#v\n\n"
s += "Cert from the fetcher:\n%#v\n\n"
s += "Block from the fetcher:\n%#v\n\n"
s += "!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!\n"
s += "!!!!!!!!!! FORK DETECTED !!!!!!!!!!!\n"
s += "!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!\n"
s = fmt.Sprintf(s, cert.Proposal.BlockDigest, fetchedCert.Proposal.BlockDigest, cert, fetchedCert, block)
fmt.Println(s)
logging.Base().Error(s)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't this be a Panic? We don't want to keep running after a fork

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Current code doesn't panic. I'm not sure if panicking would be the smartest idea.
I think that logging to telemetry and having some forever-sleeping here would be a better thing to do. Keep in mind that the agreement is already locked-step with this one, so if we go to sleep here, both the agreement & catchup are disabled.
Either way, I would break that off to a separate change since it's a behavioral change.

}
}
}

// nextRoundIsNotSupported returns true if the next round upgrades to a protocol version
// which is not supported.
// In case of an error, it returns false
Expand Down
Loading