Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import { type MockProxy, mock } from 'jest-mock-extended';

import { type PeerScoring } from '../../peer-manager/peer_scoring.js';
import { ReqRespSubProtocol, type ReqRespSubProtocolRateLimits } from '../interface.js';
import { RequestResponseRateLimiter } from './rate_limiter.js';
import { RateLimitStatus, RequestResponseRateLimiter } from './rate_limiter.js';

class MockPeerId {
private id: string;
Expand Down Expand Up @@ -57,24 +57,24 @@ describe('rate limiter', () => {
const peerId = makePeer('peer1');
// Expect to allow a burst of 5, then not allow
for (let i = 0; i < 5; i++) {
expect(rateLimiter.allow(ReqRespSubProtocol.TX, peerId)).toBe(true);
expect(rateLimiter.allow(ReqRespSubProtocol.TX, peerId)).toBe(RateLimitStatus.Allowed);
}
expect(rateLimiter.allow(ReqRespSubProtocol.TX, peerId)).toBe(false);
expect(rateLimiter.allow(ReqRespSubProtocol.TX, peerId)).toBe(RateLimitStatus.DeniedPeer);

// Smooth requests
for (let i = 0; i < 5; i++) {
jest.advanceTimersByTime(200);
expect(rateLimiter.allow(ReqRespSubProtocol.TX, peerId)).toBe(true);
expect(rateLimiter.allow(ReqRespSubProtocol.TX, peerId)).toBe(RateLimitStatus.Allowed);
}
expect(rateLimiter.allow(ReqRespSubProtocol.TX, peerId)).toBe(false);
expect(rateLimiter.allow(ReqRespSubProtocol.TX, peerId)).toBe(RateLimitStatus.DeniedPeer);

// Reset after quota has passed
jest.advanceTimersByTime(1000);
// Second burst
for (let i = 0; i < 5; i++) {
expect(rateLimiter.allow(ReqRespSubProtocol.TX, peerId)).toBe(true);
expect(rateLimiter.allow(ReqRespSubProtocol.TX, peerId)).toBe(RateLimitStatus.Allowed);
}
expect(rateLimiter.allow(ReqRespSubProtocol.TX, peerId)).toBe(false);
expect(rateLimiter.allow(ReqRespSubProtocol.TX, peerId)).toBe(RateLimitStatus.DeniedPeer);

// Spy on the peer manager and check that penalizePeer is called
expect(peerScoring.penalizePeer).toHaveBeenCalledWith(peerId, PeerErrorSeverity.HighToleranceError);
Expand All @@ -84,34 +84,34 @@ describe('rate limiter', () => {
// Initial burst
const falingPeer = makePeer('nolettoinno');
for (let i = 0; i < 10; i++) {
expect(rateLimiter.allow(ReqRespSubProtocol.TX, makePeer(`peer${i}`))).toBe(true);
expect(rateLimiter.allow(ReqRespSubProtocol.TX, makePeer(`peer${i}`))).toBe(RateLimitStatus.Allowed);
}
expect(rateLimiter.allow(ReqRespSubProtocol.TX, falingPeer)).toBe(false);
expect(rateLimiter.allow(ReqRespSubProtocol.TX, falingPeer)).toBe(RateLimitStatus.DeniedGlobal);

// Smooth requests
for (let i = 0; i < 10; i++) {
jest.advanceTimersByTime(100);
expect(rateLimiter.allow(ReqRespSubProtocol.TX, makePeer(`peer${i}`))).toBe(true);
expect(rateLimiter.allow(ReqRespSubProtocol.TX, makePeer(`peer${i}`))).toBe(RateLimitStatus.Allowed);
}
expect(rateLimiter.allow(ReqRespSubProtocol.TX, falingPeer)).toBe(false);
expect(rateLimiter.allow(ReqRespSubProtocol.TX, falingPeer)).toBe(RateLimitStatus.DeniedGlobal);

// Reset after quota has passed
jest.advanceTimersByTime(1000);
// Second burst
for (let i = 0; i < 10; i++) {
expect(rateLimiter.allow(ReqRespSubProtocol.TX, makePeer(`peer${i}`))).toBe(true);
expect(rateLimiter.allow(ReqRespSubProtocol.TX, makePeer(`peer${i}`))).toBe(RateLimitStatus.Allowed);
}
expect(rateLimiter.allow(ReqRespSubProtocol.TX, falingPeer)).toBe(false);
expect(rateLimiter.allow(ReqRespSubProtocol.TX, falingPeer)).toBe(RateLimitStatus.DeniedGlobal);
});

it('Should reset after quota has passed', () => {
const peerId = makePeer('peer1');
for (let i = 0; i < 5; i++) {
expect(rateLimiter.allow(ReqRespSubProtocol.TX, peerId)).toBe(true);
expect(rateLimiter.allow(ReqRespSubProtocol.TX, peerId)).toBe(RateLimitStatus.Allowed);
}
expect(rateLimiter.allow(ReqRespSubProtocol.TX, peerId)).toBe(false);
expect(rateLimiter.allow(ReqRespSubProtocol.TX, peerId)).toBe(RateLimitStatus.DeniedPeer);
jest.advanceTimersByTime(1000);
expect(rateLimiter.allow(ReqRespSubProtocol.TX, peerId)).toBe(true);
expect(rateLimiter.allow(ReqRespSubProtocol.TX, peerId)).toBe(RateLimitStatus.Allowed);
});

it('Should handle multiple protocols separately', () => {
Expand Down Expand Up @@ -143,22 +143,22 @@ describe('rate limiter', () => {

// Protocol 1
for (let i = 0; i < 5; i++) {
expect(multiProtocolRateLimiter.allow(ReqRespSubProtocol.TX, peerId)).toBe(true);
expect(multiProtocolRateLimiter.allow(ReqRespSubProtocol.TX, peerId)).toBe(RateLimitStatus.Allowed);
}
expect(multiProtocolRateLimiter.allow(ReqRespSubProtocol.TX, peerId)).toBe(false);
expect(multiProtocolRateLimiter.allow(ReqRespSubProtocol.TX, peerId)).toBe(RateLimitStatus.DeniedPeer);

// Protocol 2
for (let i = 0; i < 2; i++) {
expect(multiProtocolRateLimiter.allow(ReqRespSubProtocol.PING, peerId)).toBe(true);
expect(multiProtocolRateLimiter.allow(ReqRespSubProtocol.PING, peerId)).toBe(RateLimitStatus.Allowed);
}
expect(multiProtocolRateLimiter.allow(ReqRespSubProtocol.PING, peerId)).toBe(false);
expect(multiProtocolRateLimiter.allow(ReqRespSubProtocol.PING, peerId)).toBe(RateLimitStatus.DeniedPeer);

multiProtocolRateLimiter.stop();
});

it('Should allow requests if no rate limiter is configured', () => {
const rateLimiter = new RequestResponseRateLimiter(peerScoring, {} as ReqRespSubProtocolRateLimits);
expect(rateLimiter.allow(ReqRespSubProtocol.TX, makePeer('peer1'))).toBe(true);
expect(rateLimiter.allow(ReqRespSubProtocol.TX, makePeer('peer1'))).toBe(RateLimitStatus.Allowed);
});

it('Should smooth out spam', () => {
Expand Down
32 changes: 18 additions & 14 deletions yarn-project/p2p/src/services/reqresp/rate-limiter/rate_limiter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -72,10 +72,21 @@ interface PeerRateLimiter {
lastAccess: number;
}

enum RateLimitStatus {
Allowed,
export enum RateLimitStatus {
DeniedGlobal,
DeniedPeer,
Allowed, // Note: allowed last to prevent enum evaluating to 0 for success
}

export function prettyPrintRateLimitStatus(status: RateLimitStatus) {
switch (status) {
case RateLimitStatus.DeniedGlobal:
return 'DeniedGlobal';
case RateLimitStatus.DeniedPeer:
return 'DeniedPeer';
case RateLimitStatus.Allowed:
return 'Allowed';
}
}

/**
Expand Down Expand Up @@ -191,24 +202,17 @@ export class RequestResponseRateLimiter {
}, CHECK_DISCONNECTED_PEERS_INTERVAL_MS);
}

allow(subProtocol: ReqRespSubProtocol, peerId: PeerId): boolean {
allow(subProtocol: ReqRespSubProtocol, peerId: PeerId): RateLimitStatus {
const limiter = this.subProtocolRateLimiters.get(subProtocol);
if (!limiter) {
return true;
return RateLimitStatus.Allowed;
}
const rateLimitStatus = limiter.allow(peerId);

switch (rateLimitStatus) {
case RateLimitStatus.DeniedPeer:
// Hitting a peer specific limit, we should lightly penalise the peer
this.peerScoring.penalizePeer(peerId, PeerErrorSeverity.HighToleranceError);
return false;
case RateLimitStatus.DeniedGlobal:
// Hitting a global limit, we should not penalise the peer
return false;
default:
return true;
if (rateLimitStatus === RateLimitStatus.DeniedGlobal) {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this conditional check that the RateLimitStatus is DeniedPeer instead of DeniedGlobal before penalizing?

Suggested change
if (rateLimitStatus === RateLimitStatus.DeniedGlobal) {
if (rateLimitStatus === RateLimitStatus.DeniedPeer) {

@Maddiaa0 Maddiaa0 Feb 19, 2025

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh god youre right, will add unit test too
edit: good news, existing unit test caught it, this was just a typo

this.peerScoring.penalizePeer(peerId, PeerErrorSeverity.HighToleranceError);
}
return rateLimitStatus;
}

cleanupInactivePeers() {
Expand Down
4 changes: 3 additions & 1 deletion yarn-project/p2p/src/services/reqresp/reqresp.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,9 @@ describe('ReqResp', () => {
expect(rateLimitResponse).toBeDefined();

// Make sure the error message is logged
const errorMessage = `Rate limit exceeded for ${ReqRespSubProtocol.PING} from ${nodes[0].p2p.peerId.toString()}`;
const errorMessage = `Rate limit exceeded DeniedPeer for ${
ReqRespSubProtocol.PING
} from ${nodes[0].p2p.peerId.toString()}`;
expect(loggerSpy).toHaveBeenCalledWith(expect.stringContaining(errorMessage));
});

Expand Down
15 changes: 12 additions & 3 deletions yarn-project/p2p/src/services/reqresp/reqresp.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,11 @@ import {
subProtocolMap,
} from './interface.js';
import { ReqRespMetrics } from './metrics.js';
import { RequestResponseRateLimiter } from './rate-limiter/rate_limiter.js';
import {
RateLimitStatus,
RequestResponseRateLimiter,
prettyPrintRateLimitStatus,
} from './rate-limiter/rate_limiter.js';
import { ReqRespStatus, ReqRespStatusError, parseStatusChunk, prettyPrintReqRespStatus } from './status.js';

/**
Expand Down Expand Up @@ -589,8 +593,13 @@ export class ReqResp {

try {
// Store a reference to from this for the async generator
if (!this.rateLimiter.allow(protocol, connection.remotePeer)) {
this.logger.warn(`Rate limit exceeded for ${protocol} from ${connection.remotePeer}`);
const rateLimitStatus = this.rateLimiter.allow(protocol, connection.remotePeer);
if (rateLimitStatus != RateLimitStatus.Allowed) {
this.logger.warn(
`Rate limit exceeded ${prettyPrintRateLimitStatus(rateLimitStatus)} for ${protocol} from ${
connection.remotePeer
}`,
);

throw new ReqRespStatusError(ReqRespStatus.RATE_LIMIT_EXCEEDED);
}
Expand Down