From dcde849736b06fbf11aadc07ccbf48565591582b Mon Sep 17 00:00:00 2001 From: Michal Rzeszutko Date: Tue, 10 Mar 2026 15:24:28 +0000 Subject: [PATCH 1/3] fix: marking peer as dumb on failed responses --- .../batch_tx_requester.test.ts | 192 ++++++++++++++++++ .../batch-tx-requester/batch_tx_requester.ts | 18 +- .../reqresp/batch-tx-requester/interface.ts | 2 + .../reqresp/batch-tx-requester/missing_txs.ts | 7 + .../batch-tx-requester/peer_collection.ts | 5 + 5 files changed, 220 insertions(+), 4 deletions(-) diff --git a/yarn-project/p2p/src/services/reqresp/batch-tx-requester/batch_tx_requester.test.ts b/yarn-project/p2p/src/services/reqresp/batch-tx-requester/batch_tx_requester.test.ts index bc7234438f72..047c211f55ce 100644 --- a/yarn-project/p2p/src/services/reqresp/batch-tx-requester/batch_tx_requester.test.ts +++ b/yarn-project/p2p/src/services/reqresp/batch-tx-requester/batch_tx_requester.test.ts @@ -1790,6 +1790,192 @@ describe('BatchTxRequester', () => { expect(resultTxHashes.has(missing[6].toString())).toBe(false); // Invalid from regular }); }); + + describe('Smart peer demotion', () => { + it('should demote a smart peer back to dumb on NOT_FOUND without penalizing', async () => { + // peer0 claims to have ALL txs but we only request a batch at a time, so after the first + // dumb response there are still missing txs → peer0 gets promoted to smart. + // On the next (smart) request peer0 returns NOT_FOUND (pruned proposal) → demoted without penalty. + const txCount = 2 * TX_BATCH_SIZE; + const deadline = 5_000; + const missing = Array.from({ length: txCount }, () => TxHash.random()); + + blockProposal = await makeBlockProposal({ + signer: Secp256k1Signer.random(), + blockHeader: makeBlockHeader(1, { blockNumber: BlockNumber(1) }), + archiveRoot: Fr.random(), + txHashes: missing, + }); + + const peers = await Promise.all([createSecp256k1PeerId(), createSecp256k1PeerId()]); + connectionSampler.getPeerListSortedByConnectionCountAsc.mockReturnValue(peers); + + const peerCollection = new TestPeerCollection( + new PeerCollection(connectionSampler, undefined, new DateProvider()), + ); + + const allIndices = Array.from({ length: txCount }, (_, i) => i); + + let peer0RequestCount = 0; + reqResp.sendRequestToPeer.mockImplementation(async (peerId: any, _sub: any, data: any) => { + const peerStr = peerId.toString(); + + if (peerStr === peers[0].toString()) { + peer0RequestCount++; + if (peer0RequestCount === 1) { + // First dumb request succeeds: return requested txs, claim to have ALL txs → promoted + const request = BlockTxsRequest.fromBuffer(data); + const requestedIndices = request.txIndices.getTrueIndices(); + const availableTxs = requestedIndices.map(idx => makeTx(blockProposal.txHashes[idx])); + + return { + status: ReqRespStatus.SUCCESS, + data: new BlockTxsResponse( + blockProposal.archive, + new TxArray(...availableTxs), + BitVector.init(txCount, allIndices), + ).toBuffer(), + }; + } + // Subsequent smart requests return NOT_FOUND (pruned proposal, no full hashes in request) + return { status: ReqRespStatus.NOT_FOUND, data: Buffer.alloc(0) }; + } + + // peer1 always succeeds with a delay so peer0 is queried first + await sleep(50); + const request = BlockTxsRequest.fromBuffer(data); + const requestedIndices = request.txIndices.getTrueIndices(); + const availableTxs = requestedIndices.map(idx => makeTx(blockProposal.txHashes[idx])); + + return { + status: ReqRespStatus.SUCCESS, + data: new BlockTxsResponse( + blockProposal.archive, + new TxArray(...availableTxs), + BitVector.init(txCount, allIndices), + ).toBuffer(), + }; + }); + + const requester = new BatchTxRequester( + MissingTxsTracker.fromArray(missing), + blockProposal, + undefined, + deadline, + mockP2PService, + logger, + new DateProvider(), + { + smartParallelWorkerCount: 1, + dumbParallelWorkerCount: 1, + peerCollection, + txValidator, + }, + ); + + const results = await BatchTxRequester.collectAllTxs(requester.run()); + expect(results).toHaveLength(txCount); + + // Verify peer0 was first promoted to smart, then demoted on NOT_FOUND + expect(peerCollection.smartPeersMarked).toContain(peers[0].toString()); + expect(peerCollection.peersMarkedDumb).toContain(peers[0].toString()); + + // NOT_FOUND is a legitimate state (pruned proposal), so peer should NOT be penalized + const peer0Penalties = peerCollection.peersPenalised.filter(e => e.peerId === peers[0].toString()); + expect(peer0Penalties).toHaveLength(0); + }); + + it('should demote a smart peer when it responds with invalid block data', async () => { + const txCount = 2 * TX_BATCH_SIZE; + const deadline = 5_000; + const missing = Array.from({ length: txCount }, () => TxHash.random()); + + blockProposal = await makeBlockProposal({ + signer: Secp256k1Signer.random(), + blockHeader: makeBlockHeader(1, { blockNumber: BlockNumber(1) }), + archiveRoot: Fr.random(), + txHashes: missing, + }); + + const peers = await Promise.all([createSecp256k1PeerId(), createSecp256k1PeerId()]); + connectionSampler.getPeerListSortedByConnectionCountAsc.mockReturnValue(peers); + + const peerCollection = new TestPeerCollection( + new PeerCollection(connectionSampler, undefined, new DateProvider()), + ); + + const allIndices = Array.from({ length: txCount }, (_, i) => i); + + let peer0RequestCount = 0; + reqResp.sendRequestToPeer.mockImplementation(async (peerId: any, _sub: any, data: any) => { + const peerStr = peerId.toString(); + + if (peerStr === peers[0].toString()) { + peer0RequestCount++; + + if (peer0RequestCount === 1) { + // First dumb request: valid response claiming all txs → promoted to smart + const request = BlockTxsRequest.fromBuffer(data); + const requestedIndices = request.txIndices.getTrueIndices(); + const availableTxs = requestedIndices.map(idx => makeTx(blockProposal.txHashes[idx])); + + return { + status: ReqRespStatus.SUCCESS, + data: new BlockTxsResponse( + blockProposal.archive, + new TxArray(...availableTxs), + BitVector.init(txCount, allIndices), + ).toBuffer(), + }; + } + + // Subsequent smart requests: invalid block response (pruned proposal fallback) + return { + status: ReqRespStatus.SUCCESS, + data: new BlockTxsResponse(Fr.zero(), new TxArray(), BitVector.init(txCount, [])).toBuffer(), + }; + } + + // peer1 always succeeds with a delay + await sleep(50); + const request = BlockTxsRequest.fromBuffer(data); + const requestedIndices = request.txIndices.getTrueIndices(); + const availableTxs = requestedIndices.map(idx => makeTx(blockProposal.txHashes[idx])); + + return { + status: ReqRespStatus.SUCCESS, + data: new BlockTxsResponse( + blockProposal.archive, + new TxArray(...availableTxs), + BitVector.init(txCount, allIndices), + ).toBuffer(), + }; + }); + + const requester = new BatchTxRequester( + MissingTxsTracker.fromArray(missing), + blockProposal, + undefined, + deadline, + mockP2PService, + logger, + new DateProvider(), + { + smartParallelWorkerCount: 1, + dumbParallelWorkerCount: 1, + peerCollection, + txValidator, + }, + ); + + const results = await BatchTxRequester.collectAllTxs(requester.run()); + expect(results).toHaveLength(txCount); + + // Verify peer0 was first promoted to smart, then demoted on invalid block response + expect(peerCollection.smartPeersMarked).toContain(peers[0].toString()); + expect(peerCollection.peersMarkedDumb).toContain(peers[0].toString()); + }); + }); }); describe('PeerCollection - Dynamic peer list', () => { @@ -2129,6 +2315,7 @@ export class TestSemaphore implements ISemaphore { export class TestPeerCollection implements IPeerCollection { public smartPeersMarked: string[] = []; + public peersMarkedDumb: string[] = []; public peersPenalised: Array<{ peerId: string; severity: PeerErrorSeverity }> = []; public peersMarkedInFlight: string[] = []; public peersUnmarkedBad: string[] = []; @@ -2142,6 +2329,11 @@ export class TestPeerCollection implements IPeerCollection { return this.inner.markPeerSmart(peerId); } + markPeerDumb(peerId: any): void { + this.peersMarkedDumb.push(peerId.toString()); + return this.inner.markPeerDumb(peerId); + } + nextSmartPeerToQuery(): PeerId | undefined { return this.inner.nextSmartPeerToQuery(); } diff --git a/yarn-project/p2p/src/services/reqresp/batch-tx-requester/batch_tx_requester.ts b/yarn-project/p2p/src/services/reqresp/batch-tx-requester/batch_tx_requester.ts index d468b06ef7c3..3ee0f16544b6 100644 --- a/yarn-project/p2p/src/services/reqresp/batch-tx-requester/batch_tx_requester.ts +++ b/yarn-project/p2p/src/services/reqresp/batch-tx-requester/batch_tx_requester.ts @@ -463,9 +463,18 @@ export class BatchTxRequester { * this implies we will query these peers couple of more times and give them a chance to "redeem" themselves before completely ignoring them */ private handleFailResponseFromPeer(peerId: PeerId, responseStatus: ReqRespStatus) { - //TODO: Should we ban these peers? if (responseStatus === ReqRespStatus.FAILURE || responseStatus === ReqRespStatus.UNKNOWN) { this.peers.penalisePeer(peerId, PeerErrorSeverity.HighToleranceError); + this.peers.markPeerDumb(peerId); + this.txsMetadata.clearPeerData(peerId); + return; + } + + // NOT_FOUND means the peer pruned its block proposal — it can no longer serve + // index-based requests, but this is a legitimate state so we don't penalize. + if (responseStatus === ReqRespStatus.NOT_FOUND) { + this.peers.markPeerDumb(peerId); + this.txsMetadata.clearPeerData(peerId); return; } @@ -555,10 +564,11 @@ export class BatchTxRequester { return; } - // If block response is invalid we still want to query this peer in the future - // Because they sent successful response, so they might become smart peer in the future - // Or send us needed txs + // If block response is invalid, demote the peer back to dumb if it was smart. + // This happens when the peer pruned its proposal and can no longer serve index-based requests. if (!this.isBlockResponseValid(response)) { + this.peers.markPeerDumb(peerId); + this.txsMetadata.clearPeerData(peerId); return; } diff --git a/yarn-project/p2p/src/services/reqresp/batch-tx-requester/interface.ts b/yarn-project/p2p/src/services/reqresp/batch-tx-requester/interface.ts index f3e895b2d7a9..fad59924637a 100644 --- a/yarn-project/p2p/src/services/reqresp/batch-tx-requester/interface.ts +++ b/yarn-project/p2p/src/services/reqresp/batch-tx-requester/interface.ts @@ -23,6 +23,8 @@ export interface ITxMetadataCollection { alreadyFetched(txHash: TxHash): boolean; // Returns true if tx was marked as fetched, false if it was already marked as fetched markPeerHas(peerId: PeerId, txHashes: TxHash[]): void; + /** Remove all tx metadata associations for a peer (e.g. on demotion from smart to dumb). */ + clearPeerData(peerId: PeerId): void; } /** diff --git a/yarn-project/p2p/src/services/reqresp/batch-tx-requester/missing_txs.ts b/yarn-project/p2p/src/services/reqresp/batch-tx-requester/missing_txs.ts index 56705595faf9..06a09effe8be 100644 --- a/yarn-project/p2p/src/services/reqresp/batch-tx-requester/missing_txs.ts +++ b/yarn-project/p2p/src/services/reqresp/batch-tx-requester/missing_txs.ts @@ -158,4 +158,11 @@ export class MissingTxMetadataCollection implements ITxMetadataCollection { } }); } + + public clearPeerData(peerId: PeerId) { + const peerIdStr = peerId.toString(); + for (const txMeta of this.txMetadata.values()) { + txMeta.peers.delete(peerIdStr); + } + } } diff --git a/yarn-project/p2p/src/services/reqresp/batch-tx-requester/peer_collection.ts b/yarn-project/p2p/src/services/reqresp/batch-tx-requester/peer_collection.ts index 4c7baec5316d..e04da79fe619 100644 --- a/yarn-project/p2p/src/services/reqresp/batch-tx-requester/peer_collection.ts +++ b/yarn-project/p2p/src/services/reqresp/batch-tx-requester/peer_collection.ts @@ -12,6 +12,7 @@ export const RATE_LIMIT_EXCEEDED_PEER_CACHE_TTL = 1000; // 1s export interface IPeerCollection { markPeerSmart(peerId: PeerId): void; + markPeerDumb(peerId: PeerId): void; /** Sample next peer in round-robin fashion. No smart peers if returns undefined */ nextSmartPeerToQuery(): PeerId | undefined; @@ -57,6 +58,10 @@ export class PeerCollection implements IPeerCollection { this.smartPeers.add(peerId.toString()); } + public markPeerDumb(peerId: PeerId): void { + this.smartPeers.delete(peerId.toString()); + } + // We keep track of all peers that are queried for peer sampling algorithm private queriedSmartPeers: Set = new Set(); private queriedDumbPeers: Set = new Set(); From ba4b9daf265694f59840e6980ccc5c1e31513ea3 Mon Sep 17 00:00:00 2001 From: Michal Rzeszutko Date: Thu, 12 Mar 2026 10:21:31 +0000 Subject: [PATCH 2/3] penalizing the peer on archive root mismatch --- .../batch_tx_requester.test.ts | 101 +++++++++++++++++- .../batch-tx-requester/batch_tx_requester.ts | 32 ++++-- 2 files changed, 124 insertions(+), 9 deletions(-) diff --git a/yarn-project/p2p/src/services/reqresp/batch-tx-requester/batch_tx_requester.test.ts b/yarn-project/p2p/src/services/reqresp/batch-tx-requester/batch_tx_requester.test.ts index 047c211f55ce..f68b6dcddc40 100644 --- a/yarn-project/p2p/src/services/reqresp/batch-tx-requester/batch_tx_requester.test.ts +++ b/yarn-project/p2p/src/services/reqresp/batch-tx-requester/batch_tx_requester.test.ts @@ -1971,9 +1971,108 @@ describe('BatchTxRequester', () => { const results = await BatchTxRequester.collectAllTxs(requester.run()); expect(results).toHaveLength(txCount); - // Verify peer0 was first promoted to smart, then demoted on invalid block response + // Verify peer0 was first promoted to smart, then demoted on invalid block response (Fr.zero) expect(peerCollection.smartPeersMarked).toContain(peers[0].toString()); expect(peerCollection.peersMarkedDumb).toContain(peers[0].toString()); + + // Fr.zero is a legitimate pruned-proposal response — peer should NOT be penalised + const peer0Penalties = peerCollection.peersPenalised.filter(e => e.peerId === peers[0].toString()); + expect(peer0Penalties).toHaveLength(0); + }); + + it('should penalise a smart peer that responds with a non-zero archive root mismatch', async () => { + const txCount = 2 * TX_BATCH_SIZE; + const deadline = 5_000; + const missing = Array.from({ length: txCount }, () => TxHash.random()); + + blockProposal = await makeBlockProposal({ + signer: Secp256k1Signer.random(), + blockHeader: makeBlockHeader(1, { blockNumber: BlockNumber(1) }), + archiveRoot: Fr.random(), + txHashes: missing, + }); + + const peers = await Promise.all([createSecp256k1PeerId(), createSecp256k1PeerId()]); + connectionSampler.getPeerListSortedByConnectionCountAsc.mockReturnValue(peers); + + const peerCollection = new TestPeerCollection( + new PeerCollection(connectionSampler, undefined, new DateProvider()), + ); + + const allIndices = Array.from({ length: txCount }, (_, i) => i); + + let peer0RequestCount = 0; + reqResp.sendRequestToPeer.mockImplementation(async (peerId: any, _sub: any, data: any) => { + const peerStr = peerId.toString(); + + if (peerStr === peers[0].toString()) { + peer0RequestCount++; + + if (peer0RequestCount === 1) { + // First dumb request: valid response claiming all txs → promoted to smart + const request = BlockTxsRequest.fromBuffer(data); + const requestedIndices = request.txIndices.getTrueIndices(); + const availableTxs = requestedIndices.map(idx => makeTx(blockProposal.txHashes[idx])); + + return { + status: ReqRespStatus.SUCCESS, + data: new BlockTxsResponse( + blockProposal.archive, + new TxArray(...availableTxs), + BitVector.init(txCount, allIndices), + ).toBuffer(), + }; + } + + // Subsequent smart requests: non-zero archive root mismatch (malicious response) + return { + status: ReqRespStatus.SUCCESS, + data: new BlockTxsResponse(Fr.random(), new TxArray(), BitVector.init(txCount, [])).toBuffer(), + }; + } + + // peer1 always succeeds with a delay + await sleep(50); + const request = BlockTxsRequest.fromBuffer(data); + const requestedIndices = request.txIndices.getTrueIndices(); + const availableTxs = requestedIndices.map(idx => makeTx(blockProposal.txHashes[idx])); + + return { + status: ReqRespStatus.SUCCESS, + data: new BlockTxsResponse( + blockProposal.archive, + new TxArray(...availableTxs), + BitVector.init(txCount, allIndices), + ).toBuffer(), + }; + }); + + const requester = new BatchTxRequester( + MissingTxsTracker.fromArray(missing), + blockProposal, + undefined, + deadline, + mockP2PService, + logger, + new DateProvider(), + { + smartParallelWorkerCount: 1, + dumbParallelWorkerCount: 1, + peerCollection, + txValidator, + }, + ); + + const results = await BatchTxRequester.collectAllTxs(requester.run()); + expect(results).toHaveLength(txCount); + + // Verify peer0 was promoted then demoted + expect(peerCollection.smartPeersMarked).toContain(peers[0].toString()); + expect(peerCollection.peersMarkedDumb).toContain(peers[0].toString()); + + // Non-zero archive root mismatch is malicious — peer must be penalised + const peer0Penalties = peerCollection.peersPenalised.filter(e => e.peerId === peers[0].toString()); + expect(peer0Penalties.length).toBeGreaterThan(0); }); }); }); diff --git a/yarn-project/p2p/src/services/reqresp/batch-tx-requester/batch_tx_requester.ts b/yarn-project/p2p/src/services/reqresp/batch-tx-requester/batch_tx_requester.ts index 3ee0f16544b6..67700a865d9b 100644 --- a/yarn-project/p2p/src/services/reqresp/batch-tx-requester/batch_tx_requester.ts +++ b/yarn-project/p2p/src/services/reqresp/batch-tx-requester/batch_tx_requester.ts @@ -564,11 +564,7 @@ export class BatchTxRequester { return; } - // If block response is invalid, demote the peer back to dumb if it was smart. - // This happens when the peer pruned its proposal and can no longer serve index-based requests. - if (!this.isBlockResponseValid(response)) { - this.peers.markPeerDumb(peerId); - this.txsMetadata.clearPeerData(peerId); + if (this.handleArchiveRootFromResponse(peerId, response)) { return; } @@ -586,13 +582,33 @@ export class BatchTxRequester { this.smartRequesterSemaphore.release(); } - private isBlockResponseValid(response: BlockTxsResponse): boolean { + /** + * Validates the archive root in the response and handles any mismatch. + * Returns true if the caller should stop processing (i.e., there was a mismatch). + * + * - Archives match: returns false (continue processing). + * - Response archive is Fr.ZERO (peer pruned proposal, legitimate): marks peer dumb, returns true. + * - Non-zero archive mismatch (malicious response): penalises + marks dumb, returns true. + */ + private handleArchiveRootFromResponse(peerId: PeerId, response: BlockTxsResponse): boolean { const archiveRootsMatch = this.blockTxsSource.archive.toString() === response.archiveRoot.toString(); - const peerHasSomeTxsFromProposal = !response.txIndices.isEmpty(); - return archiveRootsMatch && peerHasSomeTxsFromProposal; + if (archiveRootsMatch) { + return false; + } + + if (!response.archiveRoot.isZero()) { + this.peers.penalisePeer(peerId, PeerErrorSeverity.LowToleranceError); + } + + this.peers.markPeerDumb(peerId); + this.txsMetadata.clearPeerData(peerId); + return true; } private peerHasSomeTxsWeAreMissing(_peerId: PeerId, response: BlockTxsResponse): boolean { + if (response.txIndices.isEmpty()) { + return false; + } const txsPeerHas = new Set(this.extractHashesPeerHasFromResponse(response).map(h => h.toString())); return this.txsMetadata.getMissingTxHashes().intersection(txsPeerHas).size > 0; } From 3f3c218f62275bdf0855248acb0224f7a31bfcdf Mon Sep 17 00:00:00 2001 From: Michal Rzeszutko Date: Fri, 13 Mar 2026 11:03:00 +0000 Subject: [PATCH 3/3] refactor after code review --- .../batch-tx-requester/batch_tx_requester.ts | 20 +++++++------------ 1 file changed, 7 insertions(+), 13 deletions(-) diff --git a/yarn-project/p2p/src/services/reqresp/batch-tx-requester/batch_tx_requester.ts b/yarn-project/p2p/src/services/reqresp/batch-tx-requester/batch_tx_requester.ts index 67700a865d9b..1f3fd2fb0886 100644 --- a/yarn-project/p2p/src/services/reqresp/batch-tx-requester/batch_tx_requester.ts +++ b/yarn-project/p2p/src/services/reqresp/batch-tx-requester/batch_tx_requester.ts @@ -564,7 +564,9 @@ export class BatchTxRequester { return; } - if (this.handleArchiveRootFromResponse(peerId, response)) { + const hasArchiveRootMismatch = this.blockTxsSource.archive.toString() !== response.archiveRoot.toString(); + if (hasArchiveRootMismatch) { + this.handleArchiveRootMismatch(peerId, response); return; } @@ -583,26 +585,18 @@ export class BatchTxRequester { } /** - * Validates the archive root in the response and handles any mismatch. - * Returns true if the caller should stop processing (i.e., there was a mismatch). + * Handles an archive root mismatch between local state and peer response. * - * - Archives match: returns false (continue processing). - * - Response archive is Fr.ZERO (peer pruned proposal, legitimate): marks peer dumb, returns true. - * - Non-zero archive mismatch (malicious response): penalises + marks dumb, returns true. + * - Response archive is Fr.ZERO (peer pruned proposal, legitimate): marks peer dumb. + * - Non-zero archive mismatch (malicious response): penalises + marks dumb. */ - private handleArchiveRootFromResponse(peerId: PeerId, response: BlockTxsResponse): boolean { - const archiveRootsMatch = this.blockTxsSource.archive.toString() === response.archiveRoot.toString(); - if (archiveRootsMatch) { - return false; - } - + private handleArchiveRootMismatch(peerId: PeerId, response: BlockTxsResponse): void { if (!response.archiveRoot.isZero()) { this.peers.penalisePeer(peerId, PeerErrorSeverity.LowToleranceError); } this.peers.markPeerDumb(peerId); this.txsMetadata.clearPeerData(peerId); - return true; } private peerHasSomeTxsWeAreMissing(_peerId: PeerId, response: BlockTxsResponse): boolean {