Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1790,6 +1790,291 @@ describe('BatchTxRequester', () => {
expect(resultTxHashes.has(missing[6].toString())).toBe(false); // Invalid from regular
});
});

describe('Smart peer demotion', () => {
it('should demote a smart peer back to dumb on NOT_FOUND without penalizing', async () => {
// peer0 claims to have ALL txs but we only request a batch at a time, so after the first
// dumb response there are still missing txs → peer0 gets promoted to smart.
// On the next (smart) request peer0 returns NOT_FOUND (pruned proposal) → demoted without penalty.
const txCount = 2 * TX_BATCH_SIZE;
const deadline = 5_000;
const missing = Array.from({ length: txCount }, () => TxHash.random());

blockProposal = await makeBlockProposal({
signer: Secp256k1Signer.random(),
blockHeader: makeBlockHeader(1, { blockNumber: BlockNumber(1) }),
archiveRoot: Fr.random(),
txHashes: missing,
});

const peers = await Promise.all([createSecp256k1PeerId(), createSecp256k1PeerId()]);
connectionSampler.getPeerListSortedByConnectionCountAsc.mockReturnValue(peers);

const peerCollection = new TestPeerCollection(
new PeerCollection(connectionSampler, undefined, new DateProvider()),
);

const allIndices = Array.from({ length: txCount }, (_, i) => i);

let peer0RequestCount = 0;
reqResp.sendRequestToPeer.mockImplementation(async (peerId: any, _sub: any, data: any) => {
const peerStr = peerId.toString();

if (peerStr === peers[0].toString()) {
peer0RequestCount++;
if (peer0RequestCount === 1) {
// First dumb request succeeds: return requested txs, claim to have ALL txs → promoted
const request = BlockTxsRequest.fromBuffer(data);
const requestedIndices = request.txIndices.getTrueIndices();
const availableTxs = requestedIndices.map(idx => makeTx(blockProposal.txHashes[idx]));

return {
status: ReqRespStatus.SUCCESS,
data: new BlockTxsResponse(
blockProposal.archive,
new TxArray(...availableTxs),
BitVector.init(txCount, allIndices),
).toBuffer(),
};
}
// Subsequent smart requests return NOT_FOUND (pruned proposal, no full hashes in request)
return { status: ReqRespStatus.NOT_FOUND, data: Buffer.alloc(0) };
}

// peer1 always succeeds with a delay so peer0 is queried first
await sleep(50);
const request = BlockTxsRequest.fromBuffer(data);
const requestedIndices = request.txIndices.getTrueIndices();
const availableTxs = requestedIndices.map(idx => makeTx(blockProposal.txHashes[idx]));

return {
status: ReqRespStatus.SUCCESS,
data: new BlockTxsResponse(
blockProposal.archive,
new TxArray(...availableTxs),
BitVector.init(txCount, allIndices),
).toBuffer(),
};
});

const requester = new BatchTxRequester(
MissingTxsTracker.fromArray(missing),
blockProposal,
undefined,
deadline,
mockP2PService,
logger,
new DateProvider(),
{
smartParallelWorkerCount: 1,
dumbParallelWorkerCount: 1,
peerCollection,
txValidator,
},
);

const results = await BatchTxRequester.collectAllTxs(requester.run());
expect(results).toHaveLength(txCount);

// Verify peer0 was first promoted to smart, then demoted on NOT_FOUND
expect(peerCollection.smartPeersMarked).toContain(peers[0].toString());
expect(peerCollection.peersMarkedDumb).toContain(peers[0].toString());

// NOT_FOUND is a legitimate state (pruned proposal), so peer should NOT be penalized
const peer0Penalties = peerCollection.peersPenalised.filter(e => e.peerId === peers[0].toString());
expect(peer0Penalties).toHaveLength(0);
});

it('should demote a smart peer when it responds with invalid block data', async () => {
const txCount = 2 * TX_BATCH_SIZE;
const deadline = 5_000;
const missing = Array.from({ length: txCount }, () => TxHash.random());

blockProposal = await makeBlockProposal({
signer: Secp256k1Signer.random(),
blockHeader: makeBlockHeader(1, { blockNumber: BlockNumber(1) }),
archiveRoot: Fr.random(),
txHashes: missing,
});

const peers = await Promise.all([createSecp256k1PeerId(), createSecp256k1PeerId()]);
connectionSampler.getPeerListSortedByConnectionCountAsc.mockReturnValue(peers);

const peerCollection = new TestPeerCollection(
new PeerCollection(connectionSampler, undefined, new DateProvider()),
);

const allIndices = Array.from({ length: txCount }, (_, i) => i);

let peer0RequestCount = 0;
reqResp.sendRequestToPeer.mockImplementation(async (peerId: any, _sub: any, data: any) => {
const peerStr = peerId.toString();

if (peerStr === peers[0].toString()) {
peer0RequestCount++;

if (peer0RequestCount === 1) {
// First dumb request: valid response claiming all txs → promoted to smart
const request = BlockTxsRequest.fromBuffer(data);
const requestedIndices = request.txIndices.getTrueIndices();
const availableTxs = requestedIndices.map(idx => makeTx(blockProposal.txHashes[idx]));

return {
status: ReqRespStatus.SUCCESS,
data: new BlockTxsResponse(
blockProposal.archive,
new TxArray(...availableTxs),
BitVector.init(txCount, allIndices),
).toBuffer(),
};
}

// Subsequent smart requests: invalid block response (pruned proposal fallback)
return {
status: ReqRespStatus.SUCCESS,
data: new BlockTxsResponse(Fr.zero(), new TxArray(), BitVector.init(txCount, [])).toBuffer(),
};
}

// peer1 always succeeds with a delay
await sleep(50);
const request = BlockTxsRequest.fromBuffer(data);
const requestedIndices = request.txIndices.getTrueIndices();
const availableTxs = requestedIndices.map(idx => makeTx(blockProposal.txHashes[idx]));

return {
status: ReqRespStatus.SUCCESS,
data: new BlockTxsResponse(
blockProposal.archive,
new TxArray(...availableTxs),
BitVector.init(txCount, allIndices),
).toBuffer(),
};
});

const requester = new BatchTxRequester(
MissingTxsTracker.fromArray(missing),
blockProposal,
undefined,
deadline,
mockP2PService,
logger,
new DateProvider(),
{
smartParallelWorkerCount: 1,
dumbParallelWorkerCount: 1,
peerCollection,
txValidator,
},
);

const results = await BatchTxRequester.collectAllTxs(requester.run());
expect(results).toHaveLength(txCount);

// Verify peer0 was first promoted to smart, then demoted on invalid block response (Fr.zero)
expect(peerCollection.smartPeersMarked).toContain(peers[0].toString());
expect(peerCollection.peersMarkedDumb).toContain(peers[0].toString());

// Fr.zero is a legitimate pruned-proposal response — peer should NOT be penalised
const peer0Penalties = peerCollection.peersPenalised.filter(e => e.peerId === peers[0].toString());
expect(peer0Penalties).toHaveLength(0);
});

it('should penalise a smart peer that responds with a non-zero archive root mismatch', async () => {
const txCount = 2 * TX_BATCH_SIZE;
const deadline = 5_000;
const missing = Array.from({ length: txCount }, () => TxHash.random());

blockProposal = await makeBlockProposal({
signer: Secp256k1Signer.random(),
blockHeader: makeBlockHeader(1, { blockNumber: BlockNumber(1) }),
archiveRoot: Fr.random(),
txHashes: missing,
});

const peers = await Promise.all([createSecp256k1PeerId(), createSecp256k1PeerId()]);
connectionSampler.getPeerListSortedByConnectionCountAsc.mockReturnValue(peers);

const peerCollection = new TestPeerCollection(
new PeerCollection(connectionSampler, undefined, new DateProvider()),
);

const allIndices = Array.from({ length: txCount }, (_, i) => i);

let peer0RequestCount = 0;
reqResp.sendRequestToPeer.mockImplementation(async (peerId: any, _sub: any, data: any) => {
const peerStr = peerId.toString();

if (peerStr === peers[0].toString()) {
peer0RequestCount++;

if (peer0RequestCount === 1) {
// First dumb request: valid response claiming all txs → promoted to smart
const request = BlockTxsRequest.fromBuffer(data);
const requestedIndices = request.txIndices.getTrueIndices();
const availableTxs = requestedIndices.map(idx => makeTx(blockProposal.txHashes[idx]));

return {
status: ReqRespStatus.SUCCESS,
data: new BlockTxsResponse(
blockProposal.archive,
new TxArray(...availableTxs),
BitVector.init(txCount, allIndices),
).toBuffer(),
};
}

// Subsequent smart requests: non-zero archive root mismatch (malicious response)
return {
status: ReqRespStatus.SUCCESS,
data: new BlockTxsResponse(Fr.random(), new TxArray(), BitVector.init(txCount, [])).toBuffer(),
};
}

// peer1 always succeeds with a delay
await sleep(50);
const request = BlockTxsRequest.fromBuffer(data);
const requestedIndices = request.txIndices.getTrueIndices();
const availableTxs = requestedIndices.map(idx => makeTx(blockProposal.txHashes[idx]));

return {
status: ReqRespStatus.SUCCESS,
data: new BlockTxsResponse(
blockProposal.archive,
new TxArray(...availableTxs),
BitVector.init(txCount, allIndices),
).toBuffer(),
};
});

const requester = new BatchTxRequester(
MissingTxsTracker.fromArray(missing),
blockProposal,
undefined,
deadline,
mockP2PService,
logger,
new DateProvider(),
{
smartParallelWorkerCount: 1,
dumbParallelWorkerCount: 1,
peerCollection,
txValidator,
},
);

const results = await BatchTxRequester.collectAllTxs(requester.run());
expect(results).toHaveLength(txCount);

// Verify peer0 was promoted then demoted
expect(peerCollection.smartPeersMarked).toContain(peers[0].toString());
expect(peerCollection.peersMarkedDumb).toContain(peers[0].toString());

// Non-zero archive root mismatch is malicious — peer must be penalised
const peer0Penalties = peerCollection.peersPenalised.filter(e => e.peerId === peers[0].toString());
expect(peer0Penalties.length).toBeGreaterThan(0);
});
});
});

describe('PeerCollection - Dynamic peer list', () => {
Expand Down Expand Up @@ -2129,6 +2414,7 @@ export class TestSemaphore implements ISemaphore {

export class TestPeerCollection implements IPeerCollection {
public smartPeersMarked: string[] = [];
public peersMarkedDumb: string[] = [];
public peersPenalised: Array<{ peerId: string; severity: PeerErrorSeverity }> = [];
public peersMarkedInFlight: string[] = [];
public peersUnmarkedBad: string[] = [];
Expand All @@ -2142,6 +2428,11 @@ export class TestPeerCollection implements IPeerCollection {
return this.inner.markPeerSmart(peerId);
}

markPeerDumb(peerId: any): void {
this.peersMarkedDumb.push(peerId.toString());
return this.inner.markPeerDumb(peerId);
}

nextSmartPeerToQuery(): PeerId | undefined {
return this.inner.nextSmartPeerToQuery();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -463,9 +463,18 @@ export class BatchTxRequester {
* this implies we will query these peers couple of more times and give them a chance to "redeem" themselves before completely ignoring them
*/
private handleFailResponseFromPeer(peerId: PeerId, responseStatus: ReqRespStatus) {
//TODO: Should we ban these peers?
if (responseStatus === ReqRespStatus.FAILURE || responseStatus === ReqRespStatus.UNKNOWN) {
this.peers.penalisePeer(peerId, PeerErrorSeverity.HighToleranceError);
this.peers.markPeerDumb(peerId);
this.txsMetadata.clearPeerData(peerId);
return;
}

// NOT_FOUND means the peer pruned its block proposal — it can no longer serve
// index-based requests, but this is a legitimate state so we don't penalize.
if (responseStatus === ReqRespStatus.NOT_FOUND) {
this.peers.markPeerDumb(peerId);
this.txsMetadata.clearPeerData(peerId);
return;
}

Expand Down Expand Up @@ -555,10 +564,9 @@ export class BatchTxRequester {
return;
}

// If block response is invalid we still want to query this peer in the future
// Because they sent successful response, so they might become smart peer in the future
// Or send us needed txs
if (!this.isBlockResponseValid(response)) {
const hasArchiveRootMismatch = this.blockTxsSource.archive.toString() !== response.archiveRoot.toString();
if (hasArchiveRootMismatch) {
this.handleArchiveRootMismatch(peerId, response);
return;
}

Expand All @@ -576,13 +584,25 @@ export class BatchTxRequester {
this.smartRequesterSemaphore.release();
}

private isBlockResponseValid(response: BlockTxsResponse): boolean {
const archiveRootsMatch = this.blockTxsSource.archive.toString() === response.archiveRoot.toString();
const peerHasSomeTxsFromProposal = !response.txIndices.isEmpty();
return archiveRootsMatch && peerHasSomeTxsFromProposal;
/**
* Handles an archive root mismatch between local state and peer response.
*
* - Response archive is Fr.ZERO (peer pruned proposal, legitimate): marks peer dumb.
* - Non-zero archive mismatch (malicious response): penalises + marks dumb.
*/
private handleArchiveRootMismatch(peerId: PeerId, response: BlockTxsResponse): void {
if (!response.archiveRoot.isZero()) {
this.peers.penalisePeer(peerId, PeerErrorSeverity.LowToleranceError);
}

this.peers.markPeerDumb(peerId);
this.txsMetadata.clearPeerData(peerId);
}

private peerHasSomeTxsWeAreMissing(_peerId: PeerId, response: BlockTxsResponse): boolean {
if (response.txIndices.isEmpty()) {
return false;
}
const txsPeerHas = new Set(this.extractHashesPeerHasFromResponse(response).map(h => h.toString()));
return this.txsMetadata.getMissingTxHashes().intersection(txsPeerHas).size > 0;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ export interface ITxMetadataCollection {
alreadyFetched(txHash: TxHash): boolean;
// Returns true if tx was marked as fetched, false if it was already marked as fetched
markPeerHas(peerId: PeerId, txHashes: TxHash[]): void;
/** Remove all tx metadata associations for a peer (e.g. on demotion from smart to dumb). */
clearPeerData(peerId: PeerId): void;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,4 +158,11 @@ export class MissingTxMetadataCollection implements ITxMetadataCollection {
}
});
}

public clearPeerData(peerId: PeerId) {
const peerIdStr = peerId.toString();
for (const txMeta of this.txMetadata.values()) {
txMeta.peers.delete(peerIdStr);
}
}
}
Loading
Loading