diff --git a/yarn-project/archiver/src/archiver/archiver.ts b/yarn-project/archiver/src/archiver/archiver.ts index 31fb408cd5a1..13a7ca3c214c 100644 --- a/yarn-project/archiver/src/archiver/archiver.ts +++ b/yarn-project/archiver/src/archiver/archiver.ts @@ -134,17 +134,15 @@ export class Archiver extends EventEmitter implements ArchiveSource, Traceable { } /** - * Creates a new instance of the Archiver and blocks until it syncs from chain. + * Creates a new instance of the Archiver. * @param config - The archiver's desired configuration. * @param archiverStore - The backing store for the archiver. - * @param blockUntilSynced - If true, blocks until the archiver has fully synced. * @returns - An instance of the archiver. */ - public static async createAndSync( + public static async create( config: ArchiverConfig, archiverStore: ArchiverDataStore, deps: { telemetry: TelemetryClient; blobSinkClient: BlobSinkClientInterface }, - blockUntilSynced = true, ): Promise { const chain = createEthereumChain(config.l1RpcUrls, config.l1ChainId); const publicClient = createPublicClient({ @@ -178,6 +176,23 @@ export class Archiver extends EventEmitter implements ArchiveSource, Traceable { await ArchiverInstrumentation.new(deps.telemetry, () => archiverStore.estimateSize()), { l1StartBlock, l1GenesisTime, epochDuration, slotDuration, ethereumSlotDuration }, ); + return archiver; + } + + /** + * Creates a new instance of the Archiver and blocks until it syncs from chain. + * @param config - The archiver's desired configuration. + * @param archiverStore - The backing store for the archiver. + * @param blockUntilSynced - If true, blocks until the archiver has fully synced. + * @returns - An instance of the archiver. + */ + public static async createAndSync( + config: ArchiverConfig, + archiverStore: ArchiverDataStore, + deps: { telemetry: TelemetryClient; blobSinkClient: BlobSinkClientInterface }, + blockUntilSynced: boolean, + ): Promise { + const archiver = await this.create(config, archiverStore, deps); await archiver.start(blockUntilSynced); return archiver; } @@ -315,12 +330,17 @@ export class Archiver extends EventEmitter implements ArchiveSource, Traceable { const localPendingEpochNumber = getEpochAtSlot(localPendingSlotNumber, this.l1constants); // Emit an event for listening services to react to the chain prune - this.emit(L2BlockSourceEvents.L2PruneDetected, { - type: L2BlockSourceEvents.L2PruneDetected, + this.log.debug('Emitting chain pruned event', { blockNumber: localPendingBlockNumber, slotNumber: localPendingSlotNumber, epochNumber: localPendingEpochNumber, }); + this.emit(L2BlockSourceEvents.ChainPruned, { + type: L2BlockSourceEvents.ChainPruned, + blockNumber: provenBlockNumber, + slotNumber: localPendingSlotNumber, + epochNumber: localPendingEpochNumber, + }); const blocksToUnwind = localPendingBlockNumber - provenBlockNumber; this.log.debug( @@ -413,9 +433,20 @@ export class Archiver extends EventEmitter implements ArchiveSource, Traceable { this.log.info(`Updated proven chain to block ${provenBlockNumber}`, { provenBlockNumber, }); + + // Emit an event for listening services to react to the chain proven + this.log.debug('Emitting chain proven event', { + previousProvenBlockNumber: localProvenBlockNumber, + provenBlockNumber: provenBlockNumber, + }); + this.emit(L2BlockSourceEvents.ChainProven, { + type: L2BlockSourceEvents.ChainProven, + previousProvenBlockNumber: localProvenBlockNumber, + provenBlockNumber: provenBlockNumber, + }); + this.instrumentation.updateLastProvenBlock(Number(provenBlockNumber)); } } - this.instrumentation.updateLastProvenBlock(Number(provenBlockNumber)); }; // This is an edge case that we only hit if there are no proposed blocks. @@ -519,6 +550,15 @@ export class Archiver extends EventEmitter implements ArchiveSource, Traceable { }); } + // Emit an event for listening services to react to the new blocks + this.log.debug('Emitting blocks added event', { + numberOfBlocks: retrievedBlocks.length, + }); + this.emit(L2BlockSourceEvents.BlocksAdded, { + type: L2BlockSourceEvents.BlocksAdded, + blocks: retrievedBlocks.map(b => b.data), + }); + const [processDuration] = await elapsed(() => this.store.addBlocks(retrievedBlocks)); this.instrumentation.processNewBlocks( processDuration / retrievedBlocks.length, diff --git a/yarn-project/archiver/src/factory.ts b/yarn-project/archiver/src/factory.ts index f292e0ba15f4..9a63d7d18659 100644 --- a/yarn-project/archiver/src/factory.ts +++ b/yarn-project/archiver/src/factory.ts @@ -34,14 +34,31 @@ import { createArchiverClient } from './rpc/index.js'; export async function createArchiver( config: ArchiverConfig & DataStoreConfig, blobSinkClient: BlobSinkClientInterface, - opts: { blockUntilSync: boolean } = { blockUntilSync: true }, telemetry: TelemetryClient = getTelemetryClient(), ): Promise { const store = await createStore('archiver', config, createLogger('archiver:lmdb')); const archiverStore = new KVArchiverDataStore(store, config.maxLogs); await registerProtocolContracts(archiverStore); await registerCommonContracts(archiverStore); - return Archiver.createAndSync(config, archiverStore, { telemetry, blobSinkClient }, opts.blockUntilSync); + return Archiver.create(config, archiverStore, { telemetry, blobSinkClient }); +} + +/** + * Creates a local archiver and blocks until it syncs from chain. + * @param config - The archiver configuration. + * @param blobSinkClient - The blob sink client. + * @param telemetry - The telemetry client. + * @returns The local archiver. + */ +export async function createArchiverAndSync( + config: ArchiverConfig & DataStoreConfig, + blobSinkClient: BlobSinkClientInterface, + blockUntilSynced: boolean, + telemetry: TelemetryClient = getTelemetryClient(), +): Promise { + const archiver = await createArchiver(config, blobSinkClient, telemetry); + await archiver.start(blockUntilSynced); + return archiver; } /** diff --git a/yarn-project/archiver/src/test/mock_l2_block_source.ts b/yarn-project/archiver/src/test/mock_l2_block_source.ts index 285feb78f79e..c96893bbe749 100644 --- a/yarn-project/archiver/src/test/mock_l2_block_source.ts +++ b/yarn-project/archiver/src/test/mock_l2_block_source.ts @@ -1,14 +1,22 @@ import { DefaultL1ContractsConfig } from '@aztec/ethereum'; import { EthAddress } from '@aztec/foundation/eth-address'; import { createLogger } from '@aztec/foundation/log'; -import { L2Block, L2BlockHash, type L2BlockSource, type L2Tips } from '@aztec/stdlib/block'; +import { + L2Block, + L2BlockHash, + type L2BlockSourceEventEmitter, + L2BlockSourceEvents, + type L2Tips, +} from '@aztec/stdlib/block'; import { type L1RollupConstants, getSlotRangeForEpoch } from '@aztec/stdlib/epoch-helpers'; import { type BlockHeader, TxHash, TxReceipt, TxStatus } from '@aztec/stdlib/tx'; +import { EventEmitter } from 'events'; + /** * A mocked implementation of L2BlockSource to be used in tests. */ -export class MockL2BlockSource implements L2BlockSource { +export class MockL2BlockSource extends EventEmitter implements L2BlockSourceEventEmitter { protected l2Blocks: L2Block[] = []; private provenBlockNumber: number = 0; @@ -16,27 +24,44 @@ export class MockL2BlockSource implements L2BlockSource { private log = createLogger('archiver:mock_l2_block_source'); public async createBlocks(numBlocks: number) { + const newBlocks = []; for (let i = 0; i < numBlocks; i++) { const blockNum = this.l2Blocks.length + 1; const block = await L2Block.random(blockNum); this.l2Blocks.push(block); + newBlocks.push(block); } this.log.verbose(`Created ${numBlocks} blocks in the mock L2 block source`); + this.emit(L2BlockSourceEvents.BlocksAdded, { blocks: newBlocks }); } public addBlocks(blocks: L2Block[]) { this.l2Blocks.push(...blocks); this.log.verbose(`Added ${blocks.length} blocks to the mock L2 block source`); + + this.emit(L2BlockSourceEvents.BlocksAdded, blocks); } public removeBlocks(numBlocks: number) { this.l2Blocks = this.l2Blocks.slice(0, -numBlocks); - this.log.verbose(`Removed ${numBlocks} blocks from the mock L2 block source`); + + this.emit(L2BlockSourceEvents.ChainPruned, { + blockNumber: this.l2Blocks.length, + }); + this.log.verbose( + `Removed ${numBlocks} blocks from the mock L2 block source, ${this.l2Blocks.length} blocks remaining, ChainPrune emitted`, + ); } public setProvenBlockNumber(provenBlockNumber: number) { + const previousProvenBlockNumber = this.provenBlockNumber; this.provenBlockNumber = provenBlockNumber; + + this.emit(L2BlockSourceEvents.ChainProven, { + previousProvenBlockNumber, + provenBlockNumber, + }); } /** diff --git a/yarn-project/aztec-node/src/aztec-node/server.ts b/yarn-project/aztec-node/src/aztec-node/server.ts index 4b5bc2519b68..ff6aa0e9401e 100644 --- a/yarn-project/aztec-node/src/aztec-node/server.ts +++ b/yarn-project/aztec-node/src/aztec-node/server.ts @@ -155,7 +155,7 @@ export class AztecNodeService implements AztecNode, Traceable { ); } - const archiver = await createArchiver(config, blobSinkClient, { blockUntilSync: true }, telemetry); + const archiver = await createArchiver(config, blobSinkClient, telemetry); // now create the merkle trees and the world state synchronizer const worldStateSynchronizer = await createWorldStateSynchronizer( @@ -184,8 +184,14 @@ export class AztecNodeService implements AztecNode, Traceable { const slasherClient = createSlasherClient(config, archiver, telemetry); + await p2pClient.start(); // start both and wait for them to sync from the block source - await Promise.all([p2pClient.start(), worldStateSynchronizer.start(), slasherClient.start()]); + + // Start the archiver after the p2p client, as it relies on the archiver's events + await archiver.start(/* blockUntilSync=*/ true); + await worldStateSynchronizer.start(); + + slasherClient.start(); log.verbose(`All Aztec Node subsystems synced`); const validatorClient = createValidatorClient(config, { p2pClient, telemetry, dateProvider, epochCache }); diff --git a/yarn-project/end-to-end/src/e2e_prover/e2e_prover_test.ts b/yarn-project/end-to-end/src/e2e_prover/e2e_prover_test.ts index a7811c7a9e2f..d608f6ef756d 100644 --- a/yarn-project/end-to-end/src/e2e_prover/e2e_prover_test.ts +++ b/yarn-project/end-to-end/src/e2e_prover/e2e_prover_test.ts @@ -266,7 +266,6 @@ export class FullProverTest { const archiver = await createArchiver( { ...this.context.aztecNodeConfig, dataDirectory: undefined }, blobSinkClient, - { blockUntilSync: true }, ); // The simulated prover node (now shutdown) used private key index 2 diff --git a/yarn-project/end-to-end/src/e2e_synching.test.ts b/yarn-project/end-to-end/src/e2e_synching.test.ts index 9a39d7e8a142..8b13b8485376 100644 --- a/yarn-project/end-to-end/src/e2e_synching.test.ts +++ b/yarn-project/end-to-end/src/e2e_synching.test.ts @@ -33,7 +33,7 @@ */ import { getSchnorrAccount } from '@aztec/accounts/schnorr'; import { type InitialAccountData, deployFundedSchnorrAccounts } from '@aztec/accounts/testing'; -import { createArchiver } from '@aztec/archiver'; +import { createArchiverAndSync } from '@aztec/archiver'; import { AztecNodeService } from '@aztec/aztec-node'; import { type AccountWalletWithSecretKey, @@ -550,9 +550,7 @@ describe('e2e_synching', () => { const blobSinkClient = createBlobSinkClient({ blobSinkUrl: `http://localhost:${opts.blobSink?.port ?? DEFAULT_BLOB_SINK_PORT}`, }); - const archiver = await createArchiver(opts.config!, blobSinkClient, { - blockUntilSync: true, - }); + const archiver = await createArchiverAndSync(opts.config!, blobSinkClient, /*blockUntilSync*/ true); const pendingBlockNumber = await rollup.read.getPendingBlockNumber(); const worldState = await createWorldStateSynchronizer(opts.config!, archiver); diff --git a/yarn-project/end-to-end/src/fixtures/utils.ts b/yarn-project/end-to-end/src/fixtures/utils.ts index d52264bc8c0e..51482dfa1fc4 100644 --- a/yarn-project/end-to-end/src/fixtures/utils.ts +++ b/yarn-project/end-to-end/src/fixtures/utils.ts @@ -761,9 +761,7 @@ export async function createAndSyncProverNode( const blobSinkClient = createBlobSinkClient(aztecNodeConfig); // Creating temp store and archiver for simulated prover node const archiverConfig = { ...aztecNodeConfig, dataDirectory }; - const archiver = await createArchiver(archiverConfig, blobSinkClient, { - blockUntilSync: true, - }); + const archiver = await createArchiver(archiverConfig, blobSinkClient); // Prover node config is for simulated proofs const proverConfig: ProverNodeConfig = { diff --git a/yarn-project/p2p/src/client/factory.ts b/yarn-project/p2p/src/client/factory.ts index 307e4c68401e..10d86eeb8ace 100644 --- a/yarn-project/p2p/src/client/factory.ts +++ b/yarn-project/p2p/src/client/factory.ts @@ -3,7 +3,7 @@ import { type Logger, createLogger } from '@aztec/foundation/log'; import type { AztecAsyncKVStore } from '@aztec/kv-store'; import type { DataStoreConfig } from '@aztec/kv-store/config'; import { createStore } from '@aztec/kv-store/lmdb-v2'; -import type { L2BlockSource } from '@aztec/stdlib/block'; +import type { L2BlockSourceEventEmitter } from '@aztec/stdlib/block'; import type { ClientProtocolCircuitVerifier, WorldStateSynchronizer } from '@aztec/stdlib/interfaces/server'; import { P2PClientType } from '@aztec/stdlib/p2p'; import { type TelemetryClient, getTelemetryClient } from '@aztec/telemetry-client'; @@ -29,7 +29,7 @@ type P2PClientDeps = { export const createP2PClient = async ( clientType: T, _config: P2PConfig & DataStoreConfig, - l2BlockSource: L2BlockSource, + l2BlockSource: L2BlockSourceEventEmitter, proofVerifier: ClientProtocolCircuitVerifier, worldStateSynchronizer: WorldStateSynchronizer, epochCache: EpochCacheInterface, @@ -85,5 +85,5 @@ export const createP2PClient = async ( logger.verbose('P2P is disabled. Using dummy P2P service'); p2pService = new DummyP2PService(); } - return new P2PClient(clientType, store, l2BlockSource, mempools, p2pService, config, telemetry); + return new P2PClient(clientType, l2BlockSource, mempools, p2pService, config, telemetry); }; diff --git a/yarn-project/p2p/src/client/p2p_client.test.ts b/yarn-project/p2p/src/client/p2p_client.test.ts index b8dc6fec32df..42abab04653c 100644 --- a/yarn-project/p2p/src/client/p2p_client.test.ts +++ b/yarn-project/p2p/src/client/p2p_client.test.ts @@ -2,9 +2,6 @@ import { MockL2BlockSource } from '@aztec/archiver/test'; import { Fr } from '@aztec/foundation/fields'; import { retryUntil } from '@aztec/foundation/retry'; import { sleep } from '@aztec/foundation/sleep'; -import type { AztecAsyncKVStore } from '@aztec/kv-store'; -import { openTmpStore } from '@aztec/kv-store/lmdb-v2'; -import { L2Block } from '@aztec/stdlib/block'; import { P2PClientType } from '@aztec/stdlib/p2p'; import { mockTx } from '@aztec/stdlib/testing'; @@ -23,7 +20,6 @@ describe('In-Memory P2P Client', () => { let mempools: MemPools; let blockSource: MockL2BlockSource; let p2pService: MockProxy; - let kvStore: AztecAsyncKVStore; let client: P2PClient; beforeEach(async () => { @@ -45,17 +41,17 @@ describe('In-Memory P2P Client', () => { attestationPool, }; - kvStore = await openTmpStore('test'); - client = new P2PClient(P2PClientType.Full, kvStore, blockSource, mempools, p2pService); - }); - - afterEach(async () => { - await kvStore.close(); + client = new P2PClient(P2PClientType.Full, blockSource, mempools, p2pService); }); const advanceToProvenBlock = async (getProvenBlockNumber: number) => { blockSource.setProvenBlockNumber(getProvenBlockNumber); - await retryUntil(async () => (await client.getSyncedProvenBlockNum()) >= getProvenBlockNumber, 'synced', 10, 0.1); + await retryUntil( + () => Promise.resolve(client.getSyncedProvenBlockNum() >= getProvenBlockNumber), + 'synced', + 10, + 0.1, + ); }; afterEach(async () => { @@ -99,27 +95,19 @@ describe('In-Memory P2P Client', () => { expect(txPool.addTxs).toHaveBeenCalledTimes(2); }); - it('restores the previous block number it was at', async () => { - await client.start(); - const synchedBlock = await client.getSyncedLatestBlockNum(); - await client.stop(); - - const client2 = new P2PClient(P2PClientType.Full, kvStore, blockSource, mempools, p2pService); - await expect(client2.getSyncedLatestBlockNum()).resolves.toEqual(synchedBlock); - }); - it('deletes txs once block is proven', async () => { blockSource.setProvenBlockNumber(0); await client.start(); expect(txPool.deleteTxs).not.toHaveBeenCalled(); await advanceToProvenBlock(5); + expect(txPool.deleteTxs).toHaveBeenCalledTimes(5); await client.stop(); }); it('deletes txs after waiting the set number of blocks', async () => { - client = new P2PClient(P2PClientType.Full, kvStore, blockSource, mempools, p2pService, { + client = new P2PClient(P2PClientType.Full, blockSource, mempools, p2pService, { keepProvenTxsInPoolFor: 10, }); blockSource.setProvenBlockNumber(0); @@ -138,43 +126,8 @@ describe('In-Memory P2P Client', () => { }); describe('Chain prunes', () => { - it('moves the tips on a chain reorg', async () => { - blockSource.setProvenBlockNumber(0); - await client.start(); - - await advanceToProvenBlock(90); - - await expect(client.getL2Tips()).resolves.toEqual({ - latest: { number: 100, hash: expect.any(String) }, - proven: { number: 90, hash: expect.any(String) }, - finalized: { number: 90, hash: expect.any(String) }, - }); - - blockSource.removeBlocks(10); - - // give the client a chance to react to the reorg - await sleep(1000); - - await expect(client.getL2Tips()).resolves.toEqual({ - latest: { number: 90, hash: expect.any(String) }, - proven: { number: 90, hash: expect.any(String) }, - finalized: { number: 90, hash: expect.any(String) }, - }); - - blockSource.addBlocks([await L2Block.random(91), await L2Block.random(92)]); - - // give the client a chance to react to the new blocks - await sleep(1000); - - await expect(client.getL2Tips()).resolves.toEqual({ - latest: { number: 92, hash: expect.any(String) }, - proven: { number: 90, hash: expect.any(String) }, - finalized: { number: 90, hash: expect.any(String) }, - }); - }); - it('deletes txs created from a pruned block', async () => { - client = new P2PClient(P2PClientType.Full, kvStore, blockSource, mempools, p2pService, { + client = new P2PClient(P2PClientType.Full, blockSource, mempools, p2pService, { keepProvenTxsInPoolFor: 10, }); blockSource.setProvenBlockNumber(0); @@ -198,7 +151,7 @@ describe('In-Memory P2P Client', () => { }); it('moves mined and valid txs back to the pending set', async () => { - client = new P2PClient(P2PClientType.Full, kvStore, blockSource, mempools, p2pService, { + client = new P2PClient(P2PClientType.Full, blockSource, mempools, p2pService, { keepProvenTxsInPoolFor: 10, }); blockSource.setProvenBlockNumber(0); @@ -229,6 +182,32 @@ describe('In-Memory P2P Client', () => { expect(txPool.markMinedAsPending).toHaveBeenCalledWith([await goodTx.getTxHash()]); await client.stop(); }); + + // P2P client must be running before the block source starts syncing + it('when the block source is syncing, the p2p client stays in sync', async () => { + blockSource = new MockL2BlockSource(); + client = new P2PClient(P2PClientType.Full, blockSource, mempools, p2pService, { + keepProvenTxsInPoolFor: 10, + }); + + await client.start(); + + // On startup, the tips in the p2p client should be the same as the tips in the block source + expect(await blockSource.getProvenBlockNumber()).toEqual(client.getSyncedProvenBlockNum()); + expect(await blockSource.getBlockNumber()).toEqual(client.getSyncedLatestBlockNum()); + + await blockSource.createBlocks(10); + + // Wait for the p2p client to sync with the block source + await retryUntil( + async () => client.getSyncedLatestBlockNum() === (await blockSource.getBlockNumber()), + 'synced', + 10, + 0.1, + ); + + expect(await blockSource.getProvenBlockNumber()).toEqual(client.getSyncedProvenBlockNum()); + }); }); describe('Attestation pool pruning', () => { @@ -242,7 +221,6 @@ describe('In-Memory P2P Client', () => { expect(attestationPool.deleteAttestationsOlderThan).not.toHaveBeenCalled(); await advanceToProvenBlock(advanceToProvenBlockNumber); - expect(attestationPool.deleteAttestationsOlderThan).toHaveBeenCalledTimes(1); expect(attestationPool.deleteAttestationsOlderThan).toHaveBeenCalledWith( BigInt(advanceToProvenBlockNumber - keepAttestationsInPoolFor), diff --git a/yarn-project/p2p/src/client/p2p_client.ts b/yarn-project/p2p/src/client/p2p_client.ts index 7ecbb8eae567..991cb8b93b18 100644 --- a/yarn-project/p2p/src/client/p2p_client.ts +++ b/yarn-project/p2p/src/client/p2p_client.ts @@ -1,18 +1,19 @@ import { INITIAL_L2_BLOCK_NUM } from '@aztec/constants'; +import { jsonStringify } from '@aztec/foundation/json-rpc'; import { createLogger } from '@aztec/foundation/log'; -import type { AztecAsyncKVStore, AztecAsyncMap, AztecAsyncSingleton } from '@aztec/kv-store'; -import type { L2Block, L2BlockId, L2BlockSource, L2BlockStreamEvent, L2Tips } from '@aztec/stdlib/block'; +import type { + L2Block, + L2BlockId, + L2BlockSourceBlocksAddedEvent, + L2BlockSourceChainProvenEvent, + L2BlockSourceChainPrunedEvent, + L2BlockSourceEventEmitter, +} from '@aztec/stdlib/block'; +import { L2BlockSourceEvents } from '@aztec/stdlib/block'; import type { P2PApi, PeerInfo, ProverCoordination } from '@aztec/stdlib/interfaces/server'; import type { BlockAttestation, BlockProposal, P2PClientType } from '@aztec/stdlib/p2p'; import type { Tx, TxHash } from '@aztec/stdlib/tx'; -import { - Attributes, - type TelemetryClient, - TraceableL2BlockStream, - WithTracer, - getTelemetryClient, - trackSpan, -} from '@aztec/telemetry-client'; +import { Attributes, type TelemetryClient, WithTracer, getTelemetryClient, trackSpan } from '@aztec/telemetry-client'; import type { ENR } from '@chainsafe/enr'; @@ -172,14 +173,13 @@ export class P2PClient private runningPromise!: Promise; private currentState = P2PClientState.IDLE; - private syncPromise = Promise.resolve(); - private syncResolve?: () => void = undefined; - private latestBlockNumberAtStart = -1; - private provenBlockNumberAtStart = -1; - private synchedBlockHashes: AztecAsyncMap; - private synchedLatestBlockNumber: AztecAsyncSingleton; - private synchedProvenBlockNumber: AztecAsyncSingleton; + // This value get's updated in response to events from the L2BlockSourceEventEmitter + // It's value is set in construction by requesting it from the L2BlockSource + // It is important, as the sequencer must wait until this value has progressed to the same as other services + // before it requests transactions to build blocks, else we will not have the correct transactions available + private synchedLatestBlockNumber: number = -1; + private synchedProvenBlockNumber: number = -1; private txPool: TxPool; private attestationPool: T extends P2PClientType.Full ? AttestationPool : undefined; @@ -189,7 +189,12 @@ export class P2PClient /** How many slots to keep proven txs for. */ private keepProvenTxsFor: number; - private blockStream; + // Store event handler references + private eventHandlers: { + handleChainPruned: (event: L2BlockSourceChainPrunedEvent) => void; + handleBlocksAdded: (event: L2BlockSourceBlocksAddedEvent) => void; + handleChainProven: (event: L2BlockSourceChainProvenEvent) => void; + } | null = null; /** * In-memory P2P client constructor. @@ -202,8 +207,7 @@ export class P2PClient */ constructor( _clientType: T, - store: AztecAsyncKVStore, - private l2BlockSource: L2BlockSource, + private l2BlockSource: L2BlockSourceEventEmitter, mempools: MemPools, private p2pService: P2PService, config: Partial = {}, @@ -212,24 +216,13 @@ export class P2PClient ) { super(telemetry, 'P2PClient'); - const { keepProvenTxsInPoolFor, blockCheckIntervalMS, blockRequestBatchSize, keepAttestationsInPoolFor } = { + const { keepProvenTxsInPoolFor, keepAttestationsInPoolFor } = { ...getP2PDefaultConfig(), ...config, }; this.keepProvenTxsFor = keepProvenTxsInPoolFor; this.keepAttestationsInPoolFor = keepAttestationsInPoolFor; - const tracer = telemetry.getTracer('P2PL2BlockStream'); - const logger = createLogger('p2p:l2-block-stream'); - this.blockStream = new TraceableL2BlockStream(l2BlockSource, this, this, tracer, 'P2PL2BlockStream', logger, { - batchSize: blockRequestBatchSize, - pollIntervalMS: blockCheckIntervalMS, - }); - - this.synchedBlockHashes = store.openMap('p2p_pool_block_hashes'); - this.synchedLatestBlockNumber = store.openSingleton('p2p_pool_last_l2_block'); - this.synchedProvenBlockNumber = store.openSingleton('p2p_pool_last_proven_l2_block'); - this.txPool = mempools.txPool; this.attestationPool = mempools.attestationPool!; } @@ -242,69 +235,39 @@ export class P2PClient return Promise.resolve(this.p2pService.getPeers(includePending)); } - public getL2BlockHash(number: number): Promise { - return this.synchedBlockHashes.getAsync(number); - } - - public async getL2Tips(): Promise { - const latestBlockNumber = await this.getSyncedLatestBlockNum(); - let latestBlockHash: string | undefined; - const provenBlockNumber = await this.getSyncedProvenBlockNum(); - let provenBlockHash: string | undefined; - - if (latestBlockNumber > 0) { - latestBlockHash = await this.synchedBlockHashes.getAsync(latestBlockNumber); - if (typeof latestBlockHash === 'undefined') { - this.log.warn(`Block hash for latest block ${latestBlockNumber} not found`); - throw new Error(); - } - } - - if (provenBlockNumber > 0) { - provenBlockHash = await this.synchedBlockHashes.getAsync(provenBlockNumber); - if (typeof provenBlockHash === 'undefined') { - this.log.warn(`Block hash for proven block ${provenBlockNumber} not found`); - throw new Error(); - } + #assertIsReady() { + // this.log.info('Checking if p2p client is ready, current state: ', this.currentState); + if (!this.isReady()) { + throw new Error('P2P client not ready'); } + } - return Promise.resolve({ - latest: { hash: latestBlockHash!, number: latestBlockNumber }, - proven: { hash: provenBlockHash!, number: provenBlockNumber }, - finalized: { hash: provenBlockHash!, number: provenBlockNumber }, - }); + public getSyncedLatestBlockNum(): number { + return this.synchedLatestBlockNumber; } - public async handleBlockStreamEvent(event: L2BlockStreamEvent): Promise { - this.log.debug(`Handling block stream event ${event.type}`); - switch (event.type) { - case 'blocks-added': - await this.handleLatestL2Blocks(event.blocks); - break; - case 'chain-finalized': - // TODO (alexg): I think we can prune the block hashes map here - break; - case 'chain-proven': { - const from = (await this.getSyncedProvenBlockNum()) + 1; - const limit = event.blockNumber - from + 1; - await this.handleProvenL2Blocks(await this.l2BlockSource.getBlocks(from, limit)); - break; - } - case 'chain-pruned': - await this.handlePruneL2Blocks(event.blockNumber); - break; - default: { - const _: never = event; - break; - } - } + public getSyncedProvenBlockNum(): number { + return this.synchedProvenBlockNumber; } - #assertIsReady() { - // this.log.info('Checking if p2p client is ready, current state: ', this.currentState); - if (!this.isReady()) { - throw new Error('P2P client not ready'); - } + /** + * Method to check the status the p2p client. + * @returns Information about p2p client status: state & syncedToBlockNum. + */ + public async getStatus(): Promise { + const blockNumber = this.getSyncedLatestBlockNum(); + const blockHash = + blockNumber === 0 + ? '' + : await this.l2BlockSource + .getBlockHeader(blockNumber) + .then(header => header?.hash()) + .then(hash => hash?.toString()); + + return { + state: this.currentState, + syncedToL2Block: { number: blockNumber, hash: blockHash }, + } as P2PSyncState; } /** @@ -315,36 +278,52 @@ export class P2PClient if (this.currentState === P2PClientState.STOPPED) { throw new Error('P2P client already stopped'); } - if (this.currentState !== P2PClientState.IDLE) { - return this.syncPromise; - } - // get the current latest block numbers - this.latestBlockNumberAtStart = await this.l2BlockSource.getBlockNumber(); - this.provenBlockNumberAtStart = await this.l2BlockSource.getProvenBlockNumber(); + // Store current tips in the L2BlockSource + const [latestBlockNumber, provenBlockNumber] = await Promise.all([ + this.l2BlockSource.getBlockNumber(), + this.l2BlockSource.getProvenBlockNumber(), + ]); + this.synchedLatestBlockNumber = latestBlockNumber; + this.synchedProvenBlockNumber = provenBlockNumber; - const syncedLatestBlock = (await this.getSyncedLatestBlockNum()) + 1; - const syncedProvenBlock = (await this.getSyncedProvenBlockNum()) + 1; + await this.p2pService.start(); - // if there are blocks to be retrieved, go to a synching state - if (syncedLatestBlock <= this.latestBlockNumberAtStart || syncedProvenBlock <= this.provenBlockNumberAtStart) { + // Create wrapper functions that handle the promises + const handleChainPruned = (event: L2BlockSourceChainPrunedEvent) => { this.setCurrentState(P2PClientState.SYNCHING); - this.syncPromise = new Promise(resolve => { - this.syncResolve = resolve; - }); - this.log.verbose(`Starting sync from ${syncedLatestBlock} (last proven ${syncedProvenBlock})`); - } else { - // if no blocks to be retrieved, go straight to running - this.setCurrentState(P2PClientState.RUNNING); - this.syncPromise = Promise.resolve(); - await this.p2pService.start(); - this.log.debug(`Block ${syncedLatestBlock} (proven ${syncedProvenBlock}) already beyond current block`); - } + this.handlePruneL2Blocks(event) + .catch(err => this.log.error(`Error handling chain pruned event: ${err}`)) + .finally(() => this.setCurrentState(P2PClientState.RUNNING)); + }; - this.blockStream.start(); - this.log.verbose(`Started block downloader from block ${syncedLatestBlock}`); + const handleBlocksAdded = (event: L2BlockSourceBlocksAddedEvent) => { + this.setCurrentState(P2PClientState.SYNCHING); + this.handleLatestL2Blocks(event) + .catch(err => this.log.error(`Error handling blocks added event: ${err}`)) + .finally(() => this.setCurrentState(P2PClientState.RUNNING)); + }; - return this.syncPromise; + const handleChainProven = (event: L2BlockSourceChainProvenEvent) => { + this.setCurrentState(P2PClientState.SYNCHING); + this.handleProvenL2Blocks(event) + .catch(err => this.log.error(`Error handling chain proven event: ${err}`)) + .finally(() => this.setCurrentState(P2PClientState.RUNNING)); + }; + + // Store references to the wrapper functions for later removal + this.eventHandlers = { + handleChainPruned, + handleBlocksAdded, + handleChainProven, + }; + + this.l2BlockSource.on(L2BlockSourceEvents.ChainPruned, handleChainPruned); + this.l2BlockSource.on(L2BlockSourceEvents.BlocksAdded, handleBlocksAdded); + this.l2BlockSource.on(L2BlockSourceEvents.ChainProven, handleChainProven); + + this.setCurrentState(P2PClientState.RUNNING); + this.log.verbose(`Started p2p service`); } /** @@ -354,9 +333,16 @@ export class P2PClient public async stop() { this.log.debug('Stopping p2p client...'); await this.p2pService.stop(); + + // Remove event listeners using the stored references + if (this.eventHandlers) { + this.l2BlockSource.removeListener(L2BlockSourceEvents.ChainPruned, this.eventHandlers.handleChainPruned); + this.l2BlockSource.removeListener(L2BlockSourceEvents.BlocksAdded, this.eventHandlers.handleBlocksAdded); + this.l2BlockSource.removeListener(L2BlockSourceEvents.ChainProven, this.eventHandlers.handleChainProven); + this.eventHandlers = null; + } + this.log.debug('Stopped p2p service'); - await this.blockStream.stop(); - this.log.debug('Stopped block downloader'); await this.runningPromise; this.setCurrentState(P2PClientState.STOPPED); this.log.info('P2P client stopped.'); @@ -569,42 +555,6 @@ export class P2PClient return this.currentState === P2PClientState.RUNNING; } - /** - * Public function to check the latest block number that the P2P client is synced to. - * @returns Block number of latest L2 Block we've synced with. - */ - public async getSyncedLatestBlockNum(): Promise { - return (await this.synchedLatestBlockNumber.getAsync()) ?? INITIAL_L2_BLOCK_NUM - 1; - } - - /** - * Public function to check the latest proven block number that the P2P client is synced to. - * @returns Block number of latest proven L2 Block we've synced with. - */ - public async getSyncedProvenBlockNum(): Promise { - return (await this.synchedProvenBlockNumber.getAsync()) ?? INITIAL_L2_BLOCK_NUM - 1; - } - - /** - * Method to check the status the p2p client. - * @returns Information about p2p client status: state & syncedToBlockNum. - */ - public async getStatus(): Promise { - const blockNumber = await this.getSyncedLatestBlockNum(); - const blockHash = - blockNumber === 0 - ? '' - : await this.l2BlockSource - .getBlockHeader(blockNumber) - .then(header => header?.hash()) - .then(hash => hash?.toString()); - - return { - state: this.currentState, - syncedToL2Block: { number: blockNumber, hash: blockHash }, - } as P2PSyncState; - } - /** * Mark all txs from these blocks as mined. * @param blocks - A list of existing blocks with txs that the P2P client needs to ensure the tx pool is reconciled with. @@ -635,19 +585,15 @@ export class P2PClient * @param blocks - A list of existing blocks with txs that the P2P client needs to ensure the tx pool is reconciled with. * @returns Empty promise. */ - private async handleLatestL2Blocks(blocks: L2Block[]): Promise { - if (!blocks.length) { + private async handleLatestL2Blocks(event: L2BlockSourceBlocksAddedEvent): Promise { + this.log.info(`Handling LATEST BLOCKS: ${event.blocks.length}`); + if (!event.blocks.length) { return Promise.resolve(); } - await this.markTxsAsMinedFromBlocks(blocks); - const lastBlockNum = blocks[blocks.length - 1].number; - await Promise.all( - blocks.map(async block => this.synchedBlockHashes.set(block.number, (await block.hash()).toString())), - ); - await this.synchedLatestBlockNumber.set(lastBlockNum); - this.log.verbose(`Synched to latest block ${lastBlockNum}`); - await this.startServiceIfSynched(); + await this.markTxsAsMinedFromBlocks(event.blocks); + this.synchedLatestBlockNumber = event.blocks[event.blocks.length - 1].number; + this.log.verbose(`Synched to latest block ${this.synchedLatestBlockNumber}`); } /** @@ -655,7 +601,12 @@ export class P2PClient * @param blocks - A list of proven L2 blocks. * @returns Empty promise. */ - private async handleProvenL2Blocks(blocks: L2Block[]): Promise { + private async handleProvenL2Blocks(event: L2BlockSourceChainProvenEvent): Promise { + this.log.info(`Handling chain proven event: ${jsonStringify(event)}`); + const from = Number(event.previousProvenBlockNumber) + 1; + const limit = Number(event.provenBlockNumber) - from + 1; + const blocks = await this.l2BlockSource.getBlocks(from, limit, true); + if (!blocks.length) { return Promise.resolve(); } @@ -681,21 +632,24 @@ export class P2PClient await this.attestationPool?.deleteAttestationsOlderThan(lastBlockSlotMinusKeepAttestationsInPoolFor); } - await this.synchedProvenBlockNumber.set(lastBlockNum); - this.log.debug(`Synched to proven block ${lastBlockNum}`); - - await this.startServiceIfSynched(); + this.synchedProvenBlockNumber = lastBlockNum; + this.log.verbose(`Synched to proven block ${this.synchedProvenBlockNumber}`, { + latestBlockNumber: this.synchedLatestBlockNumber, + provenBlockNumber: this.synchedProvenBlockNumber, + }); } /** * Updates the tx pool after a chain prune. * @param latestBlock - The block number the chain was pruned to. */ - private async handlePruneL2Blocks(latestBlock: number): Promise { + private async handlePruneL2Blocks(event: L2BlockSourceChainPrunedEvent): Promise { + const latestBlock = event.blockNumber; + const txsToDelete: TxHash[] = []; for (const tx of await this.txPool.getAllTxs()) { // every tx that's been generated against a block that has now been pruned is no longer valid - if (tx.data.constants.historicalHeader.globalVariables.blockNumber.toNumber() > latestBlock) { + if (tx.data.constants.historicalHeader.globalVariables.blockNumber.toBigInt() > latestBlock) { txsToDelete.push(await tx.getTxHash()); } } @@ -703,7 +657,7 @@ export class P2PClient this.log.info( `Detected chain prune. Removing invalid txs count=${ txsToDelete.length - } newLatestBlock=${latestBlock} previousLatestBlock=${this.getSyncedLatestBlockNum()}`, + } newLatestBlock=${latestBlock} previousLatestBlock=${this.l2BlockSource.getBlockNumber()}`, ); // delete invalid txs (both pending and mined) @@ -723,23 +677,15 @@ export class P2PClient this.log.info(`Moving ${txsToMoveToPending.length} mined txs back to pending`); await this.txPool.markMinedAsPending(txsToMoveToPending); - await this.synchedLatestBlockNumber.set(latestBlock); - // no need to update block hashes, as they will be updated as new blocks are added - } - - private async startServiceIfSynched() { - if ( - this.currentState === P2PClientState.SYNCHING && - (await this.getSyncedLatestBlockNum()) >= this.latestBlockNumberAtStart && - (await this.getSyncedProvenBlockNum()) >= this.provenBlockNumberAtStart - ) { - this.log.debug(`Synched to blocks at start`); - this.setCurrentState(P2PClientState.RUNNING); - if (this.syncResolve !== undefined) { - this.syncResolve(); - await this.p2pService.start(); - } - } + this.synchedLatestBlockNumber = Number(latestBlock); + this.synchedProvenBlockNumber = Number(latestBlock); + this.log.verbose( + `Handled chain prune. Latest block ${this.synchedLatestBlockNumber} and proven block ${this.synchedProvenBlockNumber}`, + { + latestBlockNumber: this.synchedLatestBlockNumber, + provenBlockNumber: this.synchedProvenBlockNumber, + }, + ); } /** diff --git a/yarn-project/prover-node/src/factory.ts b/yarn-project/prover-node/src/factory.ts index 992dc2091a13..fc93560f5673 100644 --- a/yarn-project/prover-node/src/factory.ts +++ b/yarn-project/prover-node/src/factory.ts @@ -37,7 +37,7 @@ export async function createProverNode( const telemetry = deps.telemetry ?? getTelemetryClient(); const blobSinkClient = deps.blobSinkClient ?? createBlobSinkClient(config); const log = deps.log ?? createLogger('prover-node'); - const archiver = deps.archiver ?? (await createArchiver(config, blobSinkClient, { blockUntilSync: true }, telemetry)); + const archiver = deps.archiver ?? (await createArchiver(config, blobSinkClient, telemetry)); log.verbose(`Created archiver and synced to block ${await archiver.getBlockNumber()}`); const worldStateConfig = { ...config, worldStateProvenBlocksOnly: false }; @@ -73,6 +73,8 @@ export async function createProverNode( telemetry, }); + await archiver.start(/* blockUntilSynced= */ true); + const proverNodeConfig: ProverNodeOptions = { maxPendingJobs: config.proverNodeMaxPendingJobs, pollingIntervalMs: config.proverNodePollingIntervalMs, diff --git a/yarn-project/prover-node/src/prover-coordination/factory.ts b/yarn-project/prover-node/src/prover-coordination/factory.ts index 7f0c22084ecd..d360f608efcd 100644 --- a/yarn-project/prover-node/src/prover-coordination/factory.ts +++ b/yarn-project/prover-node/src/prover-coordination/factory.ts @@ -6,6 +6,7 @@ import type { DataStoreConfig } from '@aztec/kv-store/config'; import { getVKTreeRoot } from '@aztec/noir-protocol-circuits-types/vk-tree'; import { createP2PClient } from '@aztec/p2p'; import { protocolContractTreeRoot } from '@aztec/protocol-contracts'; +import type { L2BlockSourceEventEmitter } from '@aztec/stdlib/block'; import { createAztecNodeClient } from '@aztec/stdlib/interfaces/client'; import type { ProverCoordination, WorldStateSynchronizer } from '@aztec/stdlib/interfaces/server'; import { P2PClientType } from '@aztec/stdlib/p2p'; @@ -51,7 +52,7 @@ export async function createProverCoordination( const p2pClient = await createP2PClient( P2PClientType.Prover, config, - deps.archiver, + deps.archiver as L2BlockSourceEventEmitter, proofVerifier, deps.worldStateSynchronizer, deps.epochCache, diff --git a/yarn-project/sequencer-client/src/slasher/slasher_client.ts b/yarn-project/sequencer-client/src/slasher/slasher_client.ts index bbc5e592b8ff..1608975b0a92 100644 --- a/yarn-project/sequencer-client/src/slasher/slasher_client.ts +++ b/yarn-project/sequencer-client/src/slasher/slasher_client.ts @@ -9,7 +9,7 @@ import { createLogger } from '@aztec/foundation/log'; import { SlashFactoryAbi } from '@aztec/l1-artifacts'; import { type L2BlockId, - type L2BlockSourceEvent, + type L2BlockSourceChainPrunedEvent, type L2BlockSourceEventEmitter, L2BlockSourceEvents, } from '@aztec/stdlib/block'; @@ -86,6 +86,8 @@ export class SlasherClient extends WithTracer { // showing that the slashing mechanism is working. private slashingAmount: bigint = 0n; + private pruneHandler: ((event: L2BlockSourceChainPrunedEvent) => void) | null = null; + constructor( private config: SlasherConfig & L1ContractsConfig & L1ReaderConfig, private l2BlockSource: L2BlockSourceEventEmitter, @@ -116,7 +118,10 @@ export class SlasherClient extends WithTracer { public start() { this.log.info('Starting Slasher client...'); - this.l2BlockSource.on(L2BlockSourceEvents.L2PruneDetected, this.handlePruneL2Blocks.bind(this)); + + this.pruneHandler = this.handlePruneL2Blocks.bind(this); + + this.l2BlockSource.on(L2BlockSourceEvents.ChainPruned, this.pruneHandler); } // This is where we should put a bunch of the improvements mentioned earlier. @@ -150,31 +155,20 @@ export class SlasherClient extends WithTracer { return EthAddress.fromString(payloadAddress); } - public handleBlockStreamEvent(event: L2BlockSourceEvent): Promise { - this.log.debug(`Handling block stream event ${event.type}`); - switch (event.type) { - case L2BlockSourceEvents.L2PruneDetected: - this.handlePruneL2Blocks(event); - break; - default: { - break; - } - } - return Promise.resolve(); - } - /** * Allows consumers to stop the instance of the slasher client. * 'ready' will now return 'false' and the running promise that keeps the client synced is interrupted. */ public stop() { this.log.debug('Stopping Slasher client...'); - this.l2BlockSource.removeListener(L2BlockSourceEvents.L2PruneDetected, this.handlePruneL2Blocks.bind(this)); + if (this.pruneHandler) { + this.l2BlockSource.removeListener(L2BlockSourceEvents.ChainPruned, this.pruneHandler); + } this.log.info('Slasher client stopped.'); } // I need to get the slot number from the block that was just pruned - private handlePruneL2Blocks(event: L2BlockSourceEvent): void { + private handlePruneL2Blocks(event: L2BlockSourceChainPrunedEvent): void { const { slotNumber, epochNumber } = event; this.log.info(`Detected chain prune. Punishing the validators at epoch ${epochNumber}`); diff --git a/yarn-project/stdlib/src/block/l2_block_source.ts b/yarn-project/stdlib/src/block/l2_block_source.ts index fde89993d062..416275a00f96 100644 --- a/yarn-project/stdlib/src/block/l2_block_source.ts +++ b/yarn-project/stdlib/src/block/l2_block_source.ts @@ -149,11 +149,24 @@ export const L2TipsSchema = z.object({ }) satisfies z.ZodType; export enum L2BlockSourceEvents { - L2PruneDetected = 'l2PruneDetected', + BlocksAdded = 'blocks-added', + ChainProven = 'chain-proven', + ChainPruned = 'chain-pruned', } -export type L2BlockSourceEvent = { - type: 'l2PruneDetected'; +export type L2BlockSourceEvent = + | L2BlockSourceChainPrunedEvent + | L2BlockSourceBlocksAddedEvent + | L2BlockSourceChainProvenEvent; + +export type L2BlockSourceBlocksAddedEvent = { + blocks: L2Block[]; +}; +export type L2BlockSourceChainProvenEvent = { + previousProvenBlockNumber: bigint; + provenBlockNumber: bigint; +}; +export type L2BlockSourceChainPrunedEvent = { blockNumber: bigint; slotNumber: bigint; epochNumber: bigint;