From d5b96c446d03fc86758fafe6afdfa096de2061c7 Mon Sep 17 00:00:00 2001 From: Felix Lange Date: Tue, 18 Feb 2020 22:31:16 +0100 Subject: [PATCH 1/9] eth: improve shutdown synchronization Most goroutines started by eth.Ethereum didn't have any shutdown sync at all, which lead to weird error messages when quitting the client. This change improves the clean shutdown path by stopping all internal components in dependency order and waiting for them to actually be stopped before shutdown is considered done. In particular, we now stop everything related to peers before stopping 'resident' parts such as core.BlockChain. --- eth/backend.go | 44 ++++++++++++++++---------------- eth/bloombits.go | 2 +- eth/handler.go | 65 +++++++++++++++++++++++------------------------- eth/sync.go | 47 ++++++++++++++++++++-------------- 4 files changed, 82 insertions(+), 76 deletions(-) diff --git a/eth/backend.go b/eth/backend.go index ed79340f59b4..da2e338ced29 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -67,9 +67,6 @@ type LesServer interface { type Ethereum struct { config *Config - // Channel for shutting down the service - shutdownChan chan bool - // Handlers txPool *core.TxPool blockchain *core.BlockChain @@ -84,8 +81,9 @@ type Ethereum struct { engine consensus.Engine accountManager *accounts.Manager - bloomRequests chan chan *bloombits.Retrieval // Channel receiving bloom data retrieval requests - bloomIndexer *core.ChainIndexer // Bloom indexer operating during block imports + bloomRequests chan chan *bloombits.Retrieval // Channel receiving bloom data retrieval requests + bloomIndexer *core.ChainIndexer // Bloom indexer operating during block imports + closeBloomHandler chan bool APIBackend *EthAPIBackend @@ -145,17 +143,17 @@ func New(ctx *node.ServiceContext, config *Config) (*Ethereum, error) { log.Info("Initialised chain configuration", "config", chainConfig) eth := &Ethereum{ - config: config, - chainDb: chainDb, - eventMux: ctx.EventMux, - accountManager: ctx.AccountManager, - engine: CreateConsensusEngine(ctx, chainConfig, &config.Ethash, config.Miner.Notify, config.Miner.Noverify, chainDb), - shutdownChan: make(chan bool), - networkID: config.NetworkId, - gasPrice: config.Miner.GasPrice, - etherbase: config.Miner.Etherbase, - bloomRequests: make(chan chan *bloombits.Retrieval), - bloomIndexer: NewBloomIndexer(chainDb, params.BloomBitsBlocks, params.BloomConfirms), + config: config, + chainDb: chainDb, + eventMux: ctx.EventMux, + accountManager: ctx.AccountManager, + engine: CreateConsensusEngine(ctx, chainConfig, &config.Ethash, config.Miner.Notify, config.Miner.Noverify, chainDb), + closeBloomHandler: make(chan bool), + networkID: config.NetworkId, + gasPrice: config.Miner.GasPrice, + etherbase: config.Miner.Etherbase, + bloomRequests: make(chan chan *bloombits.Retrieval), + bloomIndexer: NewBloomIndexer(chainDb, params.BloomBitsBlocks, params.BloomConfirms), } bcVersion := rawdb.ReadDatabaseVersion(chainDb) @@ -557,18 +555,20 @@ func (s *Ethereum) Start(srvr *p2p.Server) error { // Stop implements node.Service, terminating all internal goroutines used by the // Ethereum protocol. func (s *Ethereum) Stop() error { - s.bloomIndexer.Close() - s.blockchain.Stop() - s.engine.Close() + // Stop all the peer-related stuff first. s.protocolManager.Stop() if s.lesServer != nil { s.lesServer.Stop() } + + // Then stop everything else. + s.bloomIndexer.Close() + close(s.closeBloomHandler) s.txPool.Stop() s.miner.Stop() - s.eventMux.Stop() - + s.blockchain.Stop() + s.engine.Close() s.chainDb.Close() - close(s.shutdownChan) + s.eventMux.Stop() return nil } diff --git a/eth/bloombits.go b/eth/bloombits.go index 9a31997d6002..35522b9bfa92 100644 --- a/eth/bloombits.go +++ b/eth/bloombits.go @@ -54,7 +54,7 @@ func (eth *Ethereum) startBloomHandlers(sectionSize uint64) { go func() { for { select { - case <-eth.shutdownChan: + case <-eth.closeBloomHandler: return case request := <-eth.bloomRequests: diff --git a/eth/handler.go b/eth/handler.go index 236e50729c83..9d2dce6e90cf 100644 --- a/eth/handler.go +++ b/eth/handler.go @@ -87,14 +87,11 @@ type ProtocolManager struct { whitelist map[uint64]common.Hash // channels for fetcher, syncer, txsyncLoop - newPeerCh chan *peer - txsyncCh chan *txsync - quitSync chan struct{} - noMorePeers chan struct{} - - // wait group is used for graceful shutdowns during downloading - // and processing - wg sync.WaitGroup + newPeerCh chan *peer + txsyncCh chan *txsync + quitSync chan struct{} + wg sync.WaitGroup + peerWG sync.WaitGroup // Test fields or hooks broadcastTxAnnouncesOnly bool // Testing field, disable transaction propagation @@ -105,17 +102,16 @@ type ProtocolManager struct { func NewProtocolManager(config *params.ChainConfig, checkpoint *params.TrustedCheckpoint, mode downloader.SyncMode, networkID uint64, mux *event.TypeMux, txpool txPool, engine consensus.Engine, blockchain *core.BlockChain, chaindb ethdb.Database, cacheLimit int, whitelist map[uint64]common.Hash) (*ProtocolManager, error) { // Create the protocol manager with the base fields manager := &ProtocolManager{ - networkID: networkID, - forkFilter: forkid.NewFilter(blockchain), - eventMux: mux, - txpool: txpool, - blockchain: blockchain, - peers: newPeerSet(), - whitelist: whitelist, - newPeerCh: make(chan *peer), - noMorePeers: make(chan struct{}), - txsyncCh: make(chan *txsync), - quitSync: make(chan struct{}), + networkID: networkID, + forkFilter: forkid.NewFilter(blockchain), + eventMux: mux, + txpool: txpool, + blockchain: blockchain, + peers: newPeerSet(), + whitelist: whitelist, + newPeerCh: make(chan *peer), + txsyncCh: make(chan *txsync), + quitSync: make(chan struct{}), } if mode == downloader.FullSync { // The database seems empty as the current block is the genesis. Yet the fast @@ -216,8 +212,8 @@ func (pm *ProtocolManager) makeProtocol(version uint) p2p.Protocol { peer := pm.newPeer(int(version), p, rw, pm.txpool.Get) select { case pm.newPeerCh <- peer: - pm.wg.Add(1) - defer pm.wg.Done() + pm.peerWG.Add(1) + defer pm.peerWG.Done() return pm.handle(peer) case <-pm.quitSync: return p2p.DiscQuitting @@ -260,40 +256,38 @@ func (pm *ProtocolManager) Start(maxPeers int) { pm.maxPeers = maxPeers // broadcast transactions + pm.wg.Add(1) pm.txsCh = make(chan core.NewTxsEvent, txChanSize) pm.txsSub = pm.txpool.SubscribeNewTxsEvent(pm.txsCh) go pm.txBroadcastLoop() // broadcast mined blocks + pm.wg.Add(1) pm.minedBlockSub = pm.eventMux.Subscribe(core.NewMinedBlockEvent{}) go pm.minedBroadcastLoop() // start sync handlers + pm.wg.Add(2) go pm.syncer() go pm.txsyncLoop64() // TODO(karalabe): Legacy initial tx echange, drop with eth/64. } func (pm *ProtocolManager) Stop() { - log.Info("Stopping Ethereum protocol") - pm.txsSub.Unsubscribe() // quits txBroadcastLoop pm.minedBlockSub.Unsubscribe() // quits blockBroadcastLoop - // Quit the sync loop. - // After this send has completed, no new peers will be accepted. - pm.noMorePeers <- struct{}{} - - // Quit fetcher, txsyncLoop. + // Quit syncer and txsync64. + // After this is done, no new peers will be accepted. close(pm.quitSync) + pm.downloader.Cancel() + pm.wg.Wait() // Disconnect existing sessions. // This also closes the gate for any new registrations on the peer set. // sessions which are already established but not added to pm.peers yet // will exit when they try to register. pm.peers.Close() - - // Wait for all peer handler goroutines and the loops to come down. - pm.wg.Wait() + pm.peerWG.Wait() log.Info("Ethereum protocol stopped") } @@ -883,9 +877,10 @@ func (pm *ProtocolManager) BroadcastTransactions(txs types.Transactions, propaga } } -// Mined broadcast loop +// minedBroadcastLoop sends mined blocks to connected peers. func (pm *ProtocolManager) minedBroadcastLoop() { - // automatically stops if unsubscribe + defer pm.wg.Done() + for obj := range pm.minedBlockSub.Chan() { if ev, ok := obj.Data.(core.NewMinedBlockEvent); ok { pm.BroadcastBlock(ev.Block, true) // First propagate block to peers @@ -894,7 +889,10 @@ func (pm *ProtocolManager) minedBroadcastLoop() { } } +// txBroadcastLoop announces new transactions to connected peers. func (pm *ProtocolManager) txBroadcastLoop() { + defer pm.wg.Done() + for { select { case event := <-pm.txsCh: @@ -906,7 +904,6 @@ func (pm *ProtocolManager) txBroadcastLoop() { pm.BroadcastTransactions(event.Txs, true) // First propagate transactions to peers pm.BroadcastTransactions(event.Txs, false) // Only then announce to the rest - // Err() channel will be closed when unsubscribing. case <-pm.txsSub.Err(): return } diff --git a/eth/sync.go b/eth/sync.go index 0709706c9188..72f9960e83db 100644 --- a/eth/sync.go +++ b/eth/sync.go @@ -32,7 +32,7 @@ const ( forceSyncCycle = 10 * time.Second // Time interval to force syncs, even if few peers are available minDesiredPeerCount = 5 // Amount of peers desired to start syncing - // This is the target size for the packs of transactions sent by txsyncLoop. + // This is the target size for the packs of transactions sent by txsyncLoop64. // A pack can get larger than this if a single transactions exceeds this size. txsyncPackSize = 100 * 1024 ) @@ -81,12 +81,15 @@ func (pm *ProtocolManager) syncTransactions(p *peer) { // transactions. In order to minimise egress bandwidth usage, we send // the transactions in small packs to one peer at a time. func (pm *ProtocolManager) txsyncLoop64() { + defer pm.wg.Done() + var ( pending = make(map[enode.ID]*txsync) sending = false // whether a send is active pack = new(txsync) // the pack that is being sent done = make(chan error, 1) // result of the send ) + // send starts a sending a pack of transactions from the sync. send := func(s *txsync) { if s.p.version >= eth65 { @@ -149,35 +152,41 @@ func (pm *ProtocolManager) txsyncLoop64() { } } -// syncer is responsible for periodically synchronising with the network, both -// downloading hashes and blocks as well as handling the announcement handler. +// syncer runs in its own goroutine and coordinates sync-related components. func (pm *ProtocolManager) syncer() { - // Start and ensure cleanup of sync mechanisms + defer pm.wg.Done() + pm.blockFetcher.Start() pm.txFetcher.Start() defer pm.blockFetcher.Stop() defer pm.txFetcher.Stop() defer pm.downloader.Terminate() - // Wait for different events to fire synchronisation operations - forceSync := time.NewTicker(forceSyncCycle) - defer forceSync.Stop() + for pm.syncTriggerWait() { + pm.synchronise(pm.peers.BestPeer()) + } +} +// syncTriggerWait waits for sync start conditions to be met. +func (pm *ProtocolManager) syncTriggerWait() bool { + force := time.NewTimer(forceSyncCycle) + defer force.Stop() for { select { - case <-pm.newPeerCh: - // Make sure we have peers to select from, then sync - if pm.peers.Len() < minDesiredPeerCount { - break + case <-pm.quitSync: + return false + default: + if pm.peers.Len() >= minDesiredPeerCount { + return true + } + select { + case <-pm.newPeerCh: + // Go round and check the peer count again. + case <-force.C: + return true + case <-pm.quitSync: + return false } - go pm.synchronise(pm.peers.BestPeer()) - - case <-forceSync.C: - // Force a sync even if not enough peers are present - go pm.synchronise(pm.peers.BestPeer()) - - case <-pm.noMorePeers: - return } } } From a4995184011fadae94a0ca7739e3763ce4356b1e Mon Sep 17 00:00:00 2001 From: Felix Lange Date: Fri, 6 Mar 2020 12:56:34 +0100 Subject: [PATCH 2/9] eth: rewrite sync controller --- eth/handler.go | 45 ++++++----- eth/helper_test.go | 15 +--- eth/protocol_test.go | 2 +- eth/sync.go | 175 ++++++++++++++++++++++++++++++------------- eth/sync_test.go | 9 ++- 5 files changed, 156 insertions(+), 90 deletions(-) diff --git a/eth/handler.go b/eth/handler.go index 9d2dce6e90cf..eb1943031dde 100644 --- a/eth/handler.go +++ b/eth/handler.go @@ -87,9 +87,10 @@ type ProtocolManager struct { whitelist map[uint64]common.Hash // channels for fetcher, syncer, txsyncLoop - newPeerCh chan *peer - txsyncCh chan *txsync - quitSync chan struct{} + txsyncCh chan *txsync + quitSync chan struct{} + + chainSync *chainSyncer wg sync.WaitGroup peerWG sync.WaitGroup @@ -109,10 +110,10 @@ func NewProtocolManager(config *params.ChainConfig, checkpoint *params.TrustedCh blockchain: blockchain, peers: newPeerSet(), whitelist: whitelist, - newPeerCh: make(chan *peer), txsyncCh: make(chan *txsync), quitSync: make(chan struct{}), } + if mode == downloader.FullSync { // The database seems empty as the current block is the genesis. Yet the fast // block is ahead, so fast sync was enabled for this node at a certain point. @@ -136,6 +137,7 @@ func NewProtocolManager(config *params.ChainConfig, checkpoint *params.TrustedCh manager.fastSync = uint32(1) } } + // If we have trusted checkpoints, enforce them on the chain if checkpoint != nil { manager.checkpointNumber = (checkpoint.SectionIndex+1)*params.CHTFrequency - 1 @@ -195,6 +197,8 @@ func NewProtocolManager(config *params.ChainConfig, checkpoint *params.TrustedCh } manager.txFetcher = fetcher.NewTxFetcher(txpool.Has, txpool.AddRemotes, fetchTx) + manager.chainSync = newChainSyncer(manager) + return manager, nil } @@ -209,15 +213,7 @@ func (pm *ProtocolManager) makeProtocol(version uint) p2p.Protocol { Version: version, Length: length, Run: func(p *p2p.Peer, rw p2p.MsgReadWriter) error { - peer := pm.newPeer(int(version), p, rw, pm.txpool.Get) - select { - case pm.newPeerCh <- peer: - pm.peerWG.Add(1) - defer pm.peerWG.Done() - return pm.handle(peer) - case <-pm.quitSync: - return p2p.DiscQuitting - } + return pm.runPeer(pm.newPeer(int(version), p, rw, pm.txpool.Get)) }, NodeInfo: func() interface{} { return pm.NodeInfo() @@ -268,7 +264,7 @@ func (pm *ProtocolManager) Start(maxPeers int) { // start sync handlers pm.wg.Add(2) - go pm.syncer() + go pm.chainSync.loop() go pm.txsyncLoop64() // TODO(karalabe): Legacy initial tx echange, drop with eth/64. } @@ -276,7 +272,7 @@ func (pm *ProtocolManager) Stop() { pm.txsSub.Unsubscribe() // quits txBroadcastLoop pm.minedBlockSub.Unsubscribe() // quits blockBroadcastLoop - // Quit syncer and txsync64. + // Quit chainSync and txsync64. // After this is done, no new peers will be accepted. close(pm.quitSync) pm.downloader.Cancel() @@ -296,6 +292,15 @@ func (pm *ProtocolManager) newPeer(pv int, p *p2p.Peer, rw p2p.MsgReadWriter, ge return newPeer(pv, p, rw, getPooledTx) } +func (pm *ProtocolManager) runPeer(p *peer) error { + if !pm.chainSync.handlePeerEvent(p) { + return p2p.DiscQuitting + } + pm.peerWG.Add(1) + defer pm.peerWG.Done() + return pm.handle(p) +} + // handle is the callback invoked to manage the life cycle of an eth peer. When // this function terminates, the peer is disconnected. func (pm *ProtocolManager) handle(p *peer) error { @@ -317,6 +322,7 @@ func (pm *ProtocolManager) handle(p *peer) error { p.Log().Debug("Ethereum handshake failed", "err", err) return err } + // Register the peer locally if err := pm.peers.Register(p); err != nil { p.Log().Error("Ethereum peer registration failed", "err", err) @@ -717,14 +723,7 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { // Update the peer's total difficulty if better than the previous if _, td := p.Head(); trueTD.Cmp(td) > 0 { p.SetHead(trueHead, trueTD) - - // Schedule a sync if above ours. Note, this will not fire a sync for a gap of - // a single block (as the true TD is below the propagated block), however this - // scenario should easily be covered by the fetcher. - currentHeader := pm.blockchain.CurrentHeader() - if trueTD.Cmp(pm.blockchain.GetTd(currentHeader.Hash(), currentHeader.Number.Uint64())) > 0 { - go pm.synchronise(p) - } + pm.chainSync.handlePeerEvent(p) } case msg.Code == NewPooledTransactionHashesMsg && p.version >= eth65: diff --git a/eth/helper_test.go b/eth/helper_test.go index bec37e16cbb4..3338af71d5c8 100644 --- a/eth/helper_test.go +++ b/eth/helper_test.go @@ -170,23 +170,14 @@ func newTestPeer(name string, version int, pm *ProtocolManager, shake bool) (*te // Create a message pipe to communicate through app, net := p2p.MsgPipe() - // Generate a random id and create the peer + // Start the peer on a new thread var id enode.ID rand.Read(id[:]) - peer := pm.newPeer(version, p2p.NewPeer(id, name, nil), net, pm.txpool.Get) - - // Start the peer on a new thread errc := make(chan error, 1) - go func() { - select { - case pm.newPeerCh <- peer: - errc <- pm.handle(peer) - case <-pm.quitSync: - errc <- p2p.DiscQuitting - } - }() + go func() { errc <- pm.runPeer(peer) }() tp := &testPeer{app: app, net: net, peer: peer} + // Execute any implicitly requested handshakes and return if shake { var ( diff --git a/eth/protocol_test.go b/eth/protocol_test.go index 4bbfe9bd3c09..a313e4e6cf94 100644 --- a/eth/protocol_test.go +++ b/eth/protocol_test.go @@ -385,7 +385,7 @@ func testSyncTransaction(t *testing.T, propagtion bool) { go pmFetcher.handle(pmFetcher.newPeer(65, p2p.NewPeer(enode.ID{}, "fetcher", nil), io1, pmFetcher.txpool.Get)) time.Sleep(250 * time.Millisecond) - pmFetcher.synchronise(pmFetcher.peers.BestPeer()) + pmFetcher.doSync(peerToSyncOp(downloader.FullSync, pmFetcher.peers.BestPeer())) atomic.StoreUint32(&pmFetcher.acceptTxs, 1) newTxs := make(chan core.NewTxsEvent, 1024) diff --git a/eth/sync.go b/eth/sync.go index 72f9960e83db..6e7793092115 100644 --- a/eth/sync.go +++ b/eth/sync.go @@ -17,6 +17,7 @@ package eth import ( + "math/big" "math/rand" "sync/atomic" "time" @@ -30,7 +31,7 @@ import ( const ( forceSyncCycle = 10 * time.Second // Time interval to force syncs, even if few peers are available - minDesiredPeerCount = 5 // Amount of peers desired to start syncing + defaultMinSyncPeers = 5 // Amount of peers desired to start syncing // This is the target size for the packs of transactions sent by txsyncLoop64. // A pack can get larger than this if a single transactions exceeds this size. @@ -152,79 +153,146 @@ func (pm *ProtocolManager) txsyncLoop64() { } } -// syncer runs in its own goroutine and coordinates sync-related components. -func (pm *ProtocolManager) syncer() { - defer pm.wg.Done() +// chainSyncer coordinates blockchain sync components. +type chainSyncer struct { + pm *ProtocolManager + force *time.Timer + forced bool // true when force timer fired + peerEventCh chan struct{} + doneCh chan error // non-nil when sync is running +} - pm.blockFetcher.Start() - pm.txFetcher.Start() - defer pm.blockFetcher.Stop() - defer pm.txFetcher.Stop() - defer pm.downloader.Terminate() +type chainSyncOp struct { + mode downloader.SyncMode + peer *peer + td *big.Int + head common.Hash +} + +func newChainSyncer(pm *ProtocolManager) *chainSyncer { + return &chainSyncer{ + pm: pm, + peerEventCh: make(chan struct{}), + } +} - for pm.syncTriggerWait() { - pm.synchronise(pm.peers.BestPeer()) +// handlePeerEvent notifies the syncer about a change in the peer set. +// This is called for new peers and every time a peer announces a new +// chain head. +func (cs *chainSyncer) handlePeerEvent(p *peer) bool { + select { + case cs.peerEventCh <- struct{}{}: + return true + case <-cs.pm.quitSync: + return false } } -// syncTriggerWait waits for sync start conditions to be met. -func (pm *ProtocolManager) syncTriggerWait() bool { - force := time.NewTimer(forceSyncCycle) - defer force.Stop() +// loop runs in its own goroutine and launches the sync when necessary. +func (cs *chainSyncer) loop() { + defer cs.pm.wg.Done() + + cs.pm.blockFetcher.Start() + cs.pm.txFetcher.Start() + defer cs.pm.blockFetcher.Stop() + defer cs.pm.txFetcher.Stop() + defer cs.pm.downloader.Terminate() + + // The force timer lowers the peer count threshold down to one when it fires. + // This ensures we'll always start sync even if there aren't enough peers. + cs.force = time.NewTimer(forceSyncCycle) + defer cs.force.Stop() + for { + if op := cs.nextSyncOp(); op != nil { + log.Trace("Starting chain sync", "mode", op.mode, "peercount", cs.pm.peers.Len(), "id", op.peer.id) + cs.startSync(op) + } + select { - case <-pm.quitSync: - return false - default: - if pm.peers.Len() >= minDesiredPeerCount { - return true - } - select { - case <-pm.newPeerCh: - // Go round and check the peer count again. - case <-force.C: - return true - case <-pm.quitSync: - return false + case <-cs.peerEventCh: + // Peer information changed, recheck. + case <-cs.doneCh: + cs.doneCh = nil + cs.force.Reset(forceSyncCycle) + cs.forced = false + case <-cs.force.C: + cs.forced = true + + case <-cs.pm.quitSync: + if cs.doneCh != nil { + <-cs.doneCh } + return } } } -// synchronise tries to sync up our local block chain with a remote peer. -func (pm *ProtocolManager) synchronise(peer *peer) { - // Short circuit if no peers are available - if peer == nil { - return +// nextSyncOp determines whether sync is required at this time. +func (cs *chainSyncer) nextSyncOp() *chainSyncOp { + if cs.doneCh != nil { + return nil // Sync already running. } - // Make sure the peer's TD is higher than our own - currentHeader := pm.blockchain.CurrentHeader() - td := pm.blockchain.GetTd(currentHeader.Hash(), currentHeader.Number.Uint64()) - pHead, pTd := peer.Head() - if pTd.Cmp(td) <= 0 { - return + // Ensure we're at mininum peer count. + minPeers := defaultMinSyncPeers + if cs.forced { + minPeers = 1 + } else if minPeers > cs.pm.maxPeers { + minPeers = cs.pm.maxPeers } - // Otherwise try to sync with the downloader - mode := downloader.FullSync - if atomic.LoadUint32(&pm.fastSync) == 1 { - // Fast sync was explicitly requested, and explicitly granted - mode = downloader.FastSync + if cs.pm.peers.Len() < minPeers { + return nil } - if mode == downloader.FastSync { - // Make sure the peer's total difficulty we are synchronizing is higher. - if pm.blockchain.GetTdByHash(pm.blockchain.CurrentFastBlock().Hash()).Cmp(pTd) >= 0 { - return - } + + // We have enough peers, check TD. + peer := cs.pm.peers.BestPeer() + if peer == nil { + return nil } - // Run the sync cycle, and disable fast sync if we've went past the pivot block - if err := pm.downloader.Synchronise(peer.id, pHead, pTd, mode); err != nil { - return + mode, ourTD := cs.modeAndLocalHead() + op := peerToSyncOp(mode, peer) + if op.td.Cmp(ourTD) <= 0 { + return nil // We're in sync. + } + return op +} + +func peerToSyncOp(mode downloader.SyncMode, p *peer) *chainSyncOp { + peerHead, peerTD := p.Head() + return &chainSyncOp{mode: mode, peer: p, td: peerTD, head: peerHead} +} + +func (cs *chainSyncer) modeAndLocalHead() (downloader.SyncMode, *big.Int) { + if atomic.LoadUint32(&cs.pm.fastSync) == 1 { + block := cs.pm.blockchain.CurrentFastBlock() + td := cs.pm.blockchain.GetTdByHash(block.Hash()) + return downloader.FastSync, td + } else { + head := cs.pm.blockchain.CurrentHeader() + td := cs.pm.blockchain.GetTd(head.Hash(), head.Number.Uint64()) + return downloader.FullSync, td + } +} + +// startSync launches doSync in a new goroutine. +func (cs *chainSyncer) startSync(op *chainSyncOp) { + cs.doneCh = make(chan error, 1) + go func() { cs.doneCh <- cs.pm.doSync(op) }() +} + +// doSync synchronizes the local blockchain with a remote peer. +func (pm *ProtocolManager) doSync(op *chainSyncOp) error { + // Run the sync cycle, and disable fast sync if we're past the pivot block + err := pm.downloader.Synchronise(op.peer.id, op.head, op.td, op.mode) + if err != nil { + return err } if atomic.LoadUint32(&pm.fastSync) == 1 { log.Info("Fast sync complete, auto disabling") atomic.StoreUint32(&pm.fastSync, 0) } + // If we've successfully finished a sync cycle and passed any required checkpoint, // enable accepting transactions from the network. head := pm.blockchain.CurrentBlock() @@ -235,6 +303,7 @@ func (pm *ProtocolManager) synchronise(peer *peer) { atomic.StoreUint32(&pm.acceptTxs, 1) } } + if head.NumberU64() > 0 { // We've completed a sync cycle, notify all peers of new state. This path is // essential in star-topology networks where a gateway node needs to notify @@ -244,4 +313,6 @@ func (pm *ProtocolManager) synchronise(peer *peer) { // more reliably update peers or the local TD state. go pm.BroadcastBlock(head, false) } + + return nil } diff --git a/eth/sync_test.go b/eth/sync_test.go index d02bc5710813..ac1e5fad1bf1 100644 --- a/eth/sync_test.go +++ b/eth/sync_test.go @@ -33,6 +33,8 @@ func TestFastSyncDisabling65(t *testing.T) { testFastSyncDisabling(t, 65) } // Tests that fast sync gets disabled as soon as a real block is successfully // imported into the blockchain. func testFastSyncDisabling(t *testing.T, protocol int) { + t.Parallel() + // Create a pristine protocol manager, check that fast sync is left enabled pmEmpty, _ := newTestProtocolManagerMust(t, downloader.FastSync, 0, nil, nil) if atomic.LoadUint32(&pmEmpty.fastSync) == 0 { @@ -43,14 +45,17 @@ func testFastSyncDisabling(t *testing.T, protocol int) { if atomic.LoadUint32(&pmFull.fastSync) == 1 { t.Fatalf("fast sync not disabled on non-empty blockchain") } + // Sync up the two peers io1, io2 := p2p.MsgPipe() - go pmFull.handle(pmFull.newPeer(protocol, p2p.NewPeer(enode.ID{}, "empty", nil), io2, pmFull.txpool.Get)) go pmEmpty.handle(pmEmpty.newPeer(protocol, p2p.NewPeer(enode.ID{}, "full", nil), io1, pmEmpty.txpool.Get)) time.Sleep(250 * time.Millisecond) - pmEmpty.synchronise(pmEmpty.peers.BestPeer()) + op := peerToSyncOp(downloader.FastSync, pmEmpty.peers.BestPeer()) + if err := pmEmpty.doSync(op); err != nil { + t.Fatal("sync failed:", err) + } // Check that fast sync was disabled if atomic.LoadUint32(&pmEmpty.fastSync) == 1 { From b556602e4809b203f1bde0f2189825e0dd8f3f74 Mon Sep 17 00:00:00 2001 From: Felix Lange Date: Wed, 25 Mar 2020 17:36:43 +0100 Subject: [PATCH 3/9] eth: remove sync start debug message --- eth/sync.go | 1 - 1 file changed, 1 deletion(-) diff --git a/eth/sync.go b/eth/sync.go index 6e7793092115..0b9506c720ec 100644 --- a/eth/sync.go +++ b/eth/sync.go @@ -205,7 +205,6 @@ func (cs *chainSyncer) loop() { for { if op := cs.nextSyncOp(); op != nil { - log.Trace("Starting chain sync", "mode", op.mode, "peercount", cs.pm.peers.Len(), "id", op.peer.id) cs.startSync(op) } From c3d032d7a8e33dc500637cadac673c134d1c0b8b Mon Sep 17 00:00:00 2001 From: Felix Lange Date: Wed, 25 Mar 2020 17:45:44 +0100 Subject: [PATCH 4/9] eth: notify chainSyncer about new peers after handshake --- eth/handler.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/eth/handler.go b/eth/handler.go index eb1943031dde..0ef98e454ff0 100644 --- a/eth/handler.go +++ b/eth/handler.go @@ -334,6 +334,8 @@ func (pm *ProtocolManager) handle(p *peer) error { if err := pm.downloader.RegisterPeer(p.id, p.version, p); err != nil { return err } + pm.chainSync.handlePeerEvent(p) + // Propagate existing transactions. new transactions appearing // after this will be sent via broadcasts. pm.syncTransactions(p) From 15fd80203726aaee682853870723739bcdf4ee25 Mon Sep 17 00:00:00 2001 From: Felix Lange Date: Wed, 25 Mar 2020 18:46:28 +0100 Subject: [PATCH 5/9] eth: move downloader.Cancel call into chainSyncer --- eth/handler.go | 1 - eth/sync.go | 1 + 2 files changed, 1 insertion(+), 1 deletion(-) diff --git a/eth/handler.go b/eth/handler.go index 0ef98e454ff0..9a02f1f20faa 100644 --- a/eth/handler.go +++ b/eth/handler.go @@ -275,7 +275,6 @@ func (pm *ProtocolManager) Stop() { // Quit chainSync and txsync64. // After this is done, no new peers will be accepted. close(pm.quitSync) - pm.downloader.Cancel() pm.wg.Wait() // Disconnect existing sessions. diff --git a/eth/sync.go b/eth/sync.go index 0b9506c720ec..1fecba4fd33c 100644 --- a/eth/sync.go +++ b/eth/sync.go @@ -220,6 +220,7 @@ func (cs *chainSyncer) loop() { case <-cs.pm.quitSync: if cs.doneCh != nil { + cs.pm.downloader.Cancel() <-cs.doneCh } return From 68bc8982e74ea72d6746ebd346e0b4f68e712d68 Mon Sep 17 00:00:00 2001 From: Felix Lange Date: Wed, 25 Mar 2020 18:46:55 +0100 Subject: [PATCH 6/9] eth: make post-sync block broadcast synchronous --- eth/sync.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/eth/sync.go b/eth/sync.go index 1fecba4fd33c..148df0485502 100644 --- a/eth/sync.go +++ b/eth/sync.go @@ -311,7 +311,7 @@ func (pm *ProtocolManager) doSync(op *chainSyncOp) error { // scenario will most often crop up in private and hackathon networks with // degenerate connectivity, but it should be healthy for the mainnet too to // more reliably update peers or the local TD state. - go pm.BroadcastBlock(head, false) + pm.BroadcastBlock(head, false) } return nil From 59acca53592fea99faf020ce2701194af5690b6d Mon Sep 17 00:00:00 2001 From: Felix Lange Date: Thu, 26 Mar 2020 15:07:17 +0100 Subject: [PATCH 7/9] eth: add comments --- eth/sync.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/eth/sync.go b/eth/sync.go index 148df0485502..d689200dcead 100644 --- a/eth/sync.go +++ b/eth/sync.go @@ -162,6 +162,7 @@ type chainSyncer struct { doneCh chan error // non-nil when sync is running } +// chainSyncOp is a scheduled sync operation. type chainSyncOp struct { mode downloader.SyncMode peer *peer @@ -169,6 +170,7 @@ type chainSyncOp struct { head common.Hash } +// newChainSyncer creates a chainSyncer. func newChainSyncer(pm *ProtocolManager) *chainSyncer { return &chainSyncer{ pm: pm, From b5c7169feaf119c540b482687086b02bac4a2e16 Mon Sep 17 00:00:00 2001 From: Felix Lange Date: Thu, 26 Mar 2020 15:07:49 +0100 Subject: [PATCH 8/9] core: change blockchain stop message --- core/blockchain.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/blockchain.go b/core/blockchain.go index 1da899ab4325..d6f732194ef7 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -897,7 +897,7 @@ func (bc *BlockChain) Stop() { log.Error("Dangling trie nodes after full cleanup") } } - log.Info("Blockchain manager stopped") + log.Info("Blockchain stopped") } func (bc *BlockChain) procFutureBlocks() { From cdd1c3db43b2959d55e11d74eae968b316ac1e65 Mon Sep 17 00:00:00 2001 From: Felix Lange Date: Thu, 26 Mar 2020 15:33:54 +0100 Subject: [PATCH 9/9] eth: change closeBloomHandler channel type --- eth/backend.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/eth/backend.go b/eth/backend.go index da2e338ced29..589773f81a99 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -83,7 +83,7 @@ type Ethereum struct { bloomRequests chan chan *bloombits.Retrieval // Channel receiving bloom data retrieval requests bloomIndexer *core.ChainIndexer // Bloom indexer operating during block imports - closeBloomHandler chan bool + closeBloomHandler chan struct{} APIBackend *EthAPIBackend @@ -148,7 +148,7 @@ func New(ctx *node.ServiceContext, config *Config) (*Ethereum, error) { eventMux: ctx.EventMux, accountManager: ctx.AccountManager, engine: CreateConsensusEngine(ctx, chainConfig, &config.Ethash, config.Miner.Notify, config.Miner.Noverify, chainDb), - closeBloomHandler: make(chan bool), + closeBloomHandler: make(chan struct{}), networkID: config.NetworkId, gasPrice: config.Miner.GasPrice, etherbase: config.Miner.Etherbase,