diff --git a/yarn-project/p2p/src/services/reqresp/batch-tx-requester/batch_tx_requester.test.ts b/yarn-project/p2p/src/services/reqresp/batch-tx-requester/batch_tx_requester.test.ts index 03f37cad2482..9cd2997d3a2a 100644 --- a/yarn-project/p2p/src/services/reqresp/batch-tx-requester/batch_tx_requester.test.ts +++ b/yarn-project/p2p/src/services/reqresp/batch-tx-requester/batch_tx_requester.test.ts @@ -8,12 +8,12 @@ import { type ISemaphore, Semaphore } from '@aztec/foundation/queue'; import { retryUntil } from '@aztec/foundation/retry'; import { sleep } from '@aztec/foundation/sleep'; import { DateProvider } from '@aztec/foundation/timer'; -import type { BlockProposal } from '@aztec/stdlib/p2p'; -import { PeerErrorSeverity } from '@aztec/stdlib/p2p'; +import { type BlockProposal, PeerErrorSeverity } from '@aztec/stdlib/p2p'; import { makeBlockHeader, makeBlockProposal } from '@aztec/stdlib/testing'; import { Tx, TxArray, TxHash, type TxValidationResult } from '@aztec/stdlib/tx'; import { describe, expect, it, jest } from '@jest/globals'; +import type { PeerId } from '@libp2p/interface'; import { type MockProxy, mock } from 'jest-mock-extended'; import { createSecp256k1PeerId } from '../../../index.js'; @@ -73,6 +73,21 @@ describe('BatchTxRequester', () => { }); }); + function sampleAllPeers(sampler: () => PeerId | undefined): string[] | undefined { + const seen = new Set(); + const ordered: string[] = []; + let currentPeer = sampler()?.toString(); + if (currentPeer === undefined) { + return undefined; + } + while (!seen.has(currentPeer)) { + seen.add(currentPeer); + ordered.push(currentPeer); + currentPeer = sampler()!.toString(); + } + return ordered; + } + describe('Dumb peers', () => { it('should create correct TX_BATCH_SIZE chunks with single dumb worker', async () => { const txCount = 16; @@ -228,7 +243,9 @@ describe('BatchTxRequester', () => { const peers = await Promise.all([createSecp256k1PeerId(), createSecp256k1PeerId(), createSecp256k1PeerId()]); connectionSampler.getPeerListSortedByConnectionCountAsc.mockReturnValue(peers); - const peerCollection = new TestPeerCollection(new PeerCollection(peers, undefined, new DateProvider())); + const peerCollection = new TestPeerCollection( + new PeerCollection(connectionSampler, undefined, new DateProvider()), + ); // Define which transactions each peer has (same as happy path) const peerTransactions = new Map([ @@ -373,7 +390,7 @@ describe('BatchTxRequester', () => { const peers = await Promise.all([createSecp256k1PeerId(), createSecp256k1PeerId()]); connectionSampler.getPeerListSortedByConnectionCountAsc.mockReturnValue(peers); - const peerCollection = new PeerCollection(peers, pinnedPeer, dateProvider); + const peerCollection = new PeerCollection(connectionSampler, pinnedPeer, dateProvider); const peerTransactions = new Map([ [peers[0].toString(), Array.from({ length: 16 }, (_, i) => i)], // peer1 has all transactions, peer2 none ]); @@ -404,9 +421,11 @@ describe('BatchTxRequester', () => { expect(result!.length).toBe(txCount); // Verify peer promotion behavior - expect(peerCollection.getSmartPeers().size).toBe(1); - expect(peerCollection.getSmartPeers()).toContain(peers[0].toString()); - expect(peerCollection.getSmartPeers()).not.toContain(peers[1].toString()); + const smartPeers = sampleAllPeers(peerCollection.nextSmartPeerToQuery.bind(peerCollection)); + expect(smartPeers).toBeDefined(); + expect(smartPeers!.length).toBe(1); + expect(smartPeers).toContain(peers[0].toString()); + expect(smartPeers).not.toContain(peers[1].toString()); // The exact release count depends on timing of concurrent async operations. // We verify a minimum of 7 releases which accounts for: @@ -420,7 +439,10 @@ describe('BatchTxRequester', () => { }); it('Should track smart peer collection behavior with multiple promotions', async () => { - const txCount = 20; + // With batch_size=8, 3 workers handle batches 0-7, 8-15, 16-23 in the first round. + // Using 30 txs ensures txs 24-29 remain unfetched after the dumb round, + // so every peer still has unique missing txs when decideIfPeerIsSmart runs. + const txCount = 30; const deadline = 3_000; const dateProvider = new DateProvider(); const pinnedPeer = undefined; @@ -436,13 +458,14 @@ describe('BatchTxRequester', () => { const peers = await Promise.all([createSecp256k1PeerId(), createSecp256k1PeerId(), createSecp256k1PeerId()]); connectionSampler.getPeerListSortedByConnectionCountAsc.mockReturnValue(peers); - const peerCollection = new PeerCollection(peers, pinnedPeer, dateProvider); + const peerCollection = new PeerCollection(connectionSampler, pinnedPeer, dateProvider); - // Define which transactions each peer has + // Each peer has txs spanning beyond its assigned batch, so after its batch is + // processed it still has txs that are missing → gets promoted to smart. const peerTransactions = new Map([ - [peers[0].toString(), Array.from({ length: 10 }, (_, i) => i)], // peer1: txs 0-9 - [peers[1].toString(), Array.from({ length: 10 }, (_, i) => i + 5)], // peer2: txs 5-14 - [peers[2].toString(), Array.from({ length: 10 }, (_, i) => i + 10)], // peer3: txs 10-19 + [peers[0].toString(), Array.from({ length: 15 }, (_, i) => i)], // peer0: txs 0-14 + [peers[1].toString(), Array.from({ length: 15 }, (_, i) => i + 10)], // peer1: txs 10-24 + [peers[2].toString(), Array.from({ length: 10 }, (_, i) => i + 20)], // peer2: txs 20-29 ]); const { mockImplementation } = createRequestLogger(blockProposal, new Set(), peerTransactions); @@ -471,7 +494,7 @@ describe('BatchTxRequester', () => { expect(result!.length).toBe(txCount); // Verify all peers were promoted to smart - expect(peerCollection.getSmartPeers().size).toBe(peers.length); + expect(sampleAllPeers(peerCollection.nextSmartPeerToQuery.bind(peerCollection))!.length).toBe(peers.length); expect(semaphore.acquiredCount).toBe(3); }); @@ -492,7 +515,7 @@ describe('BatchTxRequester', () => { const peers = await Promise.all([createSecp256k1PeerId(), createSecp256k1PeerId(), createSecp256k1PeerId()]); connectionSampler.getPeerListSortedByConnectionCountAsc.mockReturnValue(peers); - const peerCollection = new PeerCollection(peers, pinnedPeer, dateProvider); + const peerCollection = new PeerCollection(connectionSampler, pinnedPeer, dateProvider); // Define which transactions each peer has const peerTransactions = new Map([ @@ -552,7 +575,7 @@ describe('BatchTxRequester', () => { connectionSampler.getPeerListSortedByConnectionCountAsc.mockReturnValue(peers); - const peerCollection = new PeerCollection(peers, pinnedPeer, dateProvider); + const peerCollection = new PeerCollection(connectionSampler, pinnedPeer, dateProvider); // Mock implementation that makes peer0 fail consistently, peer1 succeed const { mockImplementation } = createRequestLogger( @@ -585,7 +608,7 @@ describe('BatchTxRequester', () => { expect(peerCollection.getBadPeers()).not.toContain(peers[1].toString()); // Verify bad peer is excluded from queries - peer0 should be in bad peers - expect(peerCollection.getDumbPeersToQuery()).not.toContain(peers[0].toString()); + expect(peerCollection.nextDumbPeerToQuery()).toBeUndefined(); }); it('should recover bad peer after successful response', async () => { @@ -605,7 +628,7 @@ describe('BatchTxRequester', () => { const peers = await Promise.all([createSecp256k1PeerId(), createSecp256k1PeerId()]); connectionSampler.getPeerListSortedByConnectionCountAsc.mockReturnValue(peers); - const peerCollection = new PeerCollection(peers, pinnedPeer, dateProvider); + const peerCollection = new PeerCollection(connectionSampler, pinnedPeer, dateProvider); let requestCount = 0; // Mock implementation: first 4 requests fail (exceed threshold), then succeed @@ -656,7 +679,7 @@ describe('BatchTxRequester', () => { // Verify peer was initially marked bad but then recovered // Since peer succeeded in the end, it should not be in bad peers list expect(peerCollection.getBadPeers()).not.toContain(peers[0].toString()); - expect(peerCollection.getDumbPeersToQuery()).toContain(peers[0].toString()); + expect(sampleAllPeers(peerCollection.nextDumbPeerToQuery.bind(peerCollection))).toContain(peers[0].toString()); }); it('should handle multiple peers with different bad peer states', async () => { @@ -687,7 +710,7 @@ describe('BatchTxRequester', () => { ]); const semaphore = new TestSemaphore(new Semaphore(0)); - const peerCollection = new PeerCollection(peers, pinnedPeer, dateProvider); + const peerCollection = new PeerCollection(connectionSampler, pinnedPeer, dateProvider); const peerRequestCounts = new Map(); // eslint-disable-next-line require-await @@ -776,7 +799,7 @@ describe('BatchTxRequester', () => { expect(peerCollection.getBadPeers()).not.toContain(peers[2].toString()); // peer2: recovered // Verify query availability - const dumbPeersToQuery = peerCollection.getDumbPeersToQuery(); + const dumbPeersToQuery = sampleAllPeers(peerCollection.nextDumbPeerToQuery.bind(peerCollection)); expect(dumbPeersToQuery).not.toContain(peers[0].toString()); // bad peer excluded expect(dumbPeersToQuery).toContain(peers[1].toString()); // good peer included expect(dumbPeersToQuery).toContain(peers[2].toString()); // recovered peer included @@ -796,31 +819,35 @@ describe('BatchTxRequester', () => { const peers = await Promise.all([createSecp256k1PeerId(), createSecp256k1PeerId()]); connectionSampler.getPeerListSortedByConnectionCountAsc.mockReturnValue(peers); - const peerCollection = new TestPeerCollection(new PeerCollection(peers, pinnedPeer, clock)); + const peerCollection = new TestPeerCollection(new PeerCollection(connectionSampler, pinnedPeer, clock)); // Manually mark peer as rate limited peerCollection.markPeerRateLimitExceeded(peers[0]); // Verify peer is initially rate limited and excluded expect(peerCollection.getRateLimitExceededPeers()).toContain(peers[0].toString()); - expect(peerCollection.getDumbPeersToQuery()).not.toContain(peers[0].toString()); + expect(sampleAllPeers(peerCollection.nextDumbPeerToQuery.bind(peerCollection))).not.toContain( + peers[0].toString(), + ); // Test TTL behavior at different time points // Just before TTL expiration: still rate limited clock.advanceTo(RATE_LIMIT_EXCEEDED_PEER_CACHE_TTL - 1); expect(peerCollection.getRateLimitExceededPeers()).toContain(peers[0].toString()); - expect(peerCollection.getDumbPeersToQuery()).not.toContain(peers[0].toString()); + expect(sampleAllPeers(peerCollection.nextDumbPeerToQuery.bind(peerCollection))).not.toContain( + peers[0].toString(), + ); // Right at TTL expiration: not rate limited anymore clock.advanceTo(RATE_LIMIT_EXCEEDED_PEER_CACHE_TTL); expect(peerCollection.getRateLimitExceededPeers()).not.toContain(peers[0].toString()); - expect(peerCollection.getDumbPeersToQuery()).toContain(peers[0].toString()); + expect(sampleAllPeers(peerCollection.nextDumbPeerToQuery.bind(peerCollection))).toContain(peers[0].toString()); // After TTL expiration: not rate limited anymore clock.advanceTo(RATE_LIMIT_EXCEEDED_PEER_CACHE_TTL + 1); expect(peerCollection.getRateLimitExceededPeers()).not.toContain(peers[0].toString()); - expect(peerCollection.getDumbPeersToQuery()).toContain(peers[0].toString()); + expect(sampleAllPeers(peerCollection.nextDumbPeerToQuery.bind(peerCollection))).toContain(peers[0].toString()); // Test multiple rate limit cycles peerCollection.markPeerRateLimitExceeded(peers[0]); // Rate limit again @@ -828,7 +855,7 @@ describe('BatchTxRequester', () => { clock.advanceTo(clock.now() + RATE_LIMIT_EXCEEDED_PEER_CACHE_TTL + 1); expect(peerCollection.getRateLimitExceededPeers()).not.toContain(peers[0].toString()); - expect(peerCollection.getDumbPeersToQuery()).toContain(peers[0].toString()); + expect(sampleAllPeers(peerCollection.nextDumbPeerToQuery.bind(peerCollection))).toContain(peers[0].toString()); }); it('should exclude rate limited peer from queries and recover after TTL expiration', async () => { @@ -848,7 +875,7 @@ describe('BatchTxRequester', () => { const peers = await Promise.all([createSecp256k1PeerId(), createSecp256k1PeerId()]); connectionSampler.getPeerListSortedByConnectionCountAsc.mockReturnValue(peers); - const innerPeerCollection = new PeerCollection(peers, pinnedPeer, clock); + const innerPeerCollection = new PeerCollection(connectionSampler, pinnedPeer, clock); const peerCollection = new TestPeerCollection(innerPeerCollection); const peerTransactions = new Map([ @@ -919,8 +946,8 @@ describe('BatchTxRequester', () => { // Verify peer0 is no longer rate limited after TTL expiration expect(peerCollection.getRateLimitExceededPeers().size).toBe(0); - expect(peerCollection.getDumbPeersToQuery()).toContain(peers[0].toString()); - expect(peerCollection.getDumbPeersToQuery()).toContain(peers[1].toString()); + expect(sampleAllPeers(peerCollection.nextDumbPeerToQuery.bind(peerCollection))).toContain(peers[0].toString()); + expect(sampleAllPeers(peerCollection.nextDumbPeerToQuery.bind(peerCollection))).toContain(peers[1].toString()); }); }); @@ -1133,7 +1160,7 @@ describe('BatchTxRequester', () => { const peers = await Promise.all([createSecp256k1PeerId(), createSecp256k1PeerId()]); connectionSampler.getPeerListSortedByConnectionCountAsc.mockReturnValue(peers); - const peerCollection = new TestPeerCollection(new PeerCollection(peers, undefined, clock)); + const peerCollection = new TestPeerCollection(new PeerCollection(connectionSampler, undefined, clock)); const peerTransactions = new Map([[peers[0].toString(), Array.from({ length: txCount / 2 }, (_, i) => i)]]); const { mockImplementation } = createRequestLogger(blockProposal, new Set(), peerTransactions, 100); @@ -1195,7 +1222,9 @@ describe('BatchTxRequester', () => { const peers = await Promise.all([createSecp256k1PeerId(), createSecp256k1PeerId()]); connectionSampler.getPeerListSortedByConnectionCountAsc.mockReturnValue(peers); - const peerCollection = new TestPeerCollection(new PeerCollection(peers, undefined, new DateProvider())); + const peerCollection = new TestPeerCollection( + new PeerCollection(connectionSampler, undefined, new DateProvider()), + ); // Define which transactions each peer has const peerTransactions = new Map([ @@ -1525,7 +1554,9 @@ describe('BatchTxRequester', () => { connectionSampler.getPeerListSortedByConnectionCountAsc.mockReturnValue(peers); const [pinnedPeer, regularPeer] = peers; - const peerCollection = new TestPeerCollection(new PeerCollection(peers, pinnedPeer, new DateProvider())); + const peerCollection = new TestPeerCollection( + new PeerCollection(connectionSampler, pinnedPeer, new DateProvider()), + ); // Both peers have all transactions const peerTransactions = new Map([ @@ -1555,8 +1586,9 @@ describe('BatchTxRequester', () => { await BatchTxRequester.collectAllTxs(requester.run()); // Verify pinned peer was never marked as smart - expect(peerCollection.getSmartPeers()).not.toContain(pinnedPeer.toString()); - expect(peerCollection.getSmartPeersToQuery()).not.toContain(pinnedPeer.toString()); + expect(sampleAllPeers(peerCollection.nextSmartPeerToQuery.bind(peerCollection))).not.toContain( + pinnedPeer.toString(), + ); }); it('should handle pinned peer being rate limited and recover', async () => { @@ -1574,7 +1606,7 @@ describe('BatchTxRequester', () => { const peers = await Promise.all([createSecp256k1PeerId(), createSecp256k1PeerId()]); const [pinnedPeer, regularPeer] = peers; - const peerCollection = new TestPeerCollection(new PeerCollection(peers, pinnedPeer, clock)); + const peerCollection = new TestPeerCollection(new PeerCollection(connectionSampler, pinnedPeer, clock)); connectionSampler.getPeerListSortedByConnectionCountAsc.mockReturnValue(peers); @@ -1660,7 +1692,9 @@ describe('BatchTxRequester', () => { const peers = await Promise.all([createSecp256k1PeerId(), createSecp256k1PeerId()]); const [pinnedPeer, regularPeer] = peers; - const peerCollection = new TestPeerCollection(new PeerCollection(peers, pinnedPeer, new DateProvider())); + const peerCollection = new TestPeerCollection( + new PeerCollection(connectionSampler, pinnedPeer, new DateProvider()), + ); connectionSampler.getPeerListSortedByConnectionCountAsc.mockReturnValue(peers); @@ -1755,6 +1789,261 @@ describe('BatchTxRequester', () => { }); }); +describe('PeerCollection - Dynamic peer list', () => { + it('should reflect new peers joining', async () => { + const connectionSampler = mock(); + const dateProvider = new DateProvider(); + + const [peer1, peer2, peer3] = await Promise.all([ + createSecp256k1PeerId(), + createSecp256k1PeerId(), + createSecp256k1PeerId(), + ]); + + // Start with 2 peers + connectionSampler.getPeerListSortedByConnectionCountAsc.mockReturnValue([peer1, peer2]); + const pc = new PeerCollection(connectionSampler, undefined, dateProvider); + + assertPeerSequence(pc.nextDumbPeerToQuery.bind(pc), [peer1, peer2]); + + // A third peer joins + connectionSampler.getPeerListSortedByConnectionCountAsc.mockReturnValue([peer1, peer2, peer3]); + + // peer3 is the first unqueried peer (peer1 and peer2 were already sampled), + // then the round resets and continues from peer1. + assertPeerSequence(pc.nextDumbPeerToQuery.bind(pc), [peer3, peer1, peer2]); + }); + + it('should reflect peers leaving', async () => { + const connectionSampler = mock(); + const dateProvider = new DateProvider(); + + const [peer1, peer2, peer3] = await Promise.all([ + createSecp256k1PeerId(), + createSecp256k1PeerId(), + createSecp256k1PeerId(), + ]); + + connectionSampler.getPeerListSortedByConnectionCountAsc.mockReturnValue([peer1, peer2, peer3]); + const pc = new PeerCollection(connectionSampler, undefined, dateProvider); + + // peer2 disconnects + connectionSampler.getPeerListSortedByConnectionCountAsc.mockReturnValue([peer1, peer3]); + + assertPeerSequence(pc.nextDumbPeerToQuery.bind(pc), [peer1, peer3]); + }); + + it('should reflect new smart peers joining', async () => { + const connectionSampler = mock(); + const dateProvider = new DateProvider(); + + const [peer1, peer2, peer3] = await Promise.all([ + createSecp256k1PeerId(), + createSecp256k1PeerId(), + createSecp256k1PeerId(), + ]); + + // Start with 2 peers, both marked smart + connectionSampler.getPeerListSortedByConnectionCountAsc.mockReturnValue([peer1, peer2]); + const pc = new PeerCollection(connectionSampler, undefined, dateProvider); + pc.markPeerSmart(peer1); + pc.markPeerSmart(peer2); + + assertPeerSequence(pc.nextSmartPeerToQuery.bind(pc), [peer1, peer2]); + + // A third smart peer joins + connectionSampler.getPeerListSortedByConnectionCountAsc.mockReturnValue([peer1, peer2, peer3]); + pc.markPeerSmart(peer3); + + // peer3 is the first unqueried smart peer (peer1 and peer2 were already sampled), + // then the round resets and continues from peer1. + assertPeerSequence(pc.nextSmartPeerToQuery.bind(pc), [peer3, peer1, peer2]); + }); + + it('should reflect smart peers leaving', async () => { + const connectionSampler = mock(); + const dateProvider = new DateProvider(); + + const [peer1, peer2, peer3] = await Promise.all([ + createSecp256k1PeerId(), + createSecp256k1PeerId(), + createSecp256k1PeerId(), + ]); + + connectionSampler.getPeerListSortedByConnectionCountAsc.mockReturnValue([peer1, peer2, peer3]); + const pc = new PeerCollection(connectionSampler, undefined, dateProvider); + pc.markPeerSmart(peer1); + pc.markPeerSmart(peer2); + pc.markPeerSmart(peer3); + + assertPeerSequence(pc.nextSmartPeerToQuery.bind(pc), [peer1, peer2, peer3]); + + // peer2 disconnects + connectionSampler.getPeerListSortedByConnectionCountAsc.mockReturnValue([peer1, peer3]); + + assertPeerSequence(pc.nextSmartPeerToQuery.bind(pc), [peer1, peer3]); + }); + + it('should retain smart status after disconnect and reconnect', async () => { + const connectionSampler = mock(); + const dateProvider = new DateProvider(); + + const [peer1, peer2] = await Promise.all([createSecp256k1PeerId(), createSecp256k1PeerId()]); + + connectionSampler.getPeerListSortedByConnectionCountAsc.mockReturnValue([peer1, peer2]); + const pc = new PeerCollection(connectionSampler, undefined, dateProvider); + pc.markPeerSmart(peer1); + + // peer1 disconnects — no longer available as smart + connectionSampler.getPeerListSortedByConnectionCountAsc.mockReturnValue([peer2]); + + expect(pc.nextSmartPeerToQuery()).toBeUndefined(); + + // peer1 reconnects — should still be smart + connectionSampler.getPeerListSortedByConnectionCountAsc.mockReturnValue([peer1, peer2]); + + expect(pc.nextSmartPeerToQuery()?.toString()).toBe(peer1.toString()); + // peer2 should still be dumb + expect(pc.nextDumbPeerToQuery()?.toString()).toBe(peer2.toString()); + }); + + it('should return undefined when all peers leave', async () => { + const connectionSampler = mock(); + const dateProvider = new DateProvider(); + + const [peer1] = await Promise.all([createSecp256k1PeerId()]); + + connectionSampler.getPeerListSortedByConnectionCountAsc.mockReturnValue([peer1]); + const pc = new PeerCollection(connectionSampler, undefined, dateProvider); + + expect(pc.nextDumbPeerToQuery()).not.toBeUndefined(); + + // All peers disconnect + connectionSampler.getPeerListSortedByConnectionCountAsc.mockReturnValue([]); + + expect(pc.nextDumbPeerToQuery()).toBeUndefined(); + expect(pc.nextSmartPeerToQuery()).toBeUndefined(); + }); + + it('should recover when all peers disconnect and more peers reconnect', async () => { + const connectionSampler = mock(); + const dateProvider = new DateProvider(); + + const peers = await Promise.all(Array.from({ length: 6 }, () => createSecp256k1PeerId())); + const [peer1, peer2, peer3, peer4, peer5, peer6] = peers; + + // Start with 3 peers + connectionSampler.getPeerListSortedByConnectionCountAsc.mockReturnValue([peer1, peer2, peer3]); + const pc = new PeerCollection(connectionSampler, undefined, dateProvider); + + // Sample one peer before disconnection + const firstSampled = pc.nextDumbPeerToQuery(); + expect(firstSampled).not.toBeUndefined(); + + // All peers disconnect + connectionSampler.getPeerListSortedByConnectionCountAsc.mockReturnValue([]); + + expect(pc.nextDumbPeerToQuery()).toBeUndefined(); + expect(pc.nextDumbPeerToQuery()).toBeUndefined(); + + // Original 3 plus 3 new peers connect + connectionSampler.getPeerListSortedByConnectionCountAsc.mockReturnValue([peer1, peer2, peer3, peer4, peer5, peer6]); + + // peer1 was already sampled before disconnect, so it appears last in the first cycle + assertPeerSequence(pc.nextDumbPeerToQuery.bind(pc), [peer2, peer3, peer4, peer5, peer6, peer1]); + }); + + it('should exclude pinned peer dumb sampling, and smart sampling', async () => { + const connectionSampler = mock(); + const dateProvider = new DateProvider(); + + const [peer1, peer2, pinnedPeer] = await Promise.all([ + createSecp256k1PeerId(), + createSecp256k1PeerId(), + createSecp256k1PeerId(), + ]); + + // Connection sampler returns all 3 peers including the pinned one + connectionSampler.getPeerListSortedByConnectionCountAsc.mockReturnValue([pinnedPeer, peer1, peer2]); + const pc = new PeerCollection(connectionSampler, pinnedPeer, dateProvider); + + // Pinned peer is excluded from dumb sampling + assertPeerSequence(pc.nextDumbPeerToQuery.bind(pc), [peer1, peer2]); + + // Mark all peers as smart (including pinned) + pc.markPeerSmart(peer1); + pc.markPeerSmart(peer2); + pc.markPeerSmart(pinnedPeer); + + // Pinned peer is excluded from smart sampling + assertPeerSequence(pc.nextSmartPeerToQuery.bind(pc), [peer1, peer2]); + }); + + it('should exclude bad, in-flight, and rate-limited peers from available counts', async () => { + const connectionSampler = mock(); + const clock = new TestClock(); + + const [peer1, peer2, peer3, peer4] = await Promise.all([ + createSecp256k1PeerId(), + createSecp256k1PeerId(), + createSecp256k1PeerId(), + createSecp256k1PeerId(), + ]); + + connectionSampler.getPeerListSortedByConnectionCountAsc.mockReturnValue([peer1, peer2, peer3, peer4]); + const pc = new PeerCollection(connectionSampler, undefined, clock, /* badPeerThreshold */ 0); + + // All 4 are dumb initially + assertPeerSequence(pc.nextDumbPeerToQuery.bind(pc), [peer1, peer2, peer3, peer4]); + + // Mark peer1 as bad (threshold=0 means first penalty marks as bad) + pc.penalisePeer(peer1, PeerErrorSeverity.HighToleranceError); + assertPeerSequence(pc.nextDumbPeerToQuery.bind(pc), [peer2, peer3, peer4]); + + // Mark peer2 as in-flight + pc.markPeerInFlight(peer2); + assertPeerSequence(pc.nextDumbPeerToQuery.bind(pc), [peer3, peer4]); + + // Mark peer3 as rate-limited + pc.markPeerRateLimitExceeded(peer3); + assertPeerSequence(pc.nextDumbPeerToQuery.bind(pc), [peer4]); + + // Now test smart counts: promote peer1-peer4 to smart + pc.markPeerSmart(peer1); + pc.markPeerSmart(peer2); + pc.markPeerSmart(peer3); + pc.markPeerSmart(peer4); + + // peer1 is bad, peer2 is in-flight, peer3 is rate-limited → only peer4 available + assertPeerSequence(pc.nextSmartPeerToQuery.bind(pc), [peer4]); + + // Undo exclusions and verify counts recover + pc.unMarkPeerAsBad(peer1); + pc.unMarkPeerInFlight(peer2); + clock.advanceTo(RATE_LIMIT_EXCEEDED_PEER_CACHE_TTL); + + expect(pc.nextDumbPeerToQuery()).toBeUndefined(); + expect(pc.nextSmartPeerToQuery()?.toString()).toBe(peer1.toString()); + expect(pc.nextSmartPeerToQuery()?.toString()).toBe(peer2.toString()); + expect(pc.nextSmartPeerToQuery()?.toString()).toBe(peer3.toString()); + + assertPeerSequence(pc.nextSmartPeerToQuery.bind(pc), [peer1, peer2, peer3, peer4]); // all are smart now + }); + + function assertPeerSequence(sampler: () => PeerId | undefined, expectedPeers: PeerId[] | string[]) { + for (let i: number = 0; i < expectedPeers.length; i++) { + const currentPeer = sampler()?.toString(); + expect(currentPeer).toBe(expectedPeers[i].toString()); + } + + // We need to loop twice to be sure that we don't have any extra peers. + for (let i: number = 0; i < expectedPeers.length; i++) { + const currentPeer = sampler()?.toString(); + expect(currentPeer).toBe(expectedPeers[i].toString()); + } + } +}); + const makeTx = (txHash?: string | TxHash) => Tx.random({ txHash }) as Tx; const createRequestLogger = ( @@ -1845,25 +2134,17 @@ export class TestPeerCollection implements IPeerCollection { constructor(private readonly inner: PeerCollection) {} - getAllPeers(): Set { - return this.inner.getAllPeers(); - } - - getSmartPeers(): Set { - return this.inner.getSmartPeers(); - } - markPeerSmart(peerId: any): void { this.smartPeersMarked.push(peerId.toString()); return this.inner.markPeerSmart(peerId); } - getSmartPeersToQuery(): Array { - return this.inner.getSmartPeersToQuery(); + nextSmartPeerToQuery(): PeerId | undefined { + return this.inner.nextSmartPeerToQuery(); } - getDumbPeersToQuery(): Array { - return this.inner.getDumbPeersToQuery(); + nextDumbPeerToQuery(): PeerId | undefined { + return this.inner.nextDumbPeerToQuery(); } thereAreSomeDumbRatelimitExceededPeers(): boolean { diff --git a/yarn-project/p2p/src/services/reqresp/batch-tx-requester/batch_tx_requester.ts b/yarn-project/p2p/src/services/reqresp/batch-tx-requester/batch_tx_requester.ts index 309b41c3f4bd..d468b06ef7c3 100644 --- a/yarn-project/p2p/src/services/reqresp/batch-tx-requester/batch_tx_requester.ts +++ b/yarn-project/p2p/src/services/reqresp/batch-tx-requester/batch_tx_requester.ts @@ -8,7 +8,6 @@ import { PeerErrorSeverity } from '@aztec/stdlib/p2p'; import { Tx, TxArray, TxHash } from '@aztec/stdlib/tx'; import type { PeerId } from '@libp2p/interface'; -import { peerIdFromString } from '@libp2p/peer-id'; import type { IMissingTxsTracker } from '../../tx_collection/missing_txs_tracker.js'; import { ReqRespSubProtocol } from '.././interface.js'; @@ -90,10 +89,9 @@ export class BatchTxRequester { if (this.opts.peerCollection) { this.peers = this.opts.peerCollection; } else { - const initialPeers = this.p2pService.connectionSampler.getPeerListSortedByConnectionCountAsc(); const badPeerThreshold = this.opts.badPeerThreshold ?? DEFAULT_BATCH_TX_REQUESTER_BAD_PEER_THRESHOLD; this.peers = new PeerCollection( - initialPeers, + this.p2pService.connectionSampler, this.pinnedPeer, this.dateProvider, badPeerThreshold, @@ -227,7 +225,6 @@ export class BatchTxRequester { * Starts dumb worker loops * */ private async dumbRequester() { - const nextPeerIndex = this.makeRoundRobinIndexer(); const nextBatchIndex = this.makeRoundRobinIndexer(); // Chunk missing tx hashes into batches of txBatchSize, wrapping around to ensure no peer gets less than txBatchSize @@ -263,15 +260,9 @@ export class BatchTxRequester { return { blockRequest, txs }; }; - const nextPeer = () => { - const peers = this.peers.getDumbPeersToQuery(); - const idx = nextPeerIndex(() => peers.length); - return idx === undefined ? undefined : peerIdFromString(peers[idx]); - }; - - const workerCount = Math.min(this.dumbParallelWorkerCount, this.peers.getAllPeers().size); + const workerCount = this.dumbParallelWorkerCount; const workers = Array.from({ length: workerCount }, (_, index) => - this.dumbWorkerLoop(nextPeer, makeRequest, index + 1), + this.dumbWorkerLoop(this.peers.nextDumbPeerToQuery.bind(this.peers), makeRequest, index + 1), ); await Promise.allSettled(workers); @@ -332,14 +323,6 @@ export class BatchTxRequester { * Starts smart worker loops * */ private async smartRequester() { - const nextPeerIndex = this.makeRoundRobinIndexer(); - - const nextPeer = () => { - const peers = this.peers.getSmartPeersToQuery(); - const idx = nextPeerIndex(() => peers.length); - return idx === undefined ? undefined : peerIdFromString(peers[idx]); - }; - const makeRequest = (pid: PeerId) => { const txs = this.txsMetadata.getTxsToRequestFromThePeer(pid); const blockRequest = BlockTxsRequest.fromTxsSourceAndMissingTxs(this.blockTxsSource, txs); @@ -350,9 +333,8 @@ export class BatchTxRequester { return { blockRequest, txs }; }; - const workers = Array.from( - { length: Math.min(this.smartParallelWorkerCount, this.peers.getAllPeers().size) }, - (_, index) => this.smartWorkerLoop(nextPeer, makeRequest, index + 1), + const workers = Array.from({ length: this.smartParallelWorkerCount }, (_, index) => + this.smartWorkerLoop(this.peers.nextSmartPeerToQuery.bind(this.peers), makeRequest, index + 1), ); await Promise.allSettled(workers); @@ -387,26 +369,18 @@ export class BatchTxRequester { if (weRanOutOfPeersToQuery) { this.logger.debug(`Worker loop smart: No more peers to query`); - // If there are no more dumb peers to query then none of our peers can become smart, - // thus we can simply exit this worker - const noMoreDumbPeersToQuery = this.peers.getDumbPeersToQuery().length === 0; - if (noMoreDumbPeersToQuery) { - // These might be either smart peers that will get unblocked after _some time_ - const nextSmartPeerDelay = this.peers.getNextSmartPeerAvailabilityDelayMs(); - const thereAreSomeRateLimitedSmartPeers = nextSmartPeerDelay !== undefined; - if (thereAreSomeRateLimitedSmartPeers) { - await this.sleepClampedToDeadline(nextSmartPeerDelay); - continue; - } - - this.logger.debug(`Worker loop smart: No more smart peers to query killing ${workerIndex}`); - break; + // If we have rate limited peers wait for them. + const nextSmartPeerDelay = this.peers.getNextSmartPeerAvailabilityDelayMs(); + const thereAreSomeRateLimitedSmartPeers = nextSmartPeerDelay !== undefined; + if (thereAreSomeRateLimitedSmartPeers) { + await this.sleepClampedToDeadline(nextSmartPeerDelay); + continue; } - // Otherwise there are still some dumb peers that could become smart. // We end up here when all known smart peers became temporarily unavailable via combination of // (bad, in-flight, or rate-limited) or in some weird scenario all current smart peers turn bad which is permanent - // but dumb peers still exist that could become smart. + // but there are dumb peers that could be promoted + // or new peer can join as dumb and be promoted later // // When a dumb peer responds with valid txIndices, it gets // promoted to smart and releases the semaphore, waking this worker. @@ -599,9 +573,7 @@ export class BatchTxRequester { this.markTxsPeerHas(peerId, response); // Unblock smart workers - if (this.peers.getSmartPeersToQuery().length <= this.smartParallelWorkerCount) { - this.smartRequesterSemaphore.release(); - } + this.smartRequesterSemaphore.release(); } private isBlockResponseValid(response: BlockTxsResponse): boolean { diff --git a/yarn-project/p2p/src/services/reqresp/batch-tx-requester/peer_collection.ts b/yarn-project/p2p/src/services/reqresp/batch-tx-requester/peer_collection.ts index ca8e1c4b61c0..4c7baec5316d 100644 --- a/yarn-project/p2p/src/services/reqresp/batch-tx-requester/peer_collection.ts +++ b/yarn-project/p2p/src/services/reqresp/batch-tx-requester/peer_collection.ts @@ -2,18 +2,22 @@ import type { DateProvider } from '@aztec/foundation/timer'; import type { PeerErrorSeverity } from '@aztec/stdlib/p2p'; import type { PeerId } from '@libp2p/interface'; +import { peerIdFromString } from '@libp2p/peer-id'; +import type { ConnectionSampler } from '../connection-sampler/connection_sampler.js'; import { DEFAULT_BATCH_TX_REQUESTER_BAD_PEER_THRESHOLD } from './config.js'; import type { IPeerPenalizer } from './interface.js'; export const RATE_LIMIT_EXCEEDED_PEER_CACHE_TTL = 1000; // 1s export interface IPeerCollection { - getAllPeers(): Set; - getSmartPeers(): Set; markPeerSmart(peerId: PeerId): void; - getSmartPeersToQuery(): Array; - getDumbPeersToQuery(): Array; + + /** Sample next peer in round-robin fashion. No smart peers if returns undefined */ + nextSmartPeerToQuery(): PeerId | undefined; + /** Sample next peer in round-robin fashion. No dumb peers if returns undefined */ + nextDumbPeerToQuery(): PeerId | undefined; + thereAreSomeDumbRatelimitExceededPeers(): boolean; penalisePeer(peerId: PeerId, severity: PeerErrorSeverity): void; unMarkPeerAsBad(peerId: PeerId): void; @@ -28,8 +32,6 @@ export interface IPeerCollection { } export class PeerCollection implements IPeerCollection { - private readonly peers; - private readonly smartPeers = new Set(); private readonly inFlightPeers = new Set(); private readonly rateLimitExceededPeers = new Map(); @@ -37,46 +39,60 @@ export class PeerCollection implements IPeerCollection { private readonly badPeers = new Set(); constructor( - initialPeers: PeerId[], + private readonly connectionSampler: Pick, private readonly pinnedPeerId: PeerId | undefined, private readonly dateProvider: DateProvider, private readonly badPeerThreshold: number = DEFAULT_BATCH_TX_REQUESTER_BAD_PEER_THRESHOLD, private readonly peerPenalizer?: IPeerPenalizer, ) { - this.peers = new Set(initialPeers.map(peer => peer.toString())); - - // Pinned peer is treaded specially, always mark it as in-flight + // Pinned peer is treated specially, always mark it as in-flight // and never return it as part of smart/dumb peers if (this.pinnedPeerId) { const peerIdStr = this.pinnedPeerId.toString(); this.inFlightPeers.add(peerIdStr); - this.peers.delete(peerIdStr); } } - public getAllPeers(): Set { - return this.peers; + public markPeerSmart(peerId: PeerId): void { + this.smartPeers.add(peerId.toString()); } - public getSmartPeers(): Set { - return this.smartPeers; + // We keep track of all peers that are queried for peer sampling algorithm + private queriedSmartPeers: Set = new Set(); + private queriedDumbPeers: Set = new Set(); + + private static nextPeer(allPeers: Set, queried: Set): PeerId | undefined { + if (allPeers.size === 0) { + return undefined; + } + const availablePeers = allPeers.difference(queried); + let [first] = availablePeers; + if (first === undefined) { + // We queried all peers. Start over + [first] = allPeers; + queried.clear(); + } + queried.add(first); + return peerIdFromString(first); } - public markPeerSmart(peerId: PeerId): void { - this.smartPeers.add(peerId.toString()); + public nextSmartPeerToQuery(): PeerId | undefined { + return PeerCollection.nextPeer(this.availableSmartPeers, this.queriedSmartPeers); + } + + public nextDumbPeerToQuery(): PeerId | undefined { + return PeerCollection.nextPeer(this.availableDumbPeers, this.queriedDumbPeers); } - public getSmartPeersToQuery(): Array { - return Array.from( + private get availableSmartPeers(): Set { + return this.peers.intersection( this.smartPeers.difference(this.getBadPeers().union(this.inFlightPeers).union(this.getRateLimitExceededPeers())), ); } - public getDumbPeersToQuery(): Array { - return Array.from( - this.peers.difference( - this.smartPeers.union(this.getBadPeers()).union(this.inFlightPeers).union(this.getRateLimitExceededPeers()), - ), + private get availableDumbPeers(): Set { + return this.peers.difference( + this.smartPeers.union(this.getBadPeers()).union(this.inFlightPeers).union(this.getRateLimitExceededPeers()), ); } @@ -202,4 +218,27 @@ export class PeerCollection implements IPeerCollection { return minExpiry! - now; } + + private orderedPeers: Set = new Set(); + + private get peers(): Set { + const pinnedStr = this.pinnedPeerId?.toString(); + const currentlyConnected = new Set( + this.connectionSampler + .getPeerListSortedByConnectionCountAsc() + .map(p => p.toString()) + .filter(p => p !== pinnedStr), + ); + + // Remove disconnected peers, preserving order of the rest. + this.orderedPeers = this.orderedPeers.intersection(currentlyConnected); + + // Append newly connected peers at the end (lowest priority). + for (const peer of currentlyConnected) { + if (!this.orderedPeers.has(peer)) { + this.orderedPeers.add(peer); + } + } + return this.orderedPeers; + } }