Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 35 additions & 25 deletions eth/downloader/downloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ type Downloader struct {
rttEstimate uint64 // Round trip time to target for download requests
rttConfidence uint64 // Confidence in the estimated RTT (unit: millionths to allow atomic ops)

mode SyncMode // Synchronisation mode defining the strategy used (per sync cycle)
mode uint32 // Synchronisation mode defining the strategy used (per sync cycle), use d.getMode() to get the SyncMode
mux *event.TypeMux // Event multiplexer to announce sync operation events

checkpoint uint64 // Checkpoint block number to enforce head against (e.g. fast sync)
Expand Down Expand Up @@ -258,15 +258,16 @@ func (d *Downloader) Progress() ethereum.SyncProgress {
defer d.syncStatsLock.RUnlock()

current := uint64(0)
mode := d.getMode()
switch {
case d.blockchain != nil && d.mode == FullSync:
case d.blockchain != nil && mode == FullSync:
current = d.blockchain.CurrentBlock().NumberU64()
case d.blockchain != nil && d.mode == FastSync:
case d.blockchain != nil && mode == FastSync:
current = d.blockchain.CurrentFastBlock().NumberU64()
case d.lightchain != nil:
current = d.lightchain.CurrentHeader().Number.Uint64()
default:
log.Error("Unknown downloader chain/mode combo", "light", d.lightchain != nil, "full", d.blockchain != nil, "mode", d.mode)
log.Error("Unknown downloader chain/mode combo", "light", d.lightchain != nil, "full", d.blockchain != nil, "mode", mode)
}
return ethereum.SyncProgress{
StartingBlock: d.syncStatsChainOrigin,
Expand Down Expand Up @@ -415,8 +416,8 @@ func (d *Downloader) synchronise(id string, hash common.Hash, td *big.Int, mode

defer d.Cancel() // No matter what, we can't leave the cancel channel open

// Set the requested sync mode, unless it's forbidden
d.mode = mode
// Atomically set the requested sync mode
atomic.StoreUint32(&d.mode, uint32(mode))

// Retrieve the origin peer and initiate the downloading process
p := d.peers.Peer(id)
Expand All @@ -426,6 +427,10 @@ func (d *Downloader) synchronise(id string, hash common.Hash, td *big.Int, mode
return d.syncWithPeer(p, hash, td)
}

func (d *Downloader) getMode() SyncMode {
return SyncMode(atomic.LoadUint32(&d.mode))
}

// syncWithPeer starts a block synchronization based on the hash chain from the
// specified peer and head hash.
func (d *Downloader) syncWithPeer(p *peerConnection, hash common.Hash, td *big.Int) (err error) {
Expand All @@ -442,8 +447,9 @@ func (d *Downloader) syncWithPeer(p *peerConnection, hash common.Hash, td *big.I
if p.version < 62 {
return errTooOld
}
mode := d.getMode()

log.Debug("Synchronising with the network", "peer", p.id, "eth", p.version, "head", hash, "td", td, "mode", d.mode)
log.Debug("Synchronising with the network", "peer", p.id, "eth", p.version, "head", hash, "td", td, "mode", mode)
defer func(start time.Time) {
log.Debug("Synchronisation terminated", "elapsed", common.PrettyDuration(time.Since(start)))
}(time.Now())
Expand All @@ -468,7 +474,7 @@ func (d *Downloader) syncWithPeer(p *peerConnection, hash common.Hash, td *big.I

// Ensure our origin point is below any fast sync pivot point
pivot := uint64(0)
if d.mode == FastSync {
if mode == FastSync {
if height <= uint64(fsMinFullBlocks) {
origin = 0
} else {
Expand All @@ -479,10 +485,10 @@ func (d *Downloader) syncWithPeer(p *peerConnection, hash common.Hash, td *big.I
}
}
d.committed = 1
if d.mode == FastSync && pivot != 0 {
if mode == FastSync && pivot != 0 {
d.committed = 0
}
if d.mode == FastSync {
if mode == FastSync {
// Set the ancient data limitation.
// If we are running fast sync, all block data older than ancientLimit will be
// written to the ancient store. More recent data will be written to the active
Expand Down Expand Up @@ -521,7 +527,7 @@ func (d *Downloader) syncWithPeer(p *peerConnection, hash common.Hash, td *big.I
}
}
// Initiate the sync using a concurrent header and content retrieval algorithm
d.queue.Prepare(origin+1, d.mode)
d.queue.Prepare(origin+1, mode)
if d.syncInitHook != nil {
d.syncInitHook(origin, height)
}
Expand All @@ -531,9 +537,9 @@ func (d *Downloader) syncWithPeer(p *peerConnection, hash common.Hash, td *big.I
func() error { return d.fetchReceipts(origin + 1) }, // Receipts are retrieved during fast sync
func() error { return d.processHeaders(origin+1, pivot, td) },
}
if d.mode == FastSync {
if mode == FastSync {
fetchers = append(fetchers, func() error { return d.processFastSyncContent(latest) })
} else if d.mode == FullSync {
} else if mode == FullSync {
fetchers = append(fetchers, d.processFullSyncContent)
}
return d.spawnSync(fetchers)
Expand Down Expand Up @@ -621,6 +627,7 @@ func (d *Downloader) fetchHeight(p *peerConnection) (*types.Header, error) {

ttl := d.requestTTL()
timeout := time.After(ttl)
mode := d.getMode()
for {
select {
case <-d.cancelCh:
Expand All @@ -639,7 +646,7 @@ func (d *Downloader) fetchHeight(p *peerConnection) (*types.Header, error) {
return nil, errBadPeer
}
head := headers[0]
if (d.mode == FastSync || d.mode == LightSync) && head.Number.Uint64() < d.checkpoint {
if (mode == FastSync || mode == LightSync) && head.Number.Uint64() < d.checkpoint {
p.log.Warn("Remote head below checkpoint", "number", head.Number, "hash", head.Hash())
return nil, errUnsyncedPeer
}
Expand Down Expand Up @@ -721,7 +728,8 @@ func (d *Downloader) findAncestor(p *peerConnection, remoteHeader *types.Header)
localHeight uint64
remoteHeight = remoteHeader.Number.Uint64()
)
switch d.mode {
mode := d.getMode()
switch mode {
case FullSync:
localHeight = d.blockchain.CurrentBlock().NumberU64()
case FastSync:
Expand All @@ -738,7 +746,7 @@ func (d *Downloader) findAncestor(p *peerConnection, remoteHeader *types.Header)
}
// If we're doing a light sync, ensure the floor doesn't go below the CHT, as
// all headers before that point will be missing.
if d.mode == LightSync {
if mode == LightSync {
// If we don't know the current CHT position, find it
if d.genesis == 0 {
header := d.lightchain.CurrentHeader()
Expand Down Expand Up @@ -804,7 +812,7 @@ func (d *Downloader) findAncestor(p *peerConnection, remoteHeader *types.Header)
n := headers[i].Number.Uint64()

var known bool
switch d.mode {
switch mode {
case FullSync:
known = d.blockchain.HasBlock(h, n)
case FastSync:
Expand Down Expand Up @@ -877,7 +885,7 @@ func (d *Downloader) findAncestor(p *peerConnection, remoteHeader *types.Header)
n := headers[0].Number.Uint64()

var known bool
switch d.mode {
switch mode {
case FullSync:
known = d.blockchain.HasBlock(h, n)
case FastSync:
Expand Down Expand Up @@ -954,6 +962,7 @@ func (d *Downloader) fetchHeaders(p *peerConnection, from uint64, pivot uint64)
ancestor := from
getHeaders(from)

mode := d.getMode()
for {
select {
case <-d.cancelCh:
Expand Down Expand Up @@ -1014,7 +1023,7 @@ func (d *Downloader) fetchHeaders(p *peerConnection, from uint64, pivot uint64)
if n := len(headers); n > 0 {
// Retrieve the current head we're at
var head uint64
if d.mode == LightSync {
if mode == LightSync {
head = d.lightchain.CurrentHeader().Number.Uint64()
} else {
head = d.blockchain.CurrentFastBlock().NumberU64()
Expand Down Expand Up @@ -1375,6 +1384,7 @@ func (d *Downloader) fetchParts(deliveryCh chan dataPack, deliver func(dataPack)
func (d *Downloader) processHeaders(origin uint64, pivot uint64, td *big.Int) error {
// Keep a count of uncertain headers to roll back
var rollback []*types.Header
mode := d.getMode()
defer func() {
if len(rollback) > 0 {
// Flatten the headers and roll them back
Expand All @@ -1383,13 +1393,13 @@ func (d *Downloader) processHeaders(origin uint64, pivot uint64, td *big.Int) er
hashes[i] = header.Hash()
}
lastHeader, lastFastBlock, lastBlock := d.lightchain.CurrentHeader().Number, common.Big0, common.Big0
if d.mode != LightSync {
if mode != LightSync {
lastFastBlock = d.blockchain.CurrentFastBlock().Number()
lastBlock = d.blockchain.CurrentBlock().Number()
}
d.lightchain.Rollback(hashes)
curFastBlock, curBlock := common.Big0, common.Big0
if d.mode != LightSync {
if mode != LightSync {
curFastBlock = d.blockchain.CurrentFastBlock().Number()
curBlock = d.blockchain.CurrentBlock().Number()
}
Expand Down Expand Up @@ -1430,7 +1440,7 @@ func (d *Downloader) processHeaders(origin uint64, pivot uint64, td *big.Int) er
// L: Sync begins, and finds common ancestor at 11
// L: Request new headers up from 11 (R's TD was higher, it must have something)
// R: Nothing to give
if d.mode != LightSync {
if mode != LightSync {
head := d.blockchain.CurrentBlock()
if !gotHeaders && td.Cmp(d.blockchain.GetTd(head.Hash(), head.NumberU64())) > 0 {
return errStallingPeer
Expand All @@ -1443,7 +1453,7 @@ func (d *Downloader) processHeaders(origin uint64, pivot uint64, td *big.Int) er
// This check cannot be executed "as is" for full imports, since blocks may still be
// queued for processing when the header download completes. However, as long as the
// peer gave us something useful, we're already happy/progressed (above check).
if d.mode == FastSync || d.mode == LightSync {
if mode == FastSync || mode == LightSync {
head := d.lightchain.CurrentHeader()
if td.Cmp(d.lightchain.GetTd(head.Hash(), head.Number.Uint64())) > 0 {
return errStallingPeer
Expand All @@ -1469,7 +1479,7 @@ func (d *Downloader) processHeaders(origin uint64, pivot uint64, td *big.Int) er
}
chunk := headers[:limit]
// In case of header only syncing, validate the chunk immediately
if d.mode == FastSync || d.mode == LightSync {
if mode == FastSync || mode == LightSync {
// Collect the yet unknown headers to mark them as uncertain
unknown := make([]*types.Header, 0, len(chunk))
for _, header := range chunk {
Expand Down Expand Up @@ -1497,7 +1507,7 @@ func (d *Downloader) processHeaders(origin uint64, pivot uint64, td *big.Int) er
}
}
// Unless we're doing light chains, schedule the headers for associated content retrieval
if d.mode == FullSync || d.mode == FastSync {
if mode == FullSync || mode == FastSync {
// If we've reached the allowed number of pending headers, stall a bit
for d.queue.PendingBlocks() >= maxQueuedHeaders || d.queue.PendingReceipts() >= maxQueuedHeaders {
select {
Expand Down
2 changes: 1 addition & 1 deletion eth/downloader/downloader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -469,7 +469,7 @@ func assertOwnForkedChain(t *testing.T, tester *downloadTester, common int, leng
blocks += length - common
receipts += length - common
}
if tester.downloader.mode == LightSync {
if tester.downloader.getMode() == LightSync {
blocks, receipts = 1, 1
}
if hs := len(tester.ownHeaders) + len(tester.ancientHeaders) - 1; hs != headers {
Expand Down
3 changes: 2 additions & 1 deletion eth/downloader/modes.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ package downloader
import "fmt"

// SyncMode represents the synchronisation mode of the downloader.
type SyncMode int
// It is a uint32 as it is used with atomic operations.
type SyncMode uint32
Comment thread
MariusVanDerWijden marked this conversation as resolved.

const (
FullSync SyncMode = iota // Synchronise the entire blockchain history from full blocks
Expand Down