Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions core/types/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,17 @@ func rlpHash(x interface{}) (h common.Hash) {
return h
}

// EmptyBody returns true if there is no additional 'body' to complete the header
// that is: no transactions and no uncles.
func (h *Header) EmptyBody() bool {
return h.TxHash == EmptyRootHash && h.UncleHash == EmptyUncleHash
}

// EmptyReceipts returns true if there are no receipts for this header/block.
func (h *Header) EmptyReceipts() bool {
return h.ReceiptHash == EmptyRootHash
}

// Body is a simple (mutable, non-safe) data container for storing and moving
// a block's data contents (transactions and uncles) together.
type Body struct {
Expand Down
80 changes: 50 additions & 30 deletions eth/downloader/downloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ func New(checkpoint uint64, stateDb ethdb.Database, stateBloom *trie.SyncBloom,
stateBloom: stateBloom,
mux: mux,
checkpoint: checkpoint,
queue: newQueue(),
queue: newQueue(blockCacheItems),
peers: newPeerSet(),
rttEstimate: uint64(rttMaxEstimate),
rttConfidence: uint64(1000000),
Expand Down Expand Up @@ -367,7 +367,7 @@ func (d *Downloader) synchronise(id string, hash common.Hash, td *big.Int, mode
d.stateBloom.Close()
}
// Reset the queue, peer set and wake channels to clean any internal leftover state
d.queue.Reset()
d.queue.Reset(blockCacheItems)
d.peers.Reset()

for _, ch := range []chan bool{d.bodyWakeCh, d.receiptWakeCh} {
Expand Down Expand Up @@ -588,8 +588,10 @@ func (d *Downloader) Terminate() {
default:
close(d.quitCh)
}
if d.stateBloom != nil {
d.stateBloom.Close()
}
d.quitLock.Unlock()

// Cancel any pending download requests
d.Cancel()
}
Expand Down Expand Up @@ -619,7 +621,7 @@ func (d *Downloader) fetchHeight(p *peerConnection) (*types.Header, error) {
// Make sure the peer actually gave something valid
headers := packet.(*headerPack).headers
if len(headers) != 1 {
p.log.Debug("Multiple headers for single request", "headers", len(headers))
p.log.Warn("Multiple headers for single request", "headers", len(headers))
return nil, errBadPeer
}
head := headers[0]
Expand Down Expand Up @@ -851,7 +853,7 @@ func (d *Downloader) findAncestor(p *peerConnection, remoteHeader *types.Header)
// Make sure the peer actually gave something valid
headers := packer.(*headerPack).headers
if len(headers) != 1 {
p.log.Debug("Multiple headers for single request", "headers", len(headers))
p.log.Warn("Multiple headers for single request", "headers", len(headers))
return 0, errBadPeer
}
arrived = true
Expand All @@ -875,7 +877,7 @@ func (d *Downloader) findAncestor(p *peerConnection, remoteHeader *types.Header)
}
header := d.lightchain.GetHeaderByHash(h) // Independent of sync mode, header surely exists
if header.Number.Uint64() != check {
p.log.Debug("Received non requested header", "number", header.Number, "hash", header.Hash(), "request", check)
p.log.Warn("Received non requested header", "number", header.Number, "hash", header.Hash(), "request", check)
return 0, errBadPeer
}
start = check
Expand Down Expand Up @@ -1090,17 +1092,18 @@ func (d *Downloader) fillHeaderSkeleton(from uint64, skeleton []*types.Header) (
pack := packet.(*headerPack)
return d.queue.DeliverHeaders(pack.peerID, pack.headers, d.headerProcCh)
}
expire = func() map[string]int { return d.queue.ExpireHeaders(d.requestTTL()) }
throttle = func() bool { return false }
reserve = func(p *peerConnection, count int) (*fetchRequest, bool, error) {
return d.queue.ReserveHeaders(p, count), false, nil
expire = func() map[string]int { return d.queue.ExpireHeaders(d.requestTTL()) }
reserve = func(p *peerConnection, count int) (*fetchRequest, bool, bool, error) {
return d.queue.ReserveHeaders(p, count), false, false, nil
}
fetch = func(p *peerConnection, req *fetchRequest) error { return p.FetchHeaders(req.From, MaxHeaderFetch) }
capacity = func(p *peerConnection) int { return p.HeaderCapacity(d.requestRTT()) }
setIdle = func(p *peerConnection, accepted int) { p.SetHeadersIdle(accepted) }
setIdle = func(p *peerConnection, accepted int, deliveryTime time.Time) {
p.SetHeadersIdle(accepted, deliveryTime)
}
)
err := d.fetchParts(d.headerCh, deliver, d.queue.headerContCh, expire,
d.queue.PendingHeaders, d.queue.InFlightHeaders, throttle, reserve,
d.queue.PendingHeaders, d.queue.InFlightHeaders, reserve,
nil, fetch, d.queue.CancelHeaders, capacity, d.peers.HeaderIdlePeers, setIdle, "headers")

log.Debug("Skeleton fill terminated", "err", err)
Expand All @@ -1123,10 +1126,10 @@ func (d *Downloader) fetchBodies(from uint64) error {
expire = func() map[string]int { return d.queue.ExpireBodies(d.requestTTL()) }
fetch = func(p *peerConnection, req *fetchRequest) error { return p.FetchBodies(req) }
capacity = func(p *peerConnection) int { return p.BlockCapacity(d.requestRTT()) }
setIdle = func(p *peerConnection, accepted int) { p.SetBodiesIdle(accepted) }
setIdle = func(p *peerConnection, accepted int, deliveryTime time.Time) { p.SetBodiesIdle(accepted, deliveryTime) }
)
err := d.fetchParts(d.bodyCh, deliver, d.bodyWakeCh, expire,
d.queue.PendingBlocks, d.queue.InFlightBlocks, d.queue.ShouldThrottleBlocks, d.queue.ReserveBodies,
d.queue.PendingBlocks, d.queue.InFlightBlocks, d.queue.ReserveBodies,
d.bodyFetchHook, fetch, d.queue.CancelBodies, capacity, d.peers.BodyIdlePeers, setIdle, "bodies")

log.Debug("Block body download terminated", "err", err)
Expand All @@ -1147,10 +1150,12 @@ func (d *Downloader) fetchReceipts(from uint64) error {
expire = func() map[string]int { return d.queue.ExpireReceipts(d.requestTTL()) }
fetch = func(p *peerConnection, req *fetchRequest) error { return p.FetchReceipts(req) }
capacity = func(p *peerConnection) int { return p.ReceiptCapacity(d.requestRTT()) }
setIdle = func(p *peerConnection, accepted int) { p.SetReceiptsIdle(accepted) }
setIdle = func(p *peerConnection, accepted int, deliveryTime time.Time) {
p.SetReceiptsIdle(accepted, deliveryTime)
}
)
err := d.fetchParts(d.receiptCh, deliver, d.receiptWakeCh, expire,
d.queue.PendingReceipts, d.queue.InFlightReceipts, d.queue.ShouldThrottleReceipts, d.queue.ReserveReceipts,
d.queue.PendingReceipts, d.queue.InFlightReceipts, d.queue.ReserveReceipts,
d.receiptFetchHook, fetch, d.queue.CancelReceipts, capacity, d.peers.ReceiptIdlePeers, setIdle, "receipts")

log.Debug("Transaction receipt download terminated", "err", err)
Expand Down Expand Up @@ -1183,14 +1188,13 @@ func (d *Downloader) fetchReceipts(from uint64) error {
// - setIdle: network callback to set a peer back to idle and update its estimated capacity (traffic shaping)
// - kind: textual label of the type being downloaded to display in log mesages
func (d *Downloader) fetchParts(deliveryCh chan dataPack, deliver func(dataPack) (int, error), wakeCh chan bool,
expire func() map[string]int, pending func() int, inFlight func() bool, throttle func() bool, reserve func(*peerConnection, int) (*fetchRequest, bool, error),
expire func() map[string]int, pending func() int, inFlight func() bool, reserve func(*peerConnection, int) (*fetchRequest, bool, bool, error),
fetchHook func([]*types.Header), fetch func(*peerConnection, *fetchRequest) error, cancel func(*fetchRequest), capacity func(*peerConnection) int,
idle func() ([]*peerConnection, int), setIdle func(*peerConnection, int), kind string) error {
idle func() ([]*peerConnection, int), setIdle func(*peerConnection, int, time.Time), kind string) error {

// Create a ticker to detect expired retrieval tasks
ticker := time.NewTicker(100 * time.Millisecond)
defer ticker.Stop()

update := make(chan struct{}, 1)

// Prepare the queue and fetch block parts until the block header fetcher's done
Expand All @@ -1201,6 +1205,7 @@ func (d *Downloader) fetchParts(deliveryCh chan dataPack, deliver func(dataPack)
return errCanceled

case packet := <-deliveryCh:
deliveryTime := time.Now()
// If the peer was previously banned and failed to deliver its pack
// in a reasonable time frame, ignore its message.
if peer := d.peers.Peer(packet.PeerId()); peer != nil {
Expand All @@ -1213,7 +1218,7 @@ func (d *Downloader) fetchParts(deliveryCh chan dataPack, deliver func(dataPack)
// caused by a timed out request which came through in the end), set it to
// idle. If the delivery's stale, the peer should have already been idled.
if err != errStaleDelivery {
setIdle(peer, accepted)
setIdle(peer, accepted, deliveryTime)
}
// Issue a log to the user to see what's going on
switch {
Expand Down Expand Up @@ -1266,7 +1271,7 @@ func (d *Downloader) fetchParts(deliveryCh chan dataPack, deliver func(dataPack)
// how response times reacts, to it always requests one more than the minimum (i.e. min 2).
if fails > 2 {
peer.log.Trace("Data delivery timed out", "type", kind)
setIdle(peer, 0)
setIdle(peer, 0, time.Now())
} else {
peer.log.Debug("Stalling delivery, dropping", "type", kind)

Expand Down Expand Up @@ -1301,27 +1306,31 @@ func (d *Downloader) fetchParts(deliveryCh chan dataPack, deliver func(dataPack)
// Send a download request to all idle peers, until throttled
progressed, throttled, running := false, false, inFlight()
idles, total := idle()

pendCount := pending()
for _, peer := range idles {
// Short circuit if throttling activated
if throttle() {
throttled = true
if throttled {
break
}
// Short circuit if there is no more available task.
if pending() == 0 {
if pendCount = pending(); pendCount == 0 {
break
}
// Reserve a chunk of fetches for a peer. A nil can mean either that
// no more headers are available, or that the peer is known not to
// have them.
request, progress, err := reserve(peer, capacity(peer))
request, progress, throttle, err := reserve(peer, capacity(peer))
if err != nil {
log.Info("Error reserving fetch", "err", err)
return err
}
if progress {
progressed = true
}
if throttle {
throttled = true
throttleCounter.Inc(1)
}
if request == nil {
continue
}
Expand All @@ -1346,7 +1355,7 @@ func (d *Downloader) fetchParts(deliveryCh chan dataPack, deliver func(dataPack)
}
// Make sure that we have peers available for fetching. If all peers have been tried
// and all failed throw an error
if !progressed && !throttled && !running && len(idles) == total && pending() > 0 {
if !progressed && !throttled && !running && len(idles) == total && pendCount > 0 {
return errPeersUnavailable
}
}
Expand All @@ -1359,6 +1368,7 @@ func (d *Downloader) fetchParts(deliveryCh chan dataPack, deliver func(dataPack)
func (d *Downloader) processHeaders(origin uint64, pivot uint64, td *big.Int) error {
// Keep a count of uncertain headers to roll back
var rollback []*types.Header
var rollbackErr error
defer func() {
if len(rollback) > 0 {
// Flatten the headers and roll them back
Expand All @@ -1380,7 +1390,7 @@ func (d *Downloader) processHeaders(origin uint64, pivot uint64, td *big.Int) er
log.Warn("Rolled back headers", "count", len(hashes),
"header", fmt.Sprintf("%d->%d", lastHeader, d.lightchain.CurrentHeader().Number),
"fast", fmt.Sprintf("%d->%d", lastFastBlock, curFastBlock),
"block", fmt.Sprintf("%d->%d", lastBlock, curBlock))
"block", fmt.Sprintf("%d->%d", lastBlock, curBlock), "reason", rollbackErr)
}
}()

Expand Down Expand Up @@ -1469,9 +1479,11 @@ func (d *Downloader) processHeaders(origin uint64, pivot uint64, td *big.Int) er
if n, err := d.lightchain.InsertHeaderChain(chunk, frequency); err != nil {
// If some headers were inserted, add them too to the rollback list
if n > 0 {
rollbackErr = err
rollback = append(rollback, chunk[:n]...)
}
log.Debug("Invalid header encountered", "number", chunk[n].Number, "hash", chunk[n].Hash(), "err", err)
log.Info("Invalid header encountered", "number", chunk[n].Number,
"hash", chunk[n].Hash(), "parent", chunk[n].ParentHash, "err", err)
return errInvalidChain
}
// All verifications passed, store newly found uncertain headers
Expand All @@ -1493,7 +1505,7 @@ func (d *Downloader) processHeaders(origin uint64, pivot uint64, td *big.Int) er
// Otherwise insert the headers for content retrieval
inserts := d.queue.Schedule(chunk, origin)
if len(inserts) != len(chunk) {
log.Debug("Stale headers")
rollbackErr = fmt.Errorf("stale headers: len inserts %v len(chunk) %v", len(inserts), len(chunk))
return errBadPeer
}
}
Expand Down Expand Up @@ -1663,6 +1675,14 @@ func (d *Downloader) processFastSyncContent(latest *types.Header) error {
}

func splitAroundPivot(pivot uint64, results []*fetchResult) (p *fetchResult, before, after []*fetchResult) {
if len(results) == 0 {
return nil, nil, nil
}
if lastNum := results[len(results)-1].Header.Number.Uint64(); lastNum < pivot {
// the pivot is somewhere in the future
return nil, results, nil
}
// This can also be optimized, but only happens very seldom
for _, result := range results {
num := result.Header.Number.Uint64()
switch {
Expand Down
Loading