Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion ci3/bootstrap_ec2
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ if [ "$REF_NAME" == "master" ]; then
# Allow parallelism on master by having the instance name be the commit.
instance_name="$current_commit"_$arch
else
instance_name=$(echo -n "$REF_NAME" | tr -c 'a-zA-Z0-9-' '_')_$arch
instance_name=$(echo -n "$REF_NAME" | head -c 50 | tr -c 'a-zA-Z0-9-' '_')_$arch
fi

[ -n "${INSTANCE_POSTFIX:-}" ] && instance_name+="_$INSTANCE_POSTFIX"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import { type MockProxy, mock } from 'jest-mock-extended';

import { type PeerScoring } from '../../peer-manager/peer_scoring.js';
import { ReqRespSubProtocol, type ReqRespSubProtocolRateLimits } from '../interface.js';
import { RequestResponseRateLimiter } from './rate_limiter.js';
import { RateLimitStatus, RequestResponseRateLimiter } from './rate_limiter.js';

class MockPeerId {
private id: string;
Expand Down Expand Up @@ -57,24 +57,24 @@ describe('rate limiter', () => {
const peerId = makePeer('peer1');
// Expect to allow a burst of 5, then not allow
for (let i = 0; i < 5; i++) {
expect(rateLimiter.allow(ReqRespSubProtocol.TX, peerId)).toBe(true);
expect(rateLimiter.allow(ReqRespSubProtocol.TX, peerId)).toBe(RateLimitStatus.Allowed);
}
expect(rateLimiter.allow(ReqRespSubProtocol.TX, peerId)).toBe(false);
expect(rateLimiter.allow(ReqRespSubProtocol.TX, peerId)).toBe(RateLimitStatus.DeniedPeer);

// Smooth requests
for (let i = 0; i < 5; i++) {
jest.advanceTimersByTime(200);
expect(rateLimiter.allow(ReqRespSubProtocol.TX, peerId)).toBe(true);
expect(rateLimiter.allow(ReqRespSubProtocol.TX, peerId)).toBe(RateLimitStatus.Allowed);
}
expect(rateLimiter.allow(ReqRespSubProtocol.TX, peerId)).toBe(false);
expect(rateLimiter.allow(ReqRespSubProtocol.TX, peerId)).toBe(RateLimitStatus.DeniedPeer);

// Reset after quota has passed
jest.advanceTimersByTime(1000);
// Second burst
for (let i = 0; i < 5; i++) {
expect(rateLimiter.allow(ReqRespSubProtocol.TX, peerId)).toBe(true);
expect(rateLimiter.allow(ReqRespSubProtocol.TX, peerId)).toBe(RateLimitStatus.Allowed);
}
expect(rateLimiter.allow(ReqRespSubProtocol.TX, peerId)).toBe(false);
expect(rateLimiter.allow(ReqRespSubProtocol.TX, peerId)).toBe(RateLimitStatus.DeniedPeer);

// Spy on the peer manager and check that penalizePeer is called
expect(peerScoring.penalizePeer).toHaveBeenCalledWith(peerId, PeerErrorSeverity.HighToleranceError);
Expand All @@ -84,34 +84,34 @@ describe('rate limiter', () => {
// Initial burst
const falingPeer = makePeer('nolettoinno');
for (let i = 0; i < 10; i++) {
expect(rateLimiter.allow(ReqRespSubProtocol.TX, makePeer(`peer${i}`))).toBe(true);
expect(rateLimiter.allow(ReqRespSubProtocol.TX, makePeer(`peer${i}`))).toBe(RateLimitStatus.Allowed);
}
expect(rateLimiter.allow(ReqRespSubProtocol.TX, falingPeer)).toBe(false);
expect(rateLimiter.allow(ReqRespSubProtocol.TX, falingPeer)).toBe(RateLimitStatus.DeniedGlobal);

// Smooth requests
for (let i = 0; i < 10; i++) {
jest.advanceTimersByTime(100);
expect(rateLimiter.allow(ReqRespSubProtocol.TX, makePeer(`peer${i}`))).toBe(true);
expect(rateLimiter.allow(ReqRespSubProtocol.TX, makePeer(`peer${i}`))).toBe(RateLimitStatus.Allowed);
}
expect(rateLimiter.allow(ReqRespSubProtocol.TX, falingPeer)).toBe(false);
expect(rateLimiter.allow(ReqRespSubProtocol.TX, falingPeer)).toBe(RateLimitStatus.DeniedGlobal);

// Reset after quota has passed
jest.advanceTimersByTime(1000);
// Second burst
for (let i = 0; i < 10; i++) {
expect(rateLimiter.allow(ReqRespSubProtocol.TX, makePeer(`peer${i}`))).toBe(true);
expect(rateLimiter.allow(ReqRespSubProtocol.TX, makePeer(`peer${i}`))).toBe(RateLimitStatus.Allowed);
}
expect(rateLimiter.allow(ReqRespSubProtocol.TX, falingPeer)).toBe(false);
expect(rateLimiter.allow(ReqRespSubProtocol.TX, falingPeer)).toBe(RateLimitStatus.DeniedGlobal);
});

it('Should reset after quota has passed', () => {
const peerId = makePeer('peer1');
for (let i = 0; i < 5; i++) {
expect(rateLimiter.allow(ReqRespSubProtocol.TX, peerId)).toBe(true);
expect(rateLimiter.allow(ReqRespSubProtocol.TX, peerId)).toBe(RateLimitStatus.Allowed);
}
expect(rateLimiter.allow(ReqRespSubProtocol.TX, peerId)).toBe(false);
expect(rateLimiter.allow(ReqRespSubProtocol.TX, peerId)).toBe(RateLimitStatus.DeniedPeer);
jest.advanceTimersByTime(1000);
expect(rateLimiter.allow(ReqRespSubProtocol.TX, peerId)).toBe(true);
expect(rateLimiter.allow(ReqRespSubProtocol.TX, peerId)).toBe(RateLimitStatus.Allowed);
});

it('Should handle multiple protocols separately', () => {
Expand Down Expand Up @@ -143,22 +143,22 @@ describe('rate limiter', () => {

// Protocol 1
for (let i = 0; i < 5; i++) {
expect(multiProtocolRateLimiter.allow(ReqRespSubProtocol.TX, peerId)).toBe(true);
expect(multiProtocolRateLimiter.allow(ReqRespSubProtocol.TX, peerId)).toBe(RateLimitStatus.Allowed);
}
expect(multiProtocolRateLimiter.allow(ReqRespSubProtocol.TX, peerId)).toBe(false);
expect(multiProtocolRateLimiter.allow(ReqRespSubProtocol.TX, peerId)).toBe(RateLimitStatus.DeniedPeer);

// Protocol 2
for (let i = 0; i < 2; i++) {
expect(multiProtocolRateLimiter.allow(ReqRespSubProtocol.PING, peerId)).toBe(true);
expect(multiProtocolRateLimiter.allow(ReqRespSubProtocol.PING, peerId)).toBe(RateLimitStatus.Allowed);
}
expect(multiProtocolRateLimiter.allow(ReqRespSubProtocol.PING, peerId)).toBe(false);
expect(multiProtocolRateLimiter.allow(ReqRespSubProtocol.PING, peerId)).toBe(RateLimitStatus.DeniedPeer);

multiProtocolRateLimiter.stop();
});

it('Should allow requests if no rate limiter is configured', () => {
const rateLimiter = new RequestResponseRateLimiter(peerScoring, {} as ReqRespSubProtocolRateLimits);
expect(rateLimiter.allow(ReqRespSubProtocol.TX, makePeer('peer1'))).toBe(true);
expect(rateLimiter.allow(ReqRespSubProtocol.TX, makePeer('peer1'))).toBe(RateLimitStatus.Allowed);
});

it('Should smooth out spam', () => {
Expand Down
32 changes: 18 additions & 14 deletions yarn-project/p2p/src/services/reqresp/rate-limiter/rate_limiter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -72,10 +72,21 @@ interface PeerRateLimiter {
lastAccess: number;
}

enum RateLimitStatus {
Allowed,
export enum RateLimitStatus {
DeniedGlobal,
DeniedPeer,
Allowed, // Note: allowed last to prevent enum evaluating to 0 for success
}

export function prettyPrintRateLimitStatus(status: RateLimitStatus) {
switch (status) {
case RateLimitStatus.DeniedGlobal:
return 'DeniedGlobal';
case RateLimitStatus.DeniedPeer:
return 'DeniedPeer';
case RateLimitStatus.Allowed:
return 'Allowed';
}
}

/**
Expand Down Expand Up @@ -191,24 +202,17 @@ export class RequestResponseRateLimiter {
}, CHECK_DISCONNECTED_PEERS_INTERVAL_MS);
}

allow(subProtocol: ReqRespSubProtocol, peerId: PeerId): boolean {
allow(subProtocol: ReqRespSubProtocol, peerId: PeerId): RateLimitStatus {
const limiter = this.subProtocolRateLimiters.get(subProtocol);
if (!limiter) {
return true;
return RateLimitStatus.Allowed;
}
const rateLimitStatus = limiter.allow(peerId);

switch (rateLimitStatus) {
case RateLimitStatus.DeniedPeer:
// Hitting a peer specific limit, we should lightly penalise the peer
this.peerScoring.penalizePeer(peerId, PeerErrorSeverity.HighToleranceError);
return false;
case RateLimitStatus.DeniedGlobal:
// Hitting a global limit, we should not penalise the peer
return false;
default:
return true;
if (rateLimitStatus === RateLimitStatus.DeniedPeer) {
this.peerScoring.penalizePeer(peerId, PeerErrorSeverity.HighToleranceError);
}
return rateLimitStatus;
}

cleanupInactivePeers() {
Expand Down
4 changes: 3 additions & 1 deletion yarn-project/p2p/src/services/reqresp/reqresp.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,9 @@ describe('ReqResp', () => {
expect(rateLimitResponse).toBeDefined();

// Make sure the error message is logged
const errorMessage = `Rate limit exceeded for ${ReqRespSubProtocol.PING} from ${nodes[0].p2p.peerId.toString()}`;
const errorMessage = `Rate limit exceeded DeniedPeer for ${
ReqRespSubProtocol.PING
} from ${nodes[0].p2p.peerId.toString()}`;
expect(loggerSpy).toHaveBeenCalledWith(expect.stringContaining(errorMessage));
});

Expand Down
15 changes: 12 additions & 3 deletions yarn-project/p2p/src/services/reqresp/reqresp.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,11 @@ import {
subProtocolMap,
} from './interface.js';
import { ReqRespMetrics } from './metrics.js';
import { RequestResponseRateLimiter } from './rate-limiter/rate_limiter.js';
import {
RateLimitStatus,
RequestResponseRateLimiter,
prettyPrintRateLimitStatus,
} from './rate-limiter/rate_limiter.js';
import { ReqRespStatus, ReqRespStatusError, parseStatusChunk, prettyPrintReqRespStatus } from './status.js';

/**
Expand Down Expand Up @@ -589,8 +593,13 @@ export class ReqResp {

try {
// Store a reference to from this for the async generator
if (!this.rateLimiter.allow(protocol, connection.remotePeer)) {
this.logger.warn(`Rate limit exceeded for ${protocol} from ${connection.remotePeer}`);
const rateLimitStatus = this.rateLimiter.allow(protocol, connection.remotePeer);
if (rateLimitStatus != RateLimitStatus.Allowed) {
this.logger.warn(
`Rate limit exceeded ${prettyPrintRateLimitStatus(rateLimitStatus)} for ${protocol} from ${
connection.remotePeer
}`,
);

throw new ReqRespStatusError(ReqRespStatus.RATE_LIMIT_EXCEEDED);
}
Expand Down