Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions yarn-project/foundation/src/config/env_var.ts
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ export type EnvVar =
| 'P2P_BATCH_TX_REQUESTER_TX_BATCH_SIZE'
| 'P2P_BATCH_TX_REQUESTER_BAD_PEER_THRESHOLD'
| 'P2P_BLOCK_CHECK_INTERVAL_MS'
| 'P2P_SLOT_CHECK_INTERVAL_MS'
| 'P2P_BLOCK_REQUEST_BATCH_SIZE'
| 'P2P_BOOTSTRAP_NODE_ENR_VERSION_CHECK'
| 'P2P_BOOTSTRAP_NODES_AS_FULL_PEERS'
Expand Down
61 changes: 53 additions & 8 deletions yarn-project/p2p/src/client/factory.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,24 @@ import { DateProvider } from '@aztec/foundation/timer';
import type { AztecAsyncKVStore } from '@aztec/kv-store';
import type { DataStoreConfig } from '@aztec/kv-store/config';
import { AztecLMDBStoreV2, createStore } from '@aztec/kv-store/lmdb-v2';
import type { L2BlockSource } from '@aztec/stdlib/block';
import type { BlockHash, L2BlockSource } from '@aztec/stdlib/block';
import type { ChainConfig } from '@aztec/stdlib/config';
import type { ContractDataSource } from '@aztec/stdlib/contract';
import type { ClientProtocolCircuitVerifier, WorldStateSynchronizer } from '@aztec/stdlib/interfaces/server';
import { P2PClientType } from '@aztec/stdlib/p2p';
import { MerkleTreeId } from '@aztec/stdlib/trees';
import { type TelemetryClient, getTelemetryClient } from '@aztec/telemetry-client';

import { P2PClient } from '../client/p2p_client.js';
import type { P2PConfig } from '../config.js';
import { AttestationPool, type AttestationPoolApi } from '../mem_pools/attestation_pool/attestation_pool.js';
import type { MemPools } from '../mem_pools/interface.js';
import { AztecKVTxPool, type TxPool } from '../mem_pools/tx_pool/index.js';
import type { TxPoolV2 } from '../mem_pools/tx_pool_v2/interfaces.js';
import type { TxMetaData } from '../mem_pools/tx_pool_v2/tx_metadata.js';
import { AztecKVTxPoolV2 } from '../mem_pools/tx_pool_v2/tx_pool_v2.js';
import { AggregateTxValidator } from '../msg_validators/tx_validator/aggregate_tx_validator.js';
import { BlockHeaderTxValidator } from '../msg_validators/tx_validator/block_header_validator.js';
import { DoubleSpendTxValidator } from '../msg_validators/tx_validator/double_spend_validator.js';
import { DummyP2PService } from '../services/dummy_service.js';
import { LibP2PService } from '../services/index.js';
import { createFileStoreTxSources } from '../services/tx_collection/file_store_tx_source.js';
Expand All @@ -25,7 +31,7 @@ import { TxFileStore } from '../services/tx_file_store/tx_file_store.js';
import { configureP2PClientAddresses, createLibP2PPeerIdFromPrivateKey, getPeerIdPrivateKey } from '../util.js';

export type P2PClientDeps<T extends P2PClientType> = {
txPool?: TxPool;
txPool?: TxPoolV2;
store?: AztecAsyncKVStore;
attestationPool?: AttestationPoolApi;
logger?: Logger;
Expand Down Expand Up @@ -70,13 +76,51 @@ export async function createP2PClient<T extends P2PClientType>(
const attestationStore = await createStore(P2P_ATTESTATION_STORE_NAME, 1, config, bindings);
const l1Constants = await archiver.getL1Constants();

const mempools: MemPools = {
txPool:
deps.txPool ??
new AztecKVTxPool(store, archive, worldStateSynchronizer, telemetry, {
/** Validator factory for pool re-validation (double-spend + block header only). */
const createPoolTxValidator = async () => {
await worldStateSynchronizer.syncImmediate();
return new AggregateTxValidator<TxMetaData>(
new DoubleSpendTxValidator<TxMetaData>(
{
nullifiersExist: async (nullifiers: Buffer[]) => {
const merkleTree = worldStateSynchronizer.getCommitted();
const indices = await merkleTree.findLeafIndices(MerkleTreeId.NULLIFIER_TREE, nullifiers);
return indices.map(index => index !== undefined);
},
},
bindings,
),
new BlockHeaderTxValidator<TxMetaData>(
{
getArchiveIndices: (archives: BlockHash[]) => {
const merkleTree = worldStateSynchronizer.getCommitted();
return merkleTree.findLeafIndices(MerkleTreeId.ARCHIVE, archives);
},
},
bindings,
),
);
};

const txPool =
deps.txPool ??
new AztecKVTxPoolV2(
store,
archive,
{
l2BlockSource: archiver,
worldStateSynchronizer,
createTxValidator: createPoolTxValidator,
},
telemetry,
{
maxPendingTxCount: config.maxPendingTxCount,
archivedTxLimit: config.archivedTxLimit,
}),
},
);

const mempools: MemPools = {
txPool,
attestationPool: deps.attestationPool ?? new AttestationPool(attestationStore, telemetry),
};

Expand Down Expand Up @@ -138,6 +182,7 @@ export async function createP2PClient<T extends P2PClientType>(
p2pService,
txCollection,
txFileStore,
epochCache,
config,
dateProvider,
telemetry,
Expand Down
35 changes: 19 additions & 16 deletions yarn-project/p2p/src/client/interface.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
import type { SlotNumber } from '@aztec/foundation/branded-types';
import type { EthAddress, L2BlockId } from '@aztec/stdlib/block';
import type { P2PApiFull } from '@aztec/stdlib/interfaces/server';
import type { BlockProposal, CheckpointAttestation, CheckpointProposal, P2PClientType } from '@aztec/stdlib/p2p';
import type { Tx, TxHash } from '@aztec/stdlib/tx';
import type { BlockHeader, Tx, TxHash } from '@aztec/stdlib/tx';

import type { PeerId } from '@libp2p/interface';
import type { ENR } from '@nethermindeth/enr';
Expand Down Expand Up @@ -100,14 +101,6 @@ export type P2P<T extends P2PClientType = P2PClientType.Full> = P2PApiFull<T> &
*/
registerDuplicateAttestationCallback(callback: (info: DuplicateAttestationInfo) => void): void;

/**
* Request a list of transactions from another peer by their tx hashes.
* @param txHashes - Hashes of the txs to query.
* @param pinnedPeerId - An optional peer id that will be used to request the tx from (in addition to other random peers).
* @returns A list of transactions or undefined if the transactions are not found.
*/
requestTxsByHash(txHashes: TxHash[], pinnedPeerId: PeerId): Promise<Tx[]>;

/**
* Verifies the 'tx' and, if valid, adds it to local tx pool and forwards it to other peers.
* @param tx - The transaction.
Expand All @@ -122,11 +115,10 @@ export type P2P<T extends P2PClientType = P2PClientType.Full> = P2PApiFull<T> &
addTxsToPool(txs: Tx[]): Promise<number>;

/**
* Deletes 'txs' from the pool, given hashes.
* NOT used if we use sendTx as reconcileTxPool will handle this.
* @param txHashes - Hashes to check.
* Handles failed transaction execution by removing txs from the pool.
* @param txHashes - Hashes of the transactions that failed execution.
**/
deleteTxs(txHashes: TxHash[]): Promise<void>;
handleFailedExecution(txHashes: TxHash[]): Promise<void>;

/**
* Returns a transaction in the transaction pool by its hash.
Expand Down Expand Up @@ -178,10 +170,21 @@ export type P2P<T extends P2PClientType = P2PClientType.Full> = P2PApiFull<T> &
getPendingTxCount(): Promise<number>;

/**
* Marks transactions as non-evictable in the pool.
* @param txHashes - Hashes of the transactions to mark as non-evictable.
* Protects existing transactions by hash for a given slot.
* Returns hashes of transactions that weren't found in the pool.
* @param txHashes - Hashes of the transactions to protect.
* @param blockHeader - The block header providing slot context.
* @returns Hashes of transactions not found in the pool.
*/
protectTxs(txHashes: TxHash[], blockHeader: BlockHeader): Promise<TxHash[]>;

/**
* Prepares the pool for a new slot.
* Unprotects transactions from earlier slots and validates them before
* returning to pending state.
* @param slotNumber - The slot number to prepare for
*/
markTxsAsNonEvictable(txHashes: TxHash[]): Promise<void>;
prepareForSlot(slotNumber: SlotNumber): Promise<void>;

/**
* Starts the p2p client.
Expand Down
Loading
Loading