Skip to content
8 changes: 5 additions & 3 deletions yarn-project/aztec-node/src/aztec-node/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,11 @@ export class AztecNodeService implements AztecNode {
* @param config - The configuration to be used by the aztec node.
* @returns - A fully synced Aztec Node for use in development/testing.
*/
public static async createAndSync(config: AztecNodeConfig) {
public static async createAndSync(
config: AztecNodeConfig,
log = createDebugLogger('aztec:node'),
storeLog = createDebugLogger('aztec:node:lmdb'),
) {
const ethereumChain = createEthereumChain(config.rpcUrl, config.apiKey);
//validate that the actual chain id matches that specified in configuration
if (config.chainId !== ethereumChain.chainInfo.id) {
Expand All @@ -131,8 +135,6 @@ export class AztecNodeService implements AztecNode {
);
}

const log = createDebugLogger('aztec:node');
const storeLog = createDebugLogger('aztec:node:lmdb');
const store = await initStoreForRollup(
AztecLmdbStore.open(config.dataDirectory, false, storeLog),
config.l1Contracts.rollupAddress,
Expand Down
3 changes: 3 additions & 0 deletions yarn-project/end-to-end/Earthfile
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,9 @@ E2E_TEST:
# Run our docker compose, ending whenever sandbox ends, filtering out noisy eth_getLogs
RUN docker run -e HARDWARE_CONCURRENCY=$hardware_concurrency --rm aztecprotocol/end-to-end:$AZTEC_DOCKER_TAG $test || $allow_fail

e2e-p2p:
DO +E2E_TEST --test=./src/e2e_p2p_network.test.ts

e2e-2-pxes:
DO +E2E_TEST --test=./src/e2e_2_pxes.test.ts

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,13 @@ import {
GrumpkinScalar,
type SentTx,
TxStatus,
createDebugLogger,
sleep,
} from '@aztec/aztec.js';
import { type BootNodeConfig, BootstrapNode, createLibP2PPeerId } from '@aztec/p2p';
import { type PXEService, createPXEService, getPXEServiceConfig as getRpcConfig } from '@aztec/pxe';

import fs from 'fs';
import { mnemonicToAccount } from 'viem/accounts';

import { MNEMONIC } from './fixtures/fixtures.js';
Expand All @@ -30,21 +33,36 @@ interface NodeContext {
account: AztecAddress;
}

const PEER_ID_PRIVATE_KEYS = [
'0802122002f651fd8653925529e3baccb8489b3af4d7d9db440cbf5df4a63ff04ea69683',
'08021220c3bd886df5fe5b33376096ad0dab3d2dc86ed2a361d5fde70f24d979dc73da41',
'080212206b6567ac759db5434e79495ec7458e5e93fe479a5b80713446e0bce5439a5655',
'08021220366453668099bdacdf08fab476ee1fced6bf00ddc1223d6c2ee626e7236fb526',
];

describe('e2e_p2p_network', () => {
let config: AztecNodeConfig;
let logger: DebugLogger;
let teardown: () => Promise<void>;
let bootstrapNode: BootstrapNode;
let bootstrapNodeEnr: string;

beforeEach(async () => {
({ teardown, config, logger } = await setup(1));
({ teardown, config, logger } = await setup(0));
bootstrapNode = await createBootstrapNode();
bootstrapNodeEnr = bootstrapNode.getENR().encodeTxt();
});

afterEach(() => teardown());

afterAll(() => {
for (let i = 0; i < NUM_NODES; i++) {
fs.rmSync(`./data-${i}`, { recursive: true, force: true });
}
});

it('should rollup txs from all peers', async () => {
// create the bootstrap node for the network
const bootstrapNode = await createBootstrapNode();
const bootstrapNodeEnr = bootstrapNode.getENR();
if (!bootstrapNodeEnr) {
throw new Error('Bootstrap node ENR is not available');
}
Expand All @@ -53,14 +71,29 @@ describe('e2e_p2p_network', () => {
// should be set so that the only way for rollups to be built
// is if the txs are successfully gossiped around the nodes.
const contexts: NodeContext[] = [];
const nodes: AztecNodeService[] = [];
for (let i = 0; i < NUM_NODES; i++) {
const node = await createNode(i + 1 + BOOT_NODE_UDP_PORT, bootstrapNodeEnr?.encodeTxt(), i);
const node = await createNode(i + 1 + BOOT_NODE_UDP_PORT, bootstrapNodeEnr, i);
nodes.push(node);
}

// wait a bit for peers to discover each other
await sleep(2000);

for (const node of nodes) {
const context = await createPXEServiceAndSubmitTransactions(node, NUM_TXS_PER_NODE);
contexts.push(context);
}

// now ensure that all txs were successfully mined
await Promise.all(contexts.flatMap(context => context.txs.map(tx => tx.wait())));
await Promise.all(
contexts.flatMap((context, i) =>
context.txs.map(async (tx, j) => {
logger.info(`Waiting for tx ${i}-${j}: ${await tx.getTxHash()} to be mined`);
return tx.wait();
}),
),
);

// shutdown all nodes.
for (const context of contexts) {
Expand All @@ -70,6 +103,61 @@ describe('e2e_p2p_network', () => {
await bootstrapNode.stop();
});

it('should re-discover stored peers without bootstrap node', async () => {
const contexts: NodeContext[] = [];
const nodes: AztecNodeService[] = [];
for (let i = 0; i < NUM_NODES; i++) {
const node = await createNode(i + 1 + BOOT_NODE_UDP_PORT, bootstrapNodeEnr, i, `./data-${i}`);
nodes.push(node);
}
// wait a bit for peers to discover each other
await sleep(3000);

// stop bootstrap node
await bootstrapNode.stop();

// create new nodes from datadir
const newNodes: AztecNodeService[] = [];

// stop all nodes
for (let i = 0; i < NUM_NODES; i++) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wouldn't it be better if we stop all nodes, and then restart them all?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I felt this was a more "realistic" scenario of single nodes going down and restarted.
It can work with stopping all & restarting but seems to take quite a bit longer for them to re-discover, had to set the timeout to 30+ seconds

const node = nodes[i];
await node.stop();
logger.info(`Node ${i} stopped`);
await sleep(1200);
const newNode = await createNode(i + 1 + BOOT_NODE_UDP_PORT, undefined, i, `./data-${i}`);
logger.info(`Node ${i} restarted`);
newNodes.push(newNode);
// const context = await createPXEServiceAndSubmitTransactions(node, NUM_TXS_PER_NODE);
// contexts.push(context);
}

// wait a bit for peers to discover each other
await sleep(2000);

for (const node of newNodes) {
const context = await createPXEServiceAndSubmitTransactions(node, NUM_TXS_PER_NODE);
contexts.push(context);
}

// now ensure that all txs were successfully mined
await Promise.all(
contexts.flatMap((context, i) =>
context.txs.map(async (tx, j) => {
logger.info(`Waiting for tx ${i}-${j}: ${await tx.getTxHash()} to be mined`);
return tx.wait();
}),
),
);

// shutdown all nodes.
// for (const context of contexts) {
for (const context of contexts) {
await context.node.stop();
await context.pxeService.stop();
}
});

const createBootstrapNode = async () => {
const peerId = await createLibP2PPeerId();
const bootstrapNode = new BootstrapNode();
Expand All @@ -87,7 +175,12 @@ describe('e2e_p2p_network', () => {
};

// creates a P2P enabled instance of Aztec Node Service
const createNode = async (tcpListenPort: number, bootstrapNode: string, publisherAddressIndex: number) => {
const createNode = async (
tcpListenPort: number,
bootstrapNode: string | undefined,
publisherAddressIndex: number,
dataDirectory?: string,
) => {
// We use different L1 publisher accounts in order to avoid duplicate tx nonces. We start from
// publisherAddressIndex + 1 because index 0 was already used during test environment setup.
const hdAccount = mnemonicToAccount(MNEMONIC, { addressIndex: publisherAddressIndex + 1 });
Expand All @@ -96,38 +189,21 @@ describe('e2e_p2p_network', () => {

const newConfig: AztecNodeConfig = {
...config,
peerIdPrivateKey: PEER_ID_PRIVATE_KEYS[publisherAddressIndex],
udpListenAddress: `0.0.0.0:${tcpListenPort}`,
tcpListenAddress: `0.0.0.0:${tcpListenPort}`,
tcpAnnounceAddress: `127.0.0.1:${tcpListenPort}`,
udpAnnounceAddress: `127.0.0.1:${tcpListenPort}`,
bootstrapNodes: [bootstrapNode],
minTxsPerBlock: NUM_TXS_PER_BLOCK,
maxTxsPerBlock: NUM_TXS_PER_BLOCK,
p2pEnabled: true,
p2pBlockCheckIntervalMS: 1000,
p2pL2QueueSize: 1,
transactionProtocol: '',
dataDirectory,
bootstrapNodes: bootstrapNode ? [bootstrapNode] : [],
};
return await AztecNodeService.createAndSync(newConfig);
};

// submits a set of transactions to the provided Private eXecution Environment (PXE)
const submitTxsTo = async (pxe: PXEService, account: AztecAddress, numTxs: number) => {
const txs: SentTx[] = [];
for (let i = 0; i < numTxs; i++) {
const tx = getSchnorrAccount(pxe, Fr.random(), GrumpkinScalar.random(), Fr.random()).deploy();
logger.info(`Tx sent with hash ${await tx.getTxHash()}`);
const receipt = await tx.getReceipt();
expect(receipt).toEqual(
expect.objectContaining({
status: TxStatus.PENDING,
error: '',
}),
);
logger.info(`Receipt received for ${await tx.getTxHash()}`);
txs.push(tx);
}
return txs;
return await AztecNodeService.createAndSync(newConfig, createDebugLogger(`aztec:node-${tcpListenPort}`));
};

// creates an instance of the PXE and submit a given number of transactions to it.
Expand All @@ -142,12 +218,44 @@ describe('e2e_p2p_network', () => {
const completeAddress = CompleteAddress.fromSecretKeyAndPartialAddress(secretKey, Fr.random());
await pxeService.registerAccount(secretKey, completeAddress.partialAddress);

const txs = await submitTxsTo(pxeService, completeAddress.address, numTxs);
const txs = await submitTxsTo(pxeService, numTxs);
return {
txs,
account: completeAddress.address,
pxeService,
node,
};
};

// submits a set of transactions to the provided Private eXecution Environment (PXE)
const submitTxsTo = async (pxe: PXEService, numTxs: number) => {
const txs: SentTx[] = [];
for (let i = 0; i < numTxs; i++) {
// const tx = getSchnorrAccount(pxe, Fr.random(), GrumpkinScalar.random(), Fr.random()).deploy();
const accountManager = getSchnorrAccount(pxe, Fr.random(), GrumpkinScalar.random(), Fr.random());
const deployMethod = await accountManager.getDeployMethod();
await deployMethod.create({
contractAddressSalt: accountManager.salt,
skipClassRegistration: true,
skipPublicDeployment: true,
universalDeploy: true,
});
await deployMethod.prove({});
const tx = deployMethod.send();

const txHash = await tx.getTxHash();

logger.info(`Tx sent with hash ${txHash}`);
const receipt = await tx.getReceipt();
expect(receipt).toEqual(
expect.objectContaining({
status: TxStatus.PENDING,
error: '',
}),
);
logger.info(`Receipt received for ${txHash}`);
txs.push(tx);
}
return txs;
};
});
10 changes: 4 additions & 6 deletions yarn-project/p2p/src/client/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import { type AztecKVStore } from '@aztec/kv-store';
import { P2PClient } from '../client/p2p_client.js';
import { type P2PConfig } from '../config.js';
import { DiscV5Service } from '../service/discV5_service.js';
import { DummyP2PService, DummyPeerDiscoveryService } from '../service/dummy_service.js';
import { DummyP2PService } from '../service/dummy_service.js';
import { LibP2PService, createLibP2PPeerId } from '../service/index.js';
import { type TxPool } from '../tx_pool/index.js';
import { getPublicIp, splitAddressPort } from '../util.js';
Expand All @@ -17,7 +17,6 @@ export const createP2PClient = async (
txPool: TxPool,
l2BlockSource: L2BlockSource,
) => {
let discv5Service;
let p2pService;

if (config.p2pEnabled) {
Expand All @@ -40,7 +39,7 @@ export const createP2PClient = async (
config.tcpAnnounceAddress = tcpAnnounceAddress;
} else {
throw new Error(
`Invalid announceTcpAddress provided: ${splitTcpAnnounceAddress}. Expected format: <addr>:<port>`,
`Invalid announceTcpAddress provided: ${configTcpAnnounceAddress}. Expected format: <addr>:<port>`,
);
}
}
Expand All @@ -59,11 +58,10 @@ export const createP2PClient = async (

// Create peer discovery service
const peerId = await createLibP2PPeerId(config.peerIdPrivateKey);
discv5Service = new DiscV5Service(peerId, config);
p2pService = await LibP2PService.new(config, discv5Service, peerId, txPool);
const discoveryService = new DiscV5Service(peerId, config);
p2pService = await LibP2PService.new(config, discoveryService, peerId, txPool, store);
} else {
p2pService = new DummyP2PService();
discv5Service = new DummyPeerDiscoveryService();
}
return new P2PClient(store, l2BlockSource, txPool, p2pService);
};
1 change: 0 additions & 1 deletion yarn-project/p2p/src/client/p2p_client.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ describe('In-Memory P2P Client', () => {
start: jest.fn(),
stop: jest.fn(),
propagateTx: jest.fn(),
settledTxs: jest.fn(),
};

blockSource = new MockBlockSource();
Expand Down
3 changes: 1 addition & 2 deletions yarn-project/p2p/src/client/p2p_client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ export class P2PClient implements P2P {
this.log.debug('Stopped block downloader');
await this.runningPromise;
this.setCurrentState(P2PClientState.STOPPED);
this.log.info('P2P client stopped...');
this.log.info('P2P client stopped.');
}

/**
Expand Down Expand Up @@ -278,7 +278,6 @@ export class P2PClient implements P2P {
for (const block of blocks) {
const txHashes = block.body.txEffects.map(txEffect => txEffect.txHash);
await this.txPool.deleteTxs(txHashes);
this.p2pService.settledTxs(txHashes);
}
}

Expand Down
Loading