diff --git a/yarn-project/archiver/src/archiver.ts b/yarn-project/archiver/src/archiver.ts index 307a9674d505..b34c42661457 100644 --- a/yarn-project/archiver/src/archiver.ts +++ b/yarn-project/archiver/src/archiver.ts @@ -95,7 +95,6 @@ export class Archiver extends ArchiverDataSourceBase implements L2BlockSink, Tra * @param dataStore - An archiver data store for storage & retrieval of blocks, encrypted logs & contract data. * @param config - Archiver configuration options. * @param blobClient - Client for retrieving blob data. - * @param epochCache - Cache for epoch-related data. * @param dateProvider - Provider for current date/time. * @param instrumentation - Instrumentation for metrics and tracing. * @param l1Constants - L1 rollup constants. diff --git a/yarn-project/aztec-node/src/sentinel/sentinel.test.ts b/yarn-project/aztec-node/src/sentinel/sentinel.test.ts index f3b7e2f5ea3a..671de2cff77c 100644 --- a/yarn-project/aztec-node/src/sentinel/sentinel.test.ts +++ b/yarn-project/aztec-node/src/sentinel/sentinel.test.ts @@ -1,4 +1,4 @@ -import type { EpochCache } from '@aztec/epoch-cache'; +import { EpochCache } from '@aztec/epoch-cache'; import { BlockNumber, CheckpointNumber, EpochNumber, SlotNumber } from '@aztec/foundation/branded-types'; import { compactArray, times } from '@aztec/foundation/collection'; import { Secp256k1Signer } from '@aztec/foundation/crypto/secp256k1-signer'; @@ -79,7 +79,15 @@ describe('sentinel', () => { rollupManaLimit: Number.MAX_SAFE_INTEGER, }; - epochCache.getEpochAndSlotNow.mockReturnValue({ epoch, slot, ts, nowMs: ts * 1000n }); + epochCache.getEpochAndSlotNow.mockReturnValue({ + epoch, + slot, + ts, + nowMs: ts * 1000n, + }); + epochCache.getSlotNow.mockReturnValue(slot); + epochCache.getEpochNow.mockReturnValue(epoch); + epochCache.isProposerPipeliningEnabled.mockReturnValue(false); epochCache.getL1Constants.mockReturnValue(l1Constants); sentinel = new TestSentinel(epochCache, archiver, p2p, store, config, blockStream); @@ -590,6 +598,10 @@ describe('sentinel', () => { ts, nowMs: ts * 1000n, }); + epochCache.getSlotNow.mockReturnValue(slot); + epochCache.getTargetSlot.mockReturnValue(slot); + epochCache.getEpochNow.mockReturnValue(epochNumber); + epochCache.getTargetEpoch.mockReturnValue(epochNumber); archiver.getBlockHeader.calledWith(blockNumber).mockResolvedValue(mockBlock.header); archiver.getL1Constants.mockResolvedValue(l1Constants); epochCache.getL1Constants.mockReturnValue(l1Constants); diff --git a/yarn-project/aztec-node/src/sentinel/sentinel.ts b/yarn-project/aztec-node/src/sentinel/sentinel.ts index eed7d863513a..8c07e06056d3 100644 --- a/yarn-project/aztec-node/src/sentinel/sentinel.ts +++ b/yarn-project/aztec-node/src/sentinel/sentinel.ts @@ -88,7 +88,7 @@ export class Sentinel extends (EventEmitter as new () => WatcherEmitter) impleme /** Loads initial slot and initializes blockstream. We will not process anything at or before the initial slot. */ protected async init() { - this.initialSlot = this.epochCache.getEpochAndSlotNow().slot; + this.initialSlot = this.epochCache.getSlotNow(); const startingBlock = BlockNumber(await this.archiver.getBlockNumber()); this.logger.info(`Starting validator sentinel with initial slot ${this.initialSlot} and block ${startingBlock}`); this.blockStream = new L2BlockStream(this.archiver, this.l2TipsStore, this, this.logger, { startingBlock }); @@ -264,7 +264,7 @@ export class Sentinel extends (EventEmitter as new () => WatcherEmitter) impleme * and we don't have that data if we were offline during the period. */ public async work() { - const { slot: currentSlot } = this.epochCache.getEpochAndSlotNow(); + const currentSlot = this.epochCache.getSlotNow(); try { // Manually sync the block stream to ensure we have the latest data. // Note we never `start` the blockstream, so it loops at the same pace as we do. @@ -436,7 +436,7 @@ export class Sentinel extends (EventEmitter as new () => WatcherEmitter) impleme ? fromEntries(await Promise.all(validators.map(async v => [v.toString(), await this.store.getHistory(v)]))) : await this.store.getHistories(); - const slotNow = this.epochCache.getEpochAndSlotNow().slot; + const slotNow = this.epochCache.getSlotNow(); fromSlot ??= SlotNumber(Math.max((this.lastProcessedSlot ?? slotNow) - this.store.getHistoryLength(), 0)); toSlot ??= this.lastProcessedSlot ?? slotNow; @@ -464,7 +464,7 @@ export class Sentinel extends (EventEmitter as new () => WatcherEmitter) impleme return undefined; } - const slotNow = this.epochCache.getEpochAndSlotNow().slot; + const slotNow = this.epochCache.getSlotNow(); const effectiveFromSlot = fromSlot ?? SlotNumber(Math.max((this.lastProcessedSlot ?? slotNow) - this.store.getHistoryLength(), 0)); const effectiveToSlot = toSlot ?? this.lastProcessedSlot ?? slotNow; diff --git a/yarn-project/end-to-end/src/e2e_epochs/epochs_mbps.parallel.test.ts b/yarn-project/end-to-end/src/e2e_epochs/epochs_mbps.parallel.test.ts index 69aeef5670e0..0f293e34e4aa 100644 --- a/yarn-project/end-to-end/src/e2e_epochs/epochs_mbps.parallel.test.ts +++ b/yarn-project/end-to-end/src/e2e_epochs/epochs_mbps.parallel.test.ts @@ -15,6 +15,7 @@ import { CheckpointNumber, SlotNumber } from '@aztec/foundation/branded-types'; import { times, timesAsync } from '@aztec/foundation/collection'; import { SecretValue } from '@aztec/foundation/config'; import { retryUntil } from '@aztec/foundation/retry'; +import { sleep } from '@aztec/foundation/sleep'; import { bufferToHex } from '@aztec/foundation/string'; import { executeTimeout } from '@aztec/foundation/timer'; import { TestContract } from '@aztec/noir-test-contracts.js/Test'; @@ -552,11 +553,20 @@ describe('e2e_epochs/epochs_mbps', () => { }); await waitUntilL1Timestamp(test.l1Client, targetTimestamp, undefined, test.L2_SLOT_DURATION_IN_S * 3); - // Send both pre-proved txs simultaneously, waiting for them to be checkpointed. + // Send the deploy tx first and give it time to propagate to all validators, + // then send the call tx. Priority fees are a safety net, but arrival ordering + // ensures the deploy tx is in the pool before the call tx regardless of gossip timing. const timeout = test.L2_SLOT_DURATION_IN_S * 5; - logger.warn(`Sending both txs and waiting for checkpointed receipts`); + logger.warn(`Sending deploy tx first, then call tx`); + const deployTxHash = await deployTx.send({ wait: NO_WAIT }); + await sleep(1000); + const callTxHash = await callTx.send({ wait: NO_WAIT }); const [deployReceipt, callReceipt] = await executeTimeout( - () => Promise.all([deployTx.send({ wait: { timeout } }), callTx.send({ wait: { timeout } })]), + () => + Promise.all([ + waitForTx(context.aztecNode, deployTxHash, { timeout }), + waitForTx(context.aztecNode, callTxHash, { timeout }), + ]), timeout * 1000, ); logger.warn(`Both txs checkpointed`, { diff --git a/yarn-project/end-to-end/src/e2e_epochs/epochs_mbps.pipeline.parallel.test.ts b/yarn-project/end-to-end/src/e2e_epochs/epochs_mbps.pipeline.parallel.test.ts new file mode 100644 index 000000000000..6c135a0b0d63 --- /dev/null +++ b/yarn-project/end-to-end/src/e2e_epochs/epochs_mbps.pipeline.parallel.test.ts @@ -0,0 +1,254 @@ +import type { Archiver } from '@aztec/archiver'; +import type { AztecNodeService } from '@aztec/aztec-node'; +import { AztecAddress, EthAddress } from '@aztec/aztec.js/addresses'; +import { NO_WAIT } from '@aztec/aztec.js/contracts'; +import { Fr } from '@aztec/aztec.js/fields'; +import type { Logger } from '@aztec/aztec.js/log'; +import { waitForTx } from '@aztec/aztec.js/node'; +import { RollupContract } from '@aztec/ethereum/contracts'; +import type { Operator } from '@aztec/ethereum/deploy-aztec-l1-contracts'; +import { asyncMap } from '@aztec/foundation/async-map'; +import { BlockNumber, CheckpointNumber, SlotNumber } from '@aztec/foundation/branded-types'; +import { times, timesAsync } from '@aztec/foundation/collection'; +import { SecretValue } from '@aztec/foundation/config'; +import { bufferToHex } from '@aztec/foundation/string'; +import { executeTimeout } from '@aztec/foundation/timer'; +import { TestContract } from '@aztec/noir-test-contracts.js/Test'; +import type { SequencerEvents } from '@aztec/sequencer-client'; +import { getTimestampForSlot } from '@aztec/stdlib/epoch-helpers'; + +import { jest } from '@jest/globals'; +import { privateKeyToAccount } from 'viem/accounts'; + +import { type EndToEndContext, getPrivateKeyFromIndex } from '../fixtures/utils.js'; +import { TestWallet } from '../test-wallet/test_wallet.js'; +import { proveInteraction } from '../test-wallet/utils.js'; +import { EpochsTestContext } from './epochs_test.js'; + +jest.setTimeout(1000 * 60 * 20); + +const NODE_COUNT = 4; +const EXPECTED_BLOCKS_PER_CHECKPOINT = 1; + +// Send enough transactions to trigger multiple blocks within a checkpoint assuming 2 txs per block. +const TX_COUNT = 10; + +/** + * E2E tests for proposer pipelining with Multiple Blocks Per Slot (MBPS). + * Verifies that with pipelining enabled, the block proposer in slot N is the validator + * scheduled on L1 for slot N+1 (the proposer view uses a +1 slot offset). + */ +describe('e2e_epochs/epochs_mbps_pipeline', () => { + let context: EndToEndContext; + let logger: Logger; + let rollup: RollupContract; + let archiver: Archiver; + + let test: EpochsTestContext; + let validators: (Operator & { privateKey: `0x${string}` })[]; + let nodes: AztecNodeService[]; + let contract: TestContract; + let wallet: TestWallet; + let from: AztecAddress; + + /** Creates validators and sets up the test context with MBPS and proposer pipelining. */ + async function setupTest(opts: { + syncChainTip: 'proposed' | 'checkpointed'; + minTxsPerBlock?: number; + maxTxsPerBlock?: number; + }) { + const { syncChainTip = 'checkpointed', ...setupOpts } = opts; + + validators = times(NODE_COUNT, i => { + const privateKey = bufferToHex(getPrivateKeyFromIndex(i + 3)!); + const attester = EthAddress.fromString(privateKeyToAccount(privateKey).address); + return { attester, withdrawer: attester, privateKey, bn254SecretKey: new SecretValue(Fr.random().toBigInt()) }; + }); + + test = await EpochsTestContext.setup({ + numberOfAccounts: 1, + initialValidators: validators, + enableProposerPipelining: true, // <- yehaw + mockGossipSubNetwork: true, + disableAnvilTestWatcher: true, + startProverNode: true, + aztecEpochDuration: 4, + enforceTimeTable: true, + ethereumSlotDuration: 4, + aztecSlotDuration: 36, + blockDurationMs: 8000, + l1PublishingTime: 2, + attestationPropagationTime: 0.5, + aztecTargetCommitteeSize: 3, + ...setupOpts, + pxeOpts: { syncChainTip }, + }); + + ({ context, logger, rollup } = test); + wallet = context.wallet; + from = context.accounts[0]; + + logger.warn(`Stopping sequencer in initial aztec node.`); + await context.sequencer!.stop(); + + logger.warn(`Initial setup complete. Starting ${NODE_COUNT} validator nodes.`); + // Clear inherited coinbase so each validator derives coinbase from its own attester key + nodes = await asyncMap(validators, ({ privateKey }) => + test.createValidatorNode([privateKey], { dontStartSequencer: true, coinbase: undefined }), + ); + logger.warn(`Started ${NODE_COUNT} validator nodes.`, { validators: validators.map(v => v.attester.toString()) }); + + wallet.updateNode(nodes[0]); + archiver = nodes[0].getBlockSource() as Archiver; + + contract = await test.registerTestContract(wallet); + logger.warn(`Test setup completed.`, { validators: validators.map(v => v.attester.toString()) }); + } + + /** Retrieves all checkpoints from the archiver, checks that one has the target block count, and returns its number. */ + async function assertMultipleBlocksPerSlot(targetBlockCount: number, logger: Logger): Promise { + const checkpoints = await archiver.getCheckpoints(CheckpointNumber(1), 50); + logger.warn(`Retrieved ${checkpoints.length} checkpoints from archiver`, { + checkpoints: checkpoints.map(pc => pc.checkpoint.getStats()), + }); + + let expectedBlockNumber = checkpoints[0].checkpoint.blocks[0].number; + let multiBlockCheckpointNumber: CheckpointNumber | undefined; + + for (const checkpoint of checkpoints) { + const blockCount = checkpoint.checkpoint.blocks.length; + if (blockCount >= targetBlockCount && multiBlockCheckpointNumber === undefined) { + multiBlockCheckpointNumber = checkpoint.checkpoint.number; + } + logger.warn(`Checkpoint ${checkpoint.checkpoint.number} has ${blockCount} blocks`, { + checkpoint: checkpoint.checkpoint.getStats(), + }); + + for (let i = 0; i < blockCount; i++) { + const block = checkpoint.checkpoint.blocks[i]; + expect(block.indexWithinCheckpoint).toBe(i); + expect(block.checkpointNumber).toBe(checkpoint.checkpoint.number); + expect(block.number).toBe(expectedBlockNumber); + expectedBlockNumber++; + } + } + + expect(multiBlockCheckpointNumber).toBeDefined(); + return multiBlockCheckpointNumber!; + } + + /** Waits until a specific multi-block checkpoint is proven. */ + async function waitForProvenCheckpoint(targetCheckpoint: CheckpointNumber) { + const provenTimeout = test.L2_SLOT_DURATION_IN_S * test.epochDuration * 4; + logger.warn(`Waiting for checkpoint ${targetCheckpoint} to be proven (timeout=${provenTimeout}s)`); + await test.waitUntilProvenCheckpointNumber(targetCheckpoint, provenTimeout); + logger.warn(`Proven checkpoint advanced to ${test.monitor.provenCheckpointNumber}`); + } + + /** + * Asserts pipelining by comparing the build slot (from block-proposed events) against + * the submission slot (from block headers). With pipelining, the block is built in slot N + * but its header carries submission slot N+1. + */ + async function assertProposerPipelining( + blockProposedEvents: { blockNumber: BlockNumber; slot: SlotNumber; buildSlot: SlotNumber }[], + logger: Logger, + ) { + const checkpoints = await archiver.getCheckpoints(CheckpointNumber(1), 50); + const allBlocks = checkpoints.flatMap(pc => pc.checkpoint.blocks); + + logger.warn(`assertProposerPipelining: ${allBlocks.length} blocks, ${blockProposedEvents.length} events`, { + blockNumbers: allBlocks.map(b => b.number), + eventBlockNumbers: blockProposedEvents.map(e => e.blockNumber), + }); + + let foundPipelining = false; + + for (const block of allBlocks) { + const headerSlot = block.header.globalVariables.slotNumber; // submission slot (N+1) + const coinbase = block.header.globalVariables.coinbase; + + // Find the block-proposed event for this block (use Number() for safe comparison) + const event = blockProposedEvents.find(e => Number(e.blockNumber) === Number(block.number)); + // if there is no event, then it was probably block number one - which was proposed in setup + if (!event) { + continue; + } + + const buildSlot = event.buildSlot; // build slot (N) + + // Verify the pipelining offset: block built in slot N, submitted in slot N+1 + expect(Number(headerSlot)).toBe(Number(buildSlot) + 1); + foundPipelining = true; + + // Verify coinbase matches the expected proposer for the submission slot + const expectedProposer = await rollup.getProposerAt(getTimestampForSlot(headerSlot, test.constants)); + expect(coinbase).toEqual(expectedProposer); + + logger.warn(`Block ${block.number}: buildSlot=${buildSlot}, submissionSlot=${headerSlot}, coinbase=${coinbase}`, { + blockNumber: block.number, + buildSlot, + headerSlot, + coinbase: coinbase.toString(), + expectedProposer: expectedProposer.toString(), + }); + } + + expect(foundPipelining).toBe(true); + logger.warn(`Pipelining assertion passed for ${allBlocks.length} blocks`); + } + + afterEach(async () => { + jest.restoreAllMocks(); + await test?.teardown(); + }); + + it('pipelining builds blocks using slot plus 1 proposer and proves them', async () => { + await setupTest({ syncChainTip: 'checkpointed', minTxsPerBlock: 1, maxTxsPerBlock: 2 }); + + // Subscribe to block-proposed events to capture build slots + const blockProposedEvents: { blockNumber: BlockNumber; slot: SlotNumber; buildSlot: SlotNumber }[] = []; + const sequencers = nodes.map(n => n.getSequencer()!); + for (const sequencer of sequencers) { + sequencer.getSequencer().on('block-proposed', (args: Parameters[0]) => { + logger.warn(`block-proposed event: blockNumber=${args.blockNumber}, slot=${args.slot}`, args); + blockProposedEvents.push({ + blockNumber: args.blockNumber, + slot: args.slot, + buildSlot: args.buildSlot, + }); + }); + } + + const initialCheckpointNumber = await rollup.getCheckpointNumber(); + logger.warn(`Initial checkpoint number: ${initialCheckpointNumber}`); + + // Pre-prove and send transactions + const txs = await timesAsync(TX_COUNT, i => + proveInteraction(context.wallet, contract.methods.emit_nullifier(new Fr(i + 1)), { from }), + ); + const txHashes = await Promise.all(txs.map(tx => tx.send({ wait: NO_WAIT }))); + logger.warn(`Sent ${txHashes.length} transactions`, { txs: txHashes }); + + // Start the sequencers + await Promise.all(sequencers.map(s => s.start())); + logger.warn(`Started all sequencers`); + + // Wait until all txs are mined + const timeout = test.L2_SLOT_DURATION_IN_S * 5; + await executeTimeout( + () => Promise.all(txHashes.map(txHash => waitForTx(context.aztecNode, txHash, { timeout }))), + timeout * 1000, + ); + logger.warn(`All txs have been mined`); + + // Verify MBPS works with pipelining + const multiBlockCheckpoint = await assertMultipleBlocksPerSlot(EXPECTED_BLOCKS_PER_CHECKPOINT, logger); + + // Verify the pipelining offset: build slot N vs submission slot N+1 + await assertProposerPipelining(blockProposedEvents, logger); + + // Verify proving still works end-to-end with pipelined proposers + await waitForProvenCheckpoint(multiBlockCheckpoint); + }); +}); diff --git a/yarn-project/end-to-end/src/e2e_l1_publisher/e2e_l1_publisher.test.ts b/yarn-project/end-to-end/src/e2e_l1_publisher/e2e_l1_publisher.test.ts index 59a7dd5c6178..45f01b5834fc 100644 --- a/yarn-project/end-to-end/src/e2e_l1_publisher/e2e_l1_publisher.test.ts +++ b/yarn-project/end-to-end/src/e2e_l1_publisher/e2e_l1_publisher.test.ts @@ -274,6 +274,7 @@ describe('L1Publisher integration', () => { { l1ChainId: chainId, ethereumSlotDuration: config.ethereumSlotDuration, + aztecSlotDuration: config.aztecSlotDuration, }, { blobClient, @@ -606,7 +607,7 @@ describe('L1Publisher integration', () => { const checkpointAttestations = validators.map(v => makeCheckpointAttestationFromCheckpoint(checkpoint, v)); const attestations = orderAttestations(checkpointAttestations, committee!); - const canPropose = await publisher.canProposeAtNextEthBlock(new Fr(GENESIS_ARCHIVE_ROOT), proposer!); + const canPropose = await publisher.canProposeAt(new Fr(GENESIS_ARCHIVE_ROOT), proposer!); expect(canPropose?.slot).toEqual(block.header.getSlot()); await publisher.validateBlockHeader(checkpoint.header); @@ -630,7 +631,7 @@ describe('L1Publisher integration', () => { const attestations = orderAttestations(checkpointAttestations, committee!).reverse(); const attestationsAndSigners = new CommitteeAttestationsAndSigners(attestations); - const canPropose = await publisher.canProposeAtNextEthBlock(new Fr(GENESIS_ARCHIVE_ROOT), proposer!); + const canPropose = await publisher.canProposeAt(new Fr(GENESIS_ARCHIVE_ROOT), proposer!); expect(canPropose?.slot).toEqual(block.header.getSlot()); await publisher.validateBlockHeader(checkpoint.header); @@ -645,7 +646,7 @@ describe('L1Publisher integration', () => { const checkpointAttestations = validators.map(v => makeCheckpointAttestationFromCheckpoint(checkpoint, v)); const attestations = orderAttestations(checkpointAttestations, committee!); - const canPropose = await publisher.canProposeAtNextEthBlock(new Fr(GENESIS_ARCHIVE_ROOT), proposer!); + const canPropose = await publisher.canProposeAt(new Fr(GENESIS_ARCHIVE_ROOT), proposer!); expect(canPropose?.slot).toEqual(block.header.getSlot()); await publisher.validateBlockHeader(checkpoint.header); @@ -670,7 +671,7 @@ describe('L1Publisher integration', () => { const checkpointAttestations = validators.map(v => makeCheckpointAttestationFromCheckpoint(checkpoint, v)); const attestations = orderAttestations(checkpointAttestations, committee!); - const canPropose = await publisher.canProposeAtNextEthBlock(new Fr(GENESIS_ARCHIVE_ROOT), proposer!); + const canPropose = await publisher.canProposeAt(new Fr(GENESIS_ARCHIVE_ROOT), proposer!); expect(canPropose?.slot).toEqual(block.header.getSlot()); await publisher.validateBlockHeader(checkpoint.header); @@ -742,8 +743,8 @@ describe('L1Publisher integration', () => { // We cannot propose directly, we need to assume the previous checkpoint is invalidated const genesis = new Fr(GENESIS_ARCHIVE_ROOT); logger.warn(`Checking can propose at next eth block on top of genesis ${genesis}`); - expect(await publisher.canProposeAtNextEthBlock(genesis, proposer!)).toBeUndefined(); - const canPropose = await publisher.canProposeAtNextEthBlock(genesis, proposer!, { forcePendingCheckpointNumber }); + expect(await publisher.canProposeAt(genesis, proposer!)).toBeUndefined(); + const canPropose = await publisher.canProposeAt(genesis, proposer!, { forcePendingCheckpointNumber }); expect(canPropose?.slot).toEqual(block.header.getSlot()); // Same for validation diff --git a/yarn-project/end-to-end/src/e2e_synching.test.ts b/yarn-project/end-to-end/src/e2e_synching.test.ts index c93560f43064..dbf533d16e94 100644 --- a/yarn-project/end-to-end/src/e2e_synching.test.ts +++ b/yarn-project/end-to-end/src/e2e_synching.test.ts @@ -72,7 +72,11 @@ import { TestWallet } from './test-wallet/test_wallet.js'; const AZTEC_GENERATE_TEST_DATA = !!process.env.AZTEC_GENERATE_TEST_DATA; const START_TIME = 1893456000; // 2030 01 01 00 00 const RUN_THE_BIG_ONE = !!process.env.RUN_THE_BIG_ONE; -const ETHEREUM_SLOT_DURATION = getL1ContractsConfigEnvVars().ethereumSlotDuration; + +const l1ContractsEnvVars = getL1ContractsConfigEnvVars(); +const ETHEREUM_SLOT_DURATION = l1ContractsEnvVars.ethereumSlotDuration; +const AZTEC_SLOT_DURATION = l1ContractsEnvVars.aztecSlotDuration; + const MINT_AMOUNT = 1000n; enum TxComplexity { @@ -442,6 +446,7 @@ describe('e2e_synching', () => { { l1ChainId: 31337, ethereumSlotDuration: ETHEREUM_SLOT_DURATION, + aztecSlotDuration: AZTEC_SLOT_DURATION, }, { blobClient, diff --git a/yarn-project/epoch-cache/src/epoch_cache.test.ts b/yarn-project/epoch-cache/src/epoch_cache.test.ts index e1e3ffba22e1..7cc79f3aa96f 100644 --- a/yarn-project/epoch-cache/src/epoch_cache.test.ts +++ b/yarn-project/epoch-cache/src/epoch_cache.test.ts @@ -10,7 +10,7 @@ import { afterEach, beforeEach, describe, expect, it, jest } from '@jest/globals import { type MockProxy, mock } from 'jest-mock-extended'; import type { GetBlockReturnType } from 'viem'; -import { EpochCache, type EpochCommitteeInfo } from './epoch_cache.js'; +import { EpochCache, type EpochCommitteeInfo, PROPOSER_PIPELINING_SLOT_OFFSET } from './epoch_cache.js'; class TestEpochCache extends EpochCache { public seedCache(epoch: EpochNumber, committeeInfo: EpochCommitteeInfo): void { @@ -20,6 +20,10 @@ class TestEpochCache extends EpochCache { public setCacheSize(size: number): void { this.config.cacheSize = size; } + + public setProposerPipelining(enabled: boolean): void { + this.enableProposerPipelining = enabled; + } } describe('EpochCache', () => { @@ -164,9 +168,7 @@ describe('EpochCache', () => { // generate a random slot greater than `epochDuration` const targetSlot = BigInt(epochDuration) + BigInt(Math.floor(Math.random() * 1000)); - const targetEpoch = targetSlot / BigInt(epochDuration); - const epochStartSlot = targetEpoch * BigInt(epochDuration); - const epochStartTimestamp = l1GenesisTime + epochStartSlot * BigInt(slotDuration); + const slotTimestamp = l1GenesisTime + targetSlot * BigInt(slotDuration); const expectedCommittee = [EthAddress.fromString('0x000000000000000000000000000000000000BEEF')]; const expectedSeed = Buffer32.fromBigInt(999n); @@ -176,10 +178,10 @@ describe('EpochCache', () => { await epochCache.getCommittee(SlotNumber.fromBigInt(targetSlot)); expect(rollupContract.getCommitteeAt).toHaveBeenCalledTimes(1); - expect(rollupContract.getCommitteeAt).toHaveBeenCalledWith(epochStartTimestamp); + expect(rollupContract.getCommitteeAt).toHaveBeenCalledWith(slotTimestamp); expect(rollupContract.getSampleSeedAt).toHaveBeenCalledTimes(1); - expect(rollupContract.getSampleSeedAt).toHaveBeenCalledWith(epochStartTimestamp); + expect(rollupContract.getSampleSeedAt).toHaveBeenCalledWith(slotTimestamp); }); it('should cache multiple epochs', async () => { @@ -285,4 +287,84 @@ describe('EpochCache', () => { /Cannot query committee for future epoch.*with timestamp.*\(current L1 time is/, ); }); + + describe('proposer pipelining', () => { + it('getTargetSlot() returns slotNow when pipelining disabled', () => { + const initialTime = Number(l1GenesisTime) * 1000; + jest.setSystemTime(initialTime); + + expect(epochCache.isProposerPipeliningEnabled()).toBe(false); + expect(epochCache.getTargetSlot()).toEqual(epochCache.getSlotNow()); + }); + + it('getTargetSlot() returns slotNow + 1 when pipelining enabled', () => { + epochCache.setProposerPipelining(true); + const initialTime = Number(l1GenesisTime) * 1000; + jest.setSystemTime(initialTime); + + const slotNow = epochCache.getSlotNow(); + expect(epochCache.getTargetSlot()).toEqual(SlotNumber(slotNow + PROPOSER_PIPELINING_SLOT_OFFSET)); + }); + + it('getTargetEpoch() returns epoch for slotNow + 1 when pipelining enabled', () => { + epochCache.setProposerPipelining(true); + // Set time to mid-epoch 0 + const midEpochSlot = 5; + const initialTime = (Number(l1GenesisTime) + midEpochSlot * SLOT_DURATION) * 1000; + jest.setSystemTime(initialTime); + + // Target slot is midEpochSlot + 1, still within epoch 0 + expect(epochCache.getTargetEpoch()).toEqual(EpochNumber(0)); + }); + + it('getTargetEpochAndSlotInNextL1Slot() returns nextL1Slot + 1 when pipelining enabled', () => { + epochCache.setProposerPipelining(true); + const initialTime = Number(l1GenesisTime) * 1000; + jest.setSystemTime(initialTime); + + const baseResult = epochCache.getEpochAndSlotInNextL1Slot(); + const targetResult = epochCache.getTargetEpochAndSlotInNextL1Slot(); + + expect(targetResult.slot).toEqual(SlotNumber(baseResult.slot + PROPOSER_PIPELINING_SLOT_OFFSET)); + }); + + it('getTargetEpochAndSlotInNextL1Slot() handles epoch boundary', () => { + epochCache.setProposerPipelining(true); + // Set time to last slot of epoch 0 (slot EPOCH_DURATION - 1) + const lastSlot = EPOCH_DURATION - 1; + const initialTime = (Number(l1GenesisTime) + lastSlot * SLOT_DURATION) * 1000; + jest.setSystemTime(initialTime); + + const targetResult = epochCache.getTargetEpochAndSlotInNextL1Slot(); + + // The target slot should be at least EPOCH_DURATION (first slot of epoch 1) + expect(targetResult.slot).toBeGreaterThanOrEqual(EPOCH_DURATION); + expect(targetResult.epoch).toEqual(EpochNumber(1)); + }); + + it('getTargetAndNextSlot() returns same as getCurrentAndNextSlot when pipelining disabled', () => { + const initialTime = Number(l1GenesisTime) * 1000; + jest.setSystemTime(initialTime); + + expect(epochCache.isProposerPipeliningEnabled()).toBe(false); + + const { currentSlot, nextSlot: currentNext } = epochCache.getCurrentAndNextSlot(); + const { targetSlot, nextSlot: targetNext } = epochCache.getTargetAndNextSlot(); + + expect(targetSlot).toEqual(currentSlot); + expect(targetNext).toEqual(currentNext); + }); + + it('getTargetAndNextSlot() applies pipeline offset when enabled', () => { + epochCache.setProposerPipelining(true); + const initialTime = Number(l1GenesisTime) * 1000; + jest.setSystemTime(initialTime); + + const slotNow = epochCache.getSlotNow(); + const { targetSlot, nextSlot } = epochCache.getTargetAndNextSlot(); + + expect(targetSlot).toEqual(SlotNumber(slotNow + PROPOSER_PIPELINING_SLOT_OFFSET)); + expect(nextSlot).toEqual(epochCache.getTargetEpochAndSlotInNextL1Slot().slot); + }); + }); }); diff --git a/yarn-project/epoch-cache/src/epoch_cache.ts b/yarn-project/epoch-cache/src/epoch_cache.ts index e961706d815c..3ecb033c1f02 100644 --- a/yarn-project/epoch-cache/src/epoch_cache.ts +++ b/yarn-project/epoch-cache/src/epoch_cache.ts @@ -12,16 +12,19 @@ import { getSlotAtTimestamp, getSlotRangeForEpoch, getTimestampForSlot, - getTimestampRangeForEpoch, } from '@aztec/stdlib/epoch-helpers'; import { createPublicClient, encodeAbiParameters, keccak256 } from 'viem'; import { type EpochCacheConfig, getEpochCacheConfigEnvVars } from './config.js'; +/** When proposer pipelining is enabled, the proposer builds one slot ahead. */ +export const PROPOSER_PIPELINING_SLOT_OFFSET = 1; + +/** Flat return type for compound epoch/slot getters. */ export type EpochAndSlot = { - epoch: EpochNumber; slot: SlotNumber; + epoch: EpochNumber; ts: bigint; }; @@ -37,11 +40,21 @@ export type SlotTag = 'now' | 'next' | SlotNumber; export interface EpochCacheInterface { getCommittee(slot: SlotTag | undefined): Promise; + getSlotNow(): SlotNumber; + getTargetSlot(): SlotNumber; + getEpochNow(): EpochNumber; + getTargetEpoch(): EpochNumber; getEpochAndSlotNow(): EpochAndSlot & { nowMs: bigint }; - getEpochAndSlotInNextL1Slot(): EpochAndSlot & { now: bigint }; + getEpochAndSlotInNextL1Slot(): EpochAndSlot & { nowSeconds: bigint }; + /** Returns epoch/slot info for the next L1 slot with pipeline offset applied. */ + getTargetEpochAndSlotInNextL1Slot(): EpochAndSlot & { nowSeconds: bigint }; + isProposerPipeliningEnabled(): boolean; + isEscapeHatchOpen(epoch: EpochNumber): Promise; + isEscapeHatchOpenAtSlot(slot: SlotTag): Promise; getProposerIndexEncoding(epoch: EpochNumber, slot: SlotNumber, seed: bigint): `0x${string}`; computeProposerIndex(slot: SlotNumber, epoch: EpochNumber, seed: bigint, size: bigint): bigint; getCurrentAndNextSlot(): { currentSlot: SlotNumber; nextSlot: SlotNumber }; + getTargetAndNextSlot(): { targetSlot: SlotNumber; nextSlot: SlotNumber }; getProposerAttesterAddressInSlot(slot: SlotNumber): Promise; getRegisteredValidators(): Promise; isInCommittee(slot: SlotTag, validator: EthAddress): Promise; @@ -65,6 +78,8 @@ export class EpochCache implements EpochCacheInterface { private lastValidatorRefresh = 0; private readonly log: Logger = createLogger('epoch-cache'); + protected enableProposerPipelining: boolean; + constructor( private rollup: RollupContract, private readonly l1constants: L1RollupConstants & { @@ -72,10 +87,12 @@ export class EpochCache implements EpochCacheInterface { lagInEpochsForRandao: number; }, private readonly dateProvider: DateProvider = new DateProvider(), - protected readonly config = { cacheSize: 12, validatorRefreshIntervalSeconds: 60 }, + protected readonly config = { cacheSize: 12, validatorRefreshIntervalSeconds: 60, enableProposerPipelining: false }, ) { + this.enableProposerPipelining = this.config.enableProposerPipelining; this.log.debug(`Initialized EpochCache`, { l1constants, + enableProposerPipelining: this.enableProposerPipelining, }); } @@ -135,13 +152,39 @@ export class EpochCache implements EpochCacheInterface { rollupManaLimit: Number(rollupManaLimit), }; - return new EpochCache(rollup, l1RollupConstants, deps.dateProvider); + return new EpochCache(rollup, l1RollupConstants, deps.dateProvider, { + cacheSize: 12, + validatorRefreshIntervalSeconds: 60, + enableProposerPipelining: config.enableProposerPipelining, + }); } public getL1Constants(): L1RollupConstants { return this.l1constants; } + public isProposerPipeliningEnabled(): boolean { + return this.enableProposerPipelining; + } + + public getSlotNow(): SlotNumber { + return this.getEpochAndSlotNow().slot; + } + + public getTargetSlot(): SlotNumber { + const slotNow = this.getSlotNow(); + const offset = this.isProposerPipeliningEnabled() ? PROPOSER_PIPELINING_SLOT_OFFSET : 0; + return SlotNumber(slotNow + offset); + } + + public getEpochNow(): EpochNumber { + return this.getEpochAndSlotNow().epoch; + } + + public getTargetEpoch(): EpochNumber { + return getEpochAtSlot(this.getTargetSlot(), this.l1constants); + } + public getEpochAndSlotNow(): EpochAndSlot & { nowMs: bigint } { const nowMs = BigInt(this.dateProvider.now()); const nowSeconds = nowMs / 1000n; @@ -153,23 +196,33 @@ export class EpochCache implements EpochCacheInterface { } private getEpochAndSlotAtSlot(slot: SlotNumber): EpochAndSlot { - const epoch = getEpochAtSlot(slot, this.l1constants); - const ts = getTimestampRangeForEpoch(epoch, this.l1constants)[0]; - return { epoch, ts, slot }; + return this.getEpochAndSlotAtTimestamp(getTimestampForSlot(slot, this.l1constants)); } - public getEpochAndSlotInNextL1Slot(): EpochAndSlot & { now: bigint } { - const now = this.nowInSeconds(); - const nextSlotTs = now + BigInt(this.l1constants.ethereumSlotDuration); - return { ...this.getEpochAndSlotAtTimestamp(nextSlotTs), now }; + public getEpochAndSlotInNextL1Slot(): EpochAndSlot & { nowSeconds: bigint } { + const nowSeconds = this.nowInSeconds(); + const nextSlotTs = nowSeconds + BigInt(this.l1constants.ethereumSlotDuration); + return { ...this.getEpochAndSlotAtTimestamp(nextSlotTs), nowSeconds }; + } + + public getTargetEpochAndSlotInNextL1Slot(): EpochAndSlot & { nowSeconds: bigint } { + if (!this.isProposerPipeliningEnabled()) { + return this.getEpochAndSlotInNextL1Slot(); + } + + const result = this.getEpochAndSlotInNextL1Slot(); + const offset = PROPOSER_PIPELINING_SLOT_OFFSET; + const targetSlot = SlotNumber(result.slot + offset); + return { ...result, slot: targetSlot, epoch: getEpochAtSlot(targetSlot, this.l1constants) }; } private getEpochAndSlotAtTimestamp(ts: bigint): EpochAndSlot { const slot = getSlotAtTimestamp(ts, this.l1constants); + const epoch = getEpochNumberAtTimestamp(ts, this.l1constants); return { - epoch: getEpochNumberAtTimestamp(ts, this.l1constants), - ts: getTimestampForSlot(slot, this.l1constants), slot, + epoch, + ts: getTimestampForSlot(slot, this.l1constants), }; } @@ -202,7 +255,7 @@ export class EpochCache implements EpochCacheInterface { public async isEscapeHatchOpenAtSlot(slot: SlotTag = 'now'): Promise { const epoch = slot === 'now' - ? this.getEpochAndSlotNow().epoch + ? this.getEpochNow() : slot === 'next' ? this.getEpochAndSlotInNextL1Slot().epoch : getEpochAtSlot(slot, this.l1constants); @@ -237,7 +290,7 @@ export class EpochCache implements EpochCacheInterface { return epochData; } - private getEpochAndTimestamp(slot: SlotTag = 'now') { + private getEpochAndTimestamp(slot: SlotTag = 'now'): { epoch: EpochNumber; ts: bigint } { if (slot === 'now') { return this.getEpochAndSlotNow(); } else if (slot === 'next') { @@ -287,13 +340,24 @@ export class EpochCache implements EpochCacheInterface { return BigInt(keccak256(this.getProposerIndexEncoding(epoch, slot, seed))) % size; } - /** Returns the current and next L2 slot numbers. */ + /** Returns the current and next L2 slot in next eth L1 Slot. */ public getCurrentAndNextSlot(): { currentSlot: SlotNumber; nextSlot: SlotNumber } { - const current = this.getEpochAndSlotNow(); + const currentSlot = this.getSlotNow(); const next = this.getEpochAndSlotInNextL1Slot(); return { - currentSlot: current.slot, + currentSlot, + nextSlot: next.slot, + }; + } + + /** Returns the taget and next L2 slot in the next L1 slot */ + public getTargetAndNextSlot(): { targetSlot: SlotNumber; nextSlot: SlotNumber } { + const targetSlot = this.getTargetSlot(); + const next = this.getTargetEpochAndSlotInNextL1Slot(); + + return { + targetSlot, nextSlot: next.slot, }; } diff --git a/yarn-project/epoch-cache/src/test/test_epoch_cache.ts b/yarn-project/epoch-cache/src/test/test_epoch_cache.ts index ecb2b3b47c2b..b9e50a06128f 100644 --- a/yarn-project/epoch-cache/src/test/test_epoch_cache.ts +++ b/yarn-project/epoch-cache/src/test/test_epoch_cache.ts @@ -3,7 +3,13 @@ import { EthAddress } from '@aztec/foundation/eth-address'; import type { L1RollupConstants } from '@aztec/stdlib/epoch-helpers'; import { getEpochAtSlot, getSlotAtTimestamp, getTimestampRangeForEpoch } from '@aztec/stdlib/epoch-helpers'; -import type { EpochAndSlot, EpochCacheInterface, EpochCommitteeInfo, SlotTag } from '../epoch_cache.js'; +import { + type EpochAndSlot, + type EpochCacheInterface, + type EpochCommitteeInfo, + PROPOSER_PIPELINING_SLOT_OFFSET, + type SlotTag, +} from '../epoch_cache.js'; /** Default L1 constants for testing. */ const DEFAULT_L1_CONSTANTS: L1RollupConstants = { @@ -32,6 +38,7 @@ export class TestEpochCache implements EpochCacheInterface { private seed: bigint = 0n; private registeredValidators: EthAddress[] = []; private l1Constants: L1RollupConstants; + private proposerPipeliningEnabled = false; constructor(l1Constants: Partial = {}) { this.l1Constants = { ...DEFAULT_L1_CONSTANTS, ...l1Constants }; @@ -104,6 +111,10 @@ export class TestEpochCache implements EpochCacheInterface { return this.l1Constants; } + setProposerPipeliningEnabled(enabled: boolean): void { + this.proposerPipeliningEnabled = enabled; + } + getCommittee(_slot?: SlotTag): Promise { const epoch = getEpochAtSlot(this.currentSlot, this.l1Constants); return Promise.resolve({ @@ -114,19 +125,58 @@ export class TestEpochCache implements EpochCacheInterface { }); } + getSlotNow(): SlotNumber { + return this.currentSlot; + } + + getTargetSlot(): SlotNumber { + return this.proposerPipeliningEnabled + ? SlotNumber(this.currentSlot + PROPOSER_PIPELINING_SLOT_OFFSET) + : this.currentSlot; + } + + getEpochNow(): EpochNumber { + return getEpochAtSlot(this.currentSlot, this.l1Constants); + } + + getTargetEpoch(): EpochNumber { + return getEpochAtSlot(this.getTargetSlot(), this.l1Constants); + } + + isProposerPipeliningEnabled(): boolean { + return this.proposerPipeliningEnabled; + } + getEpochAndSlotNow(): EpochAndSlot & { nowMs: bigint } { - const epoch = getEpochAtSlot(this.currentSlot, this.l1Constants); - const ts = getTimestampRangeForEpoch(epoch, this.l1Constants)[0]; - return { epoch, slot: this.currentSlot, ts, nowMs: ts * 1000n }; + const epochNow = getEpochAtSlot(this.currentSlot, this.l1Constants); + const ts = getTimestampRangeForEpoch(epochNow, this.l1Constants)[0]; + return { + epoch: epochNow, + slot: this.currentSlot, + ts, + nowMs: ts * 1000n, + }; } - getEpochAndSlotInNextL1Slot(): EpochAndSlot & { now: bigint } { - const now = getTimestampRangeForEpoch(getEpochAtSlot(this.currentSlot, this.l1Constants), this.l1Constants)[0]; - const nextSlotTs = now + BigInt(this.l1Constants.ethereumSlotDuration); + getEpochAndSlotInNextL1Slot(): EpochAndSlot & { nowSeconds: bigint } { + const nowTs = getTimestampRangeForEpoch(getEpochAtSlot(this.currentSlot, this.l1Constants), this.l1Constants)[0]; + const nextSlotTs = nowTs + BigInt(this.l1Constants.ethereumSlotDuration); const nextSlot = getSlotAtTimestamp(nextSlotTs, this.l1Constants); - const epoch = getEpochAtSlot(nextSlot, this.l1Constants); - const ts = getTimestampRangeForEpoch(epoch, this.l1Constants)[0]; - return { epoch, slot: nextSlot, ts, now }; + const epochNow = getEpochAtSlot(nextSlot, this.l1Constants); + const ts = getTimestampRangeForEpoch(epochNow, this.l1Constants)[0]; + return { + epoch: epochNow, + slot: nextSlot, + ts, + nowSeconds: nowTs, + }; + } + + getTargetEpochAndSlotInNextL1Slot(): EpochAndSlot & { nowSeconds: bigint } { + const result = this.getEpochAndSlotInNextL1Slot(); + const offset = this.isProposerPipeliningEnabled() ? PROPOSER_PIPELINING_SLOT_OFFSET : 0; + const targetSlot = SlotNumber(result.slot + offset); + return { ...result, slot: targetSlot, epoch: getEpochAtSlot(targetSlot, this.l1Constants) }; } getProposerIndexEncoding(epoch: EpochNumber, slot: SlotNumber, seed: bigint): `0x${string}` { @@ -142,9 +192,22 @@ export class TestEpochCache implements EpochCacheInterface { } getCurrentAndNextSlot(): { currentSlot: SlotNumber; nextSlot: SlotNumber } { + const currentSlot = this.getSlotNow(); + const next = this.getEpochAndSlotInNextL1Slot(); + + return { + currentSlot, + nextSlot: next.slot, + }; + } + + getTargetAndNextSlot(): { targetSlot: SlotNumber; nextSlot: SlotNumber } { + const targetSlot = this.getTargetSlot(); + const next = this.getTargetEpochAndSlotInNextL1Slot(); + return { - currentSlot: this.currentSlot, - nextSlot: SlotNumber(this.currentSlot + 1), + targetSlot, + nextSlot: next.slot, }; } @@ -165,6 +228,10 @@ export class TestEpochCache implements EpochCacheInterface { return Promise.resolve(validators.filter(v => committeeSet.has(v.toString()))); } + isEscapeHatchOpen(_epoch: EpochNumber): Promise { + return Promise.resolve(this.escapeHatchOpen); + } + isEscapeHatchOpenAtSlot(_slot?: SlotTag): Promise { return Promise.resolve(this.escapeHatchOpen); } diff --git a/yarn-project/ethereum/src/contracts/rollup.ts b/yarn-project/ethereum/src/contracts/rollup.ts index d72085466cbd..96d1c553ee1b 100644 --- a/yarn-project/ethereum/src/contracts/rollup.ts +++ b/yarn-project/ethereum/src/contracts/rollup.ts @@ -778,14 +778,15 @@ export class RollupContract { * timestamp of the next L1 block * @throws otherwise */ - public async canProposeAtNextEthBlock( + public async canProposeAt( archive: Buffer, account: `0x${string}` | Account, - slotDuration: number, + slotDuration: bigint, + slotOffset: bigint, opts: { forcePendingCheckpointNumber?: CheckpointNumber } = {}, ): Promise<{ slot: SlotNumber; checkpointNumber: CheckpointNumber; timeOfNextL1Slot: bigint }> { const latestBlock = await this.client.getBlock(); - const timeOfNextL1Slot = latestBlock.timestamp + BigInt(slotDuration); + const timeOfNextL1Slot = latestBlock.timestamp + slotDuration + slotOffset; const who = typeof account === 'string' ? account : account.address; try { diff --git a/yarn-project/foundation/src/config/env_var.ts b/yarn-project/foundation/src/config/env_var.ts index 159eba8cfb4a..9cc98c492272 100644 --- a/yarn-project/foundation/src/config/env_var.ts +++ b/yarn-project/foundation/src/config/env_var.ts @@ -205,6 +205,7 @@ export type EnvVar = | 'SENTINEL_ENABLED' | 'SENTINEL_HISTORY_LENGTH_IN_EPOCHS' | 'SENTINEL_HISTORIC_PROVEN_PERFORMANCE_LENGTH_IN_EPOCHS' + | 'SEQ_ENABLE_PROPOSER_PIPELINING' | 'SEQ_MAX_TX_PER_BLOCK' | 'SEQ_MAX_TX_PER_CHECKPOINT' | 'SEQ_MIN_TX_PER_BLOCK' @@ -219,7 +220,6 @@ export type EnvVar = | 'SEQ_PUBLISHER_ALLOW_INVALID_STATES' | 'SEQ_PUBLISHER_FORWARDER_ADDRESS' | 'SEQ_POLLING_INTERVAL_MS' - | 'SEQ_ENABLE_PROPOSER_PIPELINING' | 'SEQ_ENFORCE_TIME_TABLE' | 'SEQ_L1_PUBLISHING_TIME_ALLOWANCE_IN_SLOT' | 'SEQ_ATTESTATION_PROPAGATION_TIME' diff --git a/yarn-project/p2p/src/client/p2p_client.test.ts b/yarn-project/p2p/src/client/p2p_client.test.ts index 98ff2f85d7fa..a72485990008 100644 --- a/yarn-project/p2p/src/client/p2p_client.test.ts +++ b/yarn-project/p2p/src/client/p2p_client.test.ts @@ -49,6 +49,7 @@ describe('P2P Client', () => { epochCache = mock(); epochCache.getCurrentAndNextSlot.mockReturnValue({ currentSlot: SlotNumber(0), nextSlot: SlotNumber(1) }); + epochCache.getTargetAndNextSlot.mockReturnValue({ targetSlot: SlotNumber(0), nextSlot: SlotNumber(1) }); attestationPool = await createTestAttestationPool(); diff --git a/yarn-project/p2p/src/client/p2p_client.ts b/yarn-project/p2p/src/client/p2p_client.ts index b9a49ac95ea8..7d0ba924746c 100644 --- a/yarn-project/p2p/src/client/p2p_client.ts +++ b/yarn-project/p2p/src/client/p2p_client.ts @@ -696,12 +696,14 @@ export class P2PClient extends WithTracer implements P2P { /** Checks if the slot has changed and calls prepareForSlot if so. */ private async maybeCallPrepareForSlot(): Promise { - const { currentSlot } = this.epochCache.getCurrentAndNextSlot(); - if (currentSlot <= this.lastSlotProcessed) { + // If we have a pending checkpoint available, we want to prepare the target slot - otherwise we prepare the current slot + // Knowledege of pending checkpoints is in the PR above + const { targetSlot } = this.epochCache.getTargetAndNextSlot(); + if (targetSlot <= this.lastSlotProcessed) { return; } - this.lastSlotProcessed = currentSlot; - await this.txPool.prepareForSlot(currentSlot); + this.lastSlotProcessed = targetSlot; + await this.txPool.prepareForSlot(targetSlot); } private async startServiceIfSynched() { diff --git a/yarn-project/p2p/src/client/test/p2p_client.integration_reqresp.test.ts b/yarn-project/p2p/src/client/test/p2p_client.integration_reqresp.test.ts index 78fc5ac35473..c6454f17a1d2 100644 --- a/yarn-project/p2p/src/client/test/p2p_client.integration_reqresp.test.ts +++ b/yarn-project/p2p/src/client/test/p2p_client.integration_reqresp.test.ts @@ -46,7 +46,9 @@ describe('p2p client integration reqresp', () => { logger = createLogger('p2p:test:integration-reqresp'); p2pBaseConfig = { ...emptyChainConfig, ...getP2PDefaultConfig() }; - epochCache.getEpochAndSlotInNextL1Slot.mockReturnValue({ ts: BigInt(0) } as EpochAndSlot & { now: bigint }); + epochCache.getEpochAndSlotInNextL1Slot.mockReturnValue({ ts: BigInt(0) } as EpochAndSlot & { + nowSeconds: bigint; + }); epochCache.getRegisteredValidators.mockResolvedValue([]); epochCache.getL1Constants.mockReturnValue({ l1StartBlock: 0n, diff --git a/yarn-project/p2p/src/client/test/tx_proposal_collector/proposal_tx_collector_worker.ts b/yarn-project/p2p/src/client/test/tx_proposal_collector/proposal_tx_collector_worker.ts index c756de610980..44ad0f6eccc1 100644 --- a/yarn-project/p2p/src/client/test/tx_proposal_collector/proposal_tx_collector_worker.ts +++ b/yarn-project/p2p/src/client/test/tx_proposal_collector/proposal_tx_collector_worker.ts @@ -1,4 +1,5 @@ import { MockL2BlockSource } from '@aztec/archiver/test'; +import type { EpochCache } from '@aztec/epoch-cache'; import { SecretValue } from '@aztec/foundation/config'; import { createLogger } from '@aztec/foundation/log'; import { sleep } from '@aztec/foundation/sleep'; @@ -14,6 +15,7 @@ import { type TelemetryClient, getTelemetryClient } from '@aztec/telemetry-clien import type { PeerId } from '@libp2p/interface'; import { peerIdFromString } from '@libp2p/peer-id'; +import { mock } from 'jest-mock-extended'; import type { P2PConfig } from '../../../config.js'; import { BatchTxRequesterCollector, SendBatchRequestCollector } from '../../../services/index.js'; @@ -27,7 +29,6 @@ import { InMemoryTxPool, UNLIMITED_RATE_LIMIT_QUOTA, calculateInternalTimeout, - createMockEpochCache, createMockWorldStateSynchronizer, } from '../../../test-helpers/index.js'; import { createP2PClient } from '../../index.js'; @@ -98,7 +99,7 @@ function sendMessage(message: WorkerResponse): Promise { async function startClient(config: P2PConfig, clientIndex: number) { txPool = new InMemoryTxPool(); attestationPool = new InMemoryAttestationPool(); - const epochCache = createMockEpochCache(); + const epochCache = mock(); const worldState = createMockWorldStateSynchronizer(); const l2BlockSource = new MockL2BlockSource(); const proofVerifier = new AlwaysTrueCircuitVerifier(); diff --git a/yarn-project/p2p/src/mem_pools/tx_pool_v2/interfaces.ts b/yarn-project/p2p/src/mem_pools/tx_pool_v2/interfaces.ts index cee420a796b4..41d1640ad42b 100644 --- a/yarn-project/p2p/src/mem_pools/tx_pool_v2/interfaces.ts +++ b/yarn-project/p2p/src/mem_pools/tx_pool_v2/interfaces.ts @@ -160,10 +160,10 @@ export interface TxPoolV2 extends TypedEventEmitter { handleMinedBlock(block: L2Block): Promise; /** - * Prepares the pool for a new slot. - * Unprotects transactions from earlier slots and validates them before - * returning to pending state. - * @param slotNumber - The slot number to prepare for + * Prepares the pool for a new slot by unprotecting transactions from earlier + * slots and re-validating them before returning to pending state. + * @param slotNumber - The pipeline slot we are building for (i.e. the slot + * the resulting blocks will target on L1). */ prepareForSlot(slotNumber: SlotNumber): Promise; diff --git a/yarn-project/p2p/src/msg_validators/attestation_validator/attestation_validator.test.ts b/yarn-project/p2p/src/msg_validators/attestation_validator/attestation_validator.test.ts index 426ede996e09..f69b53930c38 100644 --- a/yarn-project/p2p/src/msg_validators/attestation_validator/attestation_validator.test.ts +++ b/yarn-project/p2p/src/msg_validators/attestation_validator/attestation_validator.test.ts @@ -1,30 +1,29 @@ -import type { EpochCache } from '@aztec/epoch-cache'; +import type { EpochCacheInterface } from '@aztec/epoch-cache'; import { NoCommitteeError } from '@aztec/ethereum/contracts'; -import { SlotNumber } from '@aztec/foundation/branded-types'; +import { EpochNumber, SlotNumber } from '@aztec/foundation/branded-types'; import { Secp256k1Signer } from '@aztec/foundation/crypto/secp256k1-signer'; import { PeerErrorSeverity } from '@aztec/stdlib/p2p'; import { CheckpointHeader } from '@aztec/stdlib/rollup'; import { makeCheckpointAttestation } from '@aztec/stdlib/testing'; -import { mock } from 'jest-mock-extended'; +import { type MockProxy, mock } from 'jest-mock-extended'; import { CheckpointAttestationValidator } from './attestation_validator.js'; describe('CheckpointAttestationValidator', () => { - let epochCache: EpochCache; + let epochCache: MockProxy; let validator: CheckpointAttestationValidator; let proposer: Secp256k1Signer; let attester: Secp256k1Signer; beforeEach(() => { - epochCache = mock(); + epochCache = mock(); validator = new CheckpointAttestationValidator(epochCache); proposer = Secp256k1Signer.random(); attester = Secp256k1Signer.random(); }); it('returns high tolerance error if slot number is not current or next slot (outside clock tolerance)', async () => { - // Create an attestation for slot 97 (previous slot) const header = CheckpointHeader.random({ slotNumber: SlotNumber(97) }); const mockAttestation = makeCheckpointAttestation({ header, @@ -32,26 +31,24 @@ describe('CheckpointAttestationValidator', () => { proposerSigner: proposer, }); - // Mock epoch cache to return different slot numbers - (epochCache.getCurrentAndNextSlot as jest.Mock).mockReturnValue({ - currentSlot: SlotNumber(98), + epochCache.getTargetAndNextSlot.mockReturnValue({ + targetSlot: SlotNumber(98), nextSlot: SlotNumber(99), }); - // Mock getEpochAndSlotNow to return time OUTSIDE clock tolerance (1000ms elapsed) - (epochCache.getEpochAndSlotNow as jest.Mock).mockReturnValue({ - epoch: 1, + epochCache.getTargetSlot.mockReturnValue(SlotNumber(98)); + epochCache.getEpochAndSlotNow.mockReturnValue({ + epoch: EpochNumber(1), slot: SlotNumber(98), - ts: 1000n, // slot started at 1000 seconds + ts: 1000n, nowMs: 1001000n, // 1000ms elapsed, outside 500ms tolerance }); - (epochCache.isInCommittee as jest.Mock).mockResolvedValue(true); + epochCache.isInCommittee.mockResolvedValue(true); const result = await validator.validate(mockAttestation); expect(result).toEqual({ result: 'reject', severity: PeerErrorSeverity.HighToleranceError }); }); it('returns ignore if previous slot attestation is within clock tolerance', async () => { - // Create an attestation for slot 97 (previous slot) const header = CheckpointHeader.random({ slotNumber: SlotNumber(97) }); const mockAttestation = makeCheckpointAttestation({ header, @@ -59,27 +56,25 @@ describe('CheckpointAttestationValidator', () => { proposerSigner: proposer, }); - // Mock epoch cache - attestation is for previous slot (97) when current is 98 - (epochCache.getCurrentAndNextSlot as jest.Mock).mockReturnValue({ - currentSlot: SlotNumber(98), + epochCache.getTargetAndNextSlot.mockReturnValue({ + targetSlot: SlotNumber(98), nextSlot: SlotNumber(99), }); - // Mock getEpochAndSlotNow to return time WITHIN clock tolerance (100ms elapsed) - (epochCache.getEpochAndSlotNow as jest.Mock).mockReturnValue({ - epoch: 1, + epochCache.getTargetSlot.mockReturnValue(SlotNumber(98)); + epochCache.getEpochAndSlotNow.mockReturnValue({ + epoch: EpochNumber(1), slot: SlotNumber(98), - ts: 1000n, // slot started at 1000 seconds + ts: 1000n, nowMs: 1000100n, // 100ms elapsed, within 500ms tolerance }); - (epochCache.isInCommittee as jest.Mock).mockResolvedValue(true); - (epochCache.getProposerAttesterAddressInSlot as jest.Mock).mockResolvedValue(proposer.address); + epochCache.isInCommittee.mockResolvedValue(true); + epochCache.getProposerAttesterAddressInSlot.mockResolvedValue(proposer.address); const result = await validator.validate(mockAttestation); expect(result).toEqual({ result: 'ignore' }); }); it('returns high tolerance error if attester is not in committee', async () => { - // The slot is correct, but the attester is not in the committee const header = CheckpointHeader.random({ slotNumber: SlotNumber(100) }); const mockAttestation = makeCheckpointAttestation({ header, @@ -87,19 +82,17 @@ describe('CheckpointAttestationValidator', () => { proposerSigner: proposer, }); - // Mock epoch cache to return matching slot number but invalid committee membership - (epochCache.getCurrentAndNextSlot as jest.Mock).mockReturnValue({ - currentSlot: SlotNumber(100), + epochCache.getTargetAndNextSlot.mockReturnValue({ + targetSlot: SlotNumber(100), nextSlot: SlotNumber(101), }); - (epochCache.isInCommittee as jest.Mock).mockResolvedValue(false); + epochCache.isInCommittee.mockResolvedValue(false); const result = await validator.validate(mockAttestation); expect(result).toEqual({ result: 'reject', severity: PeerErrorSeverity.HighToleranceError }); }); - it('returns undefined if checkpoint attestation is valid (current slot)', async () => { - // Create an attestation for slot 100 + it('returns accept if checkpoint attestation is valid (current slot)', async () => { const header = CheckpointHeader.random({ slotNumber: SlotNumber(100) }); const mockAttestation = makeCheckpointAttestation({ header, @@ -107,20 +100,18 @@ describe('CheckpointAttestationValidator', () => { proposerSigner: proposer, }); - // Mock epoch cache for valid case with current slot - (epochCache.getCurrentAndNextSlot as jest.Mock).mockReturnValue({ - currentSlot: SlotNumber(100), + epochCache.getTargetAndNextSlot.mockReturnValue({ + targetSlot: SlotNumber(100), nextSlot: SlotNumber(101), }); - (epochCache.isInCommittee as jest.Mock).mockResolvedValue(true); - (epochCache.getProposerAttesterAddressInSlot as jest.Mock).mockResolvedValue(proposer.address); + epochCache.isInCommittee.mockResolvedValue(true); + epochCache.getProposerAttesterAddressInSlot.mockResolvedValue(proposer.address); const result = await validator.validate(mockAttestation); expect(result).toEqual({ result: 'accept' }); }); - it('returns undefined if checkpoint attestation is valid (next slot)', async () => { - // Setup attestation for next slot + it('returns accept if checkpoint attestation is valid (next slot)', async () => { const header = CheckpointHeader.random({ slotNumber: SlotNumber(101) }); const mockAttestation = makeCheckpointAttestation({ header, @@ -128,13 +119,12 @@ describe('CheckpointAttestationValidator', () => { proposerSigner: proposer, }); - // Mock epoch cache for valid case with next slot - (epochCache.getCurrentAndNextSlot as jest.Mock).mockReturnValue({ - currentSlot: SlotNumber(100), + epochCache.getTargetAndNextSlot.mockReturnValue({ + targetSlot: SlotNumber(100), nextSlot: SlotNumber(101), }); - (epochCache.isInCommittee as jest.Mock).mockResolvedValue(true); - (epochCache.getProposerAttesterAddressInSlot as jest.Mock).mockResolvedValue(proposer.address); + epochCache.isInCommittee.mockResolvedValue(true); + epochCache.getProposerAttesterAddressInSlot.mockResolvedValue(proposer.address); const result = await validator.validate(mockAttestation); expect(result).toEqual({ result: 'accept' }); @@ -149,19 +139,17 @@ describe('CheckpointAttestationValidator', () => { proposerSigner: wrongProposer, }); - // Mock epoch cache with different proposer - (epochCache.getCurrentAndNextSlot as jest.Mock).mockReturnValue({ - currentSlot: SlotNumber(100), + epochCache.getTargetAndNextSlot.mockReturnValue({ + targetSlot: SlotNumber(100), nextSlot: SlotNumber(101), }); - (epochCache.isInCommittee as jest.Mock).mockResolvedValue(true); + epochCache.isInCommittee.mockResolvedValue(true); const result = await validator.validate(mockAttestation); expect(result).toEqual({ result: 'reject', severity: PeerErrorSeverity.HighToleranceError }); }); it('returns low tolerance error if no committee exists', async () => { - // Create an attestation const header = CheckpointHeader.random({ slotNumber: SlotNumber(100) }); const mockAttestation = makeCheckpointAttestation({ header, @@ -169,13 +157,12 @@ describe('CheckpointAttestationValidator', () => { proposerSigner: proposer, }); - // Mock epoch cache to throw NoCommitteeError - (epochCache.getCurrentAndNextSlot as jest.Mock).mockReturnValue({ - currentSlot: SlotNumber(100), + epochCache.getTargetAndNextSlot.mockReturnValue({ + targetSlot: SlotNumber(100), nextSlot: SlotNumber(101), }); - (epochCache.isInCommittee as jest.Mock).mockReturnValue(true); - (epochCache.getProposerAttesterAddressInSlot as jest.Mock).mockRejectedValue(new NoCommitteeError()); + epochCache.isInCommittee.mockResolvedValue(true); + epochCache.getProposerAttesterAddressInSlot.mockRejectedValue(new NoCommitteeError()); const result = await validator.validate(mockAttestation); expect(result).toEqual({ result: 'reject', severity: PeerErrorSeverity.LowToleranceError }); diff --git a/yarn-project/p2p/src/msg_validators/attestation_validator/attestation_validator.ts b/yarn-project/p2p/src/msg_validators/attestation_validator/attestation_validator.ts index c5b05276deb2..0f6fbcab8b94 100644 --- a/yarn-project/p2p/src/msg_validators/attestation_validator/attestation_validator.ts +++ b/yarn-project/p2p/src/msg_validators/attestation_validator/attestation_validator.ts @@ -23,13 +23,14 @@ export class CheckpointAttestationValidator implements P2PValidator { - let epochCache: MockProxy; + let epochCache: MockProxy; let attestationPool: MockProxy; let validator: FishermanAttestationValidator; let proposer: Secp256k1Signer; let attester: Secp256k1Signer; beforeEach(() => { - epochCache = mock(); + epochCache = mock(); attestationPool = mock(); validator = new FishermanAttestationValidator(epochCache, attestationPool, getTelemetryClient()); proposer = Secp256k1Signer.random(); @@ -34,7 +34,6 @@ describe('FishermanAttestationValidator', () => { describe('base validation', () => { it('returns high tolerance error if slot number is not current or next slot (outside clock tolerance)', async () => { - // Create an attestation for slot 97 const header = CheckpointHeader.random({ slotNumber: SlotNumber(97) }); const mockAttestation = makeCheckpointAttestation({ header, @@ -42,16 +41,15 @@ describe('FishermanAttestationValidator', () => { proposerSigner: proposer, }); - // Mock epoch cache to return different slot numbers - epochCache.getCurrentAndNextSlot.mockReturnValue({ - currentSlot: SlotNumber(98), + epochCache.getTargetAndNextSlot.mockReturnValue({ + targetSlot: SlotNumber(98), nextSlot: SlotNumber(99), }); - // Mock getEpochAndSlotNow to return time OUTSIDE clock tolerance (1000ms elapsed) + epochCache.getTargetSlot.mockReturnValue(SlotNumber(98)); epochCache.getEpochAndSlotNow.mockReturnValue({ - epoch: 1 as any, + epoch: EpochNumber(1), slot: SlotNumber(98), - ts: 1000n, // slot started at 1000 seconds + ts: 1000n, nowMs: 1001000n, // 1000ms elapsed, outside 500ms tolerance }); epochCache.isInCommittee.mockResolvedValue(true); @@ -72,8 +70,8 @@ describe('FishermanAttestationValidator', () => { proposerSigner: proposer, }); - epochCache.getCurrentAndNextSlot.mockReturnValue({ - currentSlot: SlotNumber(100), + epochCache.getTargetAndNextSlot.mockReturnValue({ + targetSlot: SlotNumber(100), nextSlot: SlotNumber(101), }); epochCache.getProposerAttesterAddressInSlot.mockResolvedValue(proposer.address); @@ -94,8 +92,8 @@ describe('FishermanAttestationValidator', () => { proposerSigner: wrongProposer, }); - epochCache.getCurrentAndNextSlot.mockReturnValue({ - currentSlot: SlotNumber(100), + epochCache.getTargetAndNextSlot.mockReturnValue({ + targetSlot: SlotNumber(100), nextSlot: SlotNumber(101), }); epochCache.getProposerAttesterAddressInSlot.mockResolvedValue(proposer.address); @@ -111,16 +109,15 @@ describe('FishermanAttestationValidator', () => { describe('fisherman payload validation', () => { beforeEach(() => { - // Setup valid base validation for all fisherman tests - epochCache.getCurrentAndNextSlot.mockReturnValue({ - currentSlot: SlotNumber(100), + epochCache.getTargetAndNextSlot.mockReturnValue({ + targetSlot: SlotNumber(100), nextSlot: SlotNumber(101), }); epochCache.isInCommittee.mockResolvedValue(true); epochCache.getProposerAttesterAddressInSlot.mockResolvedValue(proposer.address); }); - it('returns undefined if attestation payload matches proposal payload', async () => { + it('returns accept if attestation payload matches proposal payload', async () => { const checkpointHeader = makeCheckpointHeader(1, { slotNumber: SlotNumber(100) }); const blockHeader = makeBlockHeader(1); const archive = Fr.random(); @@ -131,7 +128,6 @@ describe('FishermanAttestationValidator', () => { archive, }); - // Create a matching checkpoint proposal with the same payload const mockProposal = await makeCheckpointProposal({ checkpointHeader, signer: proposer, @@ -144,13 +140,12 @@ describe('FishermanAttestationValidator', () => { const result = await validator.validate(mockAttestation); expect(result).toEqual({ result: 'accept' }); - // Should have checked the proposal expect(attestationPool.getCheckpointProposal).toHaveBeenCalledWith(mockAttestation.archive.toString()); }); it('returns low tolerance error if attestation payload does not match proposal payload', async () => { const checkpointHeader1 = makeCheckpointHeader(1, { slotNumber: SlotNumber(100) }); - const checkpointHeader2 = makeCheckpointHeader(2, { slotNumber: SlotNumber(100) }); // Different seed = different header + const checkpointHeader2 = makeCheckpointHeader(2, { slotNumber: SlotNumber(100) }); const blockHeader2 = makeBlockHeader(2); const mockAttestation = makeCheckpointAttestation({ @@ -159,7 +154,6 @@ describe('FishermanAttestationValidator', () => { proposerSigner: proposer, }); - // Create a proposal with a different payload const mockProposal = await makeCheckpointProposal({ checkpointHeader: checkpointHeader2, signer: proposer, @@ -171,11 +165,10 @@ describe('FishermanAttestationValidator', () => { const result = await validator.validate(mockAttestation); expect(result).toEqual({ result: 'reject', severity: PeerErrorSeverity.LowToleranceError }); - // Should have checked the proposal expect(attestationPool.getCheckpointProposal).toHaveBeenCalledWith(mockAttestation.archive.toString()); }); - it('returns undefined if proposal is not found yet (attestation arrived before proposal)', async () => { + it('returns accept if proposal is not found yet (attestation arrived before proposal)', async () => { const checkpointHeader = makeCheckpointHeader(1, { slotNumber: SlotNumber(100) }); const mockAttestation = makeCheckpointAttestation({ header: checkpointHeader, @@ -183,13 +176,11 @@ describe('FishermanAttestationValidator', () => { proposerSigner: proposer, }); - // Proposal not found in pool yet attestationPool.getCheckpointProposal.mockResolvedValue(undefined); const result = await validator.validate(mockAttestation); expect(result).toEqual({ result: 'accept' }); - // Should have tried to check the proposal expect(attestationPool.getCheckpointProposal).toHaveBeenCalledWith(mockAttestation.archive.toString()); }); @@ -202,11 +193,10 @@ describe('FishermanAttestationValidator', () => { proposerSigner: proposer, }); - // Create a proposal with the same header but different archive const mockProposal = await makeCheckpointProposal({ checkpointHeader, signer: proposer, - archiveRoot: Fr.random(), // Different archive + archiveRoot: Fr.random(), lastBlock: { blockHeader }, }); @@ -218,7 +208,7 @@ describe('FishermanAttestationValidator', () => { it('detects payload mismatch with different header hash', async () => { const checkpointHeader1 = makeCheckpointHeader(1, { slotNumber: SlotNumber(100) }); - const checkpointHeader2 = makeCheckpointHeader(2, { slotNumber: SlotNumber(100) }); // Same slot but different content + const checkpointHeader2 = makeCheckpointHeader(2, { slotNumber: SlotNumber(100) }); const blockHeader2 = makeBlockHeader(2); const mockAttestation = makeCheckpointAttestation({ @@ -227,7 +217,6 @@ describe('FishermanAttestationValidator', () => { proposerSigner: proposer, }); - // Create a proposal with a different header (different hash) const mockProposal = await makeCheckpointProposal({ checkpointHeader: checkpointHeader2, signer: proposer, @@ -236,7 +225,6 @@ describe('FishermanAttestationValidator', () => { attestationPool.getCheckpointProposal.mockResolvedValue(mockProposal); - // Headers are different, so payloads should be different const result = await validator.validate(mockAttestation); expect(result).toEqual({ result: 'reject', severity: PeerErrorSeverity.LowToleranceError }); }); @@ -244,9 +232,8 @@ describe('FishermanAttestationValidator', () => { describe('edge cases', () => { beforeEach(() => { - // Setup valid base validation - epochCache.getCurrentAndNextSlot.mockReturnValue({ - currentSlot: SlotNumber(100), + epochCache.getTargetAndNextSlot.mockReturnValue({ + targetSlot: SlotNumber(100), nextSlot: SlotNumber(101), }); epochCache.isInCommittee.mockResolvedValue(true); @@ -261,7 +248,6 @@ describe('FishermanAttestationValidator', () => { proposerSigner: proposer, }); - // Simulate pool throwing an error attestationPool.getCheckpointProposal.mockRejectedValue(new Error('Pool error')); await expect(validator.validate(mockAttestation)).rejects.toThrow('Pool error'); diff --git a/yarn-project/p2p/src/msg_validators/clock_tolerance.test.ts b/yarn-project/p2p/src/msg_validators/clock_tolerance.test.ts index a2a172a7b040..cfdbf4bcd2bb 100644 --- a/yarn-project/p2p/src/msg_validators/clock_tolerance.test.ts +++ b/yarn-project/p2p/src/msg_validators/clock_tolerance.test.ts @@ -17,6 +17,8 @@ describe('clock_tolerance', () => { beforeEach(() => { epochCache = mock(); + // Default getTargetSlot to return SlotNumber(100) - tests override as needed + epochCache.getTargetSlot.mockReturnValue(SlotNumber(100)); }); it('returns true for previous slot message within tolerance window (100ms elapsed)', () => { @@ -182,5 +184,24 @@ describe('clock_tolerance', () => { expect(isWithinClockTolerance(messageSlot, currentSlot, epochCache)).toBe(true); }); + + it('returns false when getTargetSlot() does not match currentSlot argument (sanity check)', () => { + const currentSlot = SlotNumber(100); + const messageSlot = SlotNumber(99); // previous slot + + // Simulate a race: caller read target slot as 100, but epoch cache now returns 101 + // (e.g., pipelining was enabled between the two reads) + epochCache.getTargetSlot.mockReturnValue(SlotNumber(101)); + + epochCache.getEpochAndSlotNow.mockReturnValue({ + epoch: 1 as any, + slot: currentSlot, + ts: 1000n, + nowMs: 1000000n, // 0ms elapsed, within tolerance + }); + + // Even though timing is within tolerance, the sanity check fails + expect(isWithinClockTolerance(messageSlot, currentSlot, epochCache)).toBe(false); + }); }); }); diff --git a/yarn-project/p2p/src/msg_validators/clock_tolerance.ts b/yarn-project/p2p/src/msg_validators/clock_tolerance.ts index 89282176469d..dc00e9e6ce2b 100644 --- a/yarn-project/p2p/src/msg_validators/clock_tolerance.ts +++ b/yarn-project/p2p/src/msg_validators/clock_tolerance.ts @@ -36,10 +36,11 @@ export function isWithinClockTolerance( } // Check how far we are into the current slot (in milliseconds) - const { ts: slotStartTs, nowMs, slot } = epochCache.getEpochAndSlotNow(); + const { ts: slotStartTs, nowMs } = epochCache.getEpochAndSlotNow(); + const targetSlot = epochCache.getTargetSlot(); - // Sanity check: ensure the epoch cache's current slot matches the expected current slot - if (slot !== currentSlot) { + // Sanity check: ensure the epoch cache's target slot matches the expected current slot + if (targetSlot !== currentSlot) { return false; } diff --git a/yarn-project/p2p/src/msg_validators/proposal_validator/proposal_validator.test.ts b/yarn-project/p2p/src/msg_validators/proposal_validator/proposal_validator.test.ts index 8df14cd951f0..4210645babbe 100644 --- a/yarn-project/p2p/src/msg_validators/proposal_validator/proposal_validator.test.ts +++ b/yarn-project/p2p/src/msg_validators/proposal_validator/proposal_validator.test.ts @@ -1,6 +1,6 @@ import type { EpochCacheInterface } from '@aztec/epoch-cache'; import { NoCommitteeError } from '@aztec/ethereum/contracts'; -import { SlotNumber } from '@aztec/foundation/branded-types'; +import { EpochNumber, SlotNumber } from '@aztec/foundation/branded-types'; import { Secp256k1Signer } from '@aztec/foundation/crypto/secp256k1-signer'; import { EthAddress } from '@aztec/foundation/eth-address'; import { PeerErrorSeverity } from '@aztec/stdlib/p2p'; @@ -42,7 +42,17 @@ describe('ProposalValidator', () => { beforeEach(() => { epochCache = mock(); validator = new ProposalValidator(epochCache, { txsPermitted: true, maxTxsPerBlock: undefined }, 'test'); - epochCache.getCurrentAndNextSlot.mockReturnValue({ currentSlot, nextSlot }); + epochCache.getEpochAndSlotNow.mockReturnValue({ + epoch: EpochNumber(1), + slot: currentSlot, + ts: 0n, + nowMs: 0n, + }); + epochCache.getTargetAndNextSlot.mockReturnValue({ + targetSlot: currentSlot, + nextSlot, + }); + epochCache.getTargetSlot.mockReturnValue(currentSlot); }); describe.each([ @@ -61,7 +71,7 @@ describe('ProposalValidator', () => { const proposal = await factory(previousSlot, Secp256k1Signer.random()); epochCache.getEpochAndSlotNow.mockReturnValue({ - epoch: 1 as any, + epoch: EpochNumber(1), slot: currentSlot, ts: 1000n, nowMs: 1001000n, // 1000ms elapsed, outside 500ms tolerance @@ -78,7 +88,7 @@ describe('ProposalValidator', () => { const proposal = await factory(previousSlot, signer); epochCache.getEpochAndSlotNow.mockReturnValue({ - epoch: 1 as any, + epoch: EpochNumber(1), slot: currentSlot, ts: 1000n, nowMs: 1000100n, // 100ms elapsed, within 500ms tolerance diff --git a/yarn-project/p2p/src/msg_validators/proposal_validator/proposal_validator.ts b/yarn-project/p2p/src/msg_validators/proposal_validator/proposal_validator.ts index 45c38dd61529..0f2c5d47c5bf 100644 --- a/yarn-project/p2p/src/msg_validators/proposal_validator/proposal_validator.ts +++ b/yarn-project/p2p/src/msg_validators/proposal_validator/proposal_validator.ts @@ -31,13 +31,14 @@ export class ProposalValidator { /** Validates header-level fields: slot, signature, and proposer. */ public async validate(proposal: BlockProposal | CheckpointProposalCore): Promise { try { - // Slot check - const { currentSlot, nextSlot } = this.epochCache.getCurrentAndNextSlot(); + // Slot check: use target slots since proposals target pipeline slots (slot + 1 when pipelining) + const { targetSlot, nextSlot } = this.epochCache.getTargetAndNextSlot(); + const slotNumber = proposal.slotNumber; - if (slotNumber !== currentSlot && slotNumber !== nextSlot) { + if (slotNumber !== targetSlot && slotNumber !== nextSlot) { // Check if message is for previous slot and within clock tolerance - if (!isWithinClockTolerance(slotNumber, currentSlot, this.epochCache)) { - this.logger.warn(`Penalizing peer for invalid slot number ${slotNumber}`, { currentSlot, nextSlot }); + if (!isWithinClockTolerance(slotNumber, targetSlot, this.epochCache)) { + this.logger.warn(`Penalizing peer for invalid slot number ${slotNumber}`, { targetSlot, nextSlot }); return { result: 'reject', severity: PeerErrorSeverity.HighToleranceError }; } this.logger.verbose(`Ignoring proposal for previous slot ${slotNumber} within clock tolerance`); diff --git a/yarn-project/p2p/src/services/libp2p/libp2p_service.test.ts b/yarn-project/p2p/src/services/libp2p/libp2p_service.test.ts index a0e812112a30..f3330dd612b7 100644 --- a/yarn-project/p2p/src/services/libp2p/libp2p_service.test.ts +++ b/yarn-project/p2p/src/services/libp2p/libp2p_service.test.ts @@ -568,7 +568,7 @@ describe('LibP2PService', () => { let blockReceivedCallback: jest.Mock; let duplicateProposalCallback: jest.Mock; - const currentSlot = SlotNumber(100); + const targetSlot = SlotNumber(100); const nextSlot = SlotNumber(101); beforeEach(() => { @@ -578,14 +578,9 @@ describe('LibP2PService', () => { mockTxPool.protectTxs.mockResolvedValue([]); mockEpochCache = mock(); - mockEpochCache.getCurrentAndNextSlot.mockReturnValue({ currentSlot, nextSlot }); mockEpochCache.getProposerAttesterAddressInSlot.mockResolvedValue(signer.address); - mockEpochCache.getEpochAndSlotNow.mockReturnValue({ - epoch: 1 as any, - slot: currentSlot, - ts: 1000n, - nowMs: 1000100n, // 100ms elapsed, within tolerance - }); + mockEpochCache.getTargetAndNextSlot.mockReturnValue({ targetSlot: targetSlot, nextSlot }); + mockEpochCache.getTargetSlot.mockReturnValue(targetSlot); mockPeerManager = mock(); reportMessageValidationResultSpy = jest.fn(); @@ -605,7 +600,7 @@ describe('LibP2PService', () => { }); it('processes valid block: invokes callback and marks txs non-evictable', async () => { - const header = makeBlockHeader(1, { slotNumber: currentSlot }); + const header = makeBlockHeader(1, { slotNumber: targetSlot }); const proposal = await makeBlockProposal({ signer, blockHeader: header }); await service.processBlockFromPeer(proposal.toBuffer(), 'msg-1', mockPeerId); @@ -626,7 +621,7 @@ describe('LibP2PService', () => { }); it('equivocated block: re-broadcasts but does NOT process', async () => { - const header = makeBlockHeader(1, { slotNumber: currentSlot }); + const header = makeBlockHeader(1, { slotNumber: targetSlot }); const indexWithinCheckpoint = IndexWithinCheckpoint(0); // First proposal - should be processed normally @@ -660,14 +655,14 @@ describe('LibP2PService', () => { // Verify duplicate callback was invoked expect(duplicateProposalCallback).toHaveBeenCalledWith({ - slot: currentSlot, + slot: targetSlot, proposer: signer.address, type: 'block', }); }); it('duplicate exact block: returns Ignore, no processing', async () => { - const header = makeBlockHeader(1, { slotNumber: currentSlot }); + const header = makeBlockHeader(1, { slotNumber: targetSlot }); const proposal = await makeBlockProposal({ signer, blockHeader: header }); // First submission @@ -693,7 +688,7 @@ describe('LibP2PService', () => { }); it('cap exceeded: penalizes peer and rejects', async () => { - const header = makeBlockHeader(1, { slotNumber: currentSlot }); + const header = makeBlockHeader(1, { slotNumber: targetSlot }); const indexWithinCheckpoint = IndexWithinCheckpoint(0); // Add MAX_BLOCK_PROPOSALS_PER_POSITION proposals @@ -741,7 +736,7 @@ describe('LibP2PService', () => { }); it('duplicateProposalCallback invoked exactly once per equivocation event', async () => { - const header = makeBlockHeader(1, { slotNumber: currentSlot }); + const header = makeBlockHeader(1, { slotNumber: targetSlot }); const indexWithinCheckpoint = IndexWithinCheckpoint(0); // First proposal - callback NOT invoked @@ -764,7 +759,7 @@ describe('LibP2PService', () => { await service.processBlockFromPeer(proposal2.toBuffer(), 'msg-2', mockPeerId); expect(duplicateProposalCallback).toHaveBeenCalledTimes(1); expect(duplicateProposalCallback).toHaveBeenCalledWith({ - slot: currentSlot, + slot: targetSlot, proposer: signer.address, type: 'block', }); @@ -783,7 +778,7 @@ describe('LibP2PService', () => { }); it('validation failure penalizes peer with correct severity', async () => { - const header = makeBlockHeader(1, { slotNumber: currentSlot }); + const header = makeBlockHeader(1, { slotNumber: targetSlot }); // Create block signed by wrong signer const wrongSigner = Secp256k1Signer.random(); const proposal = await makeBlockProposal({ signer: wrongSigner, blockHeader: header }); @@ -807,7 +802,7 @@ describe('LibP2PService', () => { let checkpointReceivedCallback: jest.Mock; let duplicateProposalCallback: jest.Mock; - const currentSlot = SlotNumber(100); + const targetSlot = SlotNumber(100); const nextSlot = SlotNumber(101); beforeEach(() => { @@ -817,14 +812,9 @@ describe('LibP2PService', () => { mockTxPool.protectTxs.mockResolvedValue([]); mockEpochCache = mock(); - mockEpochCache.getCurrentAndNextSlot.mockReturnValue({ currentSlot, nextSlot }); mockEpochCache.getProposerAttesterAddressInSlot.mockResolvedValue(signer.address); - mockEpochCache.getEpochAndSlotNow.mockReturnValue({ - epoch: 1 as any, - slot: currentSlot, - ts: 1000n, - nowMs: 1000100n, - }); + mockEpochCache.getTargetAndNextSlot.mockReturnValue({ targetSlot, nextSlot }); + mockEpochCache.getTargetSlot.mockReturnValue(targetSlot); mockPeerManager = mock(); reportMessageValidationResultSpy = jest.fn(); @@ -846,7 +836,7 @@ describe('LibP2PService', () => { }); it('processes valid checkpoint: invokes callback and propagates attestations', async () => { - const checkpointHeader = makeCheckpointHeader(1, { slotNumber: currentSlot }); + const checkpointHeader = makeCheckpointHeader(1, { slotNumber: targetSlot }); const proposal = await makeCheckpointProposal({ signer, checkpointHeader }); await service.handleGossipedCheckpointProposal(proposal.toBuffer(), 'msg-1', mockPeerId); @@ -864,7 +854,7 @@ describe('LibP2PService', () => { }); it('equivocated checkpoint: re-broadcasts but does NOT process', async () => { - const checkpointHeader = makeCheckpointHeader(1, { slotNumber: currentSlot }); + const checkpointHeader = makeCheckpointHeader(1, { slotNumber: targetSlot }); // First checkpoint const checkpoint1 = await makeCheckpointProposal({ @@ -882,7 +872,7 @@ describe('LibP2PService', () => { // Second checkpoint at same slot (equivocation) const checkpoint2 = await makeCheckpointProposal({ signer, - checkpointHeader: makeCheckpointHeader(1, { slotNumber: currentSlot }), + checkpointHeader: makeCheckpointHeader(1, { slotNumber: targetSlot }), archiveRoot: Fr.random(), }); await service.handleGossipedCheckpointProposal(checkpoint2.toBuffer(), 'msg-2', mockPeerId); @@ -895,15 +885,15 @@ describe('LibP2PService', () => { // Verify duplicate callback was invoked expect(duplicateProposalCallback).toHaveBeenCalledWith({ - slot: currentSlot, + slot: targetSlot, proposer: signer.address, type: 'checkpoint', }); }); it('checkpoint with lastBlock: processes both when valid', async () => { - const checkpointHeader = makeCheckpointHeader(1, { slotNumber: currentSlot }); - const blockHeader = makeBlockHeader(1, { slotNumber: currentSlot }); + const checkpointHeader = makeCheckpointHeader(1, { slotNumber: targetSlot }); + const blockHeader = makeBlockHeader(1, { slotNumber: targetSlot }); const proposal = await makeCheckpointProposal({ signer, checkpointHeader, @@ -928,8 +918,8 @@ describe('LibP2PService', () => { }); it('lastBlock processed even when checkpoint cap exceeded', async () => { - const checkpointHeader = makeCheckpointHeader(1, { slotNumber: currentSlot }); - const blockHeader = makeBlockHeader(1, { slotNumber: currentSlot }); + const checkpointHeader = makeCheckpointHeader(1, { slotNumber: targetSlot }); + const blockHeader = makeBlockHeader(1, { slotNumber: targetSlot }); // Fill checkpoint slot to MAX_CHECKPOINT_PROPOSALS_PER_SLOT for (let i = 0; i < MAX_CHECKPOINT_PROPOSALS_PER_SLOT; i++) { @@ -937,7 +927,7 @@ describe('LibP2PService', () => { mockEpochCache.getProposerAttesterAddressInSlot.mockResolvedValue(individualSigner.address); const proposal = await makeCheckpointProposal({ signer: individualSigner, - checkpointHeader: makeCheckpointHeader(1, { slotNumber: currentSlot }), + checkpointHeader: makeCheckpointHeader(1, { slotNumber: targetSlot }), archiveRoot: Fr.random(), }); await service.handleGossipedCheckpointProposal(proposal.toBuffer(), `msg-${i}`, mockPeerId); @@ -986,8 +976,8 @@ describe('LibP2PService', () => { }); it('checkpoint rejected when lastBlock is equivocated', async () => { - const checkpointHeader = makeCheckpointHeader(1, { slotNumber: currentSlot }); - const blockHeader = makeBlockHeader(1, { slotNumber: currentSlot }); + const checkpointHeader = makeCheckpointHeader(1, { slotNumber: targetSlot }); + const blockHeader = makeBlockHeader(1, { slotNumber: targetSlot }); const indexWithinCheckpoint = IndexWithinCheckpoint(4); // Pre-add a block at same position @@ -1023,7 +1013,7 @@ describe('LibP2PService', () => { }); it('validation failure penalizes peer with correct severity', async () => { - const checkpointHeader = makeCheckpointHeader(1, { slotNumber: currentSlot }); + const checkpointHeader = makeCheckpointHeader(1, { slotNumber: targetSlot }); // Create checkpoint signed by wrong signer const wrongSigner = Secp256k1Signer.random(); const proposal = await makeCheckpointProposal({ signer: wrongSigner, checkpointHeader }); diff --git a/yarn-project/p2p/src/test-helpers/testbench-utils.ts b/yarn-project/p2p/src/test-helpers/testbench-utils.ts index b375e80df0ca..86903280deed 100644 --- a/yarn-project/p2p/src/test-helpers/testbench-utils.ts +++ b/yarn-project/p2p/src/test-helpers/testbench-utils.ts @@ -273,17 +273,41 @@ export class InMemoryAttestationPool { * Creates a mock EpochCache for testing. */ export function createMockEpochCache(): EpochCacheInterface { - return { + const cache: EpochCacheInterface = { getCommittee: () => Promise.resolve({ committee: [], seed: 1n, epoch: EpochNumber.ZERO, isEscapeHatchOpen: false }), getProposerIndexEncoding: () => '0x' as `0x${string}`, - getEpochAndSlotNow: () => ({ epoch: EpochNumber.ZERO, slot: SlotNumber.ZERO, ts: 0n, nowMs: 0n }), + getSlotNow: () => SlotNumber.ZERO, + getTargetSlot: () => SlotNumber.ZERO, + getEpochNow: () => EpochNumber.ZERO, + getTargetEpoch: () => EpochNumber.ZERO, + getEpochAndSlotNow: () => ({ + epoch: EpochNumber.ZERO, + slot: SlotNumber.ZERO, + ts: 0n, + nowMs: 0n, + }), + isProposerPipeliningEnabled: () => false, computeProposerIndex: () => 0n, getCurrentAndNextSlot: () => ({ currentSlot: SlotNumber.ZERO, nextSlot: SlotNumber.ZERO }), + getTargetAndNextSlot: () => ({ targetSlot: SlotNumber.ZERO, nextSlot: SlotNumber.ZERO }), getProposerAttesterAddressInSlot: () => Promise.resolve(undefined), - getEpochAndSlotInNextL1Slot: () => ({ epoch: EpochNumber.ZERO, slot: SlotNumber.ZERO, ts: 0n, now: 0n }), + getEpochAndSlotInNextL1Slot: () => ({ + epoch: EpochNumber.ZERO, + slot: SlotNumber.ZERO, + ts: 0n, + nowSeconds: 0n, + }), + getTargetEpochAndSlotInNextL1Slot: () => ({ + epoch: EpochNumber.ZERO, + slot: SlotNumber.ZERO, + ts: 0n, + nowSeconds: 0n, + }), isInCommittee: () => Promise.resolve(false), getRegisteredValidators: () => Promise.resolve([]), filterInCommittee: () => Promise.resolve([]), + isEscapeHatchOpen: () => Promise.resolve(false), + isEscapeHatchOpenAtSlot: () => Promise.resolve(false), getL1Constants: () => ({ l1StartBlock: 0n, l1GenesisTime: 0n, @@ -295,6 +319,7 @@ export function createMockEpochCache(): EpochCacheInterface { rollupManaLimit: Number.MAX_SAFE_INTEGER, }), }; + return cache; } /** diff --git a/yarn-project/p2p/src/testbench/p2p_client_testbench_worker.ts b/yarn-project/p2p/src/testbench/p2p_client_testbench_worker.ts index 1d3224d94a33..14d733793413 100644 --- a/yarn-project/p2p/src/testbench/p2p_client_testbench_worker.ts +++ b/yarn-project/p2p/src/testbench/p2p_client_testbench_worker.ts @@ -4,7 +4,7 @@ * Used when running testbench commands. */ import { MockL2BlockSource } from '@aztec/archiver/test'; -import type { EpochCacheInterface } from '@aztec/epoch-cache'; +import type { EpochCache, EpochCacheInterface } from '@aztec/epoch-cache'; import { BlockNumber } from '@aztec/foundation/branded-types'; import { SecretValue } from '@aztec/foundation/config'; import { Secp256k1Signer } from '@aztec/foundation/crypto/secp256k1-signer'; @@ -28,6 +28,7 @@ import { type TelemetryClient, getTelemetryClient } from '@aztec/telemetry-clien import type { Message, PeerId } from '@libp2p/interface'; import { TopicValidatorResult } from '@libp2p/interface'; import { peerIdFromString } from '@libp2p/peer-id'; +import { mock } from 'jest-mock-extended'; import type { P2PClient } from '../client/index.js'; import type { P2PConfig } from '../config.js'; @@ -49,7 +50,6 @@ import { InMemoryAttestationPool, InMemoryTxPool, UNLIMITED_RATE_LIMIT_QUOTA, - createMockEpochCache, createMockWorldStateSynchronizer, filterTxsByDistribution, } from '../test-helpers/index.js'; @@ -344,7 +344,7 @@ process.on('message', async msg => { workerConfig = config; workerTxPool = new InMemoryTxPool(); workerAttestationPool = new InMemoryAttestationPool(); - const epochCache = createMockEpochCache(); + const epochCache = mock(); const worldState = createMockWorldStateSynchronizer(); const l2BlockSource = new MockL2BlockSource(); diff --git a/yarn-project/sequencer-client/src/publisher/sequencer-publisher-factory.test.ts b/yarn-project/sequencer-client/src/publisher/sequencer-publisher-factory.test.ts index a06a0b62e2e7..131963b5defd 100644 --- a/yarn-project/sequencer-client/src/publisher/sequencer-publisher-factory.test.ts +++ b/yarn-project/sequencer-client/src/publisher/sequencer-publisher-factory.test.ts @@ -34,6 +34,7 @@ describe('SequencerPublisherFactory', () => { beforeEach(() => { mockConfig = { ethereumSlotDuration: 12, + aztecSlotDuration: 36, } as SequencerClientConfig; mockPublisherManager = mock>(); mockBlobClient = mock(); diff --git a/yarn-project/sequencer-client/src/publisher/sequencer-publisher.test.ts b/yarn-project/sequencer-client/src/publisher/sequencer-publisher.test.ts index 1665e6de07c0..75fdbb221715 100644 --- a/yarn-project/sequencer-client/src/publisher/sequencer-publisher.test.ts +++ b/yarn-project/sequencer-client/src/publisher/sequencer-publisher.test.ts @@ -118,11 +118,11 @@ describe('SequencerPublisher', () => { rollupAddress: EthAddress.ZERO.toString(), governanceProposerAddress: mockGovernanceProposerAddress, }, - + aztecSlotDuration: 36, ...defaultL1TxUtilsConfig, } as unknown as TxSenderConfig & PublisherConfig & - Pick & + Pick & L1TxUtilsConfig; rollup = mock(); @@ -138,7 +138,13 @@ describe('SequencerPublisher', () => { slashFactoryContract = mock(); const epochCache = mock(); - epochCache.getEpochAndSlotNow.mockReturnValue({ epoch: EpochNumber(1), slot: SlotNumber(2), ts: 3n, nowMs: 3000n }); + epochCache.getEpochAndSlotNow.mockReturnValue({ + epoch: EpochNumber(1), + slot: SlotNumber(2), + ts: 3n, + nowMs: 3000n, + }); + epochCache.getSlotNow.mockReturnValue(SlotNumber(2)); epochCache.getCommittee.mockResolvedValue({ committee: [], seed: 1n, @@ -320,6 +326,7 @@ describe('SequencerPublisher', () => { ts: 3n, nowMs: 3000n, }); + epochCache.getSlotNow.mockReturnValue(SlotNumber(2)); epochCache.getCommittee.mockResolvedValue({ committee: [], seed: 1n, @@ -327,19 +334,22 @@ describe('SequencerPublisher', () => { isEscapeHatchOpen: false, }); - rotatingPublisher = new SequencerPublisher({ ethereumSlotDuration: 12, l1ChainId: 1 } as any, { - blobClient, - rollupContract: rollup, - l1TxUtils, - epochCache, - slashingProposerContract, - governanceProposerContract, - slashFactoryContract, - dateProvider: new TestDateProvider(), - metrics: l1Metrics, - lastActions: {}, - getNextPublisher, - }); + rotatingPublisher = new SequencerPublisher( + { ethereumSlotDuration: 12, aztecSlotDuration: 36, l1ChainId: 1 } as any, + { + blobClient, + rollupContract: rollup, + l1TxUtils, + epochCache, + slashingProposerContract, + governanceProposerContract, + slashFactoryContract, + dateProvider: new TestDateProvider(), + metrics: l1Metrics, + lastActions: {}, + getNextPublisher, + }, + ); }); it('rotates to next publisher when forward throws and retries successfully', async () => { diff --git a/yarn-project/sequencer-client/src/publisher/sequencer-publisher.ts b/yarn-project/sequencer-client/src/publisher/sequencer-publisher.ts index 1baac9255c7b..971a338d8647 100644 --- a/yarn-project/sequencer-client/src/publisher/sequencer-publisher.ts +++ b/yarn-project/sequencer-client/src/publisher/sequencer-publisher.ts @@ -133,6 +133,7 @@ export class SequencerPublisher { protected log: Logger; protected ethereumSlotDuration: bigint; + protected aztecSlotDuration: bigint; private blobClient: BlobClientInterface; @@ -166,7 +167,7 @@ export class SequencerPublisher { constructor( private config: Pick & - Pick & { l1ChainId: number }, + Pick & { l1ChainId: number }, deps: { telemetry?: TelemetryClient; blobClient: BlobClientInterface; @@ -185,6 +186,7 @@ export class SequencerPublisher { ) { this.log = deps.log ?? createLogger('sequencer:publisher'); this.ethereumSlotDuration = BigInt(config.ethereumSlotDuration); + this.aztecSlotDuration = BigInt(config.aztecSlotDuration); this.epochCache = deps.epochCache; this.lastActions = deps.lastActions; @@ -286,7 +288,7 @@ export class SequencerPublisher { } public getCurrentL2Slot(): SlotNumber { - return this.epochCache.getEpochAndSlotNow().slot; + return this.epochCache.getSlotNow(); } /** @@ -596,20 +598,23 @@ export class SequencerPublisher { } /** - * @notice Will call `canProposeAtNextEthBlock` to make sure that it is possible to propose + * @notice Will call `canProposeAt` to make sure that it is possible to propose * @param tipArchive - The archive to check * @returns The slot and block number if it is possible to propose, undefined otherwise */ - public canProposeAtNextEthBlock( + public canProposeAt( tipArchive: Fr, msgSender: EthAddress, - opts: { forcePendingCheckpointNumber?: CheckpointNumber } = {}, + opts: { forcePendingCheckpointNumber?: CheckpointNumber; pipelined?: boolean } = {}, ) { // TODO: #14291 - should loop through multiple keys to check if any of them can propose const ignoredErrors = ['SlotAlreadyInChain', 'InvalidProposer', 'InvalidArchive']; + const pipelined = opts.pipelined ?? this.epochCache.isProposerPipeliningEnabled(); + const slotOffset = pipelined ? this.aztecSlotDuration : 0n; + return this.rollupContract - .canProposeAtNextEthBlock(tipArchive.toBuffer(), msgSender.toString(), Number(this.ethereumSlotDuration), { + .canProposeAt(tipArchive.toBuffer(), msgSender.toString(), this.ethereumSlotDuration, slotOffset, { forcePendingCheckpointNumber: opts.forcePendingCheckpointNumber, }) .catch(err => { @@ -623,6 +628,7 @@ export class SequencerPublisher { return undefined; }); } + /** * @notice Will simulate `validateHeader` to make sure that the block header is valid * @dev This is a convenience function that can be used by the sequencer to validate a "partial" header. @@ -811,7 +817,9 @@ export class SequencerPublisher { attestationsAndSignersSignature: Signature, options: { forcePendingCheckpointNumber?: CheckpointNumber }, ): Promise { - const ts = BigInt((await this.l1TxUtils.getBlock()).timestamp + this.ethereumSlotDuration); + // Anchor the simulation timestamp to the checkpoint's own slot start time + // rather than the current L1 block timestamp, which may overshoot into the next slot if the build ran late. + const ts = checkpoint.header.timestamp; const blobFields = checkpoint.toBlobFields(); const blobs = await getBlobsPerL1Block(blobFields); const blobInput = getPrefixedEthBlobCommitments(blobs); diff --git a/yarn-project/sequencer-client/src/sequencer/checkpoint_proposal_job.test.ts b/yarn-project/sequencer-client/src/sequencer/checkpoint_proposal_job.test.ts index cc62248bb632..5821f87fb080 100644 --- a/yarn-project/sequencer-client/src/sequencer/checkpoint_proposal_job.test.ts +++ b/yarn-project/sequencer-client/src/sequencer/checkpoint_proposal_job.test.ts @@ -1,4 +1,4 @@ -import type { EpochCache } from '@aztec/epoch-cache'; +import { EpochCache } from '@aztec/epoch-cache'; import { BlockNumber, CheckpointNumber, @@ -438,6 +438,42 @@ describe('CheckpointProposalJob', () => { expect(call.previousCheckpointOutHashes).toHaveLength(1); expect(call.previousCheckpointOutHashes[0]).toEqual(previousCheckpoint.getCheckpointOutHash()); }); + + it('uses targetEpoch for previousCheckpointOutHashes when pipelining crosses epoch boundary', async () => { + // Pipelining scenario: wall-clock is in epoch 0, but target slot is in epoch 1. + // The key fix: getCheckpointsDataForEpoch must be called with targetEpoch, not epochNow. + const epochNow = EpochNumber(0); + const targetEpoch = EpochNumber(1); + // Target slot is first slot of epoch 1 (epochDuration = 16) + const targetSlot = SlotNumber(l1Constants.epochDuration); + // Wall-clock slot is the last slot of epoch 0 + const slotNow = SlotNumber(l1Constants.epochDuration - 1); + + checkpointNumber = CheckpointNumber(2); + const previousCheckpoint = await Checkpoint.random(CheckpointNumber(1)); + + l2BlockSource.getCheckpointsDataForEpoch.mockResolvedValue([toCheckpointData(previousCheckpoint)]); + + job = createCheckpointProposalJob({ slotNow, targetSlot, epochNow, targetEpoch }); + job.setTimetable( + new SequencerTimetable({ + ethereumSlotDuration, + aztecSlotDuration: slotDuration, + l1PublishingTime: ethereumSlotDuration, + enforce: config.enforceTimeTable, + }), + ); + + // Build block successfully + const { txs, block } = await setupTxsAndBlock(p2p, globalVariables, 1, chainId); + checkpointBuilder.seedBlocks([block], [txs]); + validatorClient.collectAttestations.mockResolvedValue(getAttestations(block)); + + await job.execute(); + + // Verify getCheckpointsDataForEpoch was called with targetEpoch (1), not epochNow (0) + expect(l2BlockSource.getCheckpointsDataForEpoch).toHaveBeenCalledWith(targetEpoch); + }); }); /** @@ -512,13 +548,20 @@ describe('CheckpointProposalJob', () => { * Called in beforeEach to create the job, and tests can use job.updateConfig() * to modify config after creation. */ - function createCheckpointProposalJob(): TestCheckpointProposalJob { + function createCheckpointProposalJob(overrides?: { + slotNow?: SlotNumber; + targetSlot?: SlotNumber; + epochNow?: EpochNumber; + targetEpoch?: EpochNumber; + }): TestCheckpointProposalJob { const setStateFn = jest.fn(); const eventEmitter = new EventEmitter() as TypedEventEmitter; return new TestCheckpointProposalJob( - epoch, - SlotNumber(newSlotNumber), + overrides?.slotNow ?? SlotNumber(newSlotNumber), + overrides?.targetSlot ?? SlotNumber(newSlotNumber), + overrides?.epochNow ?? epoch, + overrides?.targetEpoch ?? epoch, checkpointNumber, lastBlockNumber, proposer, diff --git a/yarn-project/sequencer-client/src/sequencer/checkpoint_proposal_job.timing.test.ts b/yarn-project/sequencer-client/src/sequencer/checkpoint_proposal_job.timing.test.ts index bed1f5c8cca4..2a3bd79c7a9e 100644 --- a/yarn-project/sequencer-client/src/sequencer/checkpoint_proposal_job.timing.test.ts +++ b/yarn-project/sequencer-client/src/sequencer/checkpoint_proposal_job.timing.test.ts @@ -1,4 +1,4 @@ -import type { EpochCache } from '@aztec/epoch-cache'; +import { EpochCache } from '@aztec/epoch-cache'; import { BlockNumber, CheckpointNumber, EpochNumber, SlotNumber } from '@aztec/foundation/branded-types'; import { Secp256k1Signer } from '@aztec/foundation/crypto/secp256k1-signer'; import { Fr } from '@aztec/foundation/curves/bn254'; @@ -289,8 +289,10 @@ describe('CheckpointProposalJob Timing Tests', () => { return new TimingTestCheckpointProposalJob( dateProvider, getSecondsIntoSlot, - epoch, slotNumber, + slotNumber, + epoch, + epoch, checkpointNumber, BlockNumber.ZERO, proposer, diff --git a/yarn-project/sequencer-client/src/sequencer/checkpoint_proposal_job.ts b/yarn-project/sequencer-client/src/sequencer/checkpoint_proposal_job.ts index 9e24324d937e..24026200d942 100644 --- a/yarn-project/sequencer-client/src/sequencer/checkpoint_proposal_job.ts +++ b/yarn-project/sequencer-client/src/sequencer/checkpoint_proposal_job.ts @@ -31,7 +31,7 @@ import { MaliciousCommitteeAttestationsAndSigners, } from '@aztec/stdlib/block'; import { type Checkpoint, validateCheckpoint } from '@aztec/stdlib/checkpoint'; -import { getSlotStartBuildTimestamp } from '@aztec/stdlib/epoch-helpers'; +import { getSlotStartBuildTimestamp, getTimestampForSlot } from '@aztec/stdlib/epoch-helpers'; import { Gas } from '@aztec/stdlib/gas'; import { type BlockBuilderOptions, @@ -77,8 +77,10 @@ export class CheckpointProposalJob implements Traceable { protected readonly log: Logger; constructor( - private readonly epoch: EpochNumber, - private readonly slot: SlotNumber, + private readonly slotNow: SlotNumber, + private readonly targetSlot: SlotNumber, + private readonly epochNow: EpochNumber, + private readonly targetEpoch: EpochNumber, private readonly checkpointNumber: CheckpointNumber, private readonly syncedToBlockNumber: BlockNumber, // TODO(palla/mbps): Can we remove the proposer in favor of attestorAddress? Need to check fisherman-node flows. @@ -106,7 +108,20 @@ export class CheckpointProposalJob implements Traceable { public readonly tracer: Tracer, bindings?: LoggerBindings, ) { - this.log = createLogger('sequencer:checkpoint-proposal', { ...bindings, instanceId: `slot-${slot}` }); + this.log = createLogger('sequencer:checkpoint-proposal', { + ...bindings, + instanceId: `slot-${this.slotNow}`, + }); + } + + /** The wall-clock slot during which the proposer builds. */ + private get slot(): SlotNumber { + return this.slotNow; + } + + /** The wall-clock epoch. */ + private get epoch(): EpochNumber { + return this.epochNow; } /** @@ -119,7 +134,7 @@ export class CheckpointProposalJob implements Traceable { // In fisherman mode, we simulate slashing but don't actually publish to L1 // These are constant for the whole slot, so we only enqueue them once const votesPromises = new CheckpointVoter( - this.slot, + this.targetSlot, this.publisher, this.attestorAddress, this.validatorClient, @@ -146,6 +161,29 @@ export class CheckpointProposalJob implements Traceable { return; } + // If pipelining, wait until the submission slot so L1 recognizes the pipelined proposer + if (this.epochCache.isProposerPipeliningEnabled()) { + const submissionSlotTimestamp = + getTimestampForSlot(this.targetSlot, this.l1Constants) - BigInt(this.l1Constants.ethereumSlotDuration); + this.log.info(`Waiting until submission slot ${this.targetSlot} for L1 submission`, { + slot: this.slot, + submissionSlot: this.targetSlot, + submissionSlotTimestamp, + }); + await sleepUntil(new Date(Number(submissionSlotTimestamp) * 1000), this.dateProvider.nowAsDate()); + + // After waking, verify the parent checkpoint wasn't pruned during the sleep. + // We check L1's pending tip directly instead of canProposeAt, which also validates the proposer + // identity and would fail because the timestamp resolves to a different slot's proposer. + const l1Tips = await this.publisher.rollupContract.getTips(); + if (l1Tips.pending < this.checkpointNumber - 1) { + this.log.warn( + `Parent checkpoint was pruned during pipelining sleep (L1 pending=${l1Tips.pending}, expected>=${this.checkpointNumber - 1}), skipping L1 submission for checkpoint ${this.checkpointNumber}`, + ); + return undefined; + } + } + // Then send everything to L1 const l1Response = await this.publisher.sendRequests(); const proposedAction = l1Response?.successfulActions.find(a => a === 'propose'); @@ -164,7 +202,7 @@ export class CheckpointProposalJob implements Traceable { return { // nullish operator needed for tests [Attributes.COINBASE]: this.validatorClient.getCoinbaseForAttestor(this.attestorAddress)?.toString(), - [Attributes.SLOT_NUMBER]: this.slot, + [Attributes.SLOT_NUMBER]: this.targetSlot, }; }) private async proposeCheckpoint(): Promise { @@ -174,8 +212,15 @@ export class CheckpointProposalJob implements Traceable { const feeRecipient = this.validatorClient.getFeeRecipientForAttestor(this.attestorAddress); // Start the checkpoint - this.setStateFn(SequencerState.INITIALIZING_CHECKPOINT, this.slot); - this.metrics.incOpenSlot(this.slot, this.proposer?.toString() ?? 'unknown'); + this.setStateFn(SequencerState.INITIALIZING_CHECKPOINT, this.targetSlot); + this.log.info(`Starting checkpoint proposal`, { + buildSlot: this.slot, + submissionSlot: this.targetSlot, + pipelining: this.epochCache.isProposerPipeliningEnabled(), + proposer: this.proposer?.toString(), + coinbase: coinbase.toString(), + }); + this.metrics.incOpenSlot(this.targetSlot, this.proposer?.toString() ?? 'unknown'); // Enqueues checkpoint invalidation (constant for the whole slot) if (this.invalidateCheckpoint && !this.config.skipInvalidateBlockAsProposer) { @@ -186,7 +231,7 @@ export class CheckpointProposalJob implements Traceable { const checkpointGlobalVariables = await this.globalsBuilder.buildCheckpointGlobalVariables( coinbase, feeRecipient, - this.slot, + this.targetSlot, ); // Collect L1 to L2 messages for the checkpoint and compute their hash @@ -194,7 +239,7 @@ export class CheckpointProposalJob implements Traceable { const inHash = computeInHashFromL1ToL2Messages(l1ToL2Messages); // Collect the out hashes of all the checkpoints before this one in the same epoch - const previousCheckpointOutHashes = (await this.l2BlockSource.getCheckpointsDataForEpoch(this.epoch)) + const previousCheckpointOutHashes = (await this.l2BlockSource.getCheckpointsDataForEpoch(this.targetEpoch)) .filter(c => c.checkpointNumber < this.checkpointNumber) .map(c => c.checkpointOutHash); @@ -251,8 +296,8 @@ export class CheckpointProposalJob implements Traceable { } if (blocksInCheckpoint.length === 0) { - this.log.warn(`No blocks were built for slot ${this.slot}`, { slot: this.slot }); - this.eventEmitter.emit('checkpoint-empty', { slot: this.slot }); + this.log.warn(`No blocks were built for slot ${this.targetSlot}`, { slot: this.targetSlot }); + this.eventEmitter.emit('checkpoint-empty', { slot: this.targetSlot }); return undefined; } @@ -260,14 +305,14 @@ export class CheckpointProposalJob implements Traceable { if (minBlocksForCheckpoint !== undefined && blocksInCheckpoint.length < minBlocksForCheckpoint) { this.log.warn( `Checkpoint has fewer blocks than minimum (${blocksInCheckpoint.length} < ${minBlocksForCheckpoint}), skipping proposal`, - { slot: this.slot, blocksBuilt: blocksInCheckpoint.length, minBlocksForCheckpoint }, + { slot: this.targetSlot, blocksBuilt: blocksInCheckpoint.length, minBlocksForCheckpoint }, ); return undefined; } // Assemble and broadcast the checkpoint proposal, including the last block that was not // broadcasted yet, and wait to collect the committee attestations. - this.setStateFn(SequencerState.ASSEMBLING_CHECKPOINT, this.slot); + this.setStateFn(SequencerState.ASSEMBLING_CHECKPOINT, this.targetSlot); const checkpoint = await checkpointBuilder.completeCheckpoint(); // Final validation: per-block limits are only checked if the operator set them explicitly. @@ -298,10 +343,10 @@ export class CheckpointProposalJob implements Traceable { // Do not collect attestations nor publish to L1 in fisherman mode if (this.config.fishermanMode) { this.log.info( - `Built checkpoint for slot ${this.slot} with ${blocksInCheckpoint.length} blocks. ` + + `Built checkpoint for slot ${this.targetSlot} with ${blocksInCheckpoint.length} blocks. ` + `Skipping proposal in fisherman mode.`, { - slot: this.slot, + slot: this.targetSlot, checkpoint: checkpoint.header.toInspect(), blocksBuilt: blocksInCheckpoint.length, }, @@ -330,7 +375,7 @@ export class CheckpointProposalJob implements Traceable { const blockProposedAt = this.dateProvider.now(); await this.p2pClient.broadcastCheckpointProposal(proposal); - this.setStateFn(SequencerState.COLLECTING_ATTESTATIONS, this.slot); + this.setStateFn(SequencerState.COLLECTING_ATTESTATIONS, this.targetSlot); const attestations = await this.waitForAttestations(proposal); const blockAttestedAt = this.dateProvider.now(); @@ -343,7 +388,7 @@ export class CheckpointProposalJob implements Traceable { attestationsSignature = await this.validatorClient.signAttestationsAndSigners( attestations, signer, - this.slot, + this.targetSlot, this.checkpointNumber, ); } catch (err) { @@ -356,10 +401,10 @@ export class CheckpointProposalJob implements Traceable { } // Enqueue publishing the checkpoint to L1 - this.setStateFn(SequencerState.PUBLISHING_CHECKPOINT, this.slot); + this.setStateFn(SequencerState.PUBLISHING_CHECKPOINT, this.targetSlot); const aztecSlotDuration = this.l1Constants.slotDuration; - const slotStartBuildTimestamp = this.getSlotStartBuildTimestamp(); - const txTimeoutAt = new Date((slotStartBuildTimestamp + aztecSlotDuration) * 1000); + const submissionSlotStart = Number(getTimestampForSlot(this.targetSlot, this.l1Constants)); + const txTimeoutAt = new Date((submissionSlotStart + aztecSlotDuration) * 1000); // If we have been configured to potentially skip publishing checkpoint then roll the dice here if ( @@ -408,7 +453,6 @@ export class CheckpointProposalJob implements Traceable { const blocksInCheckpoint: L2Block[] = []; const txHashesAlreadyIncluded = new Set(); const initialBlockNumber = BlockNumber(this.syncedToBlockNumber + 1); - const slot = this.slot; // Last block in the checkpoint will usually be flagged as pending broadcast, so we send it along with the checkpoint proposal let blockPendingBroadcast: { block: L2Block; txs: Tx[] } | undefined = undefined; @@ -422,7 +466,11 @@ export class CheckpointProposalJob implements Traceable { const timingInfo = this.timetable.canStartNextBlock(secondsIntoSlot); if (!timingInfo.canStart) { - this.log.debug(`Not enough time left in slot to start another block`, { slot, blocksBuilt, secondsIntoSlot }); + this.log.debug(`Not enough time left in slot to start another block`, { + slot: this.targetSlot, + blocksBuilt, + secondsIntoSlot, + }); break; } @@ -454,7 +502,11 @@ export class CheckpointProposalJob implements Traceable { } else if ('error' in buildResult) { // If there was an error building the block, just exit the loop and give up the rest of the slot if (!(buildResult.error instanceof SequencerInterruptedError)) { - this.log.warn(`Halting block building for slot ${slot}`, { slot, blocksBuilt, error: buildResult.error }); + this.log.warn(`Halting block building for slot ${this.targetSlot}`, { + slot: this.targetSlot, + blocksBuilt, + error: buildResult.error, + }); } break; } @@ -463,11 +515,15 @@ export class CheckpointProposalJob implements Traceable { blocksInCheckpoint.push(block); usedTxs.forEach(tx => txHashesAlreadyIncluded.add(tx.txHash.toString())); - // If this is the last block, send the proposed block to the archiver, - // and exit the loop now so we can build the checkpoint and start collecting attestations. + // If this is the last block, sync it to the archiver and exit the loop + // so we can build the checkpoint and start collecting attestations. if (timingInfo.isLastBlock) { await this.syncProposedBlockToArchiver(block); - this.log.verbose(`Completed final block ${blockNumber} for slot ${slot}`, { slot, blockNumber, blocksBuilt }); + this.log.verbose(`Completed final block ${blockNumber} for slot ${this.targetSlot}`, { + slot: this.targetSlot, + blockNumber, + blocksBuilt, + }); blockPendingBroadcast = { block, txs: usedTxs }; break; } @@ -490,8 +546,8 @@ export class CheckpointProposalJob implements Traceable { await this.waitUntilNextSubslot(timingInfo.deadline); } - this.log.verbose(`Block building loop completed for slot ${this.slot}`, { - slot: this.slot, + this.log.verbose(`Block building loop completed for slot ${this.targetSlot}`, { + slot: this.targetSlot, blocksBuilt: blocksInCheckpoint.length, }); @@ -523,8 +579,10 @@ export class CheckpointProposalJob implements Traceable { /** Sleeps until it is time to produce the next block in the slot */ @trackSpan('CheckpointProposalJob.waitUntilNextSubslot') private async waitUntilNextSubslot(nextSubslotStart: number) { - this.setStateFn(SequencerState.WAITING_UNTIL_NEXT_BLOCK, this.slot); - this.log.verbose(`Waiting until time for the next block at ${nextSubslotStart}s into slot`, { slot: this.slot }); + this.setStateFn(SequencerState.WAITING_UNTIL_NEXT_BLOCK, this.targetSlot); + this.log.verbose(`Waiting until time for the next block at ${nextSubslotStart}s into slot`, { + slot: this.targetSlot, + }); await this.waitUntilTimeInSlot(nextSubslotStart); } @@ -545,7 +603,7 @@ export class CheckpointProposalJob implements Traceable { opts; this.log.verbose( - `Preparing block ${blockNumber} index ${indexWithinCheckpoint} at checkpoint ${this.checkpointNumber} for slot ${this.slot}`, + `Preparing block ${blockNumber} index ${indexWithinCheckpoint} at checkpoint ${this.checkpointNumber} for slot ${this.targetSlot}`, { ...checkpointBuilder.getConstantData(), ...opts }, ); @@ -554,10 +612,10 @@ export class CheckpointProposalJob implements Traceable { const { availableTxs, canStartBuilding, minTxs } = await this.waitForMinTxs(opts); if (!canStartBuilding) { this.log.warn( - `Not enough txs to build block ${blockNumber} at index ${indexWithinCheckpoint} in slot ${this.slot} (got ${availableTxs} txs but needs ${minTxs})`, - { blockNumber, slot: this.slot, indexWithinCheckpoint }, + `Not enough txs to build block ${blockNumber} at index ${indexWithinCheckpoint} in slot ${this.targetSlot} (got ${availableTxs} txs but needs ${minTxs})`, + { blockNumber, slot: this.targetSlot, indexWithinCheckpoint }, ); - this.eventEmitter.emit('block-tx-count-check-failed', { minTxs, availableTxs, slot: this.slot }); + this.eventEmitter.emit('block-tx-count-check-failed', { minTxs, availableTxs, slot: this.targetSlot }); this.metrics.recordBlockProposalFailed('insufficient_txs'); return undefined; } @@ -570,10 +628,10 @@ export class CheckpointProposalJob implements Traceable { ); this.log.debug( - `Building block ${blockNumber} at index ${indexWithinCheckpoint} for slot ${this.slot} with ${availableTxs} available txs`, - { slot: this.slot, blockNumber, indexWithinCheckpoint }, + `Building block ${blockNumber} at index ${indexWithinCheckpoint} for slot ${this.targetSlot} with ${availableTxs} available txs`, + { slot: this.targetSlot, blockNumber, indexWithinCheckpoint }, ); - this.setStateFn(SequencerState.CREATING_BLOCK, this.slot); + this.setStateFn(SequencerState.CREATING_BLOCK, this.targetSlot); // Per-block limits are operator overrides (from SEQ_MAX_L2_BLOCK_GAS etc.) further capped // by remaining checkpoint-level budgets inside CheckpointBuilder before each block is built. @@ -608,16 +666,19 @@ export class CheckpointProposalJob implements Traceable { if (buildResult.status === 'insufficient-valid-txs') { this.log.warn( - `Block ${blockNumber} at index ${indexWithinCheckpoint} on slot ${this.slot} has too few valid txs to be proposed`, + `Block ${blockNumber} at index ${indexWithinCheckpoint} on slot ${this.targetSlot} has too few valid txs to be proposed`, { - slot: this.slot, + slot: this.targetSlot, blockNumber, numTxs: buildResult.processedCount, indexWithinCheckpoint, minValidTxs, }, ); - this.eventEmitter.emit('block-build-failed', { reason: `Insufficient valid txs`, slot: this.slot }); + this.eventEmitter.emit('block-build-failed', { + reason: `Insufficient valid txs`, + slot: this.targetSlot, + }); this.metrics.recordBlockProposalFailed('insufficient_valid_txs'); return undefined; } @@ -637,17 +698,24 @@ export class CheckpointProposalJob implements Traceable { const manaPerSec = block.header.totalManaUsed.toNumberUnsafe() / (blockBuildDuration / 1000); this.log.info( - `Built block ${block.number} at checkpoint ${this.checkpointNumber} for slot ${this.slot} with ${numTxs} txs`, + `Built block ${block.number} at checkpoint ${this.checkpointNumber} for slot ${this.targetSlot} with ${numTxs} txs`, { blockHash, txHashes, manaPerSec, ...blockStats }, ); - this.eventEmitter.emit('block-proposed', { blockNumber: block.number, slot: this.slot }); + this.eventEmitter.emit('block-proposed', { + blockNumber: block.number, + slot: this.targetSlot, + buildSlot: this.slotNow, + }); this.metrics.recordBuiltBlock(blockBuildDuration, block.header.totalManaUsed.toNumberUnsafe()); return { block, usedTxs }; } catch (err: any) { - this.eventEmitter.emit('block-build-failed', { reason: err.message, slot: this.slot }); - this.log.error(`Error building block`, err, { blockNumber, slot: this.slot }); + this.eventEmitter.emit('block-build-failed', { + reason: err.message, + slot: this.targetSlot, + }); + this.log.error(`Error building block`, err, { blockNumber, slot: this.targetSlot }); this.metrics.recordBlockProposalFailed(err.name || 'unknown_error'); this.metrics.recordFailedBlock(); return { error: err }; @@ -707,10 +775,10 @@ export class CheckpointProposalJob implements Traceable { } // Wait a bit before checking again - this.setStateFn(SequencerState.WAITING_FOR_TXS, this.slot); + this.setStateFn(SequencerState.WAITING_FOR_TXS, this.targetSlot); this.log.verbose( - `Waiting for enough txs to build block ${blockNumber} at index ${indexWithinCheckpoint} in slot ${this.slot} (have ${availableTxs} but need ${minTxs})`, - { blockNumber, slot: this.slot, indexWithinCheckpoint }, + `Waiting for enough txs to build block ${blockNumber} at index ${indexWithinCheckpoint} in slot ${this.targetSlot} (have ${availableTxs} but need ${minTxs})`, + { blockNumber, slot: this.targetSlot, indexWithinCheckpoint }, ); await this.waitForTxsPollingInterval(); availableTxs = await this.p2pClient.getPendingTxCount(); @@ -910,19 +978,19 @@ export class CheckpointProposalJob implements Traceable { private async handleCheckpointEndAsFisherman(checkpoint: Checkpoint | undefined) { // Perform L1 fee analysis before clearing requests // The callback is invoked asynchronously after the next block is mined - const feeAnalysis = await this.publisher.analyzeL1Fees(this.slot, analysis => + const feeAnalysis = await this.publisher.analyzeL1Fees(this.targetSlot, analysis => this.metrics.recordFishermanFeeAnalysis(analysis), ); if (checkpoint) { - this.log.info(`Validation checkpoint building SUCCEEDED for slot ${this.slot}`, { + this.log.info(`Validation checkpoint building SUCCEEDED for slot ${this.targetSlot}`, { ...checkpoint.toCheckpointInfo(), ...checkpoint.getStats(), feeAnalysisId: feeAnalysis?.id, }); } else { - this.log.warn(`Validation block building FAILED for slot ${this.slot}`, { - slot: this.slot, + this.log.warn(`Validation block building FAILED for slot ${this.targetSlot}`, { + slot: this.targetSlot, feeAnalysisId: feeAnalysis?.id, }); this.metrics.recordCheckpointProposalFailed('block_build_failed'); @@ -936,15 +1004,15 @@ export class CheckpointProposalJob implements Traceable { */ private handleHASigningError(err: any, errorContext: string): boolean { if (err instanceof DutyAlreadySignedError) { - this.log.info(`${errorContext} for slot ${this.slot} already signed by another HA node, yielding`, { - slot: this.slot, + this.log.info(`${errorContext} for slot ${this.targetSlot} already signed by another HA node, yielding`, { + slot: this.targetSlot, signedByNode: err.signedByNode, }); return true; } if (err instanceof SlashingProtectionError) { - this.log.info(`${errorContext} for slot ${this.slot} blocked by slashing protection, yielding`, { - slot: this.slot, + this.log.info(`${errorContext} for slot ${this.targetSlot} blocked by slashing protection, yielding`, { + slot: this.targetSlot, existingMessageHash: err.existingMessageHash, attemptedMessageHash: err.attemptedMessageHash, }); diff --git a/yarn-project/sequencer-client/src/sequencer/checkpoint_voter.ha.integration.test.ts b/yarn-project/sequencer-client/src/sequencer/checkpoint_voter.ha.integration.test.ts index 473b8ecade30..40d88b89a911 100644 --- a/yarn-project/sequencer-client/src/sequencer/checkpoint_voter.ha.integration.test.ts +++ b/yarn-project/sequencer-client/src/sequencer/checkpoint_voter.ha.integration.test.ts @@ -278,6 +278,7 @@ describe('CheckpointVoter HA Integration', () => { requiredConfirmations: 1, maxL1TxInclusionWaitPulseSeconds: 60, ethereumSlotDuration: DefaultL1ContractsConfig.ethereumSlotDuration, + aztecSlotDuration: TEST_L1_CONSTANTS.slotDuration, fishermanMode: false, l1ChainId: 1, }; @@ -292,6 +293,7 @@ describe('CheckpointVoter HA Integration', () => { ts: BigInt(Math.floor(Date.now() / 1000)), nowMs: BigInt(Date.now()), }); + epochCache.getSlotNow.mockReturnValue(slot); const slashFactoryContract = mock(); diff --git a/yarn-project/sequencer-client/src/sequencer/events.ts b/yarn-project/sequencer-client/src/sequencer/events.ts index 7c1e22cf5ca5..d91b00ee190a 100644 --- a/yarn-project/sequencer-client/src/sequencer/events.ts +++ b/yarn-project/sequencer-client/src/sequencer/events.ts @@ -13,7 +13,7 @@ export type SequencerEvents = { ['proposer-rollup-check-failed']: (args: { reason: string; slot: SlotNumber }) => void; ['block-tx-count-check-failed']: (args: { minTxs: number; availableTxs: number; slot: SlotNumber }) => void; ['block-build-failed']: (args: { reason: string; slot: SlotNumber }) => void; - ['block-proposed']: (args: { blockNumber: BlockNumber; slot: SlotNumber }) => void; + ['block-proposed']: (args: { blockNumber: BlockNumber; slot: SlotNumber; buildSlot: SlotNumber }) => void; ['checkpoint-empty']: (args: { slot: SlotNumber }) => void; ['checkpoint-publish-failed']: (args: { slot: SlotNumber; diff --git a/yarn-project/sequencer-client/src/sequencer/sequencer.test.ts b/yarn-project/sequencer-client/src/sequencer/sequencer.test.ts index c30a93fb770e..d9ff3cb58919 100644 --- a/yarn-project/sequencer-client/src/sequencer/sequencer.test.ts +++ b/yarn-project/sequencer-client/src/sequencer/sequencer.test.ts @@ -171,14 +171,24 @@ describe('sequencer', () => { epoch: EpochNumber(1), slot: SlotNumber(1), ts: 1000n, - now: 1000n, + nowSeconds: 1000n, })); + epochCache.getTargetSlot.mockReturnValue(SlotNumber(1)); + epochCache.getTargetEpoch.mockReturnValue(EpochNumber(1)); + epochCache.getTargetEpochAndSlotInNextL1Slot.mockImplementation(() => ({ + epoch: EpochNumber(1), + slot: SlotNumber(1), + ts: 1000n, + nowSeconds: 1000n, + })); + epochCache.isProposerPipeliningEnabled.mockReturnValue(false); epochCache.getCommittee.mockResolvedValue({ committee, seed: 1n, epoch: EpochNumber(1), isEscapeHatchOpen: false, }); + epochCache.getProposerAttesterAddressInSlot.mockResolvedValue(undefined); publisher = mockDeep(); publisher.epochCache = epochCache; @@ -187,7 +197,7 @@ describe('sequencer', () => { publisher.enqueueProposeCheckpoint.mockResolvedValue(undefined); publisher.enqueueGovernanceCastSignal.mockResolvedValue(true); publisher.enqueueSlashingActions.mockResolvedValue(true); - publisher.canProposeAtNextEthBlock.mockResolvedValue({ + publisher.canProposeAt.mockResolvedValue({ slot: SlotNumber(newSlotNumber), checkpointNumber: CheckpointNumber.fromBlockNumber(newBlockNumber), timeOfNextL1Slot: 1000n, @@ -302,6 +312,8 @@ describe('sequencer', () => { validatorClient.createBlockProposal.mockImplementation(() => Promise.resolve(createBlockProposal())); validatorClient.createCheckpointProposal.mockImplementation(() => Promise.resolve(createCheckpointProposal())); validatorClient.signAttestationsAndSigners.mockImplementation(() => Promise.resolve(getSignatures()[0].signature)); + validatorClient.getCoinbaseForAttestor.mockReturnValue(coinbase); + validatorClient.getFeeRecipientForAttestor.mockReturnValue(feeRecipient); slasherClient = mock(); slasherClient.getProposerActions.mockResolvedValue([]); @@ -352,21 +364,21 @@ describe('sequencer', () => { expect(checkpointBuilder.buildBlockCalls).toHaveLength(0); expect(publisher.enqueueProposeCheckpoint).not.toHaveBeenCalled(); - expect(publisher.canProposeAtNextEthBlock).not.toHaveBeenCalled(); + expect(publisher.canProposeAt).not.toHaveBeenCalled(); }); it('builds a checkpoint when it is their turn', async () => { await setupSingleTxBlock(); // Not your turn! canProposeAtNextEthBlock returns undefined - publisher.canProposeAtNextEthBlock.mockResolvedValue(undefined); + publisher.canProposeAt.mockResolvedValue(undefined); await sequencer.work(); // When it's not our turn, we should not build the checkpoint expect(checkpointBuilder.buildBlockCalls).toHaveLength(0); // Now it's our turn! - publisher.canProposeAtNextEthBlock.mockResolvedValue({ + publisher.canProposeAt.mockResolvedValue({ slot: block.header.globalVariables.slotNumber, checkpointNumber: CheckpointNumber.fromBlockNumber(block.header.globalVariables.blockNumber), timeOfNextL1Slot: 1000n, @@ -474,7 +486,7 @@ describe('sequencer', () => { pub.enqueueProposeCheckpoint.mockResolvedValue(undefined); pub.enqueueGovernanceCastSignal.mockResolvedValue(true); pub.enqueueSlashingActions.mockResolvedValue(true); - pub.canProposeAtNextEthBlock.mockResolvedValue({ + pub.canProposeAt.mockResolvedValue({ slot: SlotNumber(newSlotNumber + i), checkpointNumber: CheckpointNumber.fromBlockNumber(BlockNumber(newBlockNumber)), timeOfNextL1Slot: 1000n, @@ -491,8 +503,34 @@ describe('sequencer', () => { // Configure epoch cache to return different slots epochCache.getEpochAndSlotInNextL1Slot .mockReset() - .mockReturnValueOnce({ epoch: EpochNumber(1), slot: SlotNumber(1), ts: 1000n, now: 1000n }) - .mockReturnValueOnce({ epoch: EpochNumber(1), slot: SlotNumber(2), ts: 1000n, now: 1000n }); + .mockReturnValueOnce({ + epoch: EpochNumber(1), + slot: SlotNumber(1), + ts: 1000n, + nowSeconds: 1000n, + }) + .mockReturnValueOnce({ + epoch: EpochNumber(1), + slot: SlotNumber(2), + ts: 1000n, + nowSeconds: 1000n, + }); + epochCache.getTargetSlot.mockReset().mockReturnValueOnce(SlotNumber(1)).mockReturnValueOnce(SlotNumber(2)); + epochCache.getTargetEpoch.mockReturnValue(EpochNumber(1)); + epochCache.getTargetEpochAndSlotInNextL1Slot + .mockReset() + .mockReturnValueOnce({ + epoch: EpochNumber(1), + slot: SlotNumber(1), + ts: 1000n, + nowSeconds: 1000n, + }) + .mockReturnValueOnce({ + epoch: EpochNumber(1), + slot: SlotNumber(2), + ts: 1000n, + nowSeconds: 1000n, + }); sequencer.updateConfig({ enforceTimeTable: false, maxTxsPerBlock: 4 }); @@ -887,6 +925,35 @@ describe('sequencer', () => { expect(publisher.enqueueProposeCheckpoint).toHaveBeenCalled(); }); }); + + describe('view-based proposer lookup', () => { + it('passes target slot to getProposerAttesterAddressInSlot', async () => { + const proposer = signer.address; + validatorClient.getValidatorAddresses.mockReturnValue([proposer]); + epochCache.getProposerAttesterAddressInSlot.mockResolvedValue(proposer); + + await sequencer.checkCanProposeForTest(SlotNumber(2)); + + expect(epochCache.getProposerAttesterAddressInSlot).toHaveBeenCalledWith(SlotNumber(2)); + }); + + it('when pipelining enabled, checkCanPropose receives target slot with pipeline offset', async () => { + const proposer = signer.address; + validatorClient.getValidatorAddresses.mockReturnValue([proposer]); + epochCache.getProposerAttesterAddressInSlot.mockResolvedValue(proposer); + epochCache.isProposerPipeliningEnabled.mockReturnValue(true); + epochCache.getTargetEpochAndSlotInNextL1Slot.mockReturnValue({ + epoch: EpochNumber(1), + slot: SlotNumber(2), + ts: 1000n, + nowSeconds: 1000n, + }); + + await sequencer.checkCanProposeForTest(SlotNumber(2)); + + expect(epochCache.getProposerAttesterAddressInSlot).toHaveBeenCalledWith(SlotNumber(2)); + }); + }); }); class TestSequencer extends Sequencer { @@ -902,4 +969,8 @@ class TestSequencer extends Sequencer { this.setState(SequencerState.IDLE, undefined, { force: true }); return super.work(); } + + public checkCanProposeForTest(slot: SlotNumber) { + return this.checkCanPropose(slot); + } } diff --git a/yarn-project/sequencer-client/src/sequencer/sequencer.ts b/yarn-project/sequencer-client/src/sequencer/sequencer.ts index d75788ea3cf4..1392a1be2ec7 100644 --- a/yarn-project/sequencer-client/src/sequencer/sequencer.ts +++ b/yarn-project/sequencer-client/src/sequencer/sequencer.ts @@ -192,10 +192,18 @@ export class Sequencer extends (EventEmitter as new () => TypedEventEmitter TypedEventEmitter this.lastEpochForStrategyComparison) + (this.lastEpochForStrategyComparison === undefined || targetEpoch > this.lastEpochForStrategyComparison) ) { - this.logStrategyComparison(epoch, checkpointProposalJob.getPublisher()); - this.lastEpochForStrategyComparison = epoch; + this.logStrategyComparison(targetEpoch, checkpointProposalJob.getPublisher()); + this.lastEpochForStrategyComparison = targetEpoch; } return checkpoint; @@ -227,43 +235,48 @@ export class Sequencer extends (EventEmitter as new () => TypedEventEmitter { - // Check we have not already processed this slot (cheapest check) + // Check we have not already processed this target slot (cheapest check) // We only check this if enforce timetable is set, since we want to keep processing the same slot if we are not // running against actual time (eg when we use sandbox-style automining) if ( this.lastSlotForCheckpointProposalJob && - this.lastSlotForCheckpointProposalJob >= slot && + this.lastSlotForCheckpointProposalJob >= targetSlot && this.config.enforceTimeTable ) { - this.log.trace(`Slot ${slot} has already been processed`); + this.log.trace(`Target slot ${targetSlot} has already been processed`); return undefined; } - // But if we have already proposed for this slot, the we definitely have to skip it, automining or not - if (this.lastCheckpointProposed && this.lastCheckpointProposed.header.slotNumber >= slot) { - this.log.trace(`Slot ${slot} has already been published as checkpoint ${this.lastCheckpointProposed.number}`); + // But if we have already proposed for this slot, then we definitely have to skip it, automining or not + if (this.lastCheckpointProposed && this.lastCheckpointProposed.header.slotNumber >= targetSlot) { + this.log.trace( + `Slot ${targetSlot} has already been published as checkpoint ${this.lastCheckpointProposed.number}`, + ); return undefined; } // Check all components are synced to latest as seen by the archiver (queries all subsystems) const syncedTo = await this.checkSync({ ts, slot }); if (!syncedTo) { - await this.tryVoteWhenSyncFails({ slot, ts }); + await this.tryVoteWhenSyncFails({ slot, targetSlot, ts }); return undefined; } - // If escape hatch is open for this epoch, do not start checkpoint proposal work and do not attempt invalidations. + // If escape hatch is open for the target epoch, do not start checkpoint proposal work and do not attempt invalidations. // Still perform governance/slashing voting (as proposer) once per slot. - const isEscapeHatchOpen = await this.epochCache.isEscapeHatchOpen(epoch); + // When pipelining, we check the target epoch (slot+1's epoch) since that's the epoch we're building for. + const isEscapeHatchOpen = await this.epochCache.isEscapeHatchOpen(targetEpoch); if (isEscapeHatchOpen) { this.setState(SequencerState.PROPOSER_CHECK, slot); - const [canPropose, proposer] = await this.checkCanPropose(slot); + const [canPropose, proposer] = await this.checkCanPropose(targetSlot); if (canPropose) { await this.tryVoteWhenEscapeHatchOpen({ slot, proposer }); } else { @@ -280,17 +293,18 @@ export class Sequencer extends (EventEmitter as new () => TypedEventEmitter TypedEventEmitter= slot) { + // Check that the target slot is not taken by a block already (should never happen, since only us can propose for this slot) + if (syncedTo.blockData && syncedTo.blockData.header.getSlot() >= targetSlot) { this.log.warn( - `Cannot propose block at next L2 slot ${slot} since that slot was taken by block ${syncedTo.blockNumber}`, + `Cannot propose block at target slot ${targetSlot} since that slot was taken by block ${syncedTo.blockNumber}`, { ...logCtx, block: syncedTo.blockData.header.toInspect() }, ); this.metrics.recordCheckpointPrecheckFailed('slot_already_taken'); @@ -325,13 +339,11 @@ export class Sequencer extends (EventEmitter as new () => TypedEventEmitter TypedEventEmitter TypedEventEmitter TypedEventEmitter TypedEventEmitter TypedEventEmitter { + protected async checkCanPropose(targetSlot: SlotNumber): Promise<[boolean, EthAddress | undefined]> { let proposer: EthAddress | undefined; try { - proposer = await this.epochCache.getProposerAttesterAddressInSlot(slot); + proposer = await this.epochCache.getProposerAttesterAddressInSlot(targetSlot); } catch (e) { if (e instanceof NoCommitteeError) { - if (this.lastSlotForNoCommitteeWarning !== slot) { - this.lastSlotForNoCommitteeWarning = slot; - this.log.warn(`Cannot propose at next L2 slot ${slot} since the committee does not exist on L1`); + if (this.lastSlotForNoCommitteeWarning !== targetSlot) { + this.lastSlotForNoCommitteeWarning = targetSlot; + this.log.warn(`Cannot propose at target slot ${targetSlot} since the committee does not exist on L1`); } return [false, undefined]; } - this.log.error(`Error getting proposer for slot ${slot}`, e); + this.log.error(`Error getting proposer for target slot ${targetSlot}`, e); return [false, undefined]; } @@ -578,10 +604,15 @@ export class Sequencer extends (EventEmitter as new () => TypedEventEmitter addr.equals(proposer)); if (!weAreProposer) { - this.log.debug(`Cannot propose at slot ${slot} since we are not a proposer`, { validatorAddresses, proposer }); + this.log.debug(`Cannot propose at target slot ${targetSlot} since we are not a proposer`, { + targetSlot, + validatorAddresses, + proposer, + }); return [false, proposer]; } + this.log.debug(`We are the proposer for target slot ${targetSlot}`, { targetSlot, proposer }); return [true, proposer]; } @@ -590,8 +621,8 @@ export class Sequencer extends (EventEmitter as new () => TypedEventEmitter ({ [Attributes.SLOT_NUMBER]: slot })) - protected async tryVoteWhenSyncFails(args: { slot: SlotNumber; ts: bigint }): Promise { - const { slot } = args; + protected async tryVoteWhenSyncFails(args: { slot: SlotNumber; targetSlot: SlotNumber; ts: bigint }): Promise { + const { slot, targetSlot } = args; // Prevent duplicate attempts in the same slot if (this.lastSlotForFallbackVote === slot) { @@ -619,7 +650,7 @@ export class Sequencer extends (EventEmitter as new () => TypedEventEmitter TypedEventEmitter { + getCommittee(_slot: SlotTag = 'now'): Promise { return Promise.resolve({ committee: undefined, seed: 0n, @@ -17,6 +17,22 @@ export class MockEpochCache implements EpochCacheInterface { }); } + getSlotNow(): SlotNumber { + return SlotNumber(0); + } + + getTargetSlot(): SlotNumber { + return SlotNumber(0); + } + + getEpochNow(): EpochNumber { + return EpochNumber.ZERO; + } + + getTargetEpoch(): EpochNumber { + return EpochNumber.ZERO; + } + getEpochAndSlotNow(): EpochAndSlot & { nowMs: bigint } { return { epoch: EpochNumber.ZERO, @@ -26,15 +42,23 @@ export class MockEpochCache implements EpochCacheInterface { }; } - getEpochAndSlotInNextL1Slot(): EpochAndSlot & { now: bigint } { + getEpochAndSlotInNextL1Slot(): EpochAndSlot & { nowSeconds: bigint } { return { epoch: EpochNumber.ZERO, slot: SlotNumber(0), ts: 0n, - now: 0n, + nowSeconds: 0n, }; } + getTargetEpochAndSlotInNextL1Slot(): EpochAndSlot & { nowSeconds: bigint } { + return this.getEpochAndSlotInNextL1Slot(); + } + + isProposerPipeliningEnabled(): boolean { + return false; + } + getProposerIndexEncoding(_epoch: EpochNumber, _slot: SlotNumber, _seed: bigint): `0x${string}` { return '0x00'; } @@ -50,6 +74,13 @@ export class MockEpochCache implements EpochCacheInterface { }; } + getTargetAndNextSlot(): { targetSlot: SlotNumber; nextSlot: SlotNumber } { + return { + targetSlot: SlotNumber(0), + nextSlot: SlotNumber(0), + }; + } + getProposerAttesterAddressInSlot(_slot: SlotNumber): Promise { return Promise.resolve(undefined); } @@ -66,6 +97,14 @@ export class MockEpochCache implements EpochCacheInterface { return Promise.resolve([]); } + isEscapeHatchOpen(_epoch: EpochNumber): Promise { + return Promise.resolve(false); + } + + isEscapeHatchOpenAtSlot(_slot: SlotTag): Promise { + return Promise.resolve(false); + } + getL1Constants(): L1RollupConstants { return EmptyL1RollupConstants; } diff --git a/yarn-project/validator-client/src/block_proposal_handler.ts b/yarn-project/validator-client/src/block_proposal_handler.ts index 642ec9410144..844c8adec1e4 100644 --- a/yarn-project/validator-client/src/block_proposal_handler.ts +++ b/yarn-project/validator-client/src/block_proposal_handler.ts @@ -166,11 +166,15 @@ export class BlockProposalHandler { // since a pending checkpoint prune may remove blocks we'd otherwise find. // This affects mostly the block_number_already_exists check, since a pending // checkpoint prune could remove a block that would conflict with this proposal. - // TODO(@Maddiaa0): This may break staggered slots. - const blockSourceSync = await this.waitForBlockSourceSync(slotNumber); - if (!blockSourceSync) { - this.log.warn(`Block source is not synced, skipping processing`, proposalInfo); - return { isValid: false, reason: 'block_source_not_synced' }; + // When pipelining is enabled, the proposer builds ahead of L1 submission, so the + // block source won't have synced to the proposed slot yet. Skip the sync wait to + // avoid eating into the attestation window. + if (!this.epochCache.isProposerPipeliningEnabled()) { + const blockSourceSync = await this.waitForBlockSourceSync(slotNumber); + if (!blockSourceSync) { + this.log.warn(`Block source is not synced, skipping processing`, proposalInfo); + return { isValid: false, reason: 'block_source_not_synced' }; + } } // Check that the parent proposal is a block we know, otherwise reexecution would fail. diff --git a/yarn-project/validator-client/src/validator.ha.integration.test.ts b/yarn-project/validator-client/src/validator.ha.integration.test.ts index 1bf1fbd5229b..07e997c25709 100644 --- a/yarn-project/validator-client/src/validator.ha.integration.test.ts +++ b/yarn-project/validator-client/src/validator.ha.integration.test.ts @@ -5,7 +5,7 @@ * rather than mocks to verify the HA coordination works correctly. */ import type { BlobClientInterface } from '@aztec/blob-client/client'; -import type { EpochCache } from '@aztec/epoch-cache'; +import { EpochCache } from '@aztec/epoch-cache'; import { IndexWithinCheckpoint } from '@aztec/foundation/branded-types'; import { SecretValue } from '@aztec/foundation/config'; import { Fr } from '@aztec/foundation/curves/bn254'; diff --git a/yarn-project/validator-client/src/validator.test.ts b/yarn-project/validator-client/src/validator.test.ts index 804869df8474..b76ac2bce67c 100644 --- a/yarn-project/validator-client/src/validator.test.ts +++ b/yarn-project/validator-client/src/validator.test.ts @@ -114,10 +114,12 @@ describe('ValidatorClient', () => { }); worldState = mock(); epochCache = mock(); + epochCache.filterInCommittee.mockImplementation((_slot, addresses) => Promise.resolve(addresses)); epochCache.getL1Constants.mockReturnValue({ epochDuration: 8 } satisfies Parameters< typeof getEpochAtSlot >[1] as any); + blockSource = mock(); blockSource.getCheckpointedBlocksForEpoch.mockResolvedValue([]); blockSource.getCheckpointsDataForEpoch.mockResolvedValue([]); @@ -341,8 +343,8 @@ describe('ValidatorClient', () => { ); epochCache.isInCommittee.mockResolvedValue(true); - epochCache.getCurrentAndNextSlot.mockReturnValue({ - currentSlot: proposal.slotNumber, + epochCache.getTargetAndNextSlot.mockReturnValue({ + targetSlot: proposal.slotNumber, nextSlot: SlotNumber(proposal.slotNumber + 1), }); epochCache.getProposerAttesterAddressInSlot.mockResolvedValue(proposal.getSender()); @@ -671,8 +673,8 @@ describe('ValidatorClient', () => { it('should return false if the proposer is not the current proposer', async () => { epochCache.getProposerAttesterAddressInSlot.mockImplementation(_ => Promise.resolve(EthAddress.random())); - epochCache.getCurrentAndNextSlot.mockReturnValue({ - currentSlot: proposal.slotNumber, + epochCache.getTargetAndNextSlot.mockReturnValue({ + targetSlot: proposal.slotNumber, nextSlot: SlotNumber(proposal.slotNumber + 1), }); @@ -691,8 +693,8 @@ describe('ValidatorClient', () => { it('should return false if the proposal is not for the current or next slot', async () => { epochCache.getProposerAttesterAddressInSlot.mockResolvedValue(proposal.getSender()); - epochCache.getCurrentAndNextSlot.mockReturnValue({ - currentSlot: SlotNumber(proposal.slotNumber + 20), + epochCache.getTargetAndNextSlot.mockReturnValue({ + targetSlot: SlotNumber(proposal.slotNumber + 20), nextSlot: SlotNumber(proposal.slotNumber + 21), }); @@ -753,8 +755,8 @@ describe('ValidatorClient', () => { // Update epochCache mock for the new proposal epochCache.getProposerAttesterAddressInSlot.mockResolvedValue(nonFirstBlockProposal.getSender()); - epochCache.getCurrentAndNextSlot.mockReturnValue({ - currentSlot: nonFirstBlockProposal.slotNumber, + epochCache.getTargetAndNextSlot.mockReturnValue({ + targetSlot: nonFirstBlockProposal.slotNumber, nextSlot: SlotNumber(nonFirstBlockProposal.slotNumber + 1), }); diff --git a/yarn-project/validator-client/src/validator.ts b/yarn-project/validator-client/src/validator.ts index 2691c7a749e7..e699d0bdf0bc 100644 --- a/yarn-project/validator-client/src/validator.ts +++ b/yarn-project/validator-client/src/validator.ts @@ -484,26 +484,26 @@ export class ValidatorClient extends (EventEmitter as new () => WatcherEmitter) proposal: CheckpointProposalCore, _proposalSender: PeerId, ): Promise { - const slotNumber = proposal.slotNumber; + const proposalSlotNumber = proposal.slotNumber; const proposer = proposal.getSender(); // If escape hatch is open for this slot's epoch, do not attest. - if (await this.epochCache.isEscapeHatchOpenAtSlot(slotNumber)) { - this.log.warn(`Escape hatch open for slot ${slotNumber}, skipping checkpoint attestation handling`); + if (await this.epochCache.isEscapeHatchOpenAtSlot(proposalSlotNumber)) { + this.log.warn(`Escape hatch open for slot ${proposalSlotNumber}, skipping checkpoint attestation handling`); return undefined; } // Reject proposals with invalid signatures if (!proposer) { - this.log.warn(`Received checkpoint proposal with invalid signature for slot ${slotNumber}`); + this.log.warn(`Received checkpoint proposal with invalid signature for proposal slot ${proposalSlotNumber}`); return undefined; } // Ignore proposals from ourselves (may happen in HA setups) if (this.getValidatorAddresses().some(addr => addr.equals(proposer))) { - this.log.debug(`Ignoring block proposal from self for slot ${slotNumber}`, { + this.log.debug(`Ignoring block proposal from self for slot ${proposalSlotNumber}`, { proposer: proposer.toString(), - slotNumber, + proposalSlotNumber, }); return undefined; } @@ -511,28 +511,28 @@ export class ValidatorClient extends (EventEmitter as new () => WatcherEmitter) // Validate fee asset price modifier is within allowed range if (!validateFeeAssetPriceModifier(proposal.feeAssetPriceModifier)) { this.log.warn( - `Received checkpoint proposal with invalid feeAssetPriceModifier ${proposal.feeAssetPriceModifier} for slot ${slotNumber}`, + `Received checkpoint proposal with invalid feeAssetPriceModifier ${proposal.feeAssetPriceModifier} for slot ${proposalSlotNumber}`, ); return undefined; } - // Check that I have any address in current committee before attesting - const inCommittee = await this.epochCache.filterInCommittee(slotNumber, this.getValidatorAddresses()); + // Check that I have any address in the committee where this checkpoint will land before attesting + const inCommittee = await this.epochCache.filterInCommittee(proposalSlotNumber, this.getValidatorAddresses()); const partOfCommittee = inCommittee.length > 0; const proposalInfo = { - slotNumber, + proposalSlotNumber, archive: proposal.archive.toString(), proposer: proposer.toString(), }; - this.log.info(`Received checkpoint proposal for slot ${slotNumber}`, { + this.log.info(`Received checkpoint proposal for slot ${proposalSlotNumber}`, { ...proposalInfo, fishermanMode: this.config.fishermanMode || false, }); // Validate the checkpoint proposal before attesting (unless skipCheckpointProposalValidation is set) if (this.config.skipCheckpointProposalValidation) { - this.log.warn(`Skipping checkpoint proposal validation for slot ${slotNumber}`, proposalInfo); + this.log.warn(`Skipping checkpoint proposal validation for slot ${proposalSlotNumber}`, proposalInfo); } else { const validationResult = await this.validateCheckpointProposal(proposal, proposalInfo); if (!validationResult.isValid) { @@ -554,16 +554,19 @@ export class ValidatorClient extends (EventEmitter as new () => WatcherEmitter) } // Provided all of the above checks pass, we can attest to the proposal - this.log.info(`${partOfCommittee ? 'Attesting to' : 'Validated'} checkpoint proposal for slot ${slotNumber}`, { - ...proposalInfo, - inCommittee: partOfCommittee, - fishermanMode: this.config.fishermanMode || false, - }); + this.log.info( + `${partOfCommittee ? 'Attesting to' : 'Validated'} checkpoint proposal for slot ${proposalSlotNumber}`, + { + ...proposalInfo, + inCommittee: partOfCommittee, + fishermanMode: this.config.fishermanMode || false, + }, + ); this.metrics.incSuccessfulAttestations(inCommittee.length); // Track epoch participation per attester: count each (attester, epoch) pair at most once - const proposalEpoch = getEpochAtSlot(slotNumber, this.epochCache.getL1Constants()); + const proposalEpoch = getEpochAtSlot(proposalSlotNumber, this.epochCache.getL1Constants()); for (const attester of inCommittee) { const key = attester.toString(); const lastEpoch = this.lastAttestedEpochByAttester.get(key); @@ -591,7 +594,7 @@ export class ValidatorClient extends (EventEmitter as new () => WatcherEmitter) if (this.config.fishermanMode) { // bail out early and don't save attestations to the pool in fisherman mode - this.log.info(`Creating checkpoint attestations for slot ${slotNumber}`, { + this.log.info(`Creating checkpoint attestations for slot ${proposalSlotNumber}`, { ...proposalInfo, attestors: attestors.map(a => a.toString()), });