Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
113 changes: 75 additions & 38 deletions yarn-project/p2p/src/services/libp2p/libp2p_service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ type ValidationOutcome = { allPassed: true } | { allPassed: false; failure: Vali
// REFACTOR: Unify with the type above
type ReceivedMessageValidationResult<T, M = undefined> =
| { obj: T; result: Exclude<TopicValidatorResult, TopicValidatorResult.Reject>; metadata?: M }
| { obj?: T; result: TopicValidatorResult.Reject; metadata?: M };
| { obj?: T; result: TopicValidatorResult.Reject; metadata?: M; severity: PeerErrorSeverity };

/**
* Lib P2P implementation of the P2PService interface.
Expand Down Expand Up @@ -882,30 +882,56 @@ export class LibP2PService extends WithTracer implements P2PService {
source: PeerId,
topicType: TopicType,
): Promise<ReceivedMessageValidationResult<T, M>> {
let resultAndObj: ReceivedMessageValidationResult<T, M> = { result: TopicValidatorResult.Reject };
// Default to reject result with a penalty if validation function throws an error
let resultAndObj: ReceivedMessageValidationResult<T, M> = {
result: TopicValidatorResult.Reject,
severity: PeerErrorSeverity.MidToleranceError,
};
const timer = new Timer();
try {
resultAndObj = await validationFunc();
} catch (err) {
this.peerManager.penalizePeer(source, PeerErrorSeverity.LowToleranceError);
this.logger.error(`Error deserializing and validating gossipsub message`, err, {
msgId,
source: source.toString(),
topicType,
});
this.logger.error(`Error validating gossipsub message`, err, { msgId, source: source.toString(), topicType });
}

if (resultAndObj.result === TopicValidatorResult.Accept) {
this.logger.debug(`Message ${topicType} accepted by validator`, { msgId, source: source.toString(), topicType });
this.instrumentation.recordMessageValidation(topicType, timer);
} else if (resultAndObj.result === TopicValidatorResult.Reject) {
this.logger.warn(`Message ${topicType} rejected by validator with severity ${resultAndObj.severity}`, {
msgId,
source: source.toString(),
topicType,
severity: resultAndObj.severity,
});
this.peerManager.penalizePeer(source, resultAndObj.severity);
} else {
this.logger.trace(`Message ${topicType} ignored by validator`, { msgId, source: source.toString(), topicType });
}

this.node.services.pubsub.reportMessageValidationResult(msgId, source.toString(), resultAndObj.result);
return resultAndObj;
}

private tryDeserialize<T>(deserializeFunc: () => T, msgId: string, source: PeerId): T | undefined {
try {
return deserializeFunc();
} catch (err) {
this.logger.warn(`Failed to deserialize gossipsub message from buffer`, {
err,
msgId,
source: source.toString(),
});
return undefined;
}
}

protected async handleGossipedTx(payloadData: Buffer, msgId: string, source: PeerId) {
const validationFunc: () => Promise<ReceivedMessageValidationResult<Tx>> = async () => {
const tx = Tx.fromBuffer(payloadData);
const tx = this.tryDeserialize(() => Tx.fromBuffer(payloadData), msgId, source);
if (!tx) {
return { result: TopicValidatorResult.Reject, severity: PeerErrorSeverity.LowToleranceError };
}

const currentBlockNumber = await this.archiver.getBlockNumber();
const { ts: nextSlotTimestamp } = this.epochCache.getEpochAndSlotInNextL1Slot();
Expand All @@ -930,8 +956,7 @@ export class LibP2PService extends WithTracer implements P2PService {
severity,
source: source.toString(),
});
this.peerManager.penalizePeer(source, severity);
return { result: TopicValidatorResult.Reject };
return { result: TopicValidatorResult.Reject, severity };
}

// Pool pre-check: see if the pool would accept this tx before doing expensive proof verification
Expand All @@ -953,8 +978,7 @@ export class LibP2PService extends WithTracer implements P2PService {
severity,
source: source.toString(),
});
this.peerManager.penalizePeer(source, severity);
return { result: TopicValidatorResult.Reject };
return { result: TopicValidatorResult.Reject, severity };
}

// Pool add: persist the tx
Expand All @@ -979,8 +1003,7 @@ export class LibP2PService extends WithTracer implements P2PService {
source: source.toString(),
txHash: txHash.toString(),
});
this.peerManager.penalizePeer(source, PeerErrorSeverity.HighToleranceError);
return { result: TopicValidatorResult.Reject };
return { result: TopicValidatorResult.Reject, severity: PeerErrorSeverity.HighToleranceError };
}
};

Expand Down Expand Up @@ -1010,7 +1033,16 @@ export class LibP2PService extends WithTracer implements P2PService {
source: PeerId,
): Promise<void> {
const { result, obj: attestation } = await this.validateReceivedMessage<CheckpointAttestation>(
() => this.validateAndStoreCheckpointAttestation(source, CheckpointAttestation.fromBuffer(payloadData)),
() => {
const attestation = this.tryDeserialize(() => CheckpointAttestation.fromBuffer(payloadData), msgId, source);
if (!attestation) {
return Promise.resolve({
result: TopicValidatorResult.Reject,
severity: PeerErrorSeverity.LowToleranceError,
});
}
return this.validateAndStoreCheckpointAttestation(source, attestation);
},
msgId,
source,
TopicType.checkpoint_attestation,
Expand Down Expand Up @@ -1043,8 +1075,7 @@ export class LibP2PService extends WithTracer implements P2PService {

if (validationResult.result === 'reject') {
this.logger.warn(`Penalizing peer ${peerId} for checkpoint attestation validation failure`);
this.peerManager.penalizePeer(peerId, validationResult.severity);
return { result: TopicValidatorResult.Reject };
return { result: TopicValidatorResult.Reject, severity: validationResult.severity };
}

if (validationResult.result === 'ignore') {
Expand All @@ -1070,16 +1101,16 @@ export class LibP2PService extends WithTracer implements P2PService {
return { result: TopicValidatorResult.Ignore, obj: attestation };
}

// Could not add (cap reached for signer), no need to re-broadcast
// Could not add (cap reached for signer), penalize and do not re-broadcast
if (!added) {
this.logger.warn(`Dropping checkpoint attestation due to cap`, {
this.logger.warn(`Rejecting checkpoint attestation due to cap`, {
slot: slot.toString(),
archive: attestation.archive.toString(),
source: peerId.toString(),
attester: attestation.getSender()?.toString(),
count,
});
return { result: TopicValidatorResult.Ignore, obj: attestation };
return { result: TopicValidatorResult.Reject, severity: PeerErrorSeverity.HighToleranceError };
}

// Check if this is a duplicate attestation (signer attested to a different proposal at the same slot)
Expand Down Expand Up @@ -1134,8 +1165,7 @@ export class LibP2PService extends WithTracer implements P2PService {

if (validationResult.result === 'reject') {
this.logger.warn(`Penalizing peer ${peerId} for block proposal validation failure`);
this.peerManager.penalizePeer(peerId, validationResult.severity);
return { result: TopicValidatorResult.Reject };
return { result: TopicValidatorResult.Reject, severity: validationResult.severity };
}

if (validationResult.result === 'ignore') {
Expand All @@ -1159,15 +1189,18 @@ export class LibP2PService extends WithTracer implements P2PService {

// Too many blocks received for this slot and index, penalize peer and do not re-broadcast
if (!added) {
this.peerManager.penalizePeer(peerId, PeerErrorSeverity.HighToleranceError);
this.logger.warn(`Penalizing peer for block proposal exceeding per-position cap`, {
...block.toBlockInfo(),
indexWithinCheckpoint: block.indexWithinCheckpoint,
count,
proposer: block.getSender()?.toString(),
source: peerId.toString(),
});
return { result: TopicValidatorResult.Reject, metadata: { isEquivocated } };
return {
result: TopicValidatorResult.Reject,
metadata: { isEquivocated },
severity: PeerErrorSeverity.HighToleranceError,
};
}

// If this was a duplicate proposal, do not process it, but do invoke the duplicate callback,
Expand Down Expand Up @@ -1260,8 +1293,7 @@ export class LibP2PService extends WithTracer implements P2PService {

if (validationResult.result === 'reject') {
this.logger.warn(`Penalizing peer ${peerId} for checkpoint proposal validation failure`);
this.peerManager.penalizePeer(peerId, validationResult.severity);
return { result: TopicValidatorResult.Reject };
return { result: TopicValidatorResult.Reject, severity: validationResult.severity };
}

if (validationResult.result === 'ignore') {
Expand All @@ -1276,20 +1308,21 @@ export class LibP2PService extends WithTracer implements P2PService {
[Attributes.SLOT_NUMBER]: checkpoint.slotNumber.toString(),
[Attributes.P2P_ID]: peerId.toString(),
});
const {
result,
obj,
metadata: { isEquivocated } = {},
} = await this.validateAndStoreBlockProposal(peerId, blockProposal);
if (result === TopicValidatorResult.Reject || !obj || isEquivocated) {
const blockProposalResult = await this.validateAndStoreBlockProposal(peerId, blockProposal);
const { obj, metadata: { isEquivocated } = {} } = blockProposalResult;
if (blockProposalResult.result === TopicValidatorResult.Reject || !obj || isEquivocated) {
this.logger.debug(`Rejecting checkpoint due to invalid last block proposal`, {
[Attributes.SLOT_NUMBER]: checkpoint.slotNumber.toString(),
[Attributes.P2P_ID]: peerId.toString(),
isEquivocated,
result,
result: blockProposalResult.result,
});
return { result: TopicValidatorResult.Reject };
} else if (result === TopicValidatorResult.Accept && obj && !isEquivocated) {
return {
result: TopicValidatorResult.Reject,
severity:
'severity' in blockProposalResult ? blockProposalResult.severity : PeerErrorSeverity.MidToleranceError,
};
} else if (blockProposalResult.result === TopicValidatorResult.Accept && obj && !isEquivocated) {
processBlock = true;
}
}
Expand All @@ -1316,13 +1349,17 @@ export class LibP2PService extends WithTracer implements P2PService {
// Too many checkpoint proposals received for this slot, penalize peer and do not re-broadcast
// Note: We still return the checkpoint obj so the lastBlock can be processed if valid
if (!added) {
this.peerManager.penalizePeer(peerId, PeerErrorSeverity.HighToleranceError);
this.logger.warn(`Penalizing peer for checkpoint proposal exceeding per-slot cap`, {
...checkpoint.toCheckpointInfo(),
count,
source: peerId.toString(),
});
return { result: TopicValidatorResult.Reject, obj: checkpoint, metadata: { isEquivocated, processBlock } };
return {
result: TopicValidatorResult.Reject,
obj: checkpoint,
metadata: { isEquivocated, processBlock },
severity: PeerErrorSeverity.HighToleranceError,
};
}

// If this was a duplicate proposal, do not process it, but do invoke the duplicate callback,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -514,6 +514,9 @@ export class BatchTxRequester {
});

if (hasInvalidTx) {
this.logger.warn(`Penalizing peer ${peerId.toString()} for sending invalid transactions in batch response`, {
peerId,
});
this.peers.penalisePeer(peerId, PeerErrorSeverity.LowToleranceError);
} else {
// If we have received successful response from the peer, they have "redeemed" themselves and not considered bad anymore
Expand Down
Loading