Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions yarn-project/p2p/src/services/encoding.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -356,6 +356,33 @@ describe('SnappyTransform', () => {
'exceeds maximum allowed size of 200kb',
);
});

it('should use maxSizeKbOverride when provided, ignoring topic and default limits', () => {
const transform = new SnappyTransform();

// Data at 50kb should pass with 100kb override
const data = Buffer.alloc(50 * 1024, 'a');
const compressed = compressSync(data);
expect(() => transform.inboundTransformData(compressed, undefined, 100)).not.toThrow();

// Data at 150kb should fail with 100kb override
const dataLarge = Buffer.alloc(150 * 1024, 'a');
const compressedLarge = compressSync(dataLarge);
expect(() => transform.inboundTransformData(compressedLarge, undefined, 100)).toThrow(
'exceeds maximum allowed size of 100kb',
);
});

it('should prefer maxSizeKbOverride over topic-specific limit', () => {
const transform = new SnappyTransform();

// TX topic has a 512kb limit, but override is 10kb
const data = Buffer.alloc(50 * 1024, 'a'); // 50kb - within tx limit but over override
const compressed = compressSync(data);
expect(() => transform.inboundTransformData(compressed, TopicType.tx, 10)).toThrow(
'exceeds maximum allowed size of 10kb',
);
});
});

describe('exact boundary conditions', () => {
Expand Down
4 changes: 2 additions & 2 deletions yarn-project/p2p/src/services/encoding.ts
Original file line number Diff line number Diff line change
Expand Up @@ -78,11 +78,11 @@ export class SnappyTransform implements DataTransform {
return this.inboundTransformData(Buffer.from(data), topic);
}

public inboundTransformData(data: Buffer, topic?: TopicType): Buffer {
public inboundTransformData(data: Buffer, topic?: TopicType, maxSizeKbOverride?: number): Buffer {
if (data.length === 0) {
return data;
}
const maxSizeKb = this.maxSizesKb[topic!] ?? this.defaultMaxSizeKb;
const maxSizeKb = maxSizeKbOverride ?? this.maxSizesKb[topic!] ?? this.defaultMaxSizeKb;
const { decompressedSize } = readSnappyPreamble(data);
if (decompressedSize > maxSizeKb * 1024) {
this.logger.warn(`Decompressed size ${decompressedSize} exceeds maximum allowed size of ${maxSizeKb}kb`);
Expand Down
27 changes: 26 additions & 1 deletion yarn-project/p2p/src/services/reqresp/interface.ts
Original file line number Diff line number Diff line change
@@ -1,14 +1,20 @@
import { Fr } from '@aztec/foundation/curves/bn254';
import { L2Block } from '@aztec/stdlib/block';
import { MAX_L2_BLOCK_SIZE_KB } from '@aztec/stdlib/p2p';
import { TxArray, TxHashArray } from '@aztec/stdlib/tx';

import type { PeerId } from '@libp2p/interface';

import type { P2PReqRespConfig } from './config.js';
import type { ConnectionSampler } from './connection-sampler/connection_sampler.js';
import { AuthRequest, AuthResponse } from './protocols/auth.js';
import { BlockTxsRequest, BlockTxsResponse } from './protocols/block_txs/block_txs_reqresp.js';
import {
BlockTxsRequest,
BlockTxsResponse,
calculateBlockTxsResponseSize,
} from './protocols/block_txs/block_txs_reqresp.js';
import { StatusMessage } from './protocols/status.js';
import { calculateTxResponseSize } from './protocols/tx.js';
import type { ReqRespStatus } from './status.js';

/*
Expand Down Expand Up @@ -211,6 +217,25 @@ export const subProtocolMap = {
},
};

/**
* Type for a function that calculates the expected response size in KB for a given request.
*/
export type ExpectedResponseSizeCalculator = (requestBuffer: Buffer) => number;

/**
* Map of sub-protocols to their expected response size calculators.
* These are used to validate that responses don't exceed expected sizes based on request parameters.
*/
export const subProtocolSizeCalculators: Record<ReqRespSubProtocol, ExpectedResponseSizeCalculator> = {
[ReqRespSubProtocol.TX]: calculateTxResponseSize,
[ReqRespSubProtocol.BLOCK_TXS]: calculateBlockTxsResponseSize,
[ReqRespSubProtocol.BLOCK]: () => MAX_L2_BLOCK_SIZE_KB,
[ReqRespSubProtocol.STATUS]: () => 1,
[ReqRespSubProtocol.PING]: () => 1,
[ReqRespSubProtocol.AUTH]: () => 1,
[ReqRespSubProtocol.GOODBYE]: () => 1, // No response expected, but provide minimal limit
};

export interface ReqRespInterface {
start(
subProtocolHandlers: Partial<ReqRespSubProtocolHandlers>,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
import { BlockNumber } from '@aztec/foundation/branded-types';
import { Secp256k1Signer } from '@aztec/foundation/crypto/secp256k1-signer';
import { Fr } from '@aztec/foundation/curves/bn254';
import { BlockProposal } from '@aztec/stdlib/p2p';
import { BlockProposal, MAX_TX_SIZE_KB } from '@aztec/stdlib/p2p';
import { makeBlockHeader, makeBlockProposal, mockTx } from '@aztec/stdlib/testing';
import { TxArray, TxHash, TxHashArray } from '@aztec/stdlib/tx';

import { describe, expect, it } from '@jest/globals';

import { BitVector } from './bitvector.js';
import { BlockTxsRequest, BlockTxsResponse } from './block_txs_reqresp.js';
import { BlockTxsRequest, BlockTxsResponse, calculateBlockTxsResponseSize } from './block_txs_reqresp.js';

describe('BlockTxRequest', () => {
// eslint-disable-next-line require-await
Expand Down Expand Up @@ -176,3 +176,49 @@ describe('BlockTxResponse', () => {
expect(deserialized.txIndices.getTrueIndices()).toEqual([]);
});
});

describe('calculateBlockTxsResponseSize', () => {
it('should return correct size based on requested tx indices', () => {
const archiveRoot = Fr.random();
const txIndices = BitVector.init(16, [0, 5, 10, 15]); // 4 txs requested
const request = new BlockTxsRequest(archiveRoot, new TxHashArray(), txIndices);
const buffer = request.toBuffer();

expect(calculateBlockTxsResponseSize(buffer)).toBe(4 * MAX_TX_SIZE_KB + 1);
});

it('should return correct size for a single requested tx', () => {
const archiveRoot = Fr.random();
const txIndices = BitVector.init(8, [3]); // 1 tx requested
const request = new BlockTxsRequest(archiveRoot, new TxHashArray(), txIndices);
const buffer = request.toBuffer();

expect(calculateBlockTxsResponseSize(buffer)).toBe(MAX_TX_SIZE_KB + 1);
});

it('should return overhead-only for request with no indices set', () => {
const archiveRoot = Fr.random();
const txIndices = BitVector.init(8, []); // 0 txs requested
const request = new BlockTxsRequest(archiveRoot, new TxHashArray(), txIndices);
const buffer = request.toBuffer();

expect(calculateBlockTxsResponseSize(buffer)).toBe(1); // just overhead
});

it('should return correct size for request with all indices set', () => {
const count = 32;
const allIndices = Array.from({ length: count }, (_, i) => i);
const archiveRoot = Fr.random();
const txIndices = BitVector.init(count, allIndices);
const request = new BlockTxsRequest(archiveRoot, new TxHashArray(), txIndices);
const buffer = request.toBuffer();

expect(calculateBlockTxsResponseSize(buffer)).toBe(count * MAX_TX_SIZE_KB + 1);
});

it('should fall back to single tx size for garbage buffer', () => {
const garbage = Buffer.from('not a valid buffer');

expect(calculateBlockTxsResponseSize(garbage)).toBe(MAX_TX_SIZE_KB + 1);
});
});
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { Fr } from '@aztec/foundation/curves/bn254';
import { BufferReader, serializeToBuffer } from '@aztec/foundation/serialize';
import { MAX_TX_SIZE_KB } from '@aztec/stdlib/p2p';
import { TxArray, type TxHash, TxHashArray } from '@aztec/stdlib/tx';

import { BitVector } from './bitvector.js';
Expand Down Expand Up @@ -125,3 +126,19 @@ export class BlockTxsResponse {
return new BlockTxsResponse(Fr.ZERO, new TxArray(), BitVector.init(0, []));
}
}

/**
* Calculate the expected response size for a BLOCK_TXS request.
* @param requestBuffer - The serialized request buffer containing BlockTxsRequest
* @returns Expected response size in KB
*/
export function calculateBlockTxsResponseSize(requestBuffer: Buffer): number {
try {
const request = BlockTxsRequest.fromBuffer(requestBuffer);
const requestedTxCount = request.txIndices.getTrueIndices().length;
return requestedTxCount * MAX_TX_SIZE_KB + 1; // +1 KB overhead for serialization
} catch {
// If we can't parse the request, fall back to allowing a single transaction response
return MAX_TX_SIZE_KB + 1;
}
}
52 changes: 52 additions & 0 deletions yarn-project/p2p/src/services/reqresp/protocols/tx.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
import { MAX_TX_SIZE_KB } from '@aztec/stdlib/p2p';
import { TxHash, TxHashArray } from '@aztec/stdlib/tx';

import { describe, expect, it } from '@jest/globals';

import { calculateTxResponseSize } from './tx.js';

describe('calculateTxResponseSize', () => {
it('should return correct size for a single tx hash', () => {
const hashes = new TxHashArray(TxHash.random());
const buffer = hashes.toBuffer();

expect(calculateTxResponseSize(buffer)).toBe(MAX_TX_SIZE_KB + 1);
});

it('should return correct size for multiple tx hashes', () => {
const hashes = new TxHashArray(TxHash.random(), TxHash.random(), TxHash.random());
const buffer = hashes.toBuffer();

expect(calculateTxResponseSize(buffer)).toBe(3 * MAX_TX_SIZE_KB + 1);
});

it('should return correct size for 8 tx hashes (default batch size)', () => {
const hashes = new TxHashArray(...Array.from({ length: 8 }, () => TxHash.random()));
const buffer = hashes.toBuffer();

expect(calculateTxResponseSize(buffer)).toBe(8 * MAX_TX_SIZE_KB + 1);
});

it('should fall back to single tx size for a raw TxHash buffer (not TxHashArray)', () => {
// A raw TxHash (32 bytes) is not a valid TxHashArray serialization.
// TxHashArray.fromBuffer silently returns empty array on parse failure.
const rawHash = TxHash.random().toBuffer();

expect(calculateTxResponseSize(rawHash)).toBe(MAX_TX_SIZE_KB + 1);
});

it('should fall back to single tx size for garbage buffer', () => {
const garbage = Buffer.from('not a valid buffer');

expect(calculateTxResponseSize(garbage)).toBe(MAX_TX_SIZE_KB + 1);
});

it('should return at least single tx size for empty TxHashArray', () => {
const hashes = new TxHashArray();
const buffer = hashes.toBuffer();

// Empty TxHashArray serializes to a valid buffer with length prefix 0
// We expect at least 1 * MAX_TX_SIZE_KB + 1
expect(calculateTxResponseSize(buffer)).toBe(MAX_TX_SIZE_KB + 1);
});
});
22 changes: 22 additions & 0 deletions yarn-project/p2p/src/services/reqresp/protocols/tx.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { chunk } from '@aztec/foundation/collection';
import { MAX_TX_SIZE_KB } from '@aztec/stdlib/p2p';
import { TxArray, TxHash, TxHashArray } from '@aztec/stdlib/tx';

import type { PeerId } from '@libp2p/interface';
Expand Down Expand Up @@ -55,3 +56,24 @@ export function reqRespTxHandler(mempools: MemPools): ReqRespSubProtocolHandler
export function chunkTxHashesRequest(hashes: TxHash[], chunkSize = 1): Array<TxHashArray> {
return chunk(hashes, chunkSize).map(chunk => new TxHashArray(...chunk));
}

/**
* Calculate the expected response size for a TX request.
* @param requestBuffer - The serialized request buffer containing TxHashArray
* @returns Expected response size in KB
*/
export function calculateTxResponseSize(requestBuffer: Buffer): number {
try {
const txHashes = TxHashArray.fromBuffer(requestBuffer);
// TxHashArray.fromBuffer returns empty array on parse failure, so check for that
if (txHashes.length === 0 && requestBuffer.length > 0) {
// If we got an empty array but had a non-empty buffer, parsing likely failed
// Fall back to allowing a single transaction response
return MAX_TX_SIZE_KB + 1;
}
return Math.max(txHashes.length, 1) * MAX_TX_SIZE_KB + 1; // +1 KB overhead, at least 1 tx
} catch {
// If we can't parse the request, fall back to allowing a single transaction response
return MAX_TX_SIZE_KB + 1;
}
}
16 changes: 13 additions & 3 deletions yarn-project/p2p/src/services/reqresp/reqresp.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import {
type ReqRespSubProtocolValidators,
type SubProtocolMap,
responseFromBuffer,
subProtocolSizeCalculators,
} from './interface.js';
import { ReqRespMetrics } from './metrics.js';
import {
Expand Down Expand Up @@ -437,18 +438,24 @@ export class ReqResp implements ReqRespInterface {
try {
this.metrics.recordRequestSent(subProtocol);

// Calculate expected response size based on the request payload
const expectedSizeKb = subProtocolSizeCalculators[subProtocol](payload);

this.logger.trace(`Sending request to peer ${peerId.toString()} on sub protocol ${subProtocol}`);
stream = await this.connectionSampler.dialProtocol(peerId, subProtocol, dialTimeout);
this.logger.trace(
`Opened stream ${stream.id} for sending request to peer ${peerId.toString()} on sub protocol ${subProtocol}`,
);

const timeoutErr = new IndividualReqRespTimeoutError();
// Create a wrapper to pass the expected size to readMessage
const readMessageWithSizeLimit = (source: AsyncIterable<Uint8ArrayList>) =>
this.readMessage(source, expectedSizeKb);
const [_, resp] = await executeTimeout(
signal =>
Promise.all([
pipeline([payload], stream!.sink, { signal }),
pipeline(stream!.source, this.readMessage.bind(this), { signal }),
pipeline(stream!.source, readMessageWithSizeLimit, { signal }),
]),
this.individualRequestTimeoutMs,
() => timeoutErr,
Expand Down Expand Up @@ -510,8 +517,11 @@ export class ReqResp implements ReqRespInterface {
* The message is split into two components
* - The first chunk should contain a control byte, indicating the status of the response see `ReqRespStatus`
* - The second chunk should contain the response data
*
* @param source - The async iterable source of data chunks
* @param maxSizeKb - Optional maximum expected size in KB for the decompressed response
*/
private async readMessage(source: AsyncIterable<Uint8ArrayList>): Promise<ReqRespResponse> {
private async readMessage(source: AsyncIterable<Uint8ArrayList>, maxSizeKb?: number): Promise<ReqRespResponse> {
let status: ReqRespStatus | undefined;
const chunks: Uint8Array[] = [];

Expand All @@ -536,7 +546,7 @@ export class ReqResp implements ReqRespInterface {
}

const messageData = Buffer.concat(chunks);
const message: Buffer = this.snappyTransform.inboundTransformData(messageData);
const message: Buffer = this.snappyTransform.inboundTransformData(messageData, undefined, maxSizeKb);

return {
status: status ?? ReqRespStatus.UNKNOWN,
Expand Down
3 changes: 3 additions & 0 deletions yarn-project/stdlib/src/p2p/constants.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
export const MAX_TX_SIZE_KB: number = 512;

export const MAX_MESSAGE_SIZE_KB: number = 10 * 1024;

/** Maximum size for L2Block response (contains TxEffects, not full Txs with proofs) */
export const MAX_L2_BLOCK_SIZE_KB: number = 3 * 1024; // 3 MB
Loading