Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 1 addition & 3 deletions yarn-project/aztec-node/src/aztec-node/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -855,9 +855,7 @@ export class AztecNodeService implements AztecNode, Traceable {
new DataTxValidator(),
new MetadataTxValidator(new Fr(this.l1ChainId), new Fr(blockNumber)),
new DoubleSpendTxValidator({
getNullifierIndex(nullifier) {
return db.findLeafIndices(MerkleTreeId.NULLIFIER_TREE, [nullifier.toBuffer()]).then(x => x[0]);
},
getNullifierIndices: nullifiers => db.findLeafIndices(MerkleTreeId.NULLIFIER_TREE, nullifiers),
}),
];

Expand Down
218 changes: 156 additions & 62 deletions yarn-project/p2p/src/services/libp2p/libp2p_service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import { createPeerScoreParams, createTopicScoreParams } from '@chainsafe/libp2p
import { noise } from '@chainsafe/libp2p-noise';
import { yamux } from '@chainsafe/libp2p-yamux';
import { identify } from '@libp2p/identify';
import type { PeerId } from '@libp2p/interface';
import { type Message, type PeerId, TopicValidatorResult } from '@libp2p/interface';
import '@libp2p/kad-dht';
import { mplex } from '@libp2p/mplex';
import { tcp } from '@libp2p/tcp';
Expand Down Expand Up @@ -62,6 +62,21 @@ import {
import { ReqResp } from '../reqresp/reqresp.js';
import type { P2PService, PeerDiscoveryService } from '../service.js';

interface MessageValidator {
validator: {
validateTx(tx: Tx): Promise<boolean>;
};
severity: PeerErrorSeverity;
}

interface ValidationResult {
name: string;
isValid: boolean;
severity: PeerErrorSeverity;
}

type ValidationOutcome = { allPassed: true } | { allPassed: false; failure: ValidationResult };

/**
* Lib P2P implementation of the P2PService interface.
*/
Expand Down Expand Up @@ -137,12 +152,15 @@ export class LibP2PService<T extends P2PClientType> extends WithTracer implement
this.subscribeToTopic(TopicTypeMap[topic].p2pTopic);
}

// Add p2p topic validators
this.node.services.pubsub.topicValidators.set(Tx.p2pTopic, this.validatePropagatedTxFromMessage.bind(this));

// add GossipSub listener
this.node.services.pubsub.addEventListener('gossipsub:message', async e => {
const { msg, propagationSource: peerId } = e.detail;
const { msg } = e.detail;
this.logger.trace(`Received PUBSUB message.`);

await this.jobQueue.put(() => this.handleNewGossipMessage(msg, peerId));
await this.jobQueue.put(() => this.handleNewGossipMessage(msg));
});

// Start running promise for peer discovery
Expand Down Expand Up @@ -256,6 +274,7 @@ export class LibP2PService<T extends P2PClientType> extends WithTracer implement
dataTransform: new SnappyTransform(),
metricsRegister: otelMetricsAdapter,
metricsTopicStrToLabel: metricsTopicStrToLabels(),
asyncValidation: true,
scoreParams: createPeerScoreParams({
topics: {
[Tx.p2pTopic]: createTopicScoreParams({
Expand Down Expand Up @@ -382,10 +401,10 @@ export class LibP2PService<T extends P2PClientType> extends WithTracer implement
* @param topic - The message's topic.
* @param data - The message data
*/
private async handleNewGossipMessage(message: RawGossipMessage, peerId: PeerId) {
private async handleNewGossipMessage(message: RawGossipMessage) {
if (message.topic === Tx.p2pTopic) {
const tx = Tx.fromBuffer(Buffer.from(message.data));
await this.processTxFromPeer(tx, peerId);
await this.processTxFromPeer(tx);
}
if (message.topic === BlockAttestation.p2pTopic && this.clientType === P2PClientType.Full) {
const attestation = BlockAttestation.fromBuffer(Buffer.from(message.data));
Expand Down Expand Up @@ -474,16 +493,11 @@ export class LibP2PService<T extends P2PClientType> extends WithTracer implement
});
}

private async processTxFromPeer(tx: Tx, peerId: PeerId): Promise<void> {
private async processTxFromPeer(tx: Tx): Promise<void> {
const txHash = tx.getTxHash();
const txHashString = txHash.toString();
this.logger.verbose(`Received tx ${txHashString} from external peer.`);

const isValidTx = await this.validatePropagatedTx(tx, peerId);

if (isValidTx) {
await this.mempools.txPool.addTxs([tx]);
}
await this.mempools.txPool.addTxs([tx]);
}

/**
Expand Down Expand Up @@ -523,70 +537,150 @@ export class LibP2PService<T extends P2PClientType> extends WithTracer implement
return true;
}

private async validatePropagatedTxFromMessage(
propagationSource: PeerId,
msg: Message,
): Promise<TopicValidatorResult> {
const tx = Tx.fromBuffer(Buffer.from(msg.data));
const isValid = await this.validatePropagatedTx(tx, propagationSource);
this.logger.trace(`validatePropagatedTx: ${isValid}`, {
[Attributes.TX_HASH]: tx.getTxHash().toString(),
[Attributes.P2P_ID]: propagationSource.toString(),
});
return isValid ? TopicValidatorResult.Accept : TopicValidatorResult.Reject;
}

/**
* Validate a tx that has been propagated from a peer.
* @param tx - The tx to validate.
* @param peerId - The peer ID of the peer that sent the tx.
* @returns True if the tx is valid, false otherwise.
*/
@trackSpan('Libp2pService.validatePropagatedTx', tx => ({
[Attributes.TX_HASH]: tx.getTxHash().toString(),
}))
private async validatePropagatedTx(tx: Tx, peerId: PeerId): Promise<boolean> {
const blockNumber = (await this.l2BlockSource.getBlockNumber()) + 1;
// basic data validation
const dataValidator = new DataTxValidator();
const validData = await dataValidator.validateTx(tx);
if (!validData) {
// penalize
this.node.services.pubsub.score.markInvalidMessageDelivery(peerId.toString(), Tx.p2pTopic);
return false;
const messageValidators = this.createMessageValidators(blockNumber);
const outcome = await this.runValidations(tx, messageValidators);

if (outcome.allPassed) {
return true;
}

// metadata validation
const metadataValidator = new MetadataTxValidator(new Fr(this.config.l1ChainId), new Fr(blockNumber));
const validMetadata = await metadataValidator.validateTx(tx);
if (!validMetadata) {
// penalize
this.node.services.pubsub.score.markInvalidMessageDelivery(peerId.toString(), Tx.p2pTopic);
return false;
const { name, severity } = outcome.failure;

// Double spend validator has a special case handler
if (name === 'doubleSpendValidator') {
const isValid = await this.handleDoubleSpendFailure(tx, blockNumber, peerId);
if (isValid) {
return true;
}
}

// double spend validation
const doubleSpendValidator = new DoubleSpendTxValidator({
getNullifierIndex: async (nullifier: Fr) => {
const merkleTree = this.worldStateSynchronizer.getCommitted();
const index = (await merkleTree.findLeafIndices(MerkleTreeId.NULLIFIER_TREE, [nullifier.toBuffer()]))[0];
return index;
this.peerManager.penalizePeer(peerId, severity);
return false;
}

/**
* Create message validators for the given block number.
*
* Each validator is a pair of a validator and a severity.
* If a validator fails, the peer is penalized with the severity of the validator.
*
* @param blockNumber - The block number to create validators for.
* @returns The message validators.
*/
private createMessageValidators(blockNumber: number): Record<string, MessageValidator> {
return {
dataValidator: {
validator: new DataTxValidator(),
severity: PeerErrorSeverity.HighToleranceError,
},
});
const validDoubleSpend = await doubleSpendValidator.validateTx(tx);
if (!validDoubleSpend) {
// check if nullifier is older than 20 blocks
if (blockNumber - this.config.severePeerPenaltyBlockLength > 0) {
const snapshotValidator = new DoubleSpendTxValidator({
getNullifierIndex: async (nullifier: Fr) => {
const merkleTree = this.worldStateSynchronizer.getSnapshot(
blockNumber - this.config.severePeerPenaltyBlockLength,
);
const index = (await merkleTree.findLeafIndices(MerkleTreeId.NULLIFIER_TREE, [nullifier.toBuffer()]))[0];
return index;
metadataValidator: {
validator: new MetadataTxValidator(new Fr(this.config.l1ChainId), new Fr(blockNumber)),
severity: PeerErrorSeverity.HighToleranceError,
},
proofValidator: {
validator: new TxProofValidator(this.proofVerifier),
severity: PeerErrorSeverity.MidToleranceError,
},
doubleSpendValidator: {
validator: new DoubleSpendTxValidator({
getNullifierIndices: (nullifiers: Buffer[]) => {
const merkleTree = this.worldStateSynchronizer.getCommitted();
return merkleTree.findLeafIndices(MerkleTreeId.NULLIFIER_TREE, nullifiers);
},
});

const validSnapshot = await snapshotValidator.validateTx(tx);
// High penalty if nullifier is older than 20 blocks
if (!validSnapshot) {
// penalize
this.peerManager.penalizePeer(peerId, PeerErrorSeverity.LowToleranceError);
return false;
}
}
// penalize
this.peerManager.penalizePeer(peerId, PeerErrorSeverity.HighToleranceError);
}),
severity: PeerErrorSeverity.HighToleranceError,
},
};
}

/**
* Run validations on a tx.
* @param tx - The tx to validate.
* @param messageValidators - The message validators to run.
* @returns The validation outcome.
*/
private async runValidations(
tx: Tx,
messageValidators: Record<string, MessageValidator>,
): Promise<ValidationOutcome> {
const validationPromises = Object.entries(messageValidators).map(async ([name, { validator, severity }]) => {
const isValid = await validator.validateTx(tx);
return { name, isValid, severity };
});

// A promise that resolves when all validations have been run
const allValidations = Promise.all(validationPromises);

// A promise that resolves when the first validation fails
const firstFailure = Promise.race(
validationPromises.map(async promise => {
const result = await promise;
return result.isValid ? new Promise(() => {}) : result;
}),
);

// Wait for the first validation to fail or all validations to pass
const result = await Promise.race([
allValidations.then(() => ({ allPassed: true as const })),
firstFailure.then(failure => ({ allPassed: false as const, failure: failure as ValidationResult })),
]);

// If all validations pass, allPassed will be true, if failed, then the failure will be the first validation to fail
return result;
}

/**
* Handle a double spend failure.
*
* Double spend failures are managed on their own because they are a special case.
* We must check if the double spend is recent or old, if it is past a threshold, then we heavily penalize the peer.
*
* @param tx - The tx that failed the double spend validator.
* @param blockNumber - The block number of the tx.
* @param peerId - The peer ID of the peer that sent the tx.
* @returns True if the tx is valid, false otherwise.
*/
private async handleDoubleSpendFailure(tx: Tx, blockNumber: number, peerId: PeerId): Promise<boolean> {
if (blockNumber <= this.config.severePeerPenaltyBlockLength) {
return false;
}

// proof validation
const proofValidator = new TxProofValidator(this.proofVerifier);
const validProof = await proofValidator.validateTx(tx);
if (!validProof) {
// penalize
this.peerManager.penalizePeer(peerId, PeerErrorSeverity.MidToleranceError);
const snapshotValidator = new DoubleSpendTxValidator({
getNullifierIndices: (nullifiers: Buffer[]) => {
const merkleTree = this.worldStateSynchronizer.getSnapshot(
blockNumber - this.config.severePeerPenaltyBlockLength,
);
return merkleTree.findLeafIndices(MerkleTreeId.NULLIFIER_TREE, nullifiers);
},
});

const validSnapshot = await snapshotValidator.validateTx(tx);
if (!validSnapshot) {
this.peerManager.penalizePeer(peerId, PeerErrorSeverity.LowToleranceError);
return false;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ describe('DoubleSpendTxValidator', () => {

beforeEach(() => {
nullifierSource = mock<NullifierSource>({
getNullifierIndex: mockFn().mockImplementation(() => {
return Promise.resolve(undefined);
getNullifierIndices: mockFn().mockImplementation(() => {
return Promise.resolve([undefined]);
}),
});
txValidator = new DoubleSpendTxValidator(nullifierSource);
Expand Down Expand Up @@ -48,7 +48,7 @@ describe('DoubleSpendTxValidator', () => {

it('rejects duplicates against history', async () => {
const badTx = mockTx();
nullifierSource.getNullifierIndex.mockReturnValueOnce(Promise.resolve(1n));
nullifierSource.getNullifierIndices.mockReturnValueOnce(Promise.resolve([1n]));
await expect(txValidator.validateTxs([badTx])).resolves.toEqual([[], [badTx]]);
});
});
14 changes: 6 additions & 8 deletions yarn-project/p2p/src/tx_validator/double_spend_validator.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
import { type AnyTx, Tx, type TxValidator } from '@aztec/circuit-types';
import { Fr } from '@aztec/circuits.js';
import { createLogger } from '@aztec/foundation/log';

export interface NullifierSource {
getNullifierIndex: (nullifier: Fr) => Promise<bigint | undefined>;
getNullifierIndices: (nullifiers: Buffer[]) => Promise<(bigint | undefined)[]>;
}

export class DoubleSpendTxValidator<T extends AnyTx> implements TxValidator<T> {
Expand Down Expand Up @@ -36,9 +35,7 @@ export class DoubleSpendTxValidator<T extends AnyTx> implements TxValidator<T> {
}

async #uniqueNullifiers(tx: AnyTx, thisBlockNullifiers: Set<bigint>): Promise<boolean> {
const nullifiers = (tx instanceof Tx ? tx.data.getNonEmptyNullifiers() : tx.txEffect.nullifiers).map(x =>
x.toBigInt(),
);
const nullifiers = tx instanceof Tx ? tx.data.getNonEmptyNullifiers() : tx.txEffect.nullifiers;

// Ditch this tx if it has repeated nullifiers
const uniqueNullifiers = new Set(nullifiers);
Expand All @@ -49,16 +46,17 @@ export class DoubleSpendTxValidator<T extends AnyTx> implements TxValidator<T> {

if (this.isValidatingBlock) {
for (const nullifier of nullifiers) {
if (thisBlockNullifiers.has(nullifier)) {
const nullifierBigInt = nullifier.toBigInt();
if (thisBlockNullifiers.has(nullifierBigInt)) {
this.#log.warn(`Rejecting tx ${Tx.getHash(tx)} for repeating a nullifier in the same block`);
return false;
}

thisBlockNullifiers.add(nullifier);
thisBlockNullifiers.add(nullifierBigInt);
}
}

const nullifierIndexes = await Promise.all(nullifiers.map(n => this.#nullifierSource.getNullifierIndex(new Fr(n))));
const nullifierIndexes = await this.#nullifierSource.getNullifierIndices(nullifiers.map(n => n.toBuffer()));

const hasDuplicates = nullifierIndexes.some(index => index !== undefined);
if (hasDuplicates) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,10 @@ export class TxValidatorFactory {
private enforceFees: boolean,
) {
this.nullifierSource = {
getNullifierIndex: nullifier =>
this.committedDb.findLeafIndices(MerkleTreeId.NULLIFIER_TREE, [nullifier.toBuffer()]).then(x => x[0]),
getNullifierIndices: nullifiers =>
this.committedDb
.findLeafIndices(MerkleTreeId.NULLIFIER_TREE, nullifiers)
.then(x => x.filter(index => index !== undefined) as bigint[]),
};

this.publicStateSource = {
Expand All @@ -57,8 +59,7 @@ export class TxValidatorFactory {

validatorForProcessedTxs(fork: MerkleTreeReadOperations): TxValidator<ProcessedTx> {
return new DoubleSpendTxValidator({
getNullifierIndex: nullifier =>
fork.findLeafIndices(MerkleTreeId.NULLIFIER_TREE, [nullifier.toBuffer()]).then(x => x[0]),
getNullifierIndices: nullifiers => fork.findLeafIndices(MerkleTreeId.NULLIFIER_TREE, nullifiers),
});
}
}