Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion yarn-project/archiver/src/archiver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,6 @@ export class Archiver extends ArchiverDataSourceBase implements L2BlockSink, Tra
* @param dataStore - An archiver data store for storage & retrieval of blocks, encrypted logs & contract data.
* @param config - Archiver configuration options.
* @param blobClient - Client for retrieving blob data.
* @param epochCache - Cache for epoch-related data.
* @param dateProvider - Provider for current date/time.
* @param instrumentation - Instrumentation for metrics and tracing.
* @param l1Constants - L1 rollup constants.
Expand Down
16 changes: 14 additions & 2 deletions yarn-project/aztec-node/src/sentinel/sentinel.test.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import type { EpochCache } from '@aztec/epoch-cache';
import { EpochCache } from '@aztec/epoch-cache';
import { BlockNumber, CheckpointNumber, EpochNumber, SlotNumber } from '@aztec/foundation/branded-types';
import { compactArray, times } from '@aztec/foundation/collection';
import { Secp256k1Signer } from '@aztec/foundation/crypto/secp256k1-signer';
Expand Down Expand Up @@ -79,7 +79,15 @@ describe('sentinel', () => {
rollupManaLimit: Number.MAX_SAFE_INTEGER,
};

epochCache.getEpochAndSlotNow.mockReturnValue({ epoch, slot, ts, nowMs: ts * 1000n });
epochCache.getEpochAndSlotNow.mockReturnValue({
epoch,
slot,
ts,
nowMs: ts * 1000n,
});
epochCache.getSlotNow.mockReturnValue(slot);
epochCache.getEpochNow.mockReturnValue(epoch);
epochCache.isProposerPipeliningEnabled.mockReturnValue(false);
epochCache.getL1Constants.mockReturnValue(l1Constants);

sentinel = new TestSentinel(epochCache, archiver, p2p, store, config, blockStream);
Expand Down Expand Up @@ -590,6 +598,10 @@ describe('sentinel', () => {
ts,
nowMs: ts * 1000n,
});
epochCache.getSlotNow.mockReturnValue(slot);
epochCache.getTargetSlot.mockReturnValue(slot);
epochCache.getEpochNow.mockReturnValue(epochNumber);
epochCache.getTargetEpoch.mockReturnValue(epochNumber);
archiver.getBlockHeader.calledWith(blockNumber).mockResolvedValue(mockBlock.header);
archiver.getL1Constants.mockResolvedValue(l1Constants);
epochCache.getL1Constants.mockReturnValue(l1Constants);
Expand Down
8 changes: 4 additions & 4 deletions yarn-project/aztec-node/src/sentinel/sentinel.ts
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ export class Sentinel extends (EventEmitter as new () => WatcherEmitter) impleme

/** Loads initial slot and initializes blockstream. We will not process anything at or before the initial slot. */
protected async init() {
this.initialSlot = this.epochCache.getEpochAndSlotNow().slot;
this.initialSlot = this.epochCache.getSlotNow();
const startingBlock = BlockNumber(await this.archiver.getBlockNumber());
this.logger.info(`Starting validator sentinel with initial slot ${this.initialSlot} and block ${startingBlock}`);
this.blockStream = new L2BlockStream(this.archiver, this.l2TipsStore, this, this.logger, { startingBlock });
Expand Down Expand Up @@ -264,7 +264,7 @@ export class Sentinel extends (EventEmitter as new () => WatcherEmitter) impleme
* and we don't have that data if we were offline during the period.
*/
public async work() {
const { slot: currentSlot } = this.epochCache.getEpochAndSlotNow();
const currentSlot = this.epochCache.getSlotNow();
try {
// Manually sync the block stream to ensure we have the latest data.
// Note we never `start` the blockstream, so it loops at the same pace as we do.
Expand Down Expand Up @@ -436,7 +436,7 @@ export class Sentinel extends (EventEmitter as new () => WatcherEmitter) impleme
? fromEntries(await Promise.all(validators.map(async v => [v.toString(), await this.store.getHistory(v)])))
: await this.store.getHistories();

const slotNow = this.epochCache.getEpochAndSlotNow().slot;
const slotNow = this.epochCache.getSlotNow();
fromSlot ??= SlotNumber(Math.max((this.lastProcessedSlot ?? slotNow) - this.store.getHistoryLength(), 0));
toSlot ??= this.lastProcessedSlot ?? slotNow;

Expand Down Expand Up @@ -464,7 +464,7 @@ export class Sentinel extends (EventEmitter as new () => WatcherEmitter) impleme
return undefined;
}

const slotNow = this.epochCache.getEpochAndSlotNow().slot;
const slotNow = this.epochCache.getSlotNow();
const effectiveFromSlot =
fromSlot ?? SlotNumber(Math.max((this.lastProcessedSlot ?? slotNow) - this.store.getHistoryLength(), 0));
const effectiveToSlot = toSlot ?? this.lastProcessedSlot ?? slotNow;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import { CheckpointNumber, SlotNumber } from '@aztec/foundation/branded-types';
import { times, timesAsync } from '@aztec/foundation/collection';
import { SecretValue } from '@aztec/foundation/config';
import { retryUntil } from '@aztec/foundation/retry';
import { sleep } from '@aztec/foundation/sleep';
import { bufferToHex } from '@aztec/foundation/string';
import { executeTimeout } from '@aztec/foundation/timer';
import { TestContract } from '@aztec/noir-test-contracts.js/Test';
Expand Down Expand Up @@ -552,11 +553,20 @@ describe('e2e_epochs/epochs_mbps', () => {
});
await waitUntilL1Timestamp(test.l1Client, targetTimestamp, undefined, test.L2_SLOT_DURATION_IN_S * 3);

// Send both pre-proved txs simultaneously, waiting for them to be checkpointed.
// Send the deploy tx first and give it time to propagate to all validators,
// then send the call tx. Priority fees are a safety net, but arrival ordering
// ensures the deploy tx is in the pool before the call tx regardless of gossip timing.
const timeout = test.L2_SLOT_DURATION_IN_S * 5;
logger.warn(`Sending both txs and waiting for checkpointed receipts`);
logger.warn(`Sending deploy tx first, then call tx`);
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fix a flake

const deployTxHash = await deployTx.send({ wait: NO_WAIT });
await sleep(1000);
const callTxHash = await callTx.send({ wait: NO_WAIT });
const [deployReceipt, callReceipt] = await executeTimeout(
() => Promise.all([deployTx.send({ wait: { timeout } }), callTx.send({ wait: { timeout } })]),
() =>
Promise.all([
waitForTx(context.aztecNode, deployTxHash, { timeout }),
waitForTx(context.aztecNode, callTxHash, { timeout }),
]),
timeout * 1000,
);
logger.warn(`Both txs checkpointed`, {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,254 @@
import type { Archiver } from '@aztec/archiver';
import type { AztecNodeService } from '@aztec/aztec-node';
import { AztecAddress, EthAddress } from '@aztec/aztec.js/addresses';
import { NO_WAIT } from '@aztec/aztec.js/contracts';
import { Fr } from '@aztec/aztec.js/fields';
import type { Logger } from '@aztec/aztec.js/log';
import { waitForTx } from '@aztec/aztec.js/node';
import { RollupContract } from '@aztec/ethereum/contracts';
import type { Operator } from '@aztec/ethereum/deploy-aztec-l1-contracts';
import { asyncMap } from '@aztec/foundation/async-map';
import { BlockNumber, CheckpointNumber, SlotNumber } from '@aztec/foundation/branded-types';
import { times, timesAsync } from '@aztec/foundation/collection';
import { SecretValue } from '@aztec/foundation/config';
import { bufferToHex } from '@aztec/foundation/string';
import { executeTimeout } from '@aztec/foundation/timer';
import { TestContract } from '@aztec/noir-test-contracts.js/Test';
import type { SequencerEvents } from '@aztec/sequencer-client';
import { getTimestampForSlot } from '@aztec/stdlib/epoch-helpers';

import { jest } from '@jest/globals';
import { privateKeyToAccount } from 'viem/accounts';

import { type EndToEndContext, getPrivateKeyFromIndex } from '../fixtures/utils.js';
import { TestWallet } from '../test-wallet/test_wallet.js';
import { proveInteraction } from '../test-wallet/utils.js';
import { EpochsTestContext } from './epochs_test.js';

jest.setTimeout(1000 * 60 * 20);

const NODE_COUNT = 4;
const EXPECTED_BLOCKS_PER_CHECKPOINT = 1;

// Send enough transactions to trigger multiple blocks within a checkpoint assuming 2 txs per block.
const TX_COUNT = 10;

/**
* E2E tests for proposer pipelining with Multiple Blocks Per Slot (MBPS).
* Verifies that with pipelining enabled, the block proposer in slot N is the validator
* scheduled on L1 for slot N+1 (the proposer view uses a +1 slot offset).
*/
describe('e2e_epochs/epochs_mbps_pipeline', () => {
let context: EndToEndContext;
let logger: Logger;
let rollup: RollupContract;
let archiver: Archiver;

let test: EpochsTestContext;
let validators: (Operator & { privateKey: `0x${string}` })[];
let nodes: AztecNodeService[];
let contract: TestContract;
let wallet: TestWallet;
let from: AztecAddress;

/** Creates validators and sets up the test context with MBPS and proposer pipelining. */
async function setupTest(opts: {
syncChainTip: 'proposed' | 'checkpointed';
minTxsPerBlock?: number;
maxTxsPerBlock?: number;
}) {
const { syncChainTip = 'checkpointed', ...setupOpts } = opts;

validators = times(NODE_COUNT, i => {
const privateKey = bufferToHex(getPrivateKeyFromIndex(i + 3)!);
const attester = EthAddress.fromString(privateKeyToAccount(privateKey).address);
return { attester, withdrawer: attester, privateKey, bn254SecretKey: new SecretValue(Fr.random().toBigInt()) };
});

test = await EpochsTestContext.setup({
numberOfAccounts: 1,
initialValidators: validators,
enableProposerPipelining: true, // <- yehaw
mockGossipSubNetwork: true,
disableAnvilTestWatcher: true,
startProverNode: true,
aztecEpochDuration: 4,
enforceTimeTable: true,
ethereumSlotDuration: 4,
aztecSlotDuration: 36,
blockDurationMs: 8000,
l1PublishingTime: 2,
attestationPropagationTime: 0.5,
aztecTargetCommitteeSize: 3,
...setupOpts,
pxeOpts: { syncChainTip },
});

({ context, logger, rollup } = test);
wallet = context.wallet;
from = context.accounts[0];

logger.warn(`Stopping sequencer in initial aztec node.`);
await context.sequencer!.stop();

logger.warn(`Initial setup complete. Starting ${NODE_COUNT} validator nodes.`);
// Clear inherited coinbase so each validator derives coinbase from its own attester key
nodes = await asyncMap(validators, ({ privateKey }) =>
test.createValidatorNode([privateKey], { dontStartSequencer: true, coinbase: undefined }),
);
logger.warn(`Started ${NODE_COUNT} validator nodes.`, { validators: validators.map(v => v.attester.toString()) });

wallet.updateNode(nodes[0]);
archiver = nodes[0].getBlockSource() as Archiver;

contract = await test.registerTestContract(wallet);
logger.warn(`Test setup completed.`, { validators: validators.map(v => v.attester.toString()) });
}

/** Retrieves all checkpoints from the archiver, checks that one has the target block count, and returns its number. */
async function assertMultipleBlocksPerSlot(targetBlockCount: number, logger: Logger): Promise<CheckpointNumber> {
const checkpoints = await archiver.getCheckpoints(CheckpointNumber(1), 50);
logger.warn(`Retrieved ${checkpoints.length} checkpoints from archiver`, {
checkpoints: checkpoints.map(pc => pc.checkpoint.getStats()),
});

let expectedBlockNumber = checkpoints[0].checkpoint.blocks[0].number;
let multiBlockCheckpointNumber: CheckpointNumber | undefined;

for (const checkpoint of checkpoints) {
const blockCount = checkpoint.checkpoint.blocks.length;
if (blockCount >= targetBlockCount && multiBlockCheckpointNumber === undefined) {
multiBlockCheckpointNumber = checkpoint.checkpoint.number;
}
logger.warn(`Checkpoint ${checkpoint.checkpoint.number} has ${blockCount} blocks`, {
checkpoint: checkpoint.checkpoint.getStats(),
});

for (let i = 0; i < blockCount; i++) {
const block = checkpoint.checkpoint.blocks[i];
expect(block.indexWithinCheckpoint).toBe(i);
expect(block.checkpointNumber).toBe(checkpoint.checkpoint.number);
expect(block.number).toBe(expectedBlockNumber);
expectedBlockNumber++;
}
}

expect(multiBlockCheckpointNumber).toBeDefined();
return multiBlockCheckpointNumber!;
}

/** Waits until a specific multi-block checkpoint is proven. */
async function waitForProvenCheckpoint(targetCheckpoint: CheckpointNumber) {
const provenTimeout = test.L2_SLOT_DURATION_IN_S * test.epochDuration * 4;
logger.warn(`Waiting for checkpoint ${targetCheckpoint} to be proven (timeout=${provenTimeout}s)`);
await test.waitUntilProvenCheckpointNumber(targetCheckpoint, provenTimeout);
logger.warn(`Proven checkpoint advanced to ${test.monitor.provenCheckpointNumber}`);
}

/**
* Asserts pipelining by comparing the build slot (from block-proposed events) against
* the submission slot (from block headers). With pipelining, the block is built in slot N
* but its header carries submission slot N+1.
*/
async function assertProposerPipelining(
blockProposedEvents: { blockNumber: BlockNumber; slot: SlotNumber; buildSlot: SlotNumber }[],
logger: Logger,
) {
const checkpoints = await archiver.getCheckpoints(CheckpointNumber(1), 50);
const allBlocks = checkpoints.flatMap(pc => pc.checkpoint.blocks);

logger.warn(`assertProposerPipelining: ${allBlocks.length} blocks, ${blockProposedEvents.length} events`, {
blockNumbers: allBlocks.map(b => b.number),
eventBlockNumbers: blockProposedEvents.map(e => e.blockNumber),
});

let foundPipelining = false;

for (const block of allBlocks) {
const headerSlot = block.header.globalVariables.slotNumber; // submission slot (N+1)
const coinbase = block.header.globalVariables.coinbase;

// Find the block-proposed event for this block (use Number() for safe comparison)
const event = blockProposedEvents.find(e => Number(e.blockNumber) === Number(block.number));
// if there is no event, then it was probably block number one - which was proposed in setup
if (!event) {
continue;
}

const buildSlot = event.buildSlot; // build slot (N)

// Verify the pipelining offset: block built in slot N, submitted in slot N+1
expect(Number(headerSlot)).toBe(Number(buildSlot) + 1);
foundPipelining = true;

// Verify coinbase matches the expected proposer for the submission slot
const expectedProposer = await rollup.getProposerAt(getTimestampForSlot(headerSlot, test.constants));
expect(coinbase).toEqual(expectedProposer);

logger.warn(`Block ${block.number}: buildSlot=${buildSlot}, submissionSlot=${headerSlot}, coinbase=${coinbase}`, {
blockNumber: block.number,
buildSlot,
headerSlot,
coinbase: coinbase.toString(),
expectedProposer: expectedProposer.toString(),
});
}

expect(foundPipelining).toBe(true);
logger.warn(`Pipelining assertion passed for ${allBlocks.length} blocks`);
}

afterEach(async () => {
jest.restoreAllMocks();
await test?.teardown();
});

it('pipelining builds blocks using slot plus 1 proposer and proves them', async () => {
await setupTest({ syncChainTip: 'checkpointed', minTxsPerBlock: 1, maxTxsPerBlock: 2 });

// Subscribe to block-proposed events to capture build slots
const blockProposedEvents: { blockNumber: BlockNumber; slot: SlotNumber; buildSlot: SlotNumber }[] = [];
const sequencers = nodes.map(n => n.getSequencer()!);
for (const sequencer of sequencers) {
sequencer.getSequencer().on('block-proposed', (args: Parameters<SequencerEvents['block-proposed']>[0]) => {
logger.warn(`block-proposed event: blockNumber=${args.blockNumber}, slot=${args.slot}`, args);
blockProposedEvents.push({
blockNumber: args.blockNumber,
slot: args.slot,
buildSlot: args.buildSlot,
});
});
}

const initialCheckpointNumber = await rollup.getCheckpointNumber();
logger.warn(`Initial checkpoint number: ${initialCheckpointNumber}`);

// Pre-prove and send transactions
const txs = await timesAsync(TX_COUNT, i =>
proveInteraction(context.wallet, contract.methods.emit_nullifier(new Fr(i + 1)), { from }),
);
const txHashes = await Promise.all(txs.map(tx => tx.send({ wait: NO_WAIT })));
logger.warn(`Sent ${txHashes.length} transactions`, { txs: txHashes });

// Start the sequencers
await Promise.all(sequencers.map(s => s.start()));
logger.warn(`Started all sequencers`);

// Wait until all txs are mined
const timeout = test.L2_SLOT_DURATION_IN_S * 5;
await executeTimeout(
() => Promise.all(txHashes.map(txHash => waitForTx(context.aztecNode, txHash, { timeout }))),
timeout * 1000,
);
logger.warn(`All txs have been mined`);

// Verify MBPS works with pipelining
const multiBlockCheckpoint = await assertMultipleBlocksPerSlot(EXPECTED_BLOCKS_PER_CHECKPOINT, logger);

// Verify the pipelining offset: build slot N vs submission slot N+1
await assertProposerPipelining(blockProposedEvents, logger);

// Verify proving still works end-to-end with pipelined proposers
await waitForProvenCheckpoint(multiBlockCheckpoint);
});
});
Loading
Loading