diff --git a/yarn-project/p2p/src/services/encoding.test.ts b/yarn-project/p2p/src/services/encoding.test.ts index 628eea021d43..2be4857d29ce 100644 --- a/yarn-project/p2p/src/services/encoding.test.ts +++ b/yarn-project/p2p/src/services/encoding.test.ts @@ -356,6 +356,33 @@ describe('SnappyTransform', () => { 'exceeds maximum allowed size of 200kb', ); }); + + it('should use maxSizeKbOverride when provided, ignoring topic and default limits', () => { + const transform = new SnappyTransform(); + + // Data at 50kb should pass with 100kb override + const data = Buffer.alloc(50 * 1024, 'a'); + const compressed = compressSync(data); + expect(() => transform.inboundTransformData(compressed, undefined, 100)).not.toThrow(); + + // Data at 150kb should fail with 100kb override + const dataLarge = Buffer.alloc(150 * 1024, 'a'); + const compressedLarge = compressSync(dataLarge); + expect(() => transform.inboundTransformData(compressedLarge, undefined, 100)).toThrow( + 'exceeds maximum allowed size of 100kb', + ); + }); + + it('should prefer maxSizeKbOverride over topic-specific limit', () => { + const transform = new SnappyTransform(); + + // TX topic has a 512kb limit, but override is 10kb + const data = Buffer.alloc(50 * 1024, 'a'); // 50kb - within tx limit but over override + const compressed = compressSync(data); + expect(() => transform.inboundTransformData(compressed, TopicType.tx, 10)).toThrow( + 'exceeds maximum allowed size of 10kb', + ); + }); }); describe('exact boundary conditions', () => { diff --git a/yarn-project/p2p/src/services/encoding.ts b/yarn-project/p2p/src/services/encoding.ts index e44998dd9197..c2a5e6dc5a87 100644 --- a/yarn-project/p2p/src/services/encoding.ts +++ b/yarn-project/p2p/src/services/encoding.ts @@ -78,11 +78,11 @@ export class SnappyTransform implements DataTransform { return this.inboundTransformData(Buffer.from(data), topic); } - public inboundTransformData(data: Buffer, topic?: TopicType): Buffer { + public inboundTransformData(data: Buffer, topic?: TopicType, maxSizeKbOverride?: number): Buffer { if (data.length === 0) { return data; } - const maxSizeKb = this.maxSizesKb[topic!] ?? this.defaultMaxSizeKb; + const maxSizeKb = maxSizeKbOverride ?? this.maxSizesKb[topic!] ?? this.defaultMaxSizeKb; const { decompressedSize } = readSnappyPreamble(data); if (decompressedSize > maxSizeKb * 1024) { this.logger.warn(`Decompressed size ${decompressedSize} exceeds maximum allowed size of ${maxSizeKb}kb`); diff --git a/yarn-project/p2p/src/services/reqresp/interface.ts b/yarn-project/p2p/src/services/reqresp/interface.ts index a423ffe3382d..e1eb6c6a6fd3 100644 --- a/yarn-project/p2p/src/services/reqresp/interface.ts +++ b/yarn-project/p2p/src/services/reqresp/interface.ts @@ -1,5 +1,6 @@ import { Fr } from '@aztec/foundation/curves/bn254'; import { L2Block } from '@aztec/stdlib/block'; +import { MAX_L2_BLOCK_SIZE_KB } from '@aztec/stdlib/p2p'; import { TxArray, TxHashArray } from '@aztec/stdlib/tx'; import type { PeerId } from '@libp2p/interface'; @@ -7,8 +8,13 @@ import type { PeerId } from '@libp2p/interface'; import type { P2PReqRespConfig } from './config.js'; import type { ConnectionSampler } from './connection-sampler/connection_sampler.js'; import { AuthRequest, AuthResponse } from './protocols/auth.js'; -import { BlockTxsRequest, BlockTxsResponse } from './protocols/block_txs/block_txs_reqresp.js'; +import { + BlockTxsRequest, + BlockTxsResponse, + calculateBlockTxsResponseSize, +} from './protocols/block_txs/block_txs_reqresp.js'; import { StatusMessage } from './protocols/status.js'; +import { calculateTxResponseSize } from './protocols/tx.js'; import type { ReqRespStatus } from './status.js'; /* @@ -211,6 +217,25 @@ export const subProtocolMap = { }, }; +/** + * Type for a function that calculates the expected response size in KB for a given request. + */ +export type ExpectedResponseSizeCalculator = (requestBuffer: Buffer) => number; + +/** + * Map of sub-protocols to their expected response size calculators. + * These are used to validate that responses don't exceed expected sizes based on request parameters. + */ +export const subProtocolSizeCalculators: Record = { + [ReqRespSubProtocol.TX]: calculateTxResponseSize, + [ReqRespSubProtocol.BLOCK_TXS]: calculateBlockTxsResponseSize, + [ReqRespSubProtocol.BLOCK]: () => MAX_L2_BLOCK_SIZE_KB, + [ReqRespSubProtocol.STATUS]: () => 1, + [ReqRespSubProtocol.PING]: () => 1, + [ReqRespSubProtocol.AUTH]: () => 1, + [ReqRespSubProtocol.GOODBYE]: () => 1, // No response expected, but provide minimal limit +}; + export interface ReqRespInterface { start( subProtocolHandlers: Partial, diff --git a/yarn-project/p2p/src/services/reqresp/protocols/block_txs/block_txs.test.ts b/yarn-project/p2p/src/services/reqresp/protocols/block_txs/block_txs.test.ts index e898b947b2c5..7bab3416ad9e 100644 --- a/yarn-project/p2p/src/services/reqresp/protocols/block_txs/block_txs.test.ts +++ b/yarn-project/p2p/src/services/reqresp/protocols/block_txs/block_txs.test.ts @@ -1,14 +1,14 @@ import { BlockNumber } from '@aztec/foundation/branded-types'; import { Secp256k1Signer } from '@aztec/foundation/crypto/secp256k1-signer'; import { Fr } from '@aztec/foundation/curves/bn254'; -import { BlockProposal } from '@aztec/stdlib/p2p'; +import { BlockProposal, MAX_TX_SIZE_KB } from '@aztec/stdlib/p2p'; import { makeBlockHeader, makeBlockProposal, mockTx } from '@aztec/stdlib/testing'; import { TxArray, TxHash, TxHashArray } from '@aztec/stdlib/tx'; import { describe, expect, it } from '@jest/globals'; import { BitVector } from './bitvector.js'; -import { BlockTxsRequest, BlockTxsResponse } from './block_txs_reqresp.js'; +import { BlockTxsRequest, BlockTxsResponse, calculateBlockTxsResponseSize } from './block_txs_reqresp.js'; describe('BlockTxRequest', () => { // eslint-disable-next-line require-await @@ -176,3 +176,49 @@ describe('BlockTxResponse', () => { expect(deserialized.txIndices.getTrueIndices()).toEqual([]); }); }); + +describe('calculateBlockTxsResponseSize', () => { + it('should return correct size based on requested tx indices', () => { + const archiveRoot = Fr.random(); + const txIndices = BitVector.init(16, [0, 5, 10, 15]); // 4 txs requested + const request = new BlockTxsRequest(archiveRoot, new TxHashArray(), txIndices); + const buffer = request.toBuffer(); + + expect(calculateBlockTxsResponseSize(buffer)).toBe(4 * MAX_TX_SIZE_KB + 1); + }); + + it('should return correct size for a single requested tx', () => { + const archiveRoot = Fr.random(); + const txIndices = BitVector.init(8, [3]); // 1 tx requested + const request = new BlockTxsRequest(archiveRoot, new TxHashArray(), txIndices); + const buffer = request.toBuffer(); + + expect(calculateBlockTxsResponseSize(buffer)).toBe(MAX_TX_SIZE_KB + 1); + }); + + it('should return overhead-only for request with no indices set', () => { + const archiveRoot = Fr.random(); + const txIndices = BitVector.init(8, []); // 0 txs requested + const request = new BlockTxsRequest(archiveRoot, new TxHashArray(), txIndices); + const buffer = request.toBuffer(); + + expect(calculateBlockTxsResponseSize(buffer)).toBe(1); // just overhead + }); + + it('should return correct size for request with all indices set', () => { + const count = 32; + const allIndices = Array.from({ length: count }, (_, i) => i); + const archiveRoot = Fr.random(); + const txIndices = BitVector.init(count, allIndices); + const request = new BlockTxsRequest(archiveRoot, new TxHashArray(), txIndices); + const buffer = request.toBuffer(); + + expect(calculateBlockTxsResponseSize(buffer)).toBe(count * MAX_TX_SIZE_KB + 1); + }); + + it('should fall back to single tx size for garbage buffer', () => { + const garbage = Buffer.from('not a valid buffer'); + + expect(calculateBlockTxsResponseSize(garbage)).toBe(MAX_TX_SIZE_KB + 1); + }); +}); diff --git a/yarn-project/p2p/src/services/reqresp/protocols/block_txs/block_txs_reqresp.ts b/yarn-project/p2p/src/services/reqresp/protocols/block_txs/block_txs_reqresp.ts index 3bf696eb3001..4df100c08f84 100644 --- a/yarn-project/p2p/src/services/reqresp/protocols/block_txs/block_txs_reqresp.ts +++ b/yarn-project/p2p/src/services/reqresp/protocols/block_txs/block_txs_reqresp.ts @@ -1,5 +1,6 @@ import { Fr } from '@aztec/foundation/curves/bn254'; import { BufferReader, serializeToBuffer } from '@aztec/foundation/serialize'; +import { MAX_TX_SIZE_KB } from '@aztec/stdlib/p2p'; import { TxArray, type TxHash, TxHashArray } from '@aztec/stdlib/tx'; import { BitVector } from './bitvector.js'; @@ -125,3 +126,19 @@ export class BlockTxsResponse { return new BlockTxsResponse(Fr.ZERO, new TxArray(), BitVector.init(0, [])); } } + +/** + * Calculate the expected response size for a BLOCK_TXS request. + * @param requestBuffer - The serialized request buffer containing BlockTxsRequest + * @returns Expected response size in KB + */ +export function calculateBlockTxsResponseSize(requestBuffer: Buffer): number { + try { + const request = BlockTxsRequest.fromBuffer(requestBuffer); + const requestedTxCount = request.txIndices.getTrueIndices().length; + return requestedTxCount * MAX_TX_SIZE_KB + 1; // +1 KB overhead for serialization + } catch { + // If we can't parse the request, fall back to allowing a single transaction response + return MAX_TX_SIZE_KB + 1; + } +} diff --git a/yarn-project/p2p/src/services/reqresp/protocols/tx.test.ts b/yarn-project/p2p/src/services/reqresp/protocols/tx.test.ts new file mode 100644 index 000000000000..b7103bf2f395 --- /dev/null +++ b/yarn-project/p2p/src/services/reqresp/protocols/tx.test.ts @@ -0,0 +1,52 @@ +import { MAX_TX_SIZE_KB } from '@aztec/stdlib/p2p'; +import { TxHash, TxHashArray } from '@aztec/stdlib/tx'; + +import { describe, expect, it } from '@jest/globals'; + +import { calculateTxResponseSize } from './tx.js'; + +describe('calculateTxResponseSize', () => { + it('should return correct size for a single tx hash', () => { + const hashes = new TxHashArray(TxHash.random()); + const buffer = hashes.toBuffer(); + + expect(calculateTxResponseSize(buffer)).toBe(MAX_TX_SIZE_KB + 1); + }); + + it('should return correct size for multiple tx hashes', () => { + const hashes = new TxHashArray(TxHash.random(), TxHash.random(), TxHash.random()); + const buffer = hashes.toBuffer(); + + expect(calculateTxResponseSize(buffer)).toBe(3 * MAX_TX_SIZE_KB + 1); + }); + + it('should return correct size for 8 tx hashes (default batch size)', () => { + const hashes = new TxHashArray(...Array.from({ length: 8 }, () => TxHash.random())); + const buffer = hashes.toBuffer(); + + expect(calculateTxResponseSize(buffer)).toBe(8 * MAX_TX_SIZE_KB + 1); + }); + + it('should fall back to single tx size for a raw TxHash buffer (not TxHashArray)', () => { + // A raw TxHash (32 bytes) is not a valid TxHashArray serialization. + // TxHashArray.fromBuffer silently returns empty array on parse failure. + const rawHash = TxHash.random().toBuffer(); + + expect(calculateTxResponseSize(rawHash)).toBe(MAX_TX_SIZE_KB + 1); + }); + + it('should fall back to single tx size for garbage buffer', () => { + const garbage = Buffer.from('not a valid buffer'); + + expect(calculateTxResponseSize(garbage)).toBe(MAX_TX_SIZE_KB + 1); + }); + + it('should return at least single tx size for empty TxHashArray', () => { + const hashes = new TxHashArray(); + const buffer = hashes.toBuffer(); + + // Empty TxHashArray serializes to a valid buffer with length prefix 0 + // We expect at least 1 * MAX_TX_SIZE_KB + 1 + expect(calculateTxResponseSize(buffer)).toBe(MAX_TX_SIZE_KB + 1); + }); +}); diff --git a/yarn-project/p2p/src/services/reqresp/protocols/tx.ts b/yarn-project/p2p/src/services/reqresp/protocols/tx.ts index ab64b2b13979..f121cbf9d3f2 100644 --- a/yarn-project/p2p/src/services/reqresp/protocols/tx.ts +++ b/yarn-project/p2p/src/services/reqresp/protocols/tx.ts @@ -1,4 +1,5 @@ import { chunk } from '@aztec/foundation/collection'; +import { MAX_TX_SIZE_KB } from '@aztec/stdlib/p2p'; import { TxArray, TxHash, TxHashArray } from '@aztec/stdlib/tx'; import type { PeerId } from '@libp2p/interface'; @@ -55,3 +56,24 @@ export function reqRespTxHandler(mempools: MemPools): ReqRespSubProtocolHandler export function chunkTxHashesRequest(hashes: TxHash[], chunkSize = 1): Array { return chunk(hashes, chunkSize).map(chunk => new TxHashArray(...chunk)); } + +/** + * Calculate the expected response size for a TX request. + * @param requestBuffer - The serialized request buffer containing TxHashArray + * @returns Expected response size in KB + */ +export function calculateTxResponseSize(requestBuffer: Buffer): number { + try { + const txHashes = TxHashArray.fromBuffer(requestBuffer); + // TxHashArray.fromBuffer returns empty array on parse failure, so check for that + if (txHashes.length === 0 && requestBuffer.length > 0) { + // If we got an empty array but had a non-empty buffer, parsing likely failed + // Fall back to allowing a single transaction response + return MAX_TX_SIZE_KB + 1; + } + return Math.max(txHashes.length, 1) * MAX_TX_SIZE_KB + 1; // +1 KB overhead, at least 1 tx + } catch { + // If we can't parse the request, fall back to allowing a single transaction response + return MAX_TX_SIZE_KB + 1; + } +} diff --git a/yarn-project/p2p/src/services/reqresp/reqresp.ts b/yarn-project/p2p/src/services/reqresp/reqresp.ts index 36a9ced80e65..7d544395230d 100644 --- a/yarn-project/p2p/src/services/reqresp/reqresp.ts +++ b/yarn-project/p2p/src/services/reqresp/reqresp.ts @@ -36,6 +36,7 @@ import { type ReqRespSubProtocolValidators, type SubProtocolMap, responseFromBuffer, + subProtocolSizeCalculators, } from './interface.js'; import { ReqRespMetrics } from './metrics.js'; import { @@ -437,6 +438,9 @@ export class ReqResp implements ReqRespInterface { try { this.metrics.recordRequestSent(subProtocol); + // Calculate expected response size based on the request payload + const expectedSizeKb = subProtocolSizeCalculators[subProtocol](payload); + this.logger.trace(`Sending request to peer ${peerId.toString()} on sub protocol ${subProtocol}`); stream = await this.connectionSampler.dialProtocol(peerId, subProtocol, dialTimeout); this.logger.trace( @@ -444,11 +448,14 @@ export class ReqResp implements ReqRespInterface { ); const timeoutErr = new IndividualReqRespTimeoutError(); + // Create a wrapper to pass the expected size to readMessage + const readMessageWithSizeLimit = (source: AsyncIterable) => + this.readMessage(source, expectedSizeKb); const [_, resp] = await executeTimeout( signal => Promise.all([ pipeline([payload], stream!.sink, { signal }), - pipeline(stream!.source, this.readMessage.bind(this), { signal }), + pipeline(stream!.source, readMessageWithSizeLimit, { signal }), ]), this.individualRequestTimeoutMs, () => timeoutErr, @@ -510,8 +517,11 @@ export class ReqResp implements ReqRespInterface { * The message is split into two components * - The first chunk should contain a control byte, indicating the status of the response see `ReqRespStatus` * - The second chunk should contain the response data + * + * @param source - The async iterable source of data chunks + * @param maxSizeKb - Optional maximum expected size in KB for the decompressed response */ - private async readMessage(source: AsyncIterable): Promise { + private async readMessage(source: AsyncIterable, maxSizeKb?: number): Promise { let status: ReqRespStatus | undefined; const chunks: Uint8Array[] = []; @@ -536,7 +546,7 @@ export class ReqResp implements ReqRespInterface { } const messageData = Buffer.concat(chunks); - const message: Buffer = this.snappyTransform.inboundTransformData(messageData); + const message: Buffer = this.snappyTransform.inboundTransformData(messageData, undefined, maxSizeKb); return { status: status ?? ReqRespStatus.UNKNOWN, diff --git a/yarn-project/stdlib/src/p2p/constants.ts b/yarn-project/stdlib/src/p2p/constants.ts index 82b797a8a87f..f860629a21c9 100644 --- a/yarn-project/stdlib/src/p2p/constants.ts +++ b/yarn-project/stdlib/src/p2p/constants.ts @@ -1,3 +1,6 @@ export const MAX_TX_SIZE_KB: number = 512; export const MAX_MESSAGE_SIZE_KB: number = 10 * 1024; + +/** Maximum size for L2Block response (contains TxEffects, not full Txs with proofs) */ +export const MAX_L2_BLOCK_SIZE_KB: number = 3 * 1024; // 3 MB