-
Notifications
You must be signed in to change notification settings - Fork 598
feat(p2p): batch request response #11331
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from all commits
Commits
Show all changes
7 commits
Select commit
Hold shift + click to select a range
032de72
feat: batch connection sampler
Maddiaa0 91f0f56
feat: batch request response requests across multiple peers
Maddiaa0 4c3e424
fmt
Maddiaa0 a5a2b76
fix: sample without replacement in reqresp
Maddiaa0 e8d51bb
fmt
Maddiaa0 e99d930
feat(p2p): reqresp spans (#11335)
Maddiaa0 e16757b
fix: test case where no peer is sampled
Maddiaa0 File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
178 changes: 178 additions & 0 deletions
178
yarn-project/p2p/src/services/reqresp/connection-sampler/batch_connection_sampler.test.ts
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,178 @@ | ||
| import { describe, expect, it, jest } from '@jest/globals'; | ||
| import { createSecp256k1PeerId } from '@libp2p/peer-id-factory'; | ||
| import { type Libp2p } from 'libp2p'; | ||
|
|
||
| import { BatchConnectionSampler } from './batch_connection_sampler.js'; | ||
| import { ConnectionSampler, type RandomSampler } from './connection_sampler.js'; | ||
|
|
||
| describe('BatchConnectionSampler', () => { | ||
| const mockRandomSampler = { | ||
| random: jest.fn(), | ||
| } as jest.Mocked<RandomSampler>; | ||
|
|
||
| let peers: Awaited<ReturnType<typeof createSecp256k1PeerId>>[]; | ||
| let libp2p: jest.Mocked<Libp2p>; | ||
| let connectionSampler: ConnectionSampler; | ||
|
|
||
| beforeEach(async () => { | ||
| jest.clearAllMocks(); | ||
|
|
||
| // Create a set of test peers | ||
| peers = await Promise.all(new Array(5).fill(0).map(() => createSecp256k1PeerId())); | ||
|
|
||
| // Mock libp2p to return our test peers | ||
| libp2p = { | ||
| getPeers: jest.fn().mockReturnValue(peers), | ||
| } as unknown as jest.Mocked<Libp2p>; | ||
|
|
||
| // Create a real connection sampler with mocked random sampling | ||
| connectionSampler = new ConnectionSampler(libp2p, 1000, mockRandomSampler); | ||
| }); | ||
|
|
||
| afterEach(async () => { | ||
| await connectionSampler.stop(); | ||
| }); | ||
|
|
||
| it('initializes with correct number of peers and request distribution', () => { | ||
| // Mock random to return sequential indices | ||
| mockRandomSampler.random.mockImplementation(_ => 0); | ||
|
|
||
| const sampler = new BatchConnectionSampler(connectionSampler, /* batchSize */ 10, /* maxPeers */ 3); | ||
|
|
||
| expect(sampler.activePeerCount).toBe(3); | ||
| expect(sampler.requestsPerBucket).toBe(3); // floor(10/3) = 3 | ||
| }); | ||
|
|
||
| it('assigns requests to peers deterministically with wraparound', () => { | ||
| // Mock to return first two peers | ||
| let callCount = 0; | ||
| mockRandomSampler.random.mockImplementation(() => callCount++ % 2); | ||
|
|
||
| // With 5 requests and 2 peers: | ||
| // floor(5/2) = 2 requests per peer | ||
| // Peer 0: 0,1,4 (gets extra from wraparound) | ||
| // Peer 1: 2,3 | ||
| const sampler = new BatchConnectionSampler(connectionSampler, /* batchSize */ 5, /* maxPeers */ 2); | ||
| const assignments = new Array(5).fill(0).map((_, i) => sampler.getPeerForRequest(i)); | ||
|
|
||
| // First peer gets first bucket and wraparound | ||
| expect(assignments[0]).toBe(peers[0]); // First bucket | ||
| expect(assignments[1]).toBe(peers[0]); // First bucket | ||
| expect(assignments[4]).toBe(peers[0]); // Wraparound | ||
|
|
||
| // Second peer gets middle bucket | ||
| expect(assignments[2]).toBe(peers[1]); | ||
| expect(assignments[3]).toBe(peers[1]); | ||
| }); | ||
|
|
||
| it('handles peer removal and replacement', () => { | ||
| mockRandomSampler.random.mockImplementation(_ => { | ||
| return 2; // Return index 2 for replacement peer | ||
| }); | ||
|
|
||
| // With 4 requests and 2 peers: | ||
| // floor(4/2) = 2 requests per peer | ||
| // Initial distribution: | ||
| // Peer 0: 0,1 | ||
| // Peer 1: 2,3 | ||
| const sampler = new BatchConnectionSampler(connectionSampler, /* batchSize */ 4, /* maxPeers */ 2); | ||
|
|
||
| const initialPeer = sampler.getPeerForRequest(0); | ||
| expect(initialPeer).toBe(peers[0]); | ||
|
|
||
| sampler.removePeerAndReplace(peers[0]); | ||
|
|
||
| // After replacement: | ||
| // Replacement peer should handle the same bucket | ||
| const newPeer = sampler.getPeerForRequest(0); | ||
| expect(newPeer).toBe(peers[2]); | ||
| expect(sampler.getPeerForRequest(1)).toBe(peers[2]); | ||
|
|
||
| // Other peer's bucket remains unchanged | ||
| expect(sampler.getPeerForRequest(2)).toBe(peers[1]); | ||
| expect(sampler.getPeerForRequest(3)).toBe(peers[1]); | ||
| }); | ||
|
|
||
| it('handles peer removal and replacement - no replacement available', () => { | ||
| mockRandomSampler.random.mockImplementation(() => 2); | ||
| const sampler = new BatchConnectionSampler(connectionSampler, /* batchSize */ 4, /* maxPeers */ 2); | ||
|
|
||
| expect(sampler.activePeerCount).toBe(2); | ||
| expect(sampler.getPeerForRequest(0)).toBe(peers[0]); | ||
|
|
||
| // Will sample no peers | ||
| libp2p.getPeers.mockReturnValue([]); | ||
|
|
||
| // Remove peer 0, its requests will be distributed to peer 1 | ||
| sampler.removePeerAndReplace(peers[0]); | ||
| // Decrease the number of active peers | ||
| expect(sampler.activePeerCount).toBe(1); | ||
|
|
||
| expect(sampler.getPeerForRequest(0)).toBe(peers[1]); | ||
| }); | ||
|
|
||
| it('distributes requests according to documentation example', () => { | ||
| let callCount = 0; | ||
| mockRandomSampler.random.mockImplementation(() => { | ||
| if (callCount < 3) { | ||
| return callCount++; | ||
| } | ||
| return 0; | ||
| }); | ||
|
|
||
| // Example from doc comment: | ||
| // Peers: [P1] [P2] [P3] | ||
| // Requests: 0,1,2,9 | 3,4,5 | 6,7,8 | ||
| const sampler = new BatchConnectionSampler(connectionSampler, /* batchSize */ 10, /* maxPeers */ 3); | ||
|
|
||
| expect(sampler.activePeerCount).toBe(3); | ||
| expect(sampler.requestsPerBucket).toBe(3); // floor(10/3) = 3 | ||
|
|
||
| // P1's bucket (0-2) plus wraparound (9) | ||
| expect(sampler.getPeerForRequest(0)).toBe(peers[0]); | ||
| expect(sampler.getPeerForRequest(1)).toBe(peers[0]); | ||
| expect(sampler.getPeerForRequest(2)).toBe(peers[0]); | ||
| expect(sampler.getPeerForRequest(9)).toBe(peers[0]); // Wraparound | ||
|
|
||
| // P2's bucket (3-5) | ||
| expect(sampler.getPeerForRequest(3)).toBe(peers[1]); | ||
| expect(sampler.getPeerForRequest(4)).toBe(peers[1]); | ||
| expect(sampler.getPeerForRequest(5)).toBe(peers[1]); | ||
|
|
||
| // P3's bucket (6-8) | ||
| expect(sampler.getPeerForRequest(6)).toBe(peers[2]); | ||
| expect(sampler.getPeerForRequest(7)).toBe(peers[2]); | ||
| expect(sampler.getPeerForRequest(8)).toBe(peers[2]); | ||
| }); | ||
|
|
||
| it('same number of requests per peers', () => { | ||
| let callCount = 0; | ||
| mockRandomSampler.random.mockImplementation(() => callCount++ % 2); | ||
|
|
||
| const sampler = new BatchConnectionSampler(connectionSampler, /* batchSize */ 2, /* maxPeers */ 2); | ||
| expect(sampler.requestsPerBucket).toBe(1); | ||
| expect(sampler.activePeerCount).toBe(2); | ||
|
|
||
| expect(sampler.getPeerForRequest(0)).toBe(peers[0]); | ||
| expect(sampler.getPeerForRequest(1)).toBe(peers[1]); | ||
| }); | ||
|
|
||
| it('handles edge cases, 0 peers, smaller batch than max peers', () => { | ||
| mockRandomSampler.random.mockImplementation(() => 0); | ||
| libp2p.getPeers.mockReturnValue([]); | ||
|
|
||
| const sampler = new BatchConnectionSampler(connectionSampler, /* batchSize */ 5, /* maxPeers */ 2); | ||
| expect(sampler.activePeerCount).toBe(0); | ||
| expect(sampler.getPeerForRequest(0)).toBeUndefined(); | ||
|
|
||
| let i = 0; | ||
| mockRandomSampler.random.mockImplementation(() => i++ % 3); | ||
|
|
||
| libp2p.getPeers.mockReturnValue(peers); | ||
| const samplerWithMorePeers = new BatchConnectionSampler(connectionSampler, /* batchSize */ 2, /* maxPeers */ 3); | ||
| expect(samplerWithMorePeers.requestsPerBucket).toBe(1); // floor(2/3) = 0 | ||
| // First two requests go to first two peers | ||
| expect(samplerWithMorePeers.getPeerForRequest(0)).toBe(peers[0]); | ||
| expect(samplerWithMorePeers.getPeerForRequest(1)).toBe(peers[1]); | ||
| }); | ||
| }); | ||
94 changes: 94 additions & 0 deletions
94
yarn-project/p2p/src/services/reqresp/connection-sampler/batch_connection_sampler.ts
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,94 @@ | ||
| import { createLogger } from '@aztec/foundation/log'; | ||
|
|
||
| import { type PeerId } from '@libp2p/interface'; | ||
|
|
||
| import { type ConnectionSampler } from './connection_sampler.js'; | ||
|
|
||
| /** | ||
| * Manages batches of peers for parallel request processing. | ||
| * Tracks active peers and provides deterministic peer assignment for requests. | ||
| * | ||
| * Example with 3 peers and 10 requests: | ||
| * | ||
| * Peers: [P1] [P2] [P3] | ||
| * ↓ ↓ ↓ ↓ ↓ ↓ ↓ ↓ ↓ ↓ | ||
| * Requests: 0,1,2,9 | 3,4,5 | 6,7,8 | ||
| * | ||
| * Each peer handles a bucket of consecutive requests. | ||
| * If a peer fails, it is replaced while maintaining the same bucket. | ||
| */ | ||
| export class BatchConnectionSampler { | ||
| private readonly logger = createLogger('p2p:reqresp:batch-connection-sampler'); | ||
| private readonly batch: PeerId[] = []; | ||
| private readonly requestsPerPeer: number; | ||
|
|
||
| constructor(private readonly connectionSampler: ConnectionSampler, batchSize: number, maxPeers: number) { | ||
| if (maxPeers <= 0) { | ||
| throw new Error('Max peers cannot be 0'); | ||
| } | ||
| if (batchSize <= 0) { | ||
| throw new Error('Batch size cannot be 0'); | ||
| } | ||
|
|
||
| // Calculate how many requests each peer should handle, cannot be 0 | ||
| this.requestsPerPeer = Math.max(1, Math.floor(batchSize / maxPeers)); | ||
|
|
||
| // Sample initial peers | ||
| this.batch = this.connectionSampler.samplePeersBatch(maxPeers); | ||
| } | ||
|
|
||
| /** | ||
| * Gets the peer responsible for handling a specific request index | ||
| * | ||
| * @param index - The request index | ||
| * @returns The peer assigned to handle this request | ||
| */ | ||
| getPeerForRequest(index: number): PeerId | undefined { | ||
| if (this.batch.length === 0) { | ||
| return undefined; | ||
| } | ||
|
|
||
| // Calculate which peer bucket this index belongs to | ||
| const peerIndex = Math.floor(index / this.requestsPerPeer) % this.batch.length; | ||
| return this.batch[peerIndex]; | ||
| } | ||
|
|
||
| /** | ||
| * Removes a peer and replaces it with a new one, maintaining the same position | ||
| * in the batch array to keep request distribution consistent | ||
| * | ||
| * @param peerId - The peer to remove and replace | ||
| */ | ||
| removePeerAndReplace(peerId: PeerId): void { | ||
| const index = this.batch.findIndex(p => p === peerId); | ||
| if (index === -1) { | ||
| return; | ||
| } | ||
|
|
||
| const excluding = new Map([[peerId, true]]); | ||
| const newPeer = this.connectionSampler.getPeer(excluding); | ||
|
|
||
| if (newPeer) { | ||
| this.batch[index] = newPeer; | ||
| this.logger.trace(`Replaced peer ${peerId} with ${newPeer}`, { peerId, newPeer }); | ||
| } else { | ||
| // If we couldn't get a replacement, remove the peer and compact the array | ||
| this.batch.splice(index, 1); | ||
| this.logger.trace(`Removed peer ${peerId}`, { peerId }); | ||
| } | ||
| } | ||
|
|
||
| /** | ||
| * Gets the number of active peers | ||
| */ | ||
| get activePeerCount(): number { | ||
| return this.batch.length; | ||
| } | ||
|
|
||
| /** | ||
| * Gets the number of requests each peer is assigned to handle | ||
| */ | ||
| get requestsPerBucket(): number { | ||
| return this.requestsPerPeer; | ||
| } | ||
| } |
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we test the case where no replacement is available?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
added!