Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 8 additions & 4 deletions yarn-project/p2p/src/client/interface.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
import type { L2BlockId } from '@aztec/stdlib/block';
import type { P2PApi } from '@aztec/stdlib/interfaces/server';
import { BlockAttestation, type BlockProposal, type P2PClientType } from '@aztec/stdlib/p2p';
import type { BlockProposal, P2PClientType } from '@aztec/stdlib/p2p';
import type { Tx, TxHash } from '@aztec/stdlib/tx';

import type { ENR } from '@chainsafe/enr';
import type { PeerId } from '@libp2p/interface';

import type { P2PConfig } from '../config.js';
import type { P2PBlockReceivedCallback } from '../services/service.js';

/**
* Enum defining the possible states of the p2p client.
Expand Down Expand Up @@ -50,14 +52,15 @@ export type P2P<T extends P2PClientType = P2PClientType.Full> = P2PApi<T> & {
*/
// REVIEW: https://github.com/AztecProtocol/aztec-packages/issues/7963
// ^ This pattern is not my favorite (md)
registerBlockProposalHandler(handler: (block: BlockProposal) => Promise<BlockAttestation | undefined>): void;
registerBlockProposalHandler(callback: P2PBlockReceivedCallback): void;

/**
* Request a list of transactions from another peer by their tx hashes.
* @param txHashes - Hashes of the txs to query.
* @param pinnedPeerId - An optional peer id that will be used to request the tx from (in addition to other random peers).
* @returns A list of transactions or undefined if the transactions are not found.
*/
requestTxsByHash(txHashes: TxHash[]): Promise<(Tx | undefined)[]>;
requestTxsByHash(txHashes: TxHash[], pinnedPeerId: PeerId): Promise<(Tx | undefined)[]>;

/**
* Request a transaction from another peer by its tx hash.
Expand Down Expand Up @@ -115,9 +118,10 @@ export type P2P<T extends P2PClientType = P2PClientType.Full> = P2PApi<T> & {
/**
* Returns transactions in the transaction pool by hash, requesting from the network if not found.
* @param txHashes - Hashes of tx to return.
* @param pinnedPeerId - An optional peer id that will be used to request the tx from (in addition to other random peers).
* @returns An array of tx or undefined.
*/
getTxsByHash(txHashes: TxHash[]): Promise<(Tx | undefined)[]>;
getTxsByHash(txHashes: TxHash[], pinnedPeerId: PeerId | undefined): Promise<(Tx | undefined)[]>;

/**
* Returns an archived transaction from the transaction pool by its hash.
Expand Down
40 changes: 27 additions & 13 deletions yarn-project/p2p/src/client/p2p_client.test.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import { MockL2BlockSource } from '@aztec/archiver/test';
import { times } from '@aztec/foundation/collection';
import { Fr } from '@aztec/foundation/fields';
import { retryUntil } from '@aztec/foundation/retry';
import type { AztecAsyncKVStore } from '@aztec/kv-store';
Expand All @@ -15,9 +14,10 @@ import { InMemoryAttestationPool, type P2PService } from '../index.js';
import type { AttestationPool } from '../mem_pools/attestation_pool/attestation_pool.js';
import type { MemPools } from '../mem_pools/interface.js';
import type { TxPool } from '../mem_pools/tx_pool/index.js';
import { ReqRespSubProtocol } from '../services/reqresp/interface.js';
import { P2PClient } from './p2p_client.js';

describe('In-Memory P2P Client', () => {
describe('P2P Client', () => {
let txPool: MockProxy<TxPool>;
let attestationPool: AttestationPool;
let mempools: MemPools;
Expand Down Expand Up @@ -129,25 +129,36 @@ describe('In-Memory P2P Client', () => {
});

it('request transactions handles missing items', async () => {
// Mock a batch response that returns undefined items
const mockTx1 = await mockTx();
const mockTx2 = await mockTx();
txPool.hasTxs.mockImplementation(txHashes => Promise.resolve(times(txHashes.length, () => true)));
p2pService.sendBatchRequest.mockResolvedValue([mockTx1, undefined, mockTx2]);
const mockTx3 = await mockTx();

// P2P service will not return tx2
p2pService.sendBatchRequest.mockResolvedValue([mockTx1, undefined, mockTx3]);

// Spy on the tx pool addTxs method, it should not be called for the missing tx
const addTxsSpy = jest.spyOn(txPool, 'addTxs');

await client.start();
// We query for all 3 txs
const query = await Promise.all([mockTx1.getTxHash(), mockTx2.getTxHash(), mockTx3.getTxHash()]);
const results = await client.requestTxsByHash(query, undefined);

const missingTxHash = (await mockTx()).getTxHash();
const query = await Promise.all([mockTx1.getTxHash(), missingTxHash, mockTx2.getTxHash()]);
const results = await client.requestTxsByHash(query);
// We should receive the found transactions
expect(results).toEqual([mockTx1, undefined, mockTx3]);

expect(results).toEqual([mockTx1, undefined, mockTx2]);
// P2P should have been called with the 3 tx hashes
expect(p2pService.sendBatchRequest).toHaveBeenCalledWith(
ReqRespSubProtocol.TX,
query,
undefined,
expect.anything(),
expect.anything(),
expect.anything(),
);

// Retrieved txs should have been added to the pool
expect(addTxsSpy).toHaveBeenCalledTimes(1);
expect(addTxsSpy).toHaveBeenCalledWith([mockTx1, mockTx2]);
expect(addTxsSpy).toHaveBeenCalledWith([mockTx1, mockTx3]);
});

it('getTxsByHash handles missing items', async () => {
Expand All @@ -173,14 +184,17 @@ describe('In-Memory P2P Client', () => {
await client.start();

const query = await Promise.all([txInMempool.getTxHash(), txToBeRequested.getTxHash(), txToNotBeFound.getTxHash()]);
const results = await client.getTxsByHash(query);
const results = await client.getTxsByHash(query, undefined);

// We should return the resolved transactions
expect(results).toEqual([txInMempool, txToBeRequested]);
// We should add the found requested transactions to the pool
expect(addTxsSpy).toHaveBeenCalledWith([txToBeRequested]);
// We should request the missing transactions from the network, but only find one of them
expect(requestTxsSpy).toHaveBeenCalledWith([await txToBeRequested.getTxHash(), await txToNotBeFound.getTxHash()]);
expect(requestTxsSpy).toHaveBeenCalledWith(
[await txToBeRequested.getTxHash(), await txToNotBeFound.getTxHash()],
undefined,
);
});

describe('Chain prunes', () => {
Expand Down
24 changes: 19 additions & 5 deletions yarn-project/p2p/src/client/p2p_client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,15 @@ import {
} from '@aztec/telemetry-client';

import type { ENR } from '@chainsafe/enr';
import type { PeerId } from '@libp2p/interface';

import { type P2PConfig, getP2PDefaultConfig } from '../config.js';
import type { AttestationPool } from '../mem_pools/attestation_pool/attestation_pool.js';
import type { MemPools } from '../mem_pools/interface.js';
import type { TxPool } from '../mem_pools/tx_pool/index.js';
import { ReqRespSubProtocol } from '../services/reqresp/interface.js';
import type { P2PService } from '../services/service.js';
import { TxCollector } from '../services/tx_collector.js';
import { type P2P, P2PClientState, type P2PSyncState } from './interface.js';

/**
Expand Down Expand Up @@ -87,6 +89,15 @@ export class P2PClient<T extends P2PClientType = P2PClientType.Full>
this.txPool = mempools.txPool;
this.attestationPool = mempools.attestationPool!;

// Default to collecting all txs when we see a valid proposal
// This can be overridden by the validator client to attest, and it will call collectForBlockProposal on its own
const txCollector = new TxCollector(this, this.log);
this.registerBlockProposalHandler(async (block, sender) => {
this.log.debug(`Received block proposal from ${sender.toString()}`);
await txCollector.collectForBlockProposal(block, sender);
return undefined;
});

// REFACTOR: Try replacing these with an L2TipsStore
this.synchedBlockHashes = store.openMap('p2p_pool_block_hashes');
this.synchedLatestBlockNumber = store.openSingleton('p2p_pool_last_l2_block');
Expand Down Expand Up @@ -328,7 +339,9 @@ export class P2PClient<T extends P2PClientType = P2PClientType.Full>

// REVIEW: https://github.com/AztecProtocol/aztec-packages/issues/7963
// ^ This pattern is not my favorite (md)
public registerBlockProposalHandler(handler: (block: BlockProposal) => Promise<BlockAttestation | undefined>): void {
public registerBlockProposalHandler(
handler: (block: BlockProposal, sender: any) => Promise<BlockAttestation | undefined>,
): void {
this.p2pService.registerBlockReceivedCallback(handler);
}

Expand Down Expand Up @@ -357,14 +370,15 @@ export class P2PClient<T extends P2PClientType = P2PClientType.Full>
/**
* Uses the batched Request Response protocol to request a set of transactions from the network.
*/
public async requestTxsByHash(txHashes: TxHash[]): Promise<(Tx | undefined)[]> {
public async requestTxsByHash(txHashes: TxHash[], pinnedPeerId: PeerId | undefined): Promise<(Tx | undefined)[]> {
const timeoutMs = 8000; // Longer timeout for now
const maxPeers = Math.min(Math.ceil(txHashes.length / 3), 10);
const maxRetryAttempts = 10; // Keep retrying within the timeout

const txs = await this.p2pService.sendBatchRequest(
ReqRespSubProtocol.TX,
txHashes,
pinnedPeerId,
timeoutMs,
maxPeers,
maxRetryAttempts,
Expand Down Expand Up @@ -462,7 +476,7 @@ export class P2PClient<T extends P2PClientType = P2PClientType.Full>
* @param txHashes - Hashes of the transactions to look for.
* @returns The txs found, or undefined if not found in the order requested.
*/
async getTxsByHash(txHashes: TxHash[]): Promise<(Tx | undefined)[]> {
async getTxsByHash(txHashes: TxHash[], pinnedPeerId: PeerId | undefined): Promise<(Tx | undefined)[]> {
const txs = await Promise.all(txHashes.map(txHash => this.txPool.getTxByHash(txHash)));
const missingTxHashes = txs
.map((tx, index) => [tx, index] as const)
Expand All @@ -473,7 +487,7 @@ export class P2PClient<T extends P2PClientType = P2PClientType.Full>
return txs as Tx[];
}

const missingTxs = await this.requestTxsByHash(missingTxHashes);
const missingTxs = await this.requestTxsByHash(missingTxHashes, pinnedPeerId);
const fetchedMissingTxs = missingTxs.filter((tx): tx is Tx => !!tx);

// TODO: optimize
Expand Down Expand Up @@ -678,7 +692,7 @@ export class P2PClient<T extends P2PClientType = P2PClientType.Full>
`Requesting ${missingTxHashes.length} missing txs from peers for ${unprovenBlocks.length} unproven mined blocks`,
{ missingTxHashes, unprovenBlockNumbers: unprovenBlocks.map(block => block.number) },
);
await this.requestTxsByHash(missingTxHashes);
await this.requestTxsByHash(missingTxHashes, undefined);
}
} catch (err) {
this.log.error(`Error requesting missing txs from unproven blocks`, err, {
Expand Down
2 changes: 2 additions & 0 deletions yarn-project/p2p/src/index.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
export type { PeerId } from '@libp2p/interface';

export * from './bootstrap/bootstrap.js';
export * from './client/index.js';
export * from './enr/index.js';
Expand Down
4 changes: 3 additions & 1 deletion yarn-project/p2p/src/services/dummy_service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,9 @@ export class DummyP2PService implements P2PService {
/**
* Register a callback into the validator client for when a block proposal is received
*/
public registerBlockReceivedCallback(_: (block: BlockProposal) => Promise<BlockAttestation>) {}
public registerBlockReceivedCallback(
_callback: (block: BlockProposal, sender: PeerId) => Promise<BlockAttestation>,
) {}

/**
* Sends a request to a peer.
Expand Down
1 change: 1 addition & 0 deletions yarn-project/p2p/src/services/index.ts
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
export * from './service.js';
export * from './libp2p/libp2p_service.js';
export * from './tx_collector.js';
16 changes: 8 additions & 8 deletions yarn-project/p2p/src/services/libp2p/libp2p_service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ import { DEFAULT_SUB_PROTOCOL_VALIDATORS, ReqRespSubProtocol, type SubProtocolMa
import { reqGoodbyeHandler } from '../reqresp/protocols/goodbye.js';
import { pingHandler, reqRespBlockHandler, reqRespTxHandler, statusHandler } from '../reqresp/protocols/index.js';
import { ReqResp } from '../reqresp/reqresp.js';
import type { P2PService, PeerDiscoveryService } from '../service.js';
import type { P2PBlockReceivedCallback, P2PService, PeerDiscoveryService } from '../service.js';

interface ValidationResult {
name: string;
Expand Down Expand Up @@ -101,7 +101,7 @@ export class LibP2PService<T extends P2PClientType = P2PClientType.Full> extends
* @param block - The block received from the peer.
* @returns The attestation for the block, if any.
*/
private blockReceivedCallback: (block: BlockProposal) => Promise<BlockAttestation | undefined>;
private blockReceivedCallback: P2PBlockReceivedCallback;

private gossipSubEventHandler: (e: CustomEvent<GossipsubMessage>) => void;

Expand Down Expand Up @@ -446,8 +446,9 @@ export class LibP2PService<T extends P2PClientType = P2PClientType.Full> extends
sendBatchRequest<SubProtocol extends ReqRespSubProtocol>(
protocol: SubProtocol,
requests: InstanceType<SubProtocolMap[SubProtocol]['request']>[],
pinnedPeerId: PeerId | undefined,
): Promise<(InstanceType<SubProtocolMap[SubProtocol]['response']> | undefined)[]> {
return this.reqresp.sendBatchRequest(protocol, requests);
return this.reqresp.sendBatchRequest(protocol, requests, pinnedPeerId);
}

/**
Expand All @@ -458,9 +459,8 @@ export class LibP2PService<T extends P2PClientType = P2PClientType.Full> extends
return this.peerDiscoveryService.getEnr();
}

public registerBlockReceivedCallback(callback: (block: BlockProposal) => Promise<BlockAttestation | undefined>) {
public registerBlockReceivedCallback(callback: P2PBlockReceivedCallback) {
this.blockReceivedCallback = callback;
this.logger.verbose('Block received callback registered');
}

/**
Expand Down Expand Up @@ -610,7 +610,7 @@ export class LibP2PService<T extends P2PClientType = P2PClientType.Full> extends
if (!result || !block) {
return;
}
await this.processValidBlockProposal(block);
await this.processValidBlockProposal(block, source);
}

// REVIEW: callback pattern https://github.com/AztecProtocol/aztec-packages/issues/7963
Expand All @@ -620,7 +620,7 @@ export class LibP2PService<T extends P2PClientType = P2PClientType.Full> extends
[Attributes.BLOCK_ARCHIVE]: block.archive.toString(),
[Attributes.P2P_ID]: await block.p2pMessageIdentifier().then(i => i.toString()),
}))
private async processValidBlockProposal(block: BlockProposal) {
private async processValidBlockProposal(block: BlockProposal, sender: PeerId) {
this.logger.verbose(
`Received block ${block.blockNumber.toNumber()} for slot ${block.slotNumber.toNumber()} from external peer.`,
{
Expand All @@ -632,7 +632,7 @@ export class LibP2PService<T extends P2PClientType = P2PClientType.Full> extends
);
// Mark the txs in this proposal as non-evictable
await this.mempools.txPool.markTxsAsNonEvictable(block.payload.txHashes);
const attestation = await this.blockReceivedCallback(block);
const attestation = await this.blockReceivedCallback(block, sender);

// TODO: fix up this pattern - the abstraction is not nice
// The attestation can be undefined if no handler is registered / the validator deems the block invalid
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ export class BatchConnectionSampler {
private readonly connectionSampler: ConnectionSampler,
batchSize: number,
maxPeers: number,
exclude?: PeerId[],
) {
if (maxPeers <= 0) {
throw new Error('Max peers cannot be 0');
Expand All @@ -38,7 +39,8 @@ export class BatchConnectionSampler {
this.requestsPerPeer = Math.max(1, Math.floor(batchSize / maxPeers));

// Sample initial peers
this.batch = this.connectionSampler.samplePeersBatch(maxPeers);
const excluding = exclude && new Map(exclude.map(peerId => [peerId.toString(), true] as const));
this.batch = this.connectionSampler.samplePeersBatch(maxPeers, excluding);
}

/**
Expand Down Expand Up @@ -70,7 +72,7 @@ export class BatchConnectionSampler {
}

const excluding = new Map([[peerId.toString(), true]]);
const newPeer = this.connectionSampler.getPeer(excluding);
const newPeer = this.connectionSampler.getPeer(excluding); // Q: Shouldn't we accumulate all excluded peers? Otherwise the sampler could return us a previously excluded peer?

if (newPeer) {
this.batch[index] = newPeer;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,9 +137,10 @@ export class ConnectionSampler {
* Samples a batch of unique peers from the libp2p node, prioritizing peers without active connections
*
* @param numberToSample - The number of peers to sample
* @param excluding - The peers to exclude from the sampling
* @returns Array of unique sampled peers, prioritizing those without active connections
*/
samplePeersBatch(numberToSample: number): PeerId[] {
samplePeersBatch(numberToSample: number, excluding?: Map<string, boolean>): PeerId[] {
const peers = this.libp2p.getPeers();
this.logger.debug('Sampling peers batch', { numberToSample, peers });

Expand All @@ -149,7 +150,7 @@ export class ConnectionSampler {
const batch: PeerId[] = [];
const withActiveConnections: Set<PeerId> = new Set();
for (let i = 0; i < numberToSample; i++) {
const { peer, sampledPeers } = this.getPeerFromList(peers, undefined);
const { peer, sampledPeers } = this.getPeerFromList(peers, excluding);
if (peer) {
batch.push(peer);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import { PeerErrorSeverity } from '@aztec/stdlib/p2p';
import type { PeerId } from '@libp2p/interface';

import type { PeerScoring } from '../../peer-manager/peer_scoring.js';
import type { ReqRespSubProtocol, ReqRespSubProtocolRateLimits } from '../interface.js';
import type { ProtocolRateLimitQuota, ReqRespSubProtocol, ReqRespSubProtocolRateLimits } from '../interface.js';
import { DEFAULT_RATE_LIMITS } from './rate_limits.js';

// Check for disconnected peers every 10 minutes
Expand Down Expand Up @@ -177,16 +177,18 @@ export class SubProtocolRateLimiter {
*/
export class RequestResponseRateLimiter {
private subProtocolRateLimiters: Map<ReqRespSubProtocol, SubProtocolRateLimiter>;
private rateLimits: ReqRespSubProtocolRateLimits;

private cleanupInterval: NodeJS.Timeout | undefined = undefined;

constructor(
private peerScoring: PeerScoring,
rateLimits: ReqRespSubProtocolRateLimits = DEFAULT_RATE_LIMITS,
rateLimits: Partial<ReqRespSubProtocolRateLimits> = {},
) {
this.subProtocolRateLimiters = new Map();

for (const [subProtocol, protocolLimits] of Object.entries(rateLimits)) {
this.rateLimits = { ...DEFAULT_RATE_LIMITS, ...rateLimits };
for (const [subProtocol, protocolLimits] of Object.entries(this.rateLimits)) {
this.subProtocolRateLimiters.set(
subProtocol as ReqRespSubProtocol,
new SubProtocolRateLimiter(
Expand Down Expand Up @@ -228,4 +230,8 @@ export class RequestResponseRateLimiter {
stop() {
clearInterval(this.cleanupInterval);
}

getRateLimits(protocol: ReqRespSubProtocol): ProtocolRateLimitQuota {
return this.rateLimits[protocol];
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ export const DEFAULT_RATE_LIMITS: ReqRespSubProtocolRateLimits = {
},
globalLimit: {
quotaTimeMs: 1000,
quotaCount: 20,
quotaCount: 200,
},
},
[ReqRespSubProtocol.BLOCK]: {
Expand Down
Loading