diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go index 95db0cced448..df033f157882 100644 --- a/eth/downloader/downloader.go +++ b/eth/downloader/downloader.go @@ -213,7 +213,7 @@ func New(stateDb ethdb.Database, mux *event.TypeMux, chain BlockChain, lightchai dl := &Downloader{ stateDB: stateDb, mux: mux, - queue: newQueue(blockCacheItems), + queue: newQueue(blockCacheMaxItems, blockCacheInitialItems), peers: newPeerSet(), rttEstimate: uint64(rttMaxEstimate), rttConfidence: uint64(1000000), @@ -359,7 +359,7 @@ func (d *Downloader) synchronise(id string, hash common.Hash, td *big.Int, mode log.Info("Block synchronisation started") } // Reset the queue, peer set and wake channels to clean any internal leftover state - d.queue.Reset(blockCacheItems) + d.queue.Reset(blockCacheMaxItems, blockCacheInitialItems) d.peers.Reset() for _, ch := range []chan bool{d.bodyWakeCh, d.receiptWakeCh} { diff --git a/eth/downloader/downloader_test.go b/eth/downloader/downloader_test.go index 5e325fc59cac..50f28bc22f6b 100644 --- a/eth/downloader/downloader_test.go +++ b/eth/downloader/downloader_test.go @@ -39,7 +39,7 @@ import ( // Reduce some of the parameters to make the tester faster. func init() { MaxForkAncestry = uint64(10000) - blockCacheItems = 1024 + blockCacheMaxItems = 1024 fsHeaderContCheck = 500 * time.Millisecond } @@ -469,7 +469,7 @@ func testCanonicalSynchronisation(t *testing.T, protocol int, mode SyncMode) { defer tester.terminate() // Create a small enough block chain to download - chain := testChainBase.shorten(blockCacheItems - 15) + chain := testChainBase.shorten(blockCacheMaxItems - 15) tester.newPeer("peer", protocol, chain) // Synchronise with the peer and make sure all relevant data was retrieved @@ -531,8 +531,8 @@ func testThrottling(t *testing.T, protocol int, mode SyncMode) { } tester.lock.Unlock() - if cached == blockCacheItems || - cached == blockCacheItems-reorgProtHeaderDelay || + if cached == blockCacheMaxItems || + cached == blockCacheMaxItems-reorgProtHeaderDelay || retrieved+cached+frozen == targetBlocks+1 || retrieved+cached+frozen == targetBlocks+1-reorgProtHeaderDelay { break @@ -543,8 +543,8 @@ func testThrottling(t *testing.T, protocol int, mode SyncMode) { tester.lock.RLock() retrieved = len(tester.ownBlocks) tester.lock.RUnlock() - if cached != blockCacheItems && cached != blockCacheItems-reorgProtHeaderDelay && retrieved+cached+frozen != targetBlocks+1 && retrieved+cached+frozen != targetBlocks+1-reorgProtHeaderDelay { - t.Fatalf("block count mismatch: have %v, want %v (owned %v, blocked %v, target %v)", cached, blockCacheItems, retrieved, frozen, targetBlocks+1) + if cached != blockCacheMaxItems && cached != blockCacheMaxItems-reorgProtHeaderDelay && retrieved+cached+frozen != targetBlocks+1 && retrieved+cached+frozen != targetBlocks+1-reorgProtHeaderDelay { + t.Fatalf("block count mismatch: have %v, want %v (owned %v, blocked %v, target %v)", cached, blockCacheMaxItems, retrieved, frozen, targetBlocks+1) } // Permit the blocked blocks to import @@ -807,7 +807,7 @@ func testMultiProtoSync(t *testing.T, protocol int, mode SyncMode) { defer tester.terminate() // Create a small enough block chain to download - chain := testChainBase.shorten(blockCacheItems - 15) + chain := testChainBase.shorten(blockCacheMaxItems - 15) // Create peers of every type tester.newPeer("peer 62", 62, chain) @@ -897,7 +897,7 @@ func testMissingHeaderAttack(t *testing.T, protocol int, mode SyncMode) { tester := newTester() defer tester.terminate() - chain := testChainBase.shorten(blockCacheItems - 15) + chain := testChainBase.shorten(blockCacheMaxItems - 15) brokenChain := chain.shorten(chain.len()) delete(brokenChain.headerm, brokenChain.chain[brokenChain.len()/2]) tester.newPeer("attack", protocol, brokenChain) @@ -928,7 +928,7 @@ func testShiftedHeaderAttack(t *testing.T, protocol int, mode SyncMode) { tester := newTester() defer tester.terminate() - chain := testChainBase.shorten(blockCacheItems - 15) + chain := testChainBase.shorten(blockCacheMaxItems - 15) // Attempt a full sync with an attacker feeding shifted headers brokenChain := chain.shorten(chain.len()) @@ -1129,7 +1129,7 @@ func testSyncProgress(t *testing.T, protocol int, mode SyncMode) { tester := newTester() defer tester.terminate() - chain := testChainBase.shorten(blockCacheItems - 15) + chain := testChainBase.shorten(blockCacheMaxItems - 15) // Set a sync init hook to catch progress changes starting := make(chan struct{}) @@ -1290,7 +1290,7 @@ func testFailedSyncProgress(t *testing.T, protocol int, mode SyncMode) { tester := newTester() defer tester.terminate() - chain := testChainBase.shorten(blockCacheItems - 15) + chain := testChainBase.shorten(blockCacheMaxItems - 15) // Set a sync init hook to catch progress changes starting := make(chan struct{}) @@ -1362,7 +1362,7 @@ func testFakedSyncProgress(t *testing.T, protocol int, mode SyncMode) { tester := newTester() defer tester.terminate() - chain := testChainBase.shorten(blockCacheItems - 15) + chain := testChainBase.shorten(blockCacheMaxItems - 15) // Set a sync init hook to catch progress changes starting := make(chan struct{}) diff --git a/eth/downloader/queue.go b/eth/downloader/queue.go index 56c49ef7c205..6952499df551 100644 --- a/eth/downloader/queue.go +++ b/eth/downloader/queue.go @@ -40,9 +40,10 @@ const ( ) var ( - blockCacheItems = 8192 // Maximum number of blocks to cache before throttling the download - blockCacheMemory = 64 * 1024 * 1024 // Maximum amount of memory to use for block caching - blockCacheSizeWeight = 0.1 // Multiplier to approximate the average block size based on past ones + blockCacheMaxItems = 8192 // Maximum number of blocks to cache before throttling the download + blockCacheInitialItems = 2048 // Initial number of blocks to start fetching, before we know the sizes of the blocks + blockCacheMemory = 64 * 1024 * 1024 // Maximum amount of memory to use for block caching + blockCacheSizeWeight = 0.1 // Multiplier to approximate the average block size based on past ones ) var ( @@ -142,7 +143,7 @@ type queue struct { } // newQueue creates a new download queue for scheduling block retrieval. -func newQueue(blockCacheLimit int) *queue { +func newQueue(blockCacheLimit int, thresholdInitialSize int) *queue { lock := new(sync.RWMutex) q := &queue{ headerContCh: make(chan bool), @@ -151,12 +152,12 @@ func newQueue(blockCacheLimit int) *queue { active: sync.NewCond(lock), lock: lock, } - q.Reset(blockCacheLimit) + q.Reset(blockCacheLimit, thresholdInitialSize) return q } // Reset clears out the queue contents. -func (q *queue) Reset(blockCacheLimit int) { +func (q *queue) Reset(blockCacheLimit int, thresholdInitialSize int) { q.lock.Lock() defer q.lock.Unlock() @@ -175,6 +176,7 @@ func (q *queue) Reset(blockCacheLimit int) { q.receiptPendPool = make(map[string]*fetchRequest) q.resultCache = newResultStore(blockCacheLimit) + q.resultCache.SetThrottleThreshold(uint64(thresholdInitialSize)) } // Close marks the end of the sync, unblocking Results. diff --git a/eth/downloader/queue_test.go b/eth/downloader/queue_test.go index 4b30cc9c3683..132d254e1b89 100644 --- a/eth/downloader/queue_test.go +++ b/eth/downloader/queue_test.go @@ -99,7 +99,7 @@ func dummyPeer(id string) *peerConnection { } func TestBasics(t *testing.T) { - q := newQueue(10) + q := newQueue(10, 10) if !q.Idle() { t.Errorf("new queue should be idle") } @@ -176,7 +176,7 @@ func TestBasics(t *testing.T) { } func TestEmptyBlocks(t *testing.T) { - q := newQueue(10) + q := newQueue(10, 10) q.Prepare(1, FastSync) // Schedule a batch of headers @@ -245,7 +245,7 @@ func XTestDelivery(t *testing.T) { if false { log.SetDefault(log.NewLogger(slog.NewTextHandler(os.Stdout, nil))) } - q := newQueue(10) + q := newQueue(10, 10) var wg sync.WaitGroup q.Prepare(1, FastSync) wg.Add(1) diff --git a/eth/downloader/testchain_test.go b/eth/downloader/testchain_test.go index 9f864042ba5d..a117c09e6649 100644 --- a/eth/downloader/testchain_test.go +++ b/eth/downloader/testchain_test.go @@ -39,7 +39,7 @@ var ( ) // The common prefix of all test chains: -var testChainBase = newTestChain(blockCacheItems+200, testGenesis) +var testChainBase = newTestChain(blockCacheMaxItems+200, testGenesis) // Different forks on top of the base chain: var testChainForkLightA, testChainForkLightB, testChainForkHeavy *testChain