Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
119 changes: 11 additions & 108 deletions eth/downloader/downloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,16 +47,6 @@ var (
MaxReceiptFetch = 256 // Amount of transaction receipts to allow fetching per request
MaxStateFetch = 384 // Amount of node state values to allow fetching per request

rttMinEstimate = 2 * time.Second // Minimum round-trip time to target for download requests
rttMaxEstimate = 20 * time.Second // Maximum round-trip time to target for download requests
rttMinConfidence = 0.1 // Worse confidence factor in our estimated RTT value
ttlScaling = 3 // Constant scaling factor for RTT -> TTL conversion
ttlLimit = time.Minute // Maximum TTL allowance to prevent reaching crazy timeouts

qosTuningPeers = 5 // Number of peers to tune based on (best peers)
qosConfidenceCap = 10 // Number of peers above which not to modify RTT confidence
qosTuningImpact = 0.25 // Impact that a new tuning target has on the previous value

maxQueuedHeaders = 32 * 1024 // [eth/62] Maximum number of headers to queue for import (DOS protection)
maxHeadersProcess = 2048 // Number of header download results to import at once into the chain
maxResultsProcess = 2048 // Number of content download results to import at once into the chain
Expand Down Expand Up @@ -96,13 +86,6 @@ var (
)

type Downloader struct {
// WARNING: The `rttEstimate` and `rttConfidence` fields are accessed atomically.
// On 32 bit platforms, only 64-bit aligned fields can be atomic. The struct is
// guaranteed to be so aligned, so take advantage of that. For more information,
// see https://golang.org/pkg/sync/atomic/#pkg-note-BUG.
rttEstimate uint64 // Round trip time to target for download requests
rttConfidence uint64 // Confidence in the estimated RTT (unit: millionths to allow atomic ops)

mode uint32 // Synchronisation mode defining the strategy used (per sync cycle), use d.getMode() to get the SyncMode
mux *event.TypeMux // Event multiplexer to announce sync operation events

Expand Down Expand Up @@ -232,8 +215,6 @@ func New(checkpoint uint64, stateDb ethdb.Database, stateBloom *trie.SyncBloom,
checkpoint: checkpoint,
queue: newQueue(blockCacheMaxItems, blockCacheInitialItems),
peers: newPeerSet(),
rttEstimate: uint64(rttMaxEstimate),
rttConfidence: uint64(1000000),
blockchain: chain,
lightchain: lightchain,
dropPeer: dropPeer,
Expand All @@ -252,7 +233,6 @@ func New(checkpoint uint64, stateDb ethdb.Database, stateBloom *trie.SyncBloom,
},
trackStateReq: make(chan *stateReq),
}
go dl.qosTuner()
go dl.stateFetcher()
return dl
}
Expand Down Expand Up @@ -310,8 +290,6 @@ func (d *Downloader) RegisterPeer(id string, version uint, peer Peer) error {
logger.Error("Failed to register sync peer", "err", err)
return err
}
d.qosReduceConfidence()

return nil
}

Expand Down Expand Up @@ -670,7 +648,7 @@ func (d *Downloader) fetchHead(p *peerConnection) (head *types.Header, pivot *ty
}
go p.peer.RequestHeadersByHash(latest, fetch, fsMinFullBlocks-1, true)

ttl := d.requestTTL()
ttl := d.peers.rates.TargetTimeout()
timeout := time.After(ttl)
for {
select {
Expand Down Expand Up @@ -853,7 +831,7 @@ func (d *Downloader) findAncestorSpanSearch(p *peerConnection, mode SyncMode, re
// Wait for the remote response to the head fetch
number, hash := uint64(0), common.Hash{}

ttl := d.requestTTL()
ttl := d.peers.rates.TargetTimeout()
timeout := time.After(ttl)

for finished := false; !finished; {
Expand Down Expand Up @@ -942,7 +920,7 @@ func (d *Downloader) findAncestorBinarySearch(p *peerConnection, mode SyncMode,
// Split our chain interval in two, and request the hash to cross check
check := (start + end) / 2

ttl := d.requestTTL()
ttl := d.peers.rates.TargetTimeout()
timeout := time.After(ttl)

go p.peer.RequestHeadersByNumber(check, 1, 0, false)
Expand Down Expand Up @@ -1035,7 +1013,7 @@ func (d *Downloader) fetchHeaders(p *peerConnection, from uint64) error {
getHeaders := func(from uint64) {
request = time.Now()

ttl = d.requestTTL()
ttl = d.peers.rates.TargetTimeout()
timeout.Reset(ttl)

if skeleton {
Expand All @@ -1050,7 +1028,7 @@ func (d *Downloader) fetchHeaders(p *peerConnection, from uint64) error {
pivoting = true
request = time.Now()

ttl = d.requestTTL()
ttl = d.peers.rates.TargetTimeout()
timeout.Reset(ttl)

d.pivotLock.RLock()
Expand Down Expand Up @@ -1262,12 +1240,12 @@ func (d *Downloader) fillHeaderSkeleton(from uint64, skeleton []*types.Header) (
pack := packet.(*headerPack)
return d.queue.DeliverHeaders(pack.peerID, pack.headers, d.headerProcCh)
}
expire = func() map[string]int { return d.queue.ExpireHeaders(d.requestTTL()) }
expire = func() map[string]int { return d.queue.ExpireHeaders(d.peers.rates.TargetTimeout()) }
reserve = func(p *peerConnection, count int) (*fetchRequest, bool, bool) {
return d.queue.ReserveHeaders(p, count), false, false
}
fetch = func(p *peerConnection, req *fetchRequest) error { return p.FetchHeaders(req.From, MaxHeaderFetch) }
capacity = func(p *peerConnection) int { return p.HeaderCapacity(d.requestRTT()) }
capacity = func(p *peerConnection) int { return p.HeaderCapacity(d.peers.rates.TargetRoundTrip()) }
setIdle = func(p *peerConnection, accepted int, deliveryTime time.Time) {
p.SetHeadersIdle(accepted, deliveryTime)
}
Expand All @@ -1293,9 +1271,9 @@ func (d *Downloader) fetchBodies(from uint64) error {
pack := packet.(*bodyPack)
return d.queue.DeliverBodies(pack.peerID, pack.transactions, pack.uncles)
}
expire = func() map[string]int { return d.queue.ExpireBodies(d.requestTTL()) }
expire = func() map[string]int { return d.queue.ExpireBodies(d.peers.rates.TargetTimeout()) }
fetch = func(p *peerConnection, req *fetchRequest) error { return p.FetchBodies(req) }
capacity = func(p *peerConnection) int { return p.BlockCapacity(d.requestRTT()) }
capacity = func(p *peerConnection) int { return p.BlockCapacity(d.peers.rates.TargetRoundTrip()) }
setIdle = func(p *peerConnection, accepted int, deliveryTime time.Time) { p.SetBodiesIdle(accepted, deliveryTime) }
)
err := d.fetchParts(d.bodyCh, deliver, d.bodyWakeCh, expire,
Expand All @@ -1317,9 +1295,9 @@ func (d *Downloader) fetchReceipts(from uint64) error {
pack := packet.(*receiptPack)
return d.queue.DeliverReceipts(pack.peerID, pack.receipts)
}
expire = func() map[string]int { return d.queue.ExpireReceipts(d.requestTTL()) }
expire = func() map[string]int { return d.queue.ExpireReceipts(d.peers.rates.TargetTimeout()) }
fetch = func(p *peerConnection, req *fetchRequest) error { return p.FetchReceipts(req) }
capacity = func(p *peerConnection) int { return p.ReceiptCapacity(d.requestRTT()) }
capacity = func(p *peerConnection) int { return p.ReceiptCapacity(d.peers.rates.TargetRoundTrip()) }
setIdle = func(p *peerConnection, accepted int, deliveryTime time.Time) {
p.SetReceiptsIdle(accepted, deliveryTime)
}
Expand Down Expand Up @@ -2031,78 +2009,3 @@ func (d *Downloader) deliver(destCh chan dataPack, packet dataPack, inMeter, dro
return errNoSyncActive
}
}

// qosTuner is the quality of service tuning loop that occasionally gathers the
// peer latency statistics and updates the estimated request round trip time.
func (d *Downloader) qosTuner() {
for {
// Retrieve the current median RTT and integrate into the previoust target RTT
rtt := time.Duration((1-qosTuningImpact)*float64(atomic.LoadUint64(&d.rttEstimate)) + qosTuningImpact*float64(d.peers.medianRTT()))
atomic.StoreUint64(&d.rttEstimate, uint64(rtt))

// A new RTT cycle passed, increase our confidence in the estimated RTT
conf := atomic.LoadUint64(&d.rttConfidence)
conf = conf + (1000000-conf)/2
atomic.StoreUint64(&d.rttConfidence, conf)

// Log the new QoS values and sleep until the next RTT
log.Debug("Recalculated downloader QoS values", "rtt", rtt, "confidence", float64(conf)/1000000.0, "ttl", d.requestTTL())
select {
case <-d.quitCh:
return
case <-time.After(rtt):
}
}
}

// qosReduceConfidence is meant to be called when a new peer joins the downloader's
// peer set, needing to reduce the confidence we have in out QoS estimates.
func (d *Downloader) qosReduceConfidence() {
// If we have a single peer, confidence is always 1
peers := uint64(d.peers.Len())
if peers == 0 {
// Ensure peer connectivity races don't catch us off guard
return
}
if peers == 1 {
atomic.StoreUint64(&d.rttConfidence, 1000000)
return
}
// If we have a ton of peers, don't drop confidence)
if peers >= uint64(qosConfidenceCap) {
return
}
// Otherwise drop the confidence factor
conf := atomic.LoadUint64(&d.rttConfidence) * (peers - 1) / peers
if float64(conf)/1000000 < rttMinConfidence {
conf = uint64(rttMinConfidence * 1000000)
}
atomic.StoreUint64(&d.rttConfidence, conf)

rtt := time.Duration(atomic.LoadUint64(&d.rttEstimate))
log.Debug("Relaxed downloader QoS values", "rtt", rtt, "confidence", float64(conf)/1000000.0, "ttl", d.requestTTL())
}

// requestRTT returns the current target round trip time for a download request
// to complete in.
//
// Note, the returned RTT is .9 of the actually estimated RTT. The reason is that
// the downloader tries to adapt queries to the RTT, so multiple RTT values can
// be adapted to, but smaller ones are preferred (stabler download stream).
func (d *Downloader) requestRTT() time.Duration {
return time.Duration(atomic.LoadUint64(&d.rttEstimate)) * 9 / 10
}

// requestTTL returns the current timeout allowance for a single download request
// to finish under.
func (d *Downloader) requestTTL() time.Duration {
var (
rtt = time.Duration(atomic.LoadUint64(&d.rttEstimate))
conf = float64(atomic.LoadUint64(&d.rttConfidence)) / 1000000.0
)
ttl := time.Duration(ttlScaling) * time.Duration(float64(rtt)/conf)
if ttl > ttlLimit {
ttl = ttlLimit
}
return ttl
}
Loading