Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions eth/downloader/downloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ func New(stateDb ethdb.Database, mux *event.TypeMux, chain BlockChain, lightchai
dl := &Downloader{
stateDB: stateDb,
mux: mux,
queue: newQueue(blockCacheItems),
queue: newQueue(blockCacheMaxItems, blockCacheInitialItems),
peers: newPeerSet(),
rttEstimate: uint64(rttMaxEstimate),
rttConfidence: uint64(1000000),
Expand Down Expand Up @@ -359,7 +359,7 @@ func (d *Downloader) synchronise(id string, hash common.Hash, td *big.Int, mode
log.Info("Block synchronisation started")
}
// Reset the queue, peer set and wake channels to clean any internal leftover state
d.queue.Reset(blockCacheItems)
d.queue.Reset(blockCacheMaxItems, blockCacheInitialItems)
d.peers.Reset()

for _, ch := range []chan bool{d.bodyWakeCh, d.receiptWakeCh} {
Expand Down
24 changes: 12 additions & 12 deletions eth/downloader/downloader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ import (
// Reduce some of the parameters to make the tester faster.
func init() {
MaxForkAncestry = uint64(10000)
blockCacheItems = 1024
blockCacheMaxItems = 1024
fsHeaderContCheck = 500 * time.Millisecond
}

Expand Down Expand Up @@ -469,7 +469,7 @@ func testCanonicalSynchronisation(t *testing.T, protocol int, mode SyncMode) {
defer tester.terminate()

// Create a small enough block chain to download
chain := testChainBase.shorten(blockCacheItems - 15)
chain := testChainBase.shorten(blockCacheMaxItems - 15)
tester.newPeer("peer", protocol, chain)

// Synchronise with the peer and make sure all relevant data was retrieved
Expand Down Expand Up @@ -531,8 +531,8 @@ func testThrottling(t *testing.T, protocol int, mode SyncMode) {
}
tester.lock.Unlock()

if cached == blockCacheItems ||
cached == blockCacheItems-reorgProtHeaderDelay ||
if cached == blockCacheMaxItems ||
cached == blockCacheMaxItems-reorgProtHeaderDelay ||
retrieved+cached+frozen == targetBlocks+1 ||
retrieved+cached+frozen == targetBlocks+1-reorgProtHeaderDelay {
break
Expand All @@ -543,8 +543,8 @@ func testThrottling(t *testing.T, protocol int, mode SyncMode) {
tester.lock.RLock()
retrieved = len(tester.ownBlocks)
tester.lock.RUnlock()
if cached != blockCacheItems && cached != blockCacheItems-reorgProtHeaderDelay && retrieved+cached+frozen != targetBlocks+1 && retrieved+cached+frozen != targetBlocks+1-reorgProtHeaderDelay {
t.Fatalf("block count mismatch: have %v, want %v (owned %v, blocked %v, target %v)", cached, blockCacheItems, retrieved, frozen, targetBlocks+1)
if cached != blockCacheMaxItems && cached != blockCacheMaxItems-reorgProtHeaderDelay && retrieved+cached+frozen != targetBlocks+1 && retrieved+cached+frozen != targetBlocks+1-reorgProtHeaderDelay {
t.Fatalf("block count mismatch: have %v, want %v (owned %v, blocked %v, target %v)", cached, blockCacheMaxItems, retrieved, frozen, targetBlocks+1)
}

// Permit the blocked blocks to import
Expand Down Expand Up @@ -807,7 +807,7 @@ func testMultiProtoSync(t *testing.T, protocol int, mode SyncMode) {
defer tester.terminate()

// Create a small enough block chain to download
chain := testChainBase.shorten(blockCacheItems - 15)
chain := testChainBase.shorten(blockCacheMaxItems - 15)

// Create peers of every type
tester.newPeer("peer 62", 62, chain)
Expand Down Expand Up @@ -897,7 +897,7 @@ func testMissingHeaderAttack(t *testing.T, protocol int, mode SyncMode) {
tester := newTester()
defer tester.terminate()

chain := testChainBase.shorten(blockCacheItems - 15)
chain := testChainBase.shorten(blockCacheMaxItems - 15)
brokenChain := chain.shorten(chain.len())
delete(brokenChain.headerm, brokenChain.chain[brokenChain.len()/2])
tester.newPeer("attack", protocol, brokenChain)
Expand Down Expand Up @@ -928,7 +928,7 @@ func testShiftedHeaderAttack(t *testing.T, protocol int, mode SyncMode) {
tester := newTester()
defer tester.terminate()

chain := testChainBase.shorten(blockCacheItems - 15)
chain := testChainBase.shorten(blockCacheMaxItems - 15)

// Attempt a full sync with an attacker feeding shifted headers
brokenChain := chain.shorten(chain.len())
Expand Down Expand Up @@ -1129,7 +1129,7 @@ func testSyncProgress(t *testing.T, protocol int, mode SyncMode) {

tester := newTester()
defer tester.terminate()
chain := testChainBase.shorten(blockCacheItems - 15)
chain := testChainBase.shorten(blockCacheMaxItems - 15)

// Set a sync init hook to catch progress changes
starting := make(chan struct{})
Expand Down Expand Up @@ -1290,7 +1290,7 @@ func testFailedSyncProgress(t *testing.T, protocol int, mode SyncMode) {

tester := newTester()
defer tester.terminate()
chain := testChainBase.shorten(blockCacheItems - 15)
chain := testChainBase.shorten(blockCacheMaxItems - 15)

// Set a sync init hook to catch progress changes
starting := make(chan struct{})
Expand Down Expand Up @@ -1362,7 +1362,7 @@ func testFakedSyncProgress(t *testing.T, protocol int, mode SyncMode) {

tester := newTester()
defer tester.terminate()
chain := testChainBase.shorten(blockCacheItems - 15)
chain := testChainBase.shorten(blockCacheMaxItems - 15)

// Set a sync init hook to catch progress changes
starting := make(chan struct{})
Expand Down
14 changes: 8 additions & 6 deletions eth/downloader/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,10 @@ const (
)

var (
blockCacheItems = 8192 // Maximum number of blocks to cache before throttling the download
blockCacheMemory = 64 * 1024 * 1024 // Maximum amount of memory to use for block caching
blockCacheSizeWeight = 0.1 // Multiplier to approximate the average block size based on past ones
blockCacheMaxItems = 8192 // Maximum number of blocks to cache before throttling the download
blockCacheInitialItems = 2048 // Initial number of blocks to start fetching, before we know the sizes of the blocks
blockCacheMemory = 64 * 1024 * 1024 // Maximum amount of memory to use for block caching
blockCacheSizeWeight = 0.1 // Multiplier to approximate the average block size based on past ones
)

var (
Expand Down Expand Up @@ -142,7 +143,7 @@ type queue struct {
}

// newQueue creates a new download queue for scheduling block retrieval.
func newQueue(blockCacheLimit int) *queue {
func newQueue(blockCacheLimit int, thresholdInitialSize int) *queue {
lock := new(sync.RWMutex)
q := &queue{
headerContCh: make(chan bool),
Expand All @@ -151,12 +152,12 @@ func newQueue(blockCacheLimit int) *queue {
active: sync.NewCond(lock),
lock: lock,
}
q.Reset(blockCacheLimit)
q.Reset(blockCacheLimit, thresholdInitialSize)
return q
}

// Reset clears out the queue contents.
func (q *queue) Reset(blockCacheLimit int) {
func (q *queue) Reset(blockCacheLimit int, thresholdInitialSize int) {
q.lock.Lock()
defer q.lock.Unlock()

Expand All @@ -175,6 +176,7 @@ func (q *queue) Reset(blockCacheLimit int) {
q.receiptPendPool = make(map[string]*fetchRequest)

q.resultCache = newResultStore(blockCacheLimit)
q.resultCache.SetThrottleThreshold(uint64(thresholdInitialSize))
}

// Close marks the end of the sync, unblocking Results.
Expand Down
6 changes: 3 additions & 3 deletions eth/downloader/queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ func dummyPeer(id string) *peerConnection {
}

func TestBasics(t *testing.T) {
q := newQueue(10)
q := newQueue(10, 10)
if !q.Idle() {
t.Errorf("new queue should be idle")
}
Expand Down Expand Up @@ -176,7 +176,7 @@ func TestBasics(t *testing.T) {
}

func TestEmptyBlocks(t *testing.T) {
q := newQueue(10)
q := newQueue(10, 10)

q.Prepare(1, FastSync)
// Schedule a batch of headers
Expand Down Expand Up @@ -245,7 +245,7 @@ func XTestDelivery(t *testing.T) {
if false {
log.SetDefault(log.NewLogger(slog.NewTextHandler(os.Stdout, nil)))
}
q := newQueue(10)
q := newQueue(10, 10)
var wg sync.WaitGroup
q.Prepare(1, FastSync)
wg.Add(1)
Expand Down
2 changes: 1 addition & 1 deletion eth/downloader/testchain_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ var (
)

// The common prefix of all test chains:
var testChainBase = newTestChain(blockCacheItems+200, testGenesis)
var testChainBase = newTestChain(blockCacheMaxItems+200, testGenesis)

// Different forks on top of the base chain:
var testChainForkLightA, testChainForkLightB, testChainForkHeavy *testChain
Expand Down