Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 36 additions & 20 deletions eth/downloader/downloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ type Downloader struct {
blockCh chan blockPack // [eth/61] Channel receiving inbound blocks
headerCh chan headerPack // [eth/62] Channel receiving inbound block headers
bodyCh chan bodyPack // [eth/62] Channel receiving inbound block bodies
processCh chan bool // Channel to signal the block fetcher of new or finished work
wakeCh chan bool // Channel to signal the block/body fetcher of new tasks

cancelCh chan struct{} // Channel to cancel mid-flight syncs
cancelLock sync.RWMutex // Lock to protect the cancel channel in delivers
Expand Down Expand Up @@ -188,7 +188,7 @@ func New(mux *event.TypeMux, hasBlock hashCheckFn, getBlock blockRetrievalFn, he
blockCh: make(chan blockPack, 1),
headerCh: make(chan headerPack, 1),
bodyCh: make(chan bodyPack, 1),
processCh: make(chan bool, 1),
wakeCh: make(chan bool, 1),
}
}

Expand Down Expand Up @@ -282,6 +282,10 @@ func (d *Downloader) synchronise(id string, hash common.Hash, td *big.Int) error
d.queue.Reset()
d.peers.Reset()

select {
case <-d.wakeCh:
default:
}
// Create cancel channel for aborting mid-flight
d.cancelLock.Lock()
d.cancelCh = make(chan struct{})
Expand Down Expand Up @@ -633,7 +637,7 @@ func (d *Downloader) fetchHashes61(p *peer, td *big.Int, from uint64) error {
glog.V(logger.Debug).Infof("%v: no available hashes", p)

select {
case d.processCh <- false:
case d.wakeCh <- false:
case <-d.cancelCh:
}
// If no hashes were retrieved at all, the peer violated it's TD promise that it had a
Expand Down Expand Up @@ -664,12 +668,18 @@ func (d *Downloader) fetchHashes61(p *peer, td *big.Int, from uint64) error {
return errBadPeer
}
// Notify the block fetcher of new hashes, but stop if queue is full
cont := d.queue.Pending() < maxQueuedHashes
select {
case d.processCh <- cont:
default:
}
if !cont {
if d.queue.Pending() < maxQueuedHashes {
// We still have hashes to fetch, send continuation wake signal (potential)
select {
case d.wakeCh <- true:
default:
}
} else {
// Hash limit reached, send a termination wake signal (enforced)
select {
case d.wakeCh <- false:
case <-d.cancelCh:
}
return nil
}
// Queue not yet full, fetch the next batch
Expand Down Expand Up @@ -766,7 +776,7 @@ func (d *Downloader) fetchBlocks61(from uint64) error {
default:
}

case cont := <-d.processCh:
case cont := <-d.wakeCh:
// The hash fetcher sent a continuation flag, check if it's done
if !cont {
finished = true
Expand Down Expand Up @@ -1053,7 +1063,7 @@ func (d *Downloader) fetchHeaders(p *peer, td *big.Int, from uint64) error {
glog.V(logger.Debug).Infof("%v: no available headers", p)

select {
case d.processCh <- false:
case d.wakeCh <- false:
case <-d.cancelCh:
}
// If no headers were retrieved at all, the peer violated it's TD promise that it had a
Expand Down Expand Up @@ -1084,12 +1094,18 @@ func (d *Downloader) fetchHeaders(p *peer, td *big.Int, from uint64) error {
return errBadPeer
}
// Notify the block fetcher of new headers, but stop if queue is full
cont := d.queue.Pending() < maxQueuedHeaders
select {
case d.processCh <- cont:
default:
}
if !cont {
if d.queue.Pending() < maxQueuedHeaders {
// We still have headers to fetch, send continuation wake signal (potential)
select {
case d.wakeCh <- true:
default:
}
} else {
// Header limit reached, send a termination wake signal (enforced)
select {
case d.wakeCh <- false:
case <-d.cancelCh:
}
return nil
}
// Queue not yet full, fetch the next batch
Expand All @@ -1104,8 +1120,8 @@ func (d *Downloader) fetchHeaders(p *peer, td *big.Int, from uint64) error {

// Finish the sync gracefully instead of dumping the gathered data though
select {
case d.processCh <- false:
default:
case d.wakeCh <- false:
case <-d.cancelCh:
}
return nil
}
Expand Down Expand Up @@ -1199,7 +1215,7 @@ func (d *Downloader) fetchBodies(from uint64) error {
default:
}

case cont := <-d.processCh:
case cont := <-d.wakeCh:
// The header fetcher sent a continuation flag, check if it's done
if !cont {
finished = true
Expand Down