Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 13 additions & 12 deletions eth/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -373,18 +373,19 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) {
// Permit the downloader to use the trie cache allowance during fast sync
cacheLimit := cacheConfig.TrieCleanLimit + cacheConfig.TrieDirtyLimit + cacheConfig.SnapshotLimit
if eth.handler, err = newHandler(&handlerConfig{
NodeID: eth.p2pServer.Self().ID(),
Database: chainDb,
Chain: eth.blockchain,
TxPool: eth.txPool,
Network: networkID,
Sync: config.SyncMode,
BloomCache: uint64(cacheLimit),
EventMux: eth.eventMux,
RequiredBlocks: config.RequiredBlocks,
DirectBroadcast: config.DirectBroadcast,
DisablePeerTxBroadcast: config.DisablePeerTxBroadcast,
PeerSet: peers,
NodeID: eth.p2pServer.Self().ID(),
Database: chainDb,
Chain: eth.blockchain,
TxPool: eth.txPool,
Network: networkID,
Sync: config.SyncMode,
BloomCache: uint64(cacheLimit),
EventMux: eth.eventMux,
RequiredBlocks: config.RequiredBlocks,
DirectBroadcast: config.DirectBroadcast,
DisablePeerTxBroadcast: config.DisablePeerTxBroadcast,
PeerSet: peers,
EnableQuickBlockFetching: stack.Config().EnableQuickBlockFetching,
}); err != nil {
return nil, err
}
Expand Down
2 changes: 2 additions & 0 deletions eth/downloader/downloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -506,6 +506,8 @@ func (d *Downloader) syncWithPeer(p *peerConnection, hash common.Hash, td, ttd *
return errLaggingPeer
}
}

log.Debug("try sync chain from peer", "remote", p.id, "local", localHeight, "remote", remoteHeight)
d.syncStatsLock.Lock()
if d.syncStatsChainHeight <= origin || d.syncStatsChainOrigin > origin {
d.syncStatsChainOrigin = origin
Expand Down
69 changes: 64 additions & 5 deletions eth/fetcher/block_fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,9 @@ type chainInsertFn func(types.Blocks) (int, error)
// peerDropFn is a callback type for dropping a peer detected as malicious.
type peerDropFn func(id string)

// fetchRangeBlocksFn is a callback type for fetching a range of blocks from a peer.
type fetchRangeBlocksFn func(peer string, startHeight uint64, startHash common.Hash, count uint64) ([]*types.Block, error)

// blockAnnounce is the hash notification of the availability of a new block in the
// network.
type blockAnnounce struct {
Expand Down Expand Up @@ -138,6 +141,14 @@ type blockOrHeaderInject struct {
block *types.Block // Used for normal mode fetcher which imports full block.
}

type BlockFetchingEntry struct {
announce *blockAnnounce

// results
blocks []*types.Block
err error
}

// number returns the block number of the injected object.
func (inject *blockOrHeaderInject) number() uint64 {
if inject.header != nil {
Expand All @@ -161,8 +172,9 @@ type BlockFetcher struct {
notify chan *blockAnnounce
inject chan *blockOrHeaderInject

headerFilter chan chan *headerFilterTask
bodyFilter chan chan *bodyFilterTask
headerFilter chan chan *headerFilterTask
bodyFilter chan chan *bodyFilterTask
quickBlockFetchingCh chan *BlockFetchingEntry

done chan common.Hash
quit chan struct{}
Expand All @@ -189,6 +201,7 @@ type BlockFetcher struct {
chainFinalizedHeight chainFinalizedHeightFn // Retrieves the current chain's finalized height
insertChain chainInsertFn // Injects a batch of blocks into the chain
dropPeer peerDropFn // Drops a peer for misbehaving
fetchRangeBlocks fetchRangeBlocksFn // Fetches a range of blocks from a peer

// Testing hooks
announceChangeHook func(common.Hash, bool) // Method to call upon adding or deleting a hash from the blockAnnounce list
Expand All @@ -200,12 +213,14 @@ type BlockFetcher struct {

// NewBlockFetcher creates a block fetcher to retrieve blocks based on hash announcements.
func NewBlockFetcher(getBlock blockRetrievalFn, verifyHeader headerVerifierFn, broadcastBlock blockBroadcasterFn,
chainHeight chainHeightFn, chainFinalizedHeight chainFinalizedHeightFn, insertChain chainInsertFn, dropPeer peerDropFn) *BlockFetcher {
chainHeight chainHeightFn, chainFinalizedHeight chainFinalizedHeightFn, insertChain chainInsertFn, dropPeer peerDropFn,
fetchRangeBlocks fetchRangeBlocksFn) *BlockFetcher {
return &BlockFetcher{
notify: make(chan *blockAnnounce),
inject: make(chan *blockOrHeaderInject),
headerFilter: make(chan chan *headerFilterTask),
bodyFilter: make(chan chan *bodyFilterTask),
quickBlockFetchingCh: make(chan *BlockFetchingEntry),
done: make(chan common.Hash),
quit: make(chan struct{}),
requeue: make(chan *blockOrHeaderInject),
Expand All @@ -224,6 +239,7 @@ func NewBlockFetcher(getBlock blockRetrievalFn, verifyHeader headerVerifierFn, b
chainFinalizedHeight: chainFinalizedHeight,
insertChain: insertChain,
dropPeer: dropPeer,
fetchRangeBlocks: fetchRangeBlocks,
}
}

Expand Down Expand Up @@ -329,6 +345,21 @@ func (f *BlockFetcher) FilterBodies(peer string, transactions [][]*types.Transac
}
}

func (f *BlockFetcher) asyncFetchRangeBlocks(announce *blockAnnounce) {
go func() {
if f.fetchRangeBlocks == nil {
return
}
log.Debug("Quick block fetching", "peer", announce.origin, "hash", announce.hash)
blocks, err := f.fetchRangeBlocks(announce.origin, announce.number, announce.hash, 1)
f.quickBlockFetchingCh <- &BlockFetchingEntry{
announce: announce,
blocks: blocks,
err: err,
}
}()
}

// Loop is the main fetcher loop, checking and processing various notification
// events.
func (f *BlockFetcher) loop() {
Expand Down Expand Up @@ -417,6 +448,12 @@ func (f *BlockFetcher) loop() {
if f.announceChangeHook != nil && len(f.announced[notification.hash]) == 1 {
f.announceChangeHook(notification.hash, true)
}
// if there enable range fetching, just request the first announce and wait for response,
// and if it gets timeout and wait for later header & body fetching.
if f.fetchRangeBlocks != nil && len(f.announced[notification.hash]) == 1 {
f.asyncFetchRangeBlocks(notification)
}
// schedule the first arrive announce hash
if len(f.announced) == 1 {
f.rescheduleFetch(fetchTimer)
}
Expand Down Expand Up @@ -468,7 +505,7 @@ func (f *BlockFetcher) loop() {
}
// Send out all block header requests
for peer, hashes := range request {
log.Trace("Fetching scheduled headers", "peer", peer, "list", hashes)
log.Debug("Fetching scheduled headers", "peer", peer, "list", hashes)

// Create a closure of the fetch and schedule in on a new thread
fetchHeader := f.fetching[hashes[0]].fetchHeader
Expand Down Expand Up @@ -527,7 +564,7 @@ func (f *BlockFetcher) loop() {
}
// Send out all block body requests
for peer, hashes := range request {
log.Trace("Fetching scheduled bodies", "peer", peer, "list", hashes)
log.Debug("Fetching scheduled bodies", "peer", peer, "list", hashes)

// Create a closure of the fetch and schedule in on a new thread
if f.completingHook != nil {
Expand Down Expand Up @@ -719,6 +756,28 @@ func (f *BlockFetcher) loop() {
f.enqueue(announce.origin, nil, block)
}
}
case entry := <-f.quickBlockFetchingCh:
annHash := entry.announce.hash
// if there is error or timeout, and the shcedule have not started, just retry the fetch
if entry.err != nil {
log.Debug("Quick block fetching err", "hash", annHash, "err", entry.err)
if _, ok := f.fetching[annHash]; !ok && len(f.announced[annHash]) > 1 {
// Pick the last peer to retrieve from, but ignore the current one
next := f.announced[annHash][len(f.announced[annHash])-1]
if next.origin != entry.announce.origin {
f.asyncFetchRangeBlocks(next)
}
}
continue
}
for _, block := range entry.blocks {
hash := block.Hash()
f.forgetHash(hash)
if f.getBlock(hash) != nil {
continue
}
f.enqueue(entry.announce.origin, nil, block)
}
}
}
}
Expand Down
Loading