Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 47 additions & 7 deletions yarn-project/archiver/src/archiver/archiver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -134,17 +134,15 @@ export class Archiver extends EventEmitter implements ArchiveSource, Traceable {
}

/**
* Creates a new instance of the Archiver and blocks until it syncs from chain.
* Creates a new instance of the Archiver.
* @param config - The archiver's desired configuration.
* @param archiverStore - The backing store for the archiver.
* @param blockUntilSynced - If true, blocks until the archiver has fully synced.
* @returns - An instance of the archiver.
*/
public static async createAndSync(
public static async create(
config: ArchiverConfig,
archiverStore: ArchiverDataStore,
deps: { telemetry: TelemetryClient; blobSinkClient: BlobSinkClientInterface },
blockUntilSynced = true,
): Promise<Archiver> {
const chain = createEthereumChain(config.l1RpcUrls, config.l1ChainId);
const publicClient = createPublicClient({
Expand Down Expand Up @@ -178,6 +176,23 @@ export class Archiver extends EventEmitter implements ArchiveSource, Traceable {
await ArchiverInstrumentation.new(deps.telemetry, () => archiverStore.estimateSize()),
{ l1StartBlock, l1GenesisTime, epochDuration, slotDuration, ethereumSlotDuration },
);
return archiver;
}

/**
* Creates a new instance of the Archiver and blocks until it syncs from chain.
* @param config - The archiver's desired configuration.
* @param archiverStore - The backing store for the archiver.
* @param blockUntilSynced - If true, blocks until the archiver has fully synced.
* @returns - An instance of the archiver.
*/
public static async createAndSync(
config: ArchiverConfig,
archiverStore: ArchiverDataStore,
deps: { telemetry: TelemetryClient; blobSinkClient: BlobSinkClientInterface },
blockUntilSynced: boolean,
): Promise<Archiver> {
const archiver = await this.create(config, archiverStore, deps);
await archiver.start(blockUntilSynced);
return archiver;
}
Expand Down Expand Up @@ -315,12 +330,17 @@ export class Archiver extends EventEmitter implements ArchiveSource, Traceable {
const localPendingEpochNumber = getEpochAtSlot(localPendingSlotNumber, this.l1constants);

// Emit an event for listening services to react to the chain prune
this.emit(L2BlockSourceEvents.L2PruneDetected, {
type: L2BlockSourceEvents.L2PruneDetected,
this.log.debug('Emitting chain pruned event', {
blockNumber: localPendingBlockNumber,
slotNumber: localPendingSlotNumber,
epochNumber: localPendingEpochNumber,
});
this.emit(L2BlockSourceEvents.ChainPruned, {
type: L2BlockSourceEvents.ChainPruned,
blockNumber: provenBlockNumber,
slotNumber: localPendingSlotNumber,
epochNumber: localPendingEpochNumber,
});

const blocksToUnwind = localPendingBlockNumber - provenBlockNumber;
this.log.debug(
Expand Down Expand Up @@ -413,9 +433,20 @@ export class Archiver extends EventEmitter implements ArchiveSource, Traceable {
this.log.info(`Updated proven chain to block ${provenBlockNumber}`, {
provenBlockNumber,
});

// Emit an event for listening services to react to the chain proven
this.log.debug('Emitting chain proven event', {
previousProvenBlockNumber: localProvenBlockNumber,
provenBlockNumber: provenBlockNumber,
});
this.emit(L2BlockSourceEvents.ChainProven, {
type: L2BlockSourceEvents.ChainProven,
previousProvenBlockNumber: localProvenBlockNumber,
provenBlockNumber: provenBlockNumber,
});
this.instrumentation.updateLastProvenBlock(Number(provenBlockNumber));
}
}
this.instrumentation.updateLastProvenBlock(Number(provenBlockNumber));
};

// This is an edge case that we only hit if there are no proposed blocks.
Expand Down Expand Up @@ -519,6 +550,15 @@ export class Archiver extends EventEmitter implements ArchiveSource, Traceable {
});
}

// Emit an event for listening services to react to the new blocks
this.log.debug('Emitting blocks added event', {
numberOfBlocks: retrievedBlocks.length,
});
this.emit(L2BlockSourceEvents.BlocksAdded, {
type: L2BlockSourceEvents.BlocksAdded,
blocks: retrievedBlocks.map(b => b.data),
});

const [processDuration] = await elapsed(() => this.store.addBlocks(retrievedBlocks));
this.instrumentation.processNewBlocks(
processDuration / retrievedBlocks.length,
Expand Down
21 changes: 19 additions & 2 deletions yarn-project/archiver/src/factory.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,14 +34,31 @@ import { createArchiverClient } from './rpc/index.js';
export async function createArchiver(
config: ArchiverConfig & DataStoreConfig,
blobSinkClient: BlobSinkClientInterface,
opts: { blockUntilSync: boolean } = { blockUntilSync: true },
telemetry: TelemetryClient = getTelemetryClient(),
): Promise<ArchiverApi & Service & L2BlockSourceEventEmitter> {
const store = await createStore('archiver', config, createLogger('archiver:lmdb'));
const archiverStore = new KVArchiverDataStore(store, config.maxLogs);
await registerProtocolContracts(archiverStore);
await registerCommonContracts(archiverStore);
return Archiver.createAndSync(config, archiverStore, { telemetry, blobSinkClient }, opts.blockUntilSync);
return Archiver.create(config, archiverStore, { telemetry, blobSinkClient });
}

/**
* Creates a local archiver and blocks until it syncs from chain.
* @param config - The archiver configuration.
* @param blobSinkClient - The blob sink client.
* @param telemetry - The telemetry client.
* @returns The local archiver.
*/
export async function createArchiverAndSync(
config: ArchiverConfig & DataStoreConfig,
blobSinkClient: BlobSinkClientInterface,
blockUntilSynced: boolean,
telemetry: TelemetryClient = getTelemetryClient(),
): Promise<ArchiverApi & Service & L2BlockSourceEventEmitter> {
const archiver = await createArchiver(config, blobSinkClient, telemetry);
await archiver.start(blockUntilSynced);
return archiver;
}

/**
Expand Down
31 changes: 28 additions & 3 deletions yarn-project/archiver/src/test/mock_l2_block_source.ts
Original file line number Diff line number Diff line change
@@ -1,42 +1,67 @@
import { DefaultL1ContractsConfig } from '@aztec/ethereum';
import { EthAddress } from '@aztec/foundation/eth-address';
import { createLogger } from '@aztec/foundation/log';
import { L2Block, L2BlockHash, type L2BlockSource, type L2Tips } from '@aztec/stdlib/block';
import {
L2Block,
L2BlockHash,
type L2BlockSourceEventEmitter,
L2BlockSourceEvents,
type L2Tips,
} from '@aztec/stdlib/block';
import { type L1RollupConstants, getSlotRangeForEpoch } from '@aztec/stdlib/epoch-helpers';
import { type BlockHeader, TxHash, TxReceipt, TxStatus } from '@aztec/stdlib/tx';

import { EventEmitter } from 'events';

/**
* A mocked implementation of L2BlockSource to be used in tests.
*/
export class MockL2BlockSource implements L2BlockSource {
export class MockL2BlockSource extends EventEmitter implements L2BlockSourceEventEmitter {
protected l2Blocks: L2Block[] = [];

private provenBlockNumber: number = 0;

private log = createLogger('archiver:mock_l2_block_source');

public async createBlocks(numBlocks: number) {
const newBlocks = [];
for (let i = 0; i < numBlocks; i++) {
const blockNum = this.l2Blocks.length + 1;
const block = await L2Block.random(blockNum);
this.l2Blocks.push(block);
newBlocks.push(block);
}

this.log.verbose(`Created ${numBlocks} blocks in the mock L2 block source`);
this.emit(L2BlockSourceEvents.BlocksAdded, { blocks: newBlocks });
}

public addBlocks(blocks: L2Block[]) {
this.l2Blocks.push(...blocks);
this.log.verbose(`Added ${blocks.length} blocks to the mock L2 block source`);

this.emit(L2BlockSourceEvents.BlocksAdded, blocks);
}

public removeBlocks(numBlocks: number) {
this.l2Blocks = this.l2Blocks.slice(0, -numBlocks);
this.log.verbose(`Removed ${numBlocks} blocks from the mock L2 block source`);

this.emit(L2BlockSourceEvents.ChainPruned, {
blockNumber: this.l2Blocks.length,
});
this.log.verbose(
`Removed ${numBlocks} blocks from the mock L2 block source, ${this.l2Blocks.length} blocks remaining, ChainPrune emitted`,
);
}

public setProvenBlockNumber(provenBlockNumber: number) {
const previousProvenBlockNumber = this.provenBlockNumber;
this.provenBlockNumber = provenBlockNumber;

this.emit(L2BlockSourceEvents.ChainProven, {
previousProvenBlockNumber,
provenBlockNumber,
});
}

/**
Expand Down
10 changes: 8 additions & 2 deletions yarn-project/aztec-node/src/aztec-node/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ export class AztecNodeService implements AztecNode, Traceable {
);
}

const archiver = await createArchiver(config, blobSinkClient, { blockUntilSync: true }, telemetry);
const archiver = await createArchiver(config, blobSinkClient, telemetry);

// now create the merkle trees and the world state synchronizer
const worldStateSynchronizer = await createWorldStateSynchronizer(
Expand Down Expand Up @@ -184,8 +184,14 @@ export class AztecNodeService implements AztecNode, Traceable {

const slasherClient = createSlasherClient(config, archiver, telemetry);

await p2pClient.start();
// start both and wait for them to sync from the block source
await Promise.all([p2pClient.start(), worldStateSynchronizer.start(), slasherClient.start()]);

// Start the archiver after the p2p client, as it relies on the archiver's events
await archiver.start(/* blockUntilSync=*/ true);
await worldStateSynchronizer.start();

slasherClient.start();
log.verbose(`All Aztec Node subsystems synced`);

const validatorClient = createValidatorClient(config, { p2pClient, telemetry, dateProvider, epochCache });
Expand Down
1 change: 0 additions & 1 deletion yarn-project/end-to-end/src/e2e_prover/e2e_prover_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,6 @@ export class FullProverTest {
const archiver = await createArchiver(
{ ...this.context.aztecNodeConfig, dataDirectory: undefined },
blobSinkClient,
{ blockUntilSync: true },
);

// The simulated prover node (now shutdown) used private key index 2
Expand Down
6 changes: 2 additions & 4 deletions yarn-project/end-to-end/src/e2e_synching.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
*/
import { getSchnorrAccount } from '@aztec/accounts/schnorr';
import { type InitialAccountData, deployFundedSchnorrAccounts } from '@aztec/accounts/testing';
import { createArchiver } from '@aztec/archiver';
import { createArchiverAndSync } from '@aztec/archiver';
import { AztecNodeService } from '@aztec/aztec-node';
import {
type AccountWalletWithSecretKey,
Expand Down Expand Up @@ -550,9 +550,7 @@ describe('e2e_synching', () => {
const blobSinkClient = createBlobSinkClient({
blobSinkUrl: `http://localhost:${opts.blobSink?.port ?? DEFAULT_BLOB_SINK_PORT}`,
});
const archiver = await createArchiver(opts.config!, blobSinkClient, {
blockUntilSync: true,
});
const archiver = await createArchiverAndSync(opts.config!, blobSinkClient, /*blockUntilSync*/ true);
const pendingBlockNumber = await rollup.read.getPendingBlockNumber();

const worldState = await createWorldStateSynchronizer(opts.config!, archiver);
Expand Down
4 changes: 1 addition & 3 deletions yarn-project/end-to-end/src/fixtures/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -761,9 +761,7 @@ export async function createAndSyncProverNode(
const blobSinkClient = createBlobSinkClient(aztecNodeConfig);
// Creating temp store and archiver for simulated prover node
const archiverConfig = { ...aztecNodeConfig, dataDirectory };
const archiver = await createArchiver(archiverConfig, blobSinkClient, {
blockUntilSync: true,
});
const archiver = await createArchiver(archiverConfig, blobSinkClient);

// Prover node config is for simulated proofs
const proverConfig: ProverNodeConfig = {
Expand Down
6 changes: 3 additions & 3 deletions yarn-project/p2p/src/client/factory.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import { type Logger, createLogger } from '@aztec/foundation/log';
import type { AztecAsyncKVStore } from '@aztec/kv-store';
import type { DataStoreConfig } from '@aztec/kv-store/config';
import { createStore } from '@aztec/kv-store/lmdb-v2';
import type { L2BlockSource } from '@aztec/stdlib/block';
import type { L2BlockSourceEventEmitter } from '@aztec/stdlib/block';
import type { ClientProtocolCircuitVerifier, WorldStateSynchronizer } from '@aztec/stdlib/interfaces/server';
import { P2PClientType } from '@aztec/stdlib/p2p';
import { type TelemetryClient, getTelemetryClient } from '@aztec/telemetry-client';
Expand All @@ -29,7 +29,7 @@ type P2PClientDeps<T extends P2PClientType> = {
export const createP2PClient = async <T extends P2PClientType>(
clientType: T,
_config: P2PConfig & DataStoreConfig,
l2BlockSource: L2BlockSource,
l2BlockSource: L2BlockSourceEventEmitter,
proofVerifier: ClientProtocolCircuitVerifier,
worldStateSynchronizer: WorldStateSynchronizer,
epochCache: EpochCacheInterface,
Expand Down Expand Up @@ -85,5 +85,5 @@ export const createP2PClient = async <T extends P2PClientType>(
logger.verbose('P2P is disabled. Using dummy P2P service');
p2pService = new DummyP2PService();
}
return new P2PClient(clientType, store, l2BlockSource, mempools, p2pService, config, telemetry);
return new P2PClient(clientType, l2BlockSource, mempools, p2pService, config, telemetry);
};
Loading
Loading