Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 34 additions & 35 deletions eth/downloader/downloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,16 +81,11 @@ var (
errPeersUnavailable = errors.New("no peers available or all tried for download")
errInvalidAncestor = errors.New("retrieved ancestor is invalid")
errInvalidChain = errors.New("retrieved hash chain is invalid")
errInvalidBlock = errors.New("retrieved block is invalid")
errInvalidBody = errors.New("retrieved block body is invalid")
errInvalidReceipt = errors.New("retrieved receipt is invalid")
errCancelBlockFetch = errors.New("block download canceled (requested)")
errCancelHeaderFetch = errors.New("block header download canceled (requested)")
errCancelBodyFetch = errors.New("block body download canceled (requested)")
errCancelReceiptFetch = errors.New("receipt download canceled (requested)")
errCancelStateFetch = errors.New("state data download canceled (requested)")
errCancelHeaderProcessing = errors.New("header processing canceled (requested)")
errCancelContentProcessing = errors.New("content processing canceled (requested)")
errCanceled = errors.New("syncing canceled (requested)")
errNoSyncActive = errors.New("no sync active")
errTooOld = errors.New("peer doesn't speak recent enough protocol version (need version >= 62)")
)
Expand Down Expand Up @@ -307,14 +302,6 @@ func (d *Downloader) UnregisterPeer(id string) error {
}
d.queue.Revoke(id)

// If this peer was the master peer, abort sync immediately
d.cancelLock.RLock()
master := id == d.cancelPeer
d.cancelLock.RUnlock()

if master {
d.cancel()
}
return nil
}

Expand All @@ -324,7 +311,7 @@ func (d *Downloader) Synchronise(id string, head common.Hash, td *big.Int, mode
err := d.synchronise(id, head, td, mode)
switch err {
case nil:
case errBusy:
case errBusy, errCanceled:

case errTimeout, errBadPeer, errStallingPeer,
errEmptyHeaderSet, errPeersUnavailable, errTooOld,
Expand Down Expand Up @@ -500,7 +487,7 @@ func (d *Downloader) spawnSync(fetchers []func() error) error {
// it has processed the queue.
d.queue.Close()
}
if err = <-errc; err != nil {
if err = <-errc; err != nil && err != errCanceled {
break
}
}
Expand Down Expand Up @@ -563,7 +550,7 @@ func (d *Downloader) fetchHeight(p *peerConnection) (*types.Header, error) {
for {
select {
case <-d.cancelCh:
return nil, errCancelBlockFetch
return nil, errCanceled

case packet := <-d.headerCh:
// Discard anything not from the origin peer
Expand Down Expand Up @@ -705,7 +692,7 @@ func (d *Downloader) findAncestor(p *peerConnection, remoteHeader *types.Header)
for finished := false; !finished; {
select {
case <-d.cancelCh:
return 0, errCancelHeaderFetch
return 0, errCanceled

case packet := <-d.headerCh:
// Discard anything not from the origin peer
Expand Down Expand Up @@ -791,7 +778,7 @@ func (d *Downloader) findAncestor(p *peerConnection, remoteHeader *types.Header)
for arrived := false; !arrived; {
select {
case <-d.cancelCh:
return 0, errCancelHeaderFetch
return 0, errCanceled

case packer := <-d.headerCh:
// Discard anything not from the origin peer
Expand Down Expand Up @@ -891,7 +878,7 @@ func (d *Downloader) fetchHeaders(p *peerConnection, from uint64, pivot uint64)
for {
select {
case <-d.cancelCh:
return errCancelHeaderFetch
return errCanceled

case packet := <-d.headerCh:
// Make sure the active peer is giving us the skeleton headers
Expand All @@ -918,7 +905,7 @@ func (d *Downloader) fetchHeaders(p *peerConnection, from uint64, pivot uint64)
getHeaders(from)
continue
case <-d.cancelCh:
return errCancelHeaderFetch
return errCanceled
}
}
// Pivot done (or not in fast sync) and no more headers, terminate the process
Expand All @@ -927,7 +914,7 @@ func (d *Downloader) fetchHeaders(p *peerConnection, from uint64, pivot uint64)
case d.headerProcCh <- nil:
return nil
case <-d.cancelCh:
return errCancelHeaderFetch
return errCanceled
}
}
headers := packet.(*headerPack).headers
Expand Down Expand Up @@ -972,7 +959,7 @@ func (d *Downloader) fetchHeaders(p *peerConnection, from uint64, pivot uint64)
select {
case d.headerProcCh <- headers:
case <-d.cancelCh:
return errCancelHeaderFetch
return errCanceled
}
from += uint64(len(headers))
getHeaders(from)
Expand All @@ -984,7 +971,7 @@ func (d *Downloader) fetchHeaders(p *peerConnection, from uint64, pivot uint64)
getHeaders(from)
continue
case <-d.cancelCh:
return errCancelHeaderFetch
return errCanceled
}
}

Expand Down Expand Up @@ -1043,7 +1030,7 @@ func (d *Downloader) fillHeaderSkeleton(from uint64, skeleton []*types.Header) (
capacity = func(p *peerConnection) int { return p.HeaderCapacity(d.requestRTT()) }
setIdle = func(p *peerConnection, accepted int) { p.SetHeadersIdle(accepted) }
)
err := d.fetchParts(errCancelHeaderFetch, d.headerCh, deliver, d.queue.headerContCh, expire,
err := d.fetchParts(d.headerCh, deliver, d.queue.headerContCh, expire,
d.queue.PendingHeaders, d.queue.InFlightHeaders, throttle, reserve,
nil, fetch, d.queue.CancelHeaders, capacity, d.peers.HeaderIdlePeers, setIdle, "headers")

Expand All @@ -1069,7 +1056,7 @@ func (d *Downloader) fetchBodies(from uint64) error {
capacity = func(p *peerConnection) int { return p.BlockCapacity(d.requestRTT()) }
setIdle = func(p *peerConnection, accepted int) { p.SetBodiesIdle(accepted) }
)
err := d.fetchParts(errCancelBodyFetch, d.bodyCh, deliver, d.bodyWakeCh, expire,
err := d.fetchParts(d.bodyCh, deliver, d.bodyWakeCh, expire,
d.queue.PendingBlocks, d.queue.InFlightBlocks, d.queue.ShouldThrottleBlocks, d.queue.ReserveBodies,
d.bodyFetchHook, fetch, d.queue.CancelBodies, capacity, d.peers.BodyIdlePeers, setIdle, "bodies")

Expand All @@ -1093,7 +1080,7 @@ func (d *Downloader) fetchReceipts(from uint64) error {
capacity = func(p *peerConnection) int { return p.ReceiptCapacity(d.requestRTT()) }
setIdle = func(p *peerConnection, accepted int) { p.SetReceiptsIdle(accepted) }
)
err := d.fetchParts(errCancelReceiptFetch, d.receiptCh, deliver, d.receiptWakeCh, expire,
err := d.fetchParts(d.receiptCh, deliver, d.receiptWakeCh, expire,
d.queue.PendingReceipts, d.queue.InFlightReceipts, d.queue.ShouldThrottleReceipts, d.queue.ReserveReceipts,
d.receiptFetchHook, fetch, d.queue.CancelReceipts, capacity, d.peers.ReceiptIdlePeers, setIdle, "receipts")

Expand Down Expand Up @@ -1126,7 +1113,7 @@ func (d *Downloader) fetchReceipts(from uint64) error {
// - idle: network callback to retrieve the currently (type specific) idle peers that can be assigned tasks
// - setIdle: network callback to set a peer back to idle and update its estimated capacity (traffic shaping)
// - kind: textual label of the type being downloaded to display in log mesages
func (d *Downloader) fetchParts(errCancel error, deliveryCh chan dataPack, deliver func(dataPack) (int, error), wakeCh chan bool,
func (d *Downloader) fetchParts(deliveryCh chan dataPack, deliver func(dataPack) (int, error), wakeCh chan bool,
expire func() map[string]int, pending func() int, inFlight func() bool, throttle func() bool, reserve func(*peerConnection, int) (*fetchRequest, bool, error),
fetchHook func([]*types.Header), fetch func(*peerConnection, *fetchRequest) error, cancel func(*fetchRequest), capacity func(*peerConnection) int,
idle func() ([]*peerConnection, int), setIdle func(*peerConnection, int), kind string) error {
Expand All @@ -1142,7 +1129,7 @@ func (d *Downloader) fetchParts(errCancel error, deliveryCh chan dataPack, deliv
for {
select {
case <-d.cancelCh:
return errCancel
return errCanceled

case packet := <-deliveryCh:
// If the peer was previously banned and failed to deliver its pack
Expand Down Expand Up @@ -1213,12 +1200,23 @@ func (d *Downloader) fetchParts(errCancel error, deliveryCh chan dataPack, deliv
setIdle(peer, 0)
} else {
peer.log.Debug("Stalling delivery, dropping", "type", kind)

if d.dropPeer == nil {
// The dropPeer method is nil when `--copydb` is used for a local copy.
// Timeouts can occur if e.g. compaction hits at the wrong time, and can be ignored
peer.log.Warn("Downloader wants to drop peer, but peerdrop-function is not set", "peer", pid)
} else {
d.dropPeer(pid)

// If this peer was the master peer, abort sync immediately
d.cancelLock.RLock()
master := pid == d.cancelPeer
d.cancelLock.RUnlock()

if master {
d.cancel()
return errTimeout
}
}
}
}
Expand Down Expand Up @@ -1323,7 +1321,7 @@ func (d *Downloader) processHeaders(origin uint64, pivot uint64, td *big.Int) er
for {
select {
case <-d.cancelCh:
return errCancelHeaderProcessing
return errCanceled

case headers := <-d.headerProcCh:
// Terminate header processing if we synced up
Expand Down Expand Up @@ -1376,7 +1374,7 @@ func (d *Downloader) processHeaders(origin uint64, pivot uint64, td *big.Int) er
// Terminate if something failed in between processing chunks
select {
case <-d.cancelCh:
return errCancelHeaderProcessing
return errCanceled
default:
}
// Select the next chunk of headers to import
Expand Down Expand Up @@ -1420,7 +1418,7 @@ func (d *Downloader) processHeaders(origin uint64, pivot uint64, td *big.Int) er
for d.queue.PendingBlocks() >= maxQueuedHeaders || d.queue.PendingReceipts() >= maxQueuedHeaders {
select {
case <-d.cancelCh:
return errCancelHeaderProcessing
return errCanceled
case <-time.After(time.Second):
}
}
Expand Down Expand Up @@ -1511,7 +1509,7 @@ func (d *Downloader) processFastSyncContent(latest *types.Header) error {
stateSync := d.syncState(latest.Root)
defer stateSync.Cancel()
go func() {
if err := stateSync.Wait(); err != nil && err != errCancelStateFetch {
if err := stateSync.Wait(); err != nil && err != errCancelStateFetch && err != errCanceled {
d.queue.Close() // wake up Results
}
}()
Expand Down Expand Up @@ -1539,7 +1537,8 @@ func (d *Downloader) processFastSyncContent(latest *types.Header) error {
// If sync failed, stop
select {
case <-d.cancelCh:
return stateSync.Cancel()
stateSync.Cancel()
return errCanceled
default:
}
}
Expand Down Expand Up @@ -1569,7 +1568,7 @@ func (d *Downloader) processFastSyncContent(latest *types.Header) error {
stateSync = d.syncState(P.Header.Root)
defer stateSync.Cancel()
go func() {
if err := stateSync.Wait(); err != nil && err != errCancelStateFetch {
if err := stateSync.Wait(); err != nil && err != errCancelStateFetch && err != errCanceled {
d.queue.Close() // wake up Results
}
}()
Expand Down
6 changes: 0 additions & 6 deletions eth/downloader/downloader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1057,14 +1057,8 @@ func testBlockHeaderAttackerDropping(t *testing.T, protocol int) {
{errPeersUnavailable, true}, // Nobody had the advertised blocks, drop the advertiser
{errInvalidAncestor, true}, // Agreed upon ancestor is not acceptable, drop the chain rewriter
{errInvalidChain, true}, // Hash chain was detected as invalid, definitely drop
{errInvalidBlock, false}, // A bad peer was detected, but not the sync origin
{errInvalidBody, false}, // A bad peer was detected, but not the sync origin
{errInvalidReceipt, false}, // A bad peer was detected, but not the sync origin
{errCancelBlockFetch, false}, // Synchronisation was canceled, origin may be innocent, don't drop
{errCancelHeaderFetch, false}, // Synchronisation was canceled, origin may be innocent, don't drop
{errCancelBodyFetch, false}, // Synchronisation was canceled, origin may be innocent, don't drop
{errCancelReceiptFetch, false}, // Synchronisation was canceled, origin may be innocent, don't drop
{errCancelHeaderProcessing, false}, // Synchronisation was canceled, origin may be innocent, don't drop
{errCancelContentProcessing, false}, // Synchronisation was canceled, origin may be innocent, don't drop
}
// Run the tests and check disconnection status
Expand Down
12 changes: 11 additions & 1 deletion eth/downloader/statesync.go
Original file line number Diff line number Diff line change
Expand Up @@ -301,7 +301,7 @@ func (s *stateSync) loop() (err error) {
return errCancelStateFetch

case <-s.d.cancelCh:
return errCancelStateFetch
return errCanceled

case req := <-s.deliver:
// Response, disconnect or timeout triggered, drop the peer if stalling
Expand All @@ -316,6 +316,16 @@ func (s *stateSync) loop() (err error) {
req.peer.log.Warn("Downloader wants to drop peer, but peerdrop-function is not set", "peer", req.peer.id)
} else {
s.d.dropPeer(req.peer.id)

// If this peer was the master peer, abort sync immediately
s.d.cancelLock.RLock()
master := req.peer.id == s.d.cancelPeer
s.d.cancelLock.RUnlock()

if master {
s.d.cancel()
return errTimeout
}
}
}
// Process all the received blobs and check for stale delivery
Expand Down