Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 0 additions & 20 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -24,26 +24,6 @@ jobs:
script:
- go run build/ci.go lint

- stage: build
os: linux
dist: xenial
go: 1.11.x
env:
- GO111MODULE=on
script:
- go run build/ci.go install
- go run build/ci.go test -coverage $TEST_PACKAGES

- stage: build
os: linux
dist: xenial
go: 1.12.x
env:
- GO111MODULE=on
script:
- go run build/ci.go install
- go run build/ci.go test -coverage $TEST_PACKAGES

- stage: build
os: linux
dist: xenial
Expand Down
35 changes: 25 additions & 10 deletions eth/downloader/downloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -321,13 +321,28 @@ func (d *Downloader) UnregisterPeer(id string) error {
// adding various sanity checks as well as wrapping it with various log entries.
func (d *Downloader) Synchronise(id string, head common.Hash, td *big.Int, mode SyncMode) error {
err := d.synchronise(id, head, td, mode)

switch err {
case nil:
case errBusy, errCanceled:
case nil, errBusy, errCanceled:
return err
}

if errors.Is(err, errInvalidChain) {
log.Warn("Synchronisation failed, dropping peer", "peer", id, "err", err)
if d.dropPeer == nil {
// The dropPeer method is nil when `--copydb` is used for a local copy.
// Timeouts can occur if e.g. compaction hits at the wrong time, and can be ignored
log.Warn("Downloader wants to drop peer, but peerdrop-function is not set", "peer", id)
} else {
d.dropPeer(id)
}
return err
}

switch err {
case errTimeout, errBadPeer, errStallingPeer, errUnsyncedPeer,
errEmptyHeaderSet, errPeersUnavailable, errTooOld,
errInvalidAncestor, errInvalidChain:
errInvalidAncestor:
log.Warn("Synchronisation failed, dropping peer", "peer", id, "err", err)
if d.dropPeer == nil {
// The dropPeer method is nil when `--copydb` is used for a local copy.
Expand Down Expand Up @@ -774,7 +789,7 @@ func (d *Downloader) findAncestor(p *peerConnection, remoteHeader *types.Header)
expectNumber := from + int64(i)*int64(skip+1)
if number := header.Number.Int64(); number != expectNumber {
p.log.Warn("Head headers broke chain ordering", "index", i, "requested", expectNumber, "received", number)
return 0, errInvalidChain
return 0, fmt.Errorf("%w: %v", errInvalidChain, errors.New("head headers broke chain ordering"))
}
}
// Check if a common ancestor was found
Expand Down Expand Up @@ -988,7 +1003,7 @@ func (d *Downloader) fetchHeaders(p *peerConnection, from uint64, pivot uint64)
filled, proced, err := d.fillHeaderSkeleton(from, headers)
if err != nil {
p.log.Debug("Skeleton chain invalid", "err", err)
return errInvalidChain
return fmt.Errorf("%w: %v", errInvalidChain, err)
}
headers = filled[proced:]
from += uint64(proced)
Expand Down Expand Up @@ -1207,13 +1222,13 @@ func (d *Downloader) fetchParts(deliveryCh chan dataPack, deliver func(dataPack)
if peer := d.peers.Peer(packet.PeerId()); peer != nil {
// Deliver the received chunk of data and check chain validity
accepted, err := deliver(packet)
if err == errInvalidChain {
if errors.Is(err, errInvalidChain) {
return err
}
// Unless a peer delivered something completely else than requested (usually
// caused by a timed out request which came through in the end), set it to
// idle. If the delivery's stale, the peer should have already been idled.
if err != errStaleDelivery {
if !errors.Is(err, errStaleDelivery) {
setIdle(peer, accepted)
}
// Issue a log to the user to see what's going on
Expand Down Expand Up @@ -1473,7 +1488,7 @@ func (d *Downloader) processHeaders(origin uint64, pivot uint64, td *big.Int) er
rollback = append(rollback, chunk[:n]...)
}
log.Debug("Invalid header encountered", "number", chunk[n].Number, "hash", chunk[n].Hash(), "err", err)
return errInvalidChain
return fmt.Errorf("%w: %v", errInvalidChain, err)
}
// All verifications passed, store newly found uncertain headers
rollback = append(rollback, unknown...)
Expand Down Expand Up @@ -1565,7 +1580,7 @@ func (d *Downloader) importBlockResults(results []*fetchResult) error {
// of the blocks delivered from the downloader, and the indexing will be off.
log.Debug("Downloaded item processing failed on sidechain import", "index", index, "err", err)
}
return errInvalidChain
return fmt.Errorf("%w: %v", errInvalidChain, err)
}
return nil
}
Expand Down Expand Up @@ -1706,7 +1721,7 @@ func (d *Downloader) commitFastSyncData(results []*fetchResult, stateSync *state
}
if index, err := d.blockchain.InsertReceiptChain(blocks, receipts, d.ancientLimit); err != nil {
log.Debug("Downloaded item processing failed", "number", results[index].Header.Number, "hash", results[index].Header.Hash(), "err", err)
return errInvalidChain
return fmt.Errorf("%w: %v", errInvalidChain, err)
}
return nil
}
Expand Down
27 changes: 16 additions & 11 deletions eth/downloader/downloader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -242,27 +242,32 @@ func (dl *downloadTester) GetTd(hash common.Hash, number uint64) *big.Int {
func (dl *downloadTester) InsertHeaderChain(headers []*types.Header, checkFreq int) (i int, err error) {
dl.lock.Lock()
defer dl.lock.Unlock()

// Do a quick check, as the blockchain.InsertHeaderChain doesn't insert anything in case of errors
if _, ok := dl.ownHeaders[headers[0].ParentHash]; !ok {
return 0, errors.New("unknown parent")
return 0, errors.New("InsertHeaderChain: unknown parent at first position")
}
var hashes []common.Hash
for i := 1; i < len(headers); i++ {
hash := headers[i-1].Hash()
if headers[i].ParentHash != headers[i-1].Hash() {
return i, errors.New("unknown parent")
return i, fmt.Errorf("non-contiguous import at position %d", i)
}
hashes = append(hashes, hash)
}
hashes = append(hashes, headers[len(headers)-1].Hash())
// Do a full insert if pre-checks passed
for i, header := range headers {
if _, ok := dl.ownHeaders[header.Hash()]; ok {
hash := hashes[i]
if _, ok := dl.ownHeaders[hash]; ok {
continue
}
if _, ok := dl.ownHeaders[header.ParentHash]; !ok {
return i, errors.New("unknown parent")
// This _should_ be impossible, due to precheck and induction
return i, fmt.Errorf("InsertHeaderChain: unknown parent at position %d", i)
}
dl.ownHashes = append(dl.ownHashes, header.Hash())
dl.ownHeaders[header.Hash()] = header
dl.ownChainTd[header.Hash()] = new(big.Int).Add(dl.ownChainTd[header.ParentHash], header.Difficulty)
dl.ownHashes = append(dl.ownHashes, hash)
dl.ownHeaders[hash] = header
dl.ownChainTd[hash] = new(big.Int).Add(dl.ownChainTd[header.ParentHash], header.Difficulty)
}
return len(headers), nil
}
Expand All @@ -274,9 +279,9 @@ func (dl *downloadTester) InsertChain(blocks types.Blocks) (i int, err error) {

for i, block := range blocks {
if parent, ok := dl.ownBlocks[block.ParentHash()]; !ok {
return i, errors.New("unknown parent")
return i, fmt.Errorf("InsertChain: unknown parent at position %d / %d", i, len(blocks))
} else if _, err := dl.stateDb.Get(parent.Root().Bytes()); err != nil {
return i, fmt.Errorf("unknown parent state %x: %v", parent.Root(), err)
return i, fmt.Errorf("InsertChain: unknown parent state %x: %v", parent.Root(), err)
}
if _, ok := dl.ownHeaders[block.Hash()]; !ok {
dl.ownHashes = append(dl.ownHashes, block.Hash())
Expand All @@ -301,7 +306,7 @@ func (dl *downloadTester) InsertReceiptChain(blocks types.Blocks, receipts []typ
}
if _, ok := dl.ancientBlocks[blocks[i].ParentHash()]; !ok {
if _, ok := dl.ownBlocks[blocks[i].ParentHash()]; !ok {
return i, errors.New("unknown parent")
return i, errors.New("InsertReceiptChain: unknown parent")
}
}
if blocks[i].NumberU64() <= ancientLimit {
Expand Down
14 changes: 8 additions & 6 deletions eth/downloader/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -509,7 +509,7 @@ func (q *queue) reserveHeaders(p *peerConnection, count int, taskPool map[common
index := int(header.Number.Int64() - int64(q.resultOffset))
if index >= len(q.resultCache) || index < 0 {
common.Report("index allocation went beyond available resultCache space")
return nil, false, errInvalidChain
return nil, false, fmt.Errorf("%w: index allocation went beyond available resultCache space", errInvalidChain)
}
if q.resultCache[index] == nil {
components := 1
Expand Down Expand Up @@ -863,14 +863,16 @@ func (q *queue) deliver(id string, taskPool map[common.Hash]*types.Header, taskQ
q.active.Signal()
}
// If none of the data was good, it's a stale delivery
switch {
case failure == nil || failure == errInvalidChain:
if failure == nil {
return accepted, nil
}
if errors.Is(failure, errInvalidChain) {
return accepted, failure
case useful:
}
if useful {
return accepted, fmt.Errorf("partial failure: %v", failure)
default:
return accepted, errStaleDelivery
}
return accepted, fmt.Errorf("%w: %v", failure, errStaleDelivery)
}

// Prepare configures the result cache to allow accepting and caching inbound
Expand Down