diff --git a/yarn-project/archiver/src/archiver/archiver.ts b/yarn-project/archiver/src/archiver/archiver.ts index 015d8d131646..c2c796bdc537 100644 --- a/yarn-project/archiver/src/archiver/archiver.ts +++ b/yarn-project/archiver/src/archiver/archiver.ts @@ -18,7 +18,15 @@ import { } from '@aztec/protocol-contracts/instance-deployer'; import type { FunctionSelector } from '@aztec/stdlib/abi'; import type { AztecAddress } from '@aztec/stdlib/aztec-address'; -import type { InBlock, L2Block, L2BlockId, L2BlockSource, L2Tips, NullifierWithBlockSource } from '@aztec/stdlib/block'; +import { + type InBlock, + type L2Block, + type L2BlockId, + type L2BlockSource, + L2BlockSourceEvents, + type L2Tips, + type NullifierWithBlockSource, +} from '@aztec/stdlib/block'; import { type ContractClassPublic, type ContractDataSource, @@ -32,6 +40,7 @@ import { } from '@aztec/stdlib/contract'; import { type L1RollupConstants, + getEpochAtSlot, getEpochNumberAtTimestamp, getSlotAtTimestamp, getSlotRangeForEpoch, @@ -44,6 +53,7 @@ import type { InboxLeaf, L1ToL2MessageSource } from '@aztec/stdlib/messaging'; import { type BlockHeader, TxEffect, TxHash, TxReceipt } from '@aztec/stdlib/tx'; import { Attributes, type TelemetryClient, type Traceable, type Tracer, trackSpan } from '@aztec/telemetry-client'; +import { EventEmitter } from 'events'; import groupBy from 'lodash.groupby'; import { type GetContractReturnType, createPublicClient, fallback, getContract, http } from 'viem'; @@ -69,7 +79,7 @@ export type ArchiveSource = L2BlockSource & * Responsible for handling robust L1 polling so that other components do not need to * concern themselves with it. */ -export class Archiver implements ArchiveSource, Traceable { +export class Archiver extends EventEmitter implements ArchiveSource, Traceable { /** * A promise in which we will be continually fetching new L2 blocks. */ @@ -105,6 +115,8 @@ export class Archiver implements ArchiveSource, Traceable { private readonly l1constants: L1RollupConstants, private readonly log: Logger = createLogger('archiver'), ) { + super(); + this.tracer = instrumentation.tracer; this.store = new ArchiverStoreHelper(dataStore); @@ -299,6 +311,17 @@ export class Archiver implements ArchiveSource, Traceable { const canPrune = localPendingBlockNumber > provenBlockNumber && (await this.canPrune(currentL1BlockNumber)); if (canPrune) { + const localPendingSlotNumber = await this.getL2SlotNumber(); + const localPendingEpochNumber = getEpochAtSlot(localPendingSlotNumber, this.l1constants); + + // Emit an event for listening services to react to the chain prune + this.emit(L2BlockSourceEvents.L2PruneDetected, { + type: L2BlockSourceEvents.L2PruneDetected, + blockNumber: localPendingBlockNumber, + slotNumber: localPendingSlotNumber, + epochNumber: localPendingEpochNumber, + }); + const blocksToUnwind = localPendingBlockNumber - provenBlockNumber; this.log.debug( `L2 prune from ${provenBlockNumber + 1n} to ${localPendingBlockNumber} will occur on next block submission.`, @@ -846,9 +869,18 @@ export class Archiver implements ArchiveSource, Traceable { const provenBlockHeaderHash = await provenBlockHeader?.hash(); const finalizedBlockHeaderHash = await provenBlockHeader?.hash(); return { - latest: { number: latestBlockNumber, hash: latestBlockHeaderHash?.toString() } as L2BlockId, - proven: { number: provenBlockNumber, hash: provenBlockHeaderHash?.toString() } as L2BlockId, - finalized: { number: provenBlockNumber, hash: finalizedBlockHeaderHash?.toString() } as L2BlockId, + latest: { + number: latestBlockNumber, + hash: latestBlockHeaderHash?.toString(), + } as L2BlockId, + proven: { + number: provenBlockNumber, + hash: provenBlockHeaderHash?.toString(), + } as L2BlockId, + finalized: { + number: provenBlockNumber, + hash: finalizedBlockHeaderHash?.toString(), + } as L2BlockId, }; } } diff --git a/yarn-project/archiver/src/factory.ts b/yarn-project/archiver/src/factory.ts index 299654fc31f4..f292e0ba15f4 100644 --- a/yarn-project/archiver/src/factory.ts +++ b/yarn-project/archiver/src/factory.ts @@ -1,6 +1,5 @@ import type { BlobSinkClientInterface } from '@aztec/blob-sink/client'; import { createLogger } from '@aztec/foundation/log'; -import type { Maybe } from '@aztec/foundation/types'; import type { DataStoreConfig } from '@aztec/kv-store/config'; import { createStore } from '@aztec/kv-store/lmdb-v2'; import { TokenContractArtifact } from '@aztec/noir-contracts.js/Token'; @@ -9,6 +8,7 @@ import { getVKTreeRoot } from '@aztec/noir-protocol-circuits-types/vk-tree'; import { protocolContractNames, protocolContractTreeRoot } from '@aztec/protocol-contracts'; import { BundledProtocolContractsProvider } from '@aztec/protocol-contracts/providers/bundle'; import { FunctionType, decodeFunctionSignature } from '@aztec/stdlib/abi'; +import type { L2BlockSourceEventEmitter } from '@aztec/stdlib/block'; import { type ContractClassPublic, computePublicBytecodeCommitment, @@ -23,24 +23,41 @@ import type { ArchiverConfig } from './archiver/config.js'; import { KVArchiverDataStore } from './archiver/index.js'; import { createArchiverClient } from './rpc/index.js'; +/** + * Creates a local archiver. + * @param config - The archiver configuration. + * @param blobSinkClient - The blob sink client. + * @param opts - The options. + * @param telemetry - The telemetry client. + * @returns The local archiver. + */ export async function createArchiver( config: ArchiverConfig & DataStoreConfig, blobSinkClient: BlobSinkClientInterface, opts: { blockUntilSync: boolean } = { blockUntilSync: true }, telemetry: TelemetryClient = getTelemetryClient(), -): Promise> { +): Promise { + const store = await createStore('archiver', config, createLogger('archiver:lmdb')); + const archiverStore = new KVArchiverDataStore(store, config.maxLogs); + await registerProtocolContracts(archiverStore); + await registerCommonContracts(archiverStore); + return Archiver.createAndSync(config, archiverStore, { telemetry, blobSinkClient }, opts.blockUntilSync); +} + +/** + * Creates a remote archiver client. + * @param config - The archiver configuration. + * @returns The remote archiver client. + */ +export function createRemoteArchiver(config: ArchiverConfig): ArchiverApi { if (!config.archiverUrl) { - const store = await createStore('archiver', config, createLogger('archiver:lmdb')); - const archiverStore = new KVArchiverDataStore(store, config.maxLogs); - await registerProtocolContracts(archiverStore); - await registerCommonContracts(archiverStore); - return Archiver.createAndSync(config, archiverStore, { telemetry, blobSinkClient }, opts.blockUntilSync); - } else { - return createArchiverClient( - config.archiverUrl, - getComponentsVersionsFromConfig(config, protocolContractTreeRoot, getVKTreeRoot()), - ); + throw new Error('Archiver URL is required'); } + + return createArchiverClient( + config.archiverUrl, + getComponentsVersionsFromConfig(config, protocolContractTreeRoot, getVKTreeRoot()), + ); } async function registerProtocolContracts(store: KVArchiverDataStore) { diff --git a/yarn-project/archiver/src/test/mock_l2_block_source.ts b/yarn-project/archiver/src/test/mock_l2_block_source.ts index 3995a7b77596..8695a46b0432 100644 --- a/yarn-project/archiver/src/test/mock_l2_block_source.ts +++ b/yarn-project/archiver/src/test/mock_l2_block_source.ts @@ -164,9 +164,18 @@ export class MockL2BlockSource implements L2BlockSource { const finalizedBlock = this.l2Blocks[finalized - 1]; return { - latest: { number: latest, hash: (await latestBlock?.hash())?.toString() }, - proven: { number: proven, hash: (await provenBlock?.hash())?.toString() }, - finalized: { number: finalized, hash: (await finalizedBlock?.hash())?.toString() }, + latest: { + number: latest, + hash: (await latestBlock?.hash())?.toString(), + }, + proven: { + number: proven, + hash: (await provenBlock?.hash())?.toString(), + }, + finalized: { + number: finalized, + hash: (await finalizedBlock?.hash())?.toString(), + }, }; } diff --git a/yarn-project/aztec-node/src/aztec-node/server.ts b/yarn-project/aztec-node/src/aztec-node/server.ts index f2a9e02b9a53..e73fa04e14b7 100644 --- a/yarn-project/aztec-node/src/aztec-node/server.ts +++ b/yarn-project/aztec-node/src/aztec-node/server.ts @@ -182,7 +182,7 @@ export class AztecNodeService implements AztecNode, Traceable { telemetry, ); - const slasherClient = await createSlasherClient(config, archiver, telemetry); + const slasherClient = createSlasherClient(config, archiver, telemetry); // start both and wait for them to sync from the block source await Promise.all([p2pClient.start(), worldStateSynchronizer.start(), slasherClient.start()]); diff --git a/yarn-project/end-to-end/scripts/run_test.sh b/yarn-project/end-to-end/scripts/run_test.sh index 8218bb53615c..fdfaa84a8840 100755 --- a/yarn-project/end-to-end/scripts/run_test.sh +++ b/yarn-project/end-to-end/scripts/run_test.sh @@ -32,6 +32,7 @@ case "$type" in --mount type=tmpfs,target=/tmp-jest,tmpfs-size=512m \ -e JEST_CACHE_DIR=/tmp-jest \ -e FAKE_PROOFS \ + -e LOG_LEVEL \ --workdir /root/aztec-packages/yarn-project/end-to-end \ aztecprotocol/build:3.0 ./scripts/test_simple.sh $TEST ;; diff --git a/yarn-project/end-to-end/src/e2e_p2p/reqresp.test.ts b/yarn-project/end-to-end/src/e2e_p2p/reqresp.test.ts index b1d6737bd090..986a8a5033be 100644 --- a/yarn-project/end-to-end/src/e2e_p2p/reqresp.test.ts +++ b/yarn-project/end-to-end/src/e2e_p2p/reqresp.test.ts @@ -116,7 +116,7 @@ describe('e2e_p2p_reqresp_tx', () => { contexts.flatMap((context, i) => context.txs.map(async (tx, j) => { t.logger.info(`Waiting for tx ${i}-${j}: ${await tx.getTxHash()} to be mined`); - await tx.wait({ timeout: WAIT_FOR_TX_TIMEOUT }); + await tx.wait({ timeout: WAIT_FOR_TX_TIMEOUT * 1.5 }); // more transactions in this test so allow more time t.logger.info(`Tx ${i}-${j}: ${await tx.getTxHash()} has been mined`); return await tx.getTxHash(); }), diff --git a/yarn-project/end-to-end/src/e2e_p2p/slashing.test.ts b/yarn-project/end-to-end/src/e2e_p2p/slashing.test.ts index 16bc1b94cb4b..1ce8ed0f0aea 100644 --- a/yarn-project/end-to-end/src/e2e_p2p/slashing.test.ts +++ b/yarn-project/end-to-end/src/e2e_p2p/slashing.test.ts @@ -11,7 +11,6 @@ import { getAddress, getContract, parseEventLogs } from 'viem'; import { shouldCollectMetrics } from '../fixtures/fixtures.js'; import { createNodes } from '../fixtures/setup_p2p_test.js'; import { P2PNetworkTest } from './p2p_network.js'; -import { createPXEServiceAndSubmitTransactions } from './shared.js'; jest.setTimeout(1000000); @@ -38,6 +37,8 @@ describe('e2e_p2p_slashing', () => { metricsPort: shouldCollectMetrics(), initialConfig: { aztecEpochDuration: 1, + ethereumSlotDuration: 4, + aztecSlotDuration: 12, aztecProofSubmissionWindow: 1, slashingQuorum, slashingRoundSize, @@ -110,6 +111,7 @@ describe('e2e_p2p_slashing', () => { }; t.ctx.aztecNodeConfig.validatorReexecute = false; + t.ctx.aztecNodeConfig.minTxsPerBlock = 0; // create our network of nodes and submit txs into each of them // the number of txs per node and the number of txs per rollup @@ -159,29 +161,31 @@ describe('e2e_p2p_slashing', () => { } const sequencer = (seqClient as any).sequencer; const slasher = (sequencer as any).slasherClient; + let slashEvents: any[] = []; t.logger.info(`Producing blocks until we hit a pruning event`); // Run for up to the slashing round size, or as long as needed to get a slash event // Variable because sometimes hit race-condition issues with attestations. for (let i = 0; i < slashingRoundSize; i++) { - t.logger.info('Submitting transactions'); const bn = await nodes[0].getBlockNumber(); - await createPXEServiceAndSubmitTransactions(t.logger, nodes[0], 1, t.fundedAccount); t.logger.info(`Waiting for block number to change`); while (bn === (await nodes[0].getBlockNumber())) { await sleep(1000); } - if (slasher.slashEvents.length > 0) { + // Create a clone of slasher.slashEvents to prevent race conditions + // The validator client can remove elements from the original array + slashEvents = [...slasher.slashEvents]; + t.logger.info(`Slash events: ${slashEvents.length}`); + if (slashEvents.length > 0) { t.logger.info(`We have a slash event ${i}`); break; } } - expect(slasher.slashEvents.length).toBeGreaterThan(0); - + expect(slashEvents.length).toBeGreaterThan(0); // We should push us to land exactly at the next round await jumpToNextRound(); @@ -189,12 +193,12 @@ describe('e2e_p2p_slashing', () => { // Stop early if we have enough votes. t.logger.info(`Waiting for votes to be cast`); for (let i = 0; i < slashingRoundSize; i++) { - t.logger.info('Waiting for slot number to change and votes to be cast'); - const slotNumber = await rollup.read.getCurrentSlot(); t.logger.info(`Waiting for block number to change`); + const slotNumber = await rollup.read.getCurrentSlot(); while (slotNumber === (await rollup.read.getCurrentSlot())) { await sleep(1000); } + sInfo = await slashingInfo(); t.logger.info(`We have ${sInfo.leaderVotes} votes in round ${sInfo.roundNumber} on ${sInfo.info[1]}`); if (sInfo.leaderVotes > votesNeeded) { @@ -204,7 +208,7 @@ describe('e2e_p2p_slashing', () => { } t.logger.info('Deploy the actual payload for slashing!'); - const slashEvent = slasher.slashEvents[0]; + const slashEvent = slashEvents[0]; await t.ctx.deployL1ContractsValues.publicClient.waitForTransactionReceipt({ hash: await slashFactory.write.createSlashPayload([slashEvent.epoch, slashEvent.amount], { account: t.ctx.deployL1ContractsValues.walletClient.account, diff --git a/yarn-project/kv-store/src/stores/l2_tips_store.ts b/yarn-project/kv-store/src/stores/l2_tips_store.ts index 61faeb73e998..55718faad2f3 100644 --- a/yarn-project/kv-store/src/stores/l2_tips_store.ts +++ b/yarn-project/kv-store/src/stores/l2_tips_store.ts @@ -41,6 +41,7 @@ export class L2TipsStore implements L2BlockStreamEventHandler, L2BlockStreamLoca if (!blockHash) { throw new Error(`Block hash not found for block number ${blockNumber}`); } + return { number: blockNumber, hash: blockHash }; } diff --git a/yarn-project/p2p/src/services/reqresp/connection-sampler/connection_sampler.ts b/yarn-project/p2p/src/services/reqresp/connection-sampler/connection_sampler.ts index bcee4e979d31..a241ce8f97e7 100644 --- a/yarn-project/p2p/src/services/reqresp/connection-sampler/connection_sampler.ts +++ b/yarn-project/p2p/src/services/reqresp/connection-sampler/connection_sampler.ts @@ -170,7 +170,13 @@ export class ConnectionSampler { */ async close(streamId: string): Promise { try { - const { stream, peerId } = this.streams.get(streamId)!; + const streamAndPeerId = this.streams.get(streamId); + if (!streamAndPeerId) { + this.logger.warn(`Stream ${streamId} not found`); + return; + } + + const { stream, peerId } = streamAndPeerId; const updatedActiveConnectionsCount = (this.activeConnectionsCount.get(peerId) ?? 1) - 1; this.activeConnectionsCount.set(peerId, updatedActiveConnectionsCount); @@ -184,7 +190,7 @@ export class ConnectionSampler { await stream?.close(); } catch (error) { - this.logger.warn(`Failed to close connection to peer with stream id ${streamId}`); + this.logger.error(`Failed to close connection to peer with stream id ${streamId}`, error); } finally { this.streams.delete(streamId); } diff --git a/yarn-project/sequencer-client/src/sequencer/sequencer.ts b/yarn-project/sequencer-client/src/sequencer/sequencer.ts index f4eee659d23b..3a3f5c74da30 100644 --- a/yarn-project/sequencer-client/src/sequencer/sequencer.ts +++ b/yarn-project/sequencer-client/src/sequencer/sequencer.ts @@ -193,7 +193,7 @@ export class Sequencer { this.log.debug(`Stopping sequencer`); await this.validatorClient?.stop(); await this.runningPromise?.stop(); - await this.slasherClient?.stop(); + this.slasherClient.stop(); this.publisher.interrupt(); this.setState(SequencerState.STOPPED, 0n, true /** force */); this.log.info('Stopped sequencer'); diff --git a/yarn-project/sequencer-client/src/slasher/factory.ts b/yarn-project/sequencer-client/src/slasher/factory.ts index c972e6fdee31..d93737770a71 100644 --- a/yarn-project/sequencer-client/src/slasher/factory.ts +++ b/yarn-project/sequencer-client/src/slasher/factory.ts @@ -1,21 +1,15 @@ import type { L1ContractsConfig, L1ReaderConfig } from '@aztec/ethereum'; -import { createLogger } from '@aztec/foundation/log'; -import type { AztecAsyncKVStore } from '@aztec/kv-store'; -import type { DataStoreConfig } from '@aztec/kv-store/config'; -import { createStore } from '@aztec/kv-store/lmdb-v2'; -import type { L2BlockSource } from '@aztec/stdlib/block'; +import type { L2BlockSourceEventEmitter } from '@aztec/stdlib/block'; import { type TelemetryClient, getTelemetryClient } from '@aztec/telemetry-client'; import { SlasherClient } from './slasher_client.js'; import type { SlasherConfig } from './slasher_client.js'; -export const createSlasherClient = async ( - _config: SlasherConfig & DataStoreConfig & L1ContractsConfig & L1ReaderConfig, - l2BlockSource: L2BlockSource, +export const createSlasherClient = ( + _config: SlasherConfig & L1ContractsConfig & L1ReaderConfig, + l2BlockSource: L2BlockSourceEventEmitter, telemetry: TelemetryClient = getTelemetryClient(), - deps: { store?: AztecAsyncKVStore } = {}, ) => { const config = { ..._config }; - const store = deps.store ?? (await createStore('slasher', config, createLogger('slasher:lmdb'))); - return new SlasherClient(config, store, l2BlockSource, telemetry); + return new SlasherClient(config, l2BlockSource, telemetry); }; diff --git a/yarn-project/sequencer-client/src/slasher/slasher_client.test.ts b/yarn-project/sequencer-client/src/slasher/slasher_client.test.ts deleted file mode 100644 index fbbbdcf37418..000000000000 --- a/yarn-project/sequencer-client/src/slasher/slasher_client.test.ts +++ /dev/null @@ -1,127 +0,0 @@ -import { MockL2BlockSource } from '@aztec/archiver/test'; -import { - type L1ContractAddresses, - type L1ContractsConfig, - type L1ReaderConfig, - getL1ContractsConfigEnvVars, -} from '@aztec/ethereum'; -import { EthAddress } from '@aztec/foundation/eth-address'; -import { retryUntil } from '@aztec/foundation/retry'; -import { sleep } from '@aztec/foundation/sleep'; -import type { AztecAsyncKVStore } from '@aztec/kv-store'; -import { openStoreAt, openTmpStore } from '@aztec/kv-store/lmdb-v2'; -import { L2Block } from '@aztec/stdlib/block'; - -import { expect } from '@jest/globals'; -import { rm } from 'fs/promises'; - -import { SlasherClient, type SlasherConfig } from './slasher_client.js'; - -// Most of this test are directly copied from the P2P client test. -describe('In-Memory Slasher Client', () => { - let blockSource: MockL2BlockSource; - let kvStore: AztecAsyncKVStore; - let client: SlasherClient; - let config: SlasherConfig & L1ContractsConfig & L1ReaderConfig; - let tmpDir: string; - - beforeEach(async () => { - blockSource = new MockL2BlockSource(); - await blockSource.createBlocks(100); - - const l1Config = getL1ContractsConfigEnvVars(); - - // Need some configuration here. Can be a basic bitch config really. - config = { - ...l1Config, - blockCheckIntervalMS: 100, - blockRequestBatchSize: 20, - l1Contracts: { - slashFactoryAddress: EthAddress.ZERO, - } as unknown as L1ContractAddresses, - l1RpcUrls: ['http://127.0.0.1:8545'], - l1ChainId: 1, - viemPollingIntervalMS: 1000, - }; - - // ephemeral false so that we can close and re-open during tests - const store = await openTmpStore('test', false); - kvStore = store; - tmpDir = store.dataDirectory; - client = new SlasherClient(config, kvStore, blockSource); - }); - - const advanceToProvenBlock = async (getProvenBlockNumber: number, provenEpochNumber = getProvenBlockNumber) => { - blockSource.setProvenBlockNumber(getProvenBlockNumber); - blockSource.setProvenEpochNumber(provenEpochNumber); - await retryUntil(async () => (await client.getSyncedProvenBlockNum()) >= getProvenBlockNumber, 'synced', 10, 0.1); - }; - - afterEach(async () => { - if (client.isReady()) { - await client.stop(); - } - - await rm(tmpDir, { recursive: true, force: true, maxRetries: 3 }); - }); - - it('can start & stop', async () => { - expect(client.isReady()).toEqual(false); - - await client.start(); - expect(client.isReady()).toEqual(true); - - await client.stop(); - expect(client.isReady()).toEqual(false); - }); - - it('restores the previous block number it was at', async () => { - await client.start(); - const synchedBlock = await client.getSyncedLatestBlockNum(); - await client.stop(); - - const reopenedStore = await openStoreAt(tmpDir); - const client2 = new SlasherClient(config, reopenedStore, blockSource); - expect(await client2.getSyncedLatestBlockNum()).toEqual(synchedBlock); - await client2.stop(); - }); - - describe('Chain prunes', () => { - it('moves the tips on a chain reorg', async () => { - const timeToReact = 1000; - - blockSource.setProvenBlockNumber(0); - await client.start(); - - await advanceToProvenBlock(90); - - await expect(client.getL2Tips()).resolves.toEqual({ - latest: { number: 100, hash: expect.any(String) }, - proven: { number: 90, hash: expect.any(String) }, - finalized: { number: 90, hash: expect.any(String) }, - }); - - blockSource.removeBlocks(10); - - // give the client a chance to react to the reorg - await sleep(timeToReact); - - await expect(client.getL2Tips()).resolves.toEqual({ - latest: { number: 90, hash: expect.any(String) }, - proven: { number: 90, hash: expect.any(String) }, - finalized: { number: 90, hash: expect.any(String) }, - }); - - blockSource.addBlocks([await L2Block.random(91), await L2Block.random(92)]); - - // give the client a chance to react to the new blocks - await sleep(timeToReact); - - await expect(client.getL2Tips()).resolves.toEqual({ - latest: { number: 92, hash: expect.any(String) }, - proven: { number: 90, hash: expect.any(String) }, - finalized: { number: 90, hash: expect.any(String) }, - }); - }); - }); -}); diff --git a/yarn-project/sequencer-client/src/slasher/slasher_client.ts b/yarn-project/sequencer-client/src/slasher/slasher_client.ts index 847e8e335151..bbc5e592b8ff 100644 --- a/yarn-project/sequencer-client/src/slasher/slasher_client.ts +++ b/yarn-project/sequencer-client/src/slasher/slasher_client.ts @@ -1,4 +1,3 @@ -import { INITIAL_L2_BLOCK_NUM } from '@aztec/constants'; import { type L1ContractsConfig, type L1ReaderConfig, @@ -7,15 +6,12 @@ import { } from '@aztec/ethereum'; import { EthAddress } from '@aztec/foundation/eth-address'; import { createLogger } from '@aztec/foundation/log'; -import type { AztecAsyncKVStore, AztecAsyncMap, AztecAsyncSingleton } from '@aztec/kv-store'; import { SlashFactoryAbi } from '@aztec/l1-artifacts'; import { - type L2Block, type L2BlockId, - type L2BlockSource, - L2BlockStream, - type L2BlockStreamEvent, - type L2Tips, + type L2BlockSourceEvent, + type L2BlockSourceEventEmitter, + L2BlockSourceEvents, } from '@aztec/stdlib/block'; import { type TelemetryClient, WithTracer, getTelemetryClient } from '@aztec/telemetry-client'; @@ -26,7 +22,6 @@ import { type GetContractReturnType, createPublicClient, fallback, getAddress, g */ export enum SlasherClientState { IDLE, - SYNCHING, RUNNING, STOPPED, } @@ -82,18 +77,6 @@ type SlashEvent = { * slashing only the first, because the "lifetime" of the second would have passed after that vote */ export class SlasherClient extends WithTracer { - private currentState = SlasherClientState.IDLE; - private syncPromise = Promise.resolve(); - private syncResolve?: () => void = undefined; - private latestBlockNumberAtStart = -1; - private provenBlockNumberAtStart = -1; - - private synchedBlockHashes: AztecAsyncMap; - private synchedLatestBlockNumber: AztecAsyncSingleton; - private synchedProvenBlockNumber: AztecAsyncSingleton; - - private blockStream; - private slashEvents: SlashEvent[] = []; protected slashFactoryContract?: GetContractReturnType = undefined; @@ -105,22 +88,12 @@ export class SlasherClient extends WithTracer { constructor( private config: SlasherConfig & L1ContractsConfig & L1ReaderConfig, - private store: AztecAsyncKVStore, - private l2BlockSource: L2BlockSource, + private l2BlockSource: L2BlockSourceEventEmitter, telemetry: TelemetryClient = getTelemetryClient(), private log = createLogger('slasher'), ) { super(telemetry, 'slasher'); - this.blockStream = new L2BlockStream(l2BlockSource, this, this, createLogger('slasher:block_stream'), { - batchSize: config.blockRequestBatchSize, - pollIntervalMS: config.blockCheckIntervalMS, - }); - - this.synchedBlockHashes = store.openMap('slasher_block_hashes'); - this.synchedLatestBlockNumber = store.openSingleton('slasher_last_l2_block'); - this.synchedProvenBlockNumber = store.openSingleton('slasher_last_proven_l2_block'); - if (config.l1Contracts.slashFactoryAddress && config.l1Contracts.slashFactoryAddress !== EthAddress.ZERO) { const chain = createEthereumChain(config.l1RpcUrls, config.l1ChainId); const publicClient = createPublicClient({ @@ -141,6 +114,11 @@ export class SlasherClient extends WithTracer { this.log.info(`Slasher client initialized`); } + public start() { + this.log.info('Starting Slasher client...'); + this.l2BlockSource.on(L2BlockSourceEvents.L2PruneDetected, this.handlePruneL2Blocks.bind(this)); + } + // This is where we should put a bunch of the improvements mentioned earlier. public async getSlashPayload(slotNumber: bigint): Promise { if (!this.slashFactoryContract) { @@ -172,198 +150,32 @@ export class SlasherClient extends WithTracer { return EthAddress.fromString(payloadAddress); } - public getL2BlockHash(number: number): Promise { - return this.synchedBlockHashes.getAsync(number); - } - - public async getL2Tips(): Promise { - const latestBlockNumber = await this.getSyncedLatestBlockNum(); - let latestBlockHash: string | undefined; - const provenBlockNumber = await this.getSyncedProvenBlockNum(); - let provenBlockHash: string | undefined; - - if (latestBlockNumber > 0) { - latestBlockHash = await this.synchedBlockHashes.getAsync(latestBlockNumber); - if (typeof latestBlockHash === 'undefined') { - this.log.warn(`Block hash for latest block ${latestBlockNumber} not found`); - throw new Error(); - } - } - - if (provenBlockNumber > 0) { - provenBlockHash = await this.synchedBlockHashes.getAsync(provenBlockNumber); - if (typeof provenBlockHash === 'undefined') { - this.log.warn(`Block hash for proven block ${provenBlockNumber} not found`); - throw new Error(); - } - } - - return Promise.resolve({ - latest: { hash: latestBlockHash!, number: latestBlockNumber }, - proven: { hash: provenBlockHash!, number: provenBlockNumber }, - finalized: { hash: provenBlockHash!, number: provenBlockNumber }, - }); - } - - public async handleBlockStreamEvent(event: L2BlockStreamEvent): Promise { + public handleBlockStreamEvent(event: L2BlockSourceEvent): Promise { this.log.debug(`Handling block stream event ${event.type}`); switch (event.type) { - case 'blocks-added': - await this.handleLatestL2Blocks(event.blocks); - break; - case 'chain-finalized': - // TODO (alexg): I think we can prune the block hashes map here - break; - case 'chain-proven': { - const from = (await this.getSyncedProvenBlockNum()) + 1; - const limit = event.blockNumber - from + 1; - await this.handleProvenL2Blocks(await this.l2BlockSource.getBlocks(from, limit)); - break; - } - case 'chain-pruned': - await this.handlePruneL2Blocks(event.blockNumber); + case L2BlockSourceEvents.L2PruneDetected: + this.handlePruneL2Blocks(event); break; default: { - const _: never = event; break; } } - } - - public async start() { - if (this.currentState === SlasherClientState.STOPPED) { - throw new Error('Slasher already stopped'); - } - if (this.currentState !== SlasherClientState.IDLE) { - return this.syncPromise; - } - - // get the current latest block numbers - this.latestBlockNumberAtStart = await this.l2BlockSource.getBlockNumber(); - this.provenBlockNumberAtStart = await this.l2BlockSource.getProvenBlockNumber(); - - const syncedLatestBlock = (await this.getSyncedLatestBlockNum()) + 1; - const syncedProvenBlock = (await this.getSyncedProvenBlockNum()) + 1; - - // if there are blocks to be retrieved, go to a synching state - if (syncedLatestBlock <= this.latestBlockNumberAtStart || syncedProvenBlock <= this.provenBlockNumberAtStart) { - this.setCurrentState(SlasherClientState.SYNCHING); - this.syncPromise = new Promise(resolve => { - this.syncResolve = resolve; - }); - this.log.verbose(`Starting sync from ${syncedLatestBlock} (last proven ${syncedProvenBlock})`); - } else { - // if no blocks to be retrieved, go straight to running - this.setCurrentState(SlasherClientState.RUNNING); - this.syncPromise = Promise.resolve(); - this.log.verbose(`Block ${syncedLatestBlock} (proven ${syncedProvenBlock}) already beyond current block`); - } - - this.blockStream.start(); - this.log.verbose(`Started block downloader from block ${syncedLatestBlock}`); - - return this.syncPromise; + return Promise.resolve(); } /** * Allows consumers to stop the instance of the slasher client. * 'ready' will now return 'false' and the running promise that keeps the client synced is interrupted. */ - public async stop() { + public stop() { this.log.debug('Stopping Slasher client...'); - await this.blockStream.stop(); - this.log.debug('Stopped block downloader'); - await this.store.close(); - this.log.debug('Stopped slasher store'); - this.setCurrentState(SlasherClientState.STOPPED); + this.l2BlockSource.removeListener(L2BlockSourceEvents.L2PruneDetected, this.handlePruneL2Blocks.bind(this)); this.log.info('Slasher client stopped.'); } - /** - * Public function to check if the slasher client is fully synced and ready to receive txs. - * @returns True if the slasher client is ready to receive txs. - */ - public isReady() { - return this.currentState === SlasherClientState.RUNNING; - } - - /** - * Public function to check the latest block number that the slasher client is synced to. - * @returns Block number of latest L2 Block we've synced with. - */ - public async getSyncedLatestBlockNum(): Promise { - return (await this.synchedLatestBlockNumber.getAsync()) ?? INITIAL_L2_BLOCK_NUM - 1; - } - - /** - * Public function to check the latest proven block number that the slasher client is synced to. - * @returns Block number of latest proven L2 Block we've synced with. - */ - public async getSyncedProvenBlockNum(): Promise { - return (await this.synchedProvenBlockNumber.getAsync()) ?? INITIAL_L2_BLOCK_NUM - 1; - } - - /** - * Method to check the status of the slasher client. - * @returns Information about slasher client status: state & syncedToBlockNum. - */ - public async getStatus(): Promise { - const blockNumber = await this.getSyncedLatestBlockNum(); - const blockHash = - blockNumber == 0 - ? '' - : await this.l2BlockSource - .getBlockHeader(blockNumber) - .then(header => header?.hash()) - .then(hash => hash?.toString()); - return Promise.resolve({ - state: this.currentState, - syncedToL2Block: { number: blockNumber, hash: blockHash }, - } as SlasherSyncState); - } - - /** - * Handles new blocks - * @param blocks - A list of blocks that the slasher client needs to store block hashes for - * @returns Empty promise. - */ - private async handleLatestL2Blocks(blocks: L2Block[]): Promise { - if (!blocks.length) { - return; - } - - await this.store.transactionAsync(async () => { - for (const block of blocks) { - await this.synchedBlockHashes.set(block.number, (await block.hash()).toString()); - } - - const lastBlockNum = blocks[blocks.length - 1].number; - await this.synchedLatestBlockNumber.set(lastBlockNum); - }); - - await this.startServiceIfSynched(); - } - - /** - * Handles new proven blocks by updating the proven block number - * @param blocks - A list of proven L2 blocks. - * @returns Empty promise. - */ - private async handleProvenL2Blocks(blocks: L2Block[]): Promise { - if (!blocks.length) { - return Promise.resolve(); - } - const lastBlockNum = blocks[blocks.length - 1].number; - await this.synchedProvenBlockNumber.set(lastBlockNum); - this.log.debug(`Synched to proven block ${lastBlockNum}`); - - await this.startServiceIfSynched(); - } - - private async handlePruneL2Blocks(latestBlock: number): Promise { - const blockHeader = await this.l2BlockSource.getBlockHeader(latestBlock); - const slotNumber = blockHeader ? blockHeader.globalVariables.slotNumber.toBigInt() : BigInt(0); - const epochNumber = slotNumber / BigInt(this.config.aztecEpochDuration); + // I need to get the slot number from the block that was just pruned + private handlePruneL2Blocks(event: L2BlockSourceEvent): void { + const { slotNumber, epochNumber } = event; this.log.info(`Detected chain prune. Punishing the validators at epoch ${epochNumber}`); // Set the lifetime such that we have a full round that we could vote throughout. @@ -377,34 +189,5 @@ export class SlasherClient extends WithTracer { amount: this.slashingAmount, lifetime, }); - - await this.synchedLatestBlockNumber.set(latestBlock); - } - - private async startServiceIfSynched() { - const [latestBlock, provenBlock] = await Promise.all([ - this.getSyncedLatestBlockNum(), - this.getSyncedProvenBlockNum(), - ]); - if ( - this.currentState === SlasherClientState.SYNCHING && - latestBlock >= this.latestBlockNumberAtStart && - provenBlock >= this.provenBlockNumberAtStart - ) { - this.log.debug(`Synched to blocks at start`); - this.setCurrentState(SlasherClientState.RUNNING); - if (this.syncResolve !== undefined) { - this.syncResolve(); - } - } - } - - /** - * Method to set the value of the current state. - * @param newState - New state value. - */ - private setCurrentState(newState: SlasherClientState) { - this.currentState = newState; - this.log.debug(`Moved to state ${SlasherClientState[this.currentState]}`); } } diff --git a/yarn-project/stdlib/src/block/l2_block_source.ts b/yarn-project/stdlib/src/block/l2_block_source.ts index 4d7b3adb66d2..2ed0d160e289 100644 --- a/yarn-project/stdlib/src/block/l2_block_source.ts +++ b/yarn-project/stdlib/src/block/l2_block_source.ts @@ -1,5 +1,6 @@ import type { EthAddress } from '@aztec/foundation/eth-address'; +import type { EventEmitter } from 'events'; import { z } from 'zod'; import type { L1RollupConstants } from '../epoch-helpers/index.js'; @@ -115,6 +116,12 @@ export interface L2BlockSource { getL1Constants(): Promise; } +/** + * L2BlockSource that emits events upon pending / proven chain changes. + * see L2BlockSourceEvents for the events emitted. + */ +export interface L2BlockSourceEventEmitter extends L2BlockSource, EventEmitter {} + /** * Identifier for L2 block tags. * - latest: Latest block pushed to L1. @@ -146,3 +153,14 @@ export const L2TipsSchema = z.object({ proven: L2BlockIdSchema, finalized: L2BlockIdSchema, }) satisfies z.ZodType; + +export enum L2BlockSourceEvents { + L2PruneDetected = 'l2PruneDetected', +} + +export type L2BlockSourceEvent = { + type: 'l2PruneDetected'; + blockNumber: bigint; + slotNumber: bigint; + epochNumber: bigint; +}; diff --git a/yarn-project/stdlib/src/interfaces/archiver.test.ts b/yarn-project/stdlib/src/interfaces/archiver.test.ts index aaa2f97499f4..fbd76b0b30ee 100644 --- a/yarn-project/stdlib/src/interfaces/archiver.test.ts +++ b/yarn-project/stdlib/src/interfaces/archiver.test.ts @@ -311,6 +311,10 @@ class MockArchiver implements ArchiverApi { finalized: { number: 1, hash: `0x01` }, }); } + getL2BlockHash(blockNumber: number): Promise { + expect(blockNumber).toEqual(1); + return Promise.resolve(`0x01`); + } findNullifiersIndexesWithBlock(blockNumber: number, nullifiers: Fr[]): Promise<(InBlock | undefined)[]> { expect(blockNumber).toEqual(1); expect(nullifiers).toHaveLength(2);