Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions yarn-project/archiver/src/archiver-sync.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import { Archiver, type ArchiverEmitter } from './archiver.js';
import type { ArchiverInstrumentation } from './modules/instrumentation.js';
import { ArchiverL1Synchronizer } from './modules/l1_synchronizer.js';
import { KVArchiverDataStore } from './store/kv_archiver_store.js';
import { L2TipsCache } from './store/l2_tips_cache.js';
import { FakeL1State } from './test/fake_l1_state.js';

describe('Archiver Sync', () => {
Expand Down Expand Up @@ -116,6 +117,9 @@ describe('Archiver Sync', () => {
// Create event emitter shared by archiver and synchronizer
const events = new EventEmitter() as ArchiverEmitter;

// Create L2 tips cache shared by archiver and synchronizer
const l2TipsCache = new L2TipsCache(archiverStore.blockStore);

// Create the L1 synchronizer
synchronizer = new ArchiverL1Synchronizer(
publicClient,
Expand All @@ -132,6 +136,7 @@ describe('Archiver Sync', () => {
l1Constants,
events,
instrumentation.tracer,
l2TipsCache,
syncLogger,
);

Expand All @@ -147,6 +152,7 @@ describe('Archiver Sync', () => {
l1Constants,
synchronizer,
events,
l2TipsCache,
);
});

Expand Down
120 changes: 10 additions & 110 deletions yarn-project/archiver/src/archiver.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import type { BlobClientInterface } from '@aztec/blob-client/client';
import { GENESIS_BLOCK_HEADER_HASH, INITIAL_L2_BLOCK_NUM } from '@aztec/constants';
import { EpochCache } from '@aztec/epoch-cache';
import { BlockTagTooOldError, RollupContract } from '@aztec/ethereum/contracts';
import type { L1ContractAddresses } from '@aztec/ethereum/l1-contract-addresses';
Expand All @@ -15,8 +14,6 @@ import { RunningPromise, makeLoggingErrorHandler } from '@aztec/foundation/runni
import { DateProvider } from '@aztec/foundation/timer';
import {
type ArchiverEmitter,
type CheckpointId,
GENESIS_CHECKPOINT_HEADER_HASH,
L2Block,
type L2BlockSink,
type L2Tips,
Expand All @@ -41,6 +38,7 @@ import { ArchiverDataStoreUpdater } from './modules/data_store_updater.js';
import type { ArchiverInstrumentation } from './modules/instrumentation.js';
import type { ArchiverL1Synchronizer } from './modules/l1_synchronizer.js';
import type { KVArchiverDataStore } from './store/kv_archiver_store.js';
import { L2TipsCache } from './store/l2_tips_cache.js';

/** Export ArchiverEmitter for use in factory and tests. */
export type { ArchiverEmitter };
Expand Down Expand Up @@ -83,6 +81,9 @@ export class Archiver extends ArchiverDataSourceBase implements L2BlockSink, Tra
/** Helper to handle updates to the store */
private readonly updater: ArchiverDataStoreUpdater;

/** In-memory cache for L2 chain tips. */
private readonly l2TipsCache: L2TipsCache;

public readonly tracer: Tracer;

/**
Expand Down Expand Up @@ -122,6 +123,7 @@ export class Archiver extends ArchiverDataSourceBase implements L2BlockSink, Tra
protected override readonly l1Constants: L1RollupConstants & { l1StartBlockHash: Buffer32; genesisArchiveRoot: Fr },
synchronizer: ArchiverL1Synchronizer,
events: ArchiverEmitter,
l2TipsCache?: L2TipsCache,
private readonly log: Logger = createLogger('archiver'),
) {
super(dataStore, l1Constants);
Expand All @@ -130,7 +132,8 @@ export class Archiver extends ArchiverDataSourceBase implements L2BlockSink, Tra
this.initialSyncPromise = promiseWithResolvers();
this.synchronizer = synchronizer;
this.events = events;
this.updater = new ArchiverDataStoreUpdater(this.dataStore);
this.l2TipsCache = l2TipsCache ?? new L2TipsCache(this.dataStore.blockStore);
this.updater = new ArchiverDataStoreUpdater(this.dataStore, this.l2TipsCache);

// Running promise starts with a small interval inbetween runs, so all iterations needed for the initial sync
// are done as fast as possible. This then gets updated once the initial sync completes.
Expand Down Expand Up @@ -391,111 +394,8 @@ export class Archiver extends ArchiverDataSourceBase implements L2BlockSink, Tra
return true;
}

public async getL2Tips(): Promise<L2Tips> {
const [latestBlockNumber, provenBlockNumber, checkpointedBlockNumber, finalizedBlockNumber] = await Promise.all([
this.getBlockNumber(),
this.getProvenBlockNumber(),
this.getCheckpointedL2BlockNumber(),
this.getFinalizedL2BlockNumber(),
] as const);

const beforeInitialblockNumber = BlockNumber(INITIAL_L2_BLOCK_NUM - 1);

// Get the latest block header and checkpointed blocks for proven, finalised and checkpointed blocks
const [latestBlockHeader, provenCheckpointedBlock, finalizedCheckpointedBlock, checkpointedBlock] =
await Promise.all([
latestBlockNumber > beforeInitialblockNumber ? this.getBlockHeader(latestBlockNumber) : undefined,
provenBlockNumber > beforeInitialblockNumber ? this.getCheckpointedBlock(provenBlockNumber) : undefined,
finalizedBlockNumber > beforeInitialblockNumber ? this.getCheckpointedBlock(finalizedBlockNumber) : undefined,
checkpointedBlockNumber > beforeInitialblockNumber
? this.getCheckpointedBlock(checkpointedBlockNumber)
: undefined,
] as const);

if (latestBlockNumber > beforeInitialblockNumber && !latestBlockHeader) {
throw new Error(`Failed to retrieve latest block header for block ${latestBlockNumber}`);
}

// Checkpointed blocks must exist for proven, finalized and checkpointed tips if they are beyond the initial block number.
if (checkpointedBlockNumber > beforeInitialblockNumber && !checkpointedBlock?.block.header) {
throw new Error(
`Failed to retrieve checkpointed block header for block ${checkpointedBlockNumber} (latest block is ${latestBlockNumber})`,
);
}

if (provenBlockNumber > beforeInitialblockNumber && !provenCheckpointedBlock?.block.header) {
throw new Error(
`Failed to retrieve proven checkpointed for block ${provenBlockNumber} (latest block is ${latestBlockNumber})`,
);
}

if (finalizedBlockNumber > beforeInitialblockNumber && !finalizedCheckpointedBlock?.block.header) {
throw new Error(
`Failed to retrieve finalized block header for block ${finalizedBlockNumber} (latest block is ${latestBlockNumber})`,
);
}

const latestBlockHeaderHash = (await latestBlockHeader?.hash()) ?? GENESIS_BLOCK_HEADER_HASH;
const provenBlockHeaderHash = (await provenCheckpointedBlock?.block.header?.hash()) ?? GENESIS_BLOCK_HEADER_HASH;
const finalizedBlockHeaderHash =
(await finalizedCheckpointedBlock?.block.header?.hash()) ?? GENESIS_BLOCK_HEADER_HASH;
const checkpointedBlockHeaderHash = (await checkpointedBlock?.block.header?.hash()) ?? GENESIS_BLOCK_HEADER_HASH;

// Now attempt to retrieve checkpoints for proven, finalised and checkpointed blocks
const [[provenBlockCheckpoint], [finalizedBlockCheckpoint], [checkpointedBlockCheckpoint]] = await Promise.all([
provenCheckpointedBlock !== undefined
? await this.getCheckpoints(provenCheckpointedBlock?.checkpointNumber, 1)
: [undefined],
finalizedCheckpointedBlock !== undefined
? await this.getCheckpoints(finalizedCheckpointedBlock?.checkpointNumber, 1)
: [undefined],
checkpointedBlock !== undefined ? await this.getCheckpoints(checkpointedBlock?.checkpointNumber, 1) : [undefined],
]);

const initialcheckpointId: CheckpointId = {
number: CheckpointNumber.ZERO,
hash: GENESIS_CHECKPOINT_HEADER_HASH.toString(),
};

const makeCheckpointId = (checkpoint: PublishedCheckpoint | undefined) => {
if (checkpoint === undefined) {
return initialcheckpointId;
}
return {
number: checkpoint.checkpoint.number,
hash: checkpoint.checkpoint.hash().toString(),
};
};

const l2Tips: L2Tips = {
proposed: {
number: latestBlockNumber,
hash: latestBlockHeaderHash.toString(),
},
proven: {
block: {
number: provenBlockNumber,
hash: provenBlockHeaderHash.toString(),
},
checkpoint: makeCheckpointId(provenBlockCheckpoint),
},
finalized: {
block: {
number: finalizedBlockNumber,
hash: finalizedBlockHeaderHash.toString(),
},
checkpoint: makeCheckpointId(finalizedBlockCheckpoint),
},
checkpointed: {
block: {
number: checkpointedBlockNumber,
hash: checkpointedBlockHeaderHash.toString(),
},
checkpoint: makeCheckpointId(checkpointedBlockCheckpoint),
},
};

return l2Tips;
public getL2Tips(): Promise<L2Tips> {
return this.l2TipsCache.getL2Tips();
}

public async rollbackTo(targetL2BlockNumber: BlockNumber): Promise<void> {
Expand Down Expand Up @@ -532,7 +432,7 @@ export class Archiver extends ArchiverDataSourceBase implements L2BlockSink, Tra
await this.store.setMessageSynchedL1Block({ l1BlockNumber: targetL1BlockNumber, l1BlockHash: targetL1BlockHash });
if (targetL2BlockNumber < currentProvenBlock) {
this.log.info(`Clearing proven L2 block number`);
await this.store.setProvenCheckpointNumber(CheckpointNumber.ZERO);
await this.updater.setProvenCheckpointNumber(CheckpointNumber.ZERO);
}
// TODO(palla/reorg): Set the finalized block when we add support for it.
// if (targetL2BlockNumber < currentFinalizedBlock) {
Expand Down
7 changes: 7 additions & 0 deletions yarn-project/archiver/src/factory.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import { type ArchiverConfig, mapArchiverConfig } from './config.js';
import { ArchiverInstrumentation } from './modules/instrumentation.js';
import { ArchiverL1Synchronizer } from './modules/l1_synchronizer.js';
import { ARCHIVER_DB_VERSION, KVArchiverDataStore } from './store/kv_archiver_store.js';
import { L2TipsCache } from './store/l2_tips_cache.js';

export const ARCHIVER_STORE_NAME = 'archiver';

Expand Down Expand Up @@ -128,6 +129,9 @@ export async function createArchiver(
// Create the event emitter that will be shared by archiver and synchronizer
const events = new EventEmitter() as ArchiverEmitter;

// Create L2 tips cache shared by archiver and synchronizer
const l2TipsCache = new L2TipsCache(archiverStore.blockStore);

// Create the L1 synchronizer
const synchronizer = new ArchiverL1Synchronizer(
publicClient,
Expand All @@ -144,6 +148,8 @@ export async function createArchiver(
l1Constants,
events,
instrumentation.tracer,
l2TipsCache,
undefined, // log (use default)
);

const archiver = new Archiver(
Expand All @@ -158,6 +164,7 @@ export async function createArchiver(
l1Constants,
synchronizer,
events,
l2TipsCache,
);

await archiver.start(opts.blockUntilSync);
Expand Down
1 change: 1 addition & 0 deletions yarn-project/archiver/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,6 @@ export * from './config.js';
export { type L1PublishedData } from './structs/published.js';
export { KVArchiverDataStore, ARCHIVER_DB_VERSION } from './store/kv_archiver_store.js';
export { ContractInstanceStore } from './store/contract_instance_store.js';
export { L2TipsCache } from './store/l2_tips_cache.js';

export { retrieveCheckpointsFromRollup, retrieveL2ProofVerifiedEvents } from './l1/data_retrieval.js';
61 changes: 43 additions & 18 deletions yarn-project/archiver/src/modules/data_store_updater.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import type { UInt64 } from '@aztec/stdlib/types';
import groupBy from 'lodash.groupby';

import type { KVArchiverDataStore } from '../store/kv_archiver_store.js';
import type { L2TipsCache } from '../store/l2_tips_cache.js';

/** Operation type for contract data updates. */
enum Operation {
Expand All @@ -44,7 +45,10 @@ type ReconcileCheckpointsResult = {
export class ArchiverDataStoreUpdater {
private readonly log = createLogger('archiver:store_updater');

constructor(private store: KVArchiverDataStore) {}
constructor(
private store: KVArchiverDataStore,
private l2TipsCache?: L2TipsCache,
) {}

/**
* Adds proposed blocks to the store with contract class/instance extraction from logs.
Expand All @@ -56,11 +60,11 @@ export class ArchiverDataStoreUpdater {
* @param pendingChainValidationStatus - Optional validation status to set.
* @returns True if the operation is successful.
*/
public addProposedBlocks(
public async addProposedBlocks(
blocks: L2Block[],
pendingChainValidationStatus?: ValidateCheckpointResult,
): Promise<boolean> {
return this.store.transactionAsync(async () => {
const result = await this.store.transactionAsync(async () => {
await this.store.addProposedBlocks(blocks);

const opResults = await Promise.all([
Expand All @@ -72,8 +76,10 @@ export class ArchiverDataStoreUpdater {
...blocks.map(block => this.addContractDataToDb(block)),
]);

await this.l2TipsCache?.refresh();
return opResults.every(Boolean);
});
return result;
}

/**
Expand All @@ -87,11 +93,11 @@ export class ArchiverDataStoreUpdater {
* @param pendingChainValidationStatus - Optional validation status to set.
* @returns Result with information about any pruned blocks.
*/
public addCheckpoints(
public async addCheckpoints(
checkpoints: PublishedCheckpoint[],
pendingChainValidationStatus?: ValidateCheckpointResult,
): Promise<ReconcileCheckpointsResult> {
return this.store.transactionAsync(async () => {
const result = await this.store.transactionAsync(async () => {
// Before adding checkpoints, check for conflicts with local blocks if any
const { prunedBlocks, lastAlreadyInsertedBlockNumber } = await this.pruneMismatchingLocalBlocks(checkpoints);

Expand All @@ -111,8 +117,10 @@ export class ArchiverDataStoreUpdater {
...newBlocks.map(block => this.addContractDataToDb(block)),
]);

await this.l2TipsCache?.refresh();
return { prunedBlocks, lastAlreadyInsertedBlockNumber };
});
return result;
}

/**
Expand Down Expand Up @@ -197,8 +205,8 @@ export class ArchiverDataStoreUpdater {
* @returns The removed blocks.
* @throws Error if any block to be removed is checkpointed.
*/
public removeUncheckpointedBlocksAfter(blockNumber: BlockNumber): Promise<L2Block[]> {
return this.store.transactionAsync(async () => {
public async removeUncheckpointedBlocksAfter(blockNumber: BlockNumber): Promise<L2Block[]> {
const result = await this.store.transactionAsync(async () => {
// Verify we're only removing uncheckpointed blocks
const lastCheckpointedBlockNumber = await this.store.getCheckpointedL2BlockNumber();
if (blockNumber < lastCheckpointedBlockNumber) {
Expand All @@ -207,8 +215,11 @@ export class ArchiverDataStoreUpdater {
);
}

return await this.removeBlocksAfter(blockNumber);
const result = await this.removeBlocksAfter(blockNumber);
await this.l2TipsCache?.refresh();
return result;
});
return result;
}

/**
Expand Down Expand Up @@ -238,17 +249,31 @@ export class ArchiverDataStoreUpdater {
* @returns True if the operation is successful.
*/
public async removeCheckpointsAfter(checkpointNumber: CheckpointNumber): Promise<boolean> {
const { blocksRemoved = [] } = await this.store.removeCheckpointsAfter(checkpointNumber);

const opResults = await Promise.all([
// Prune rolls back to the last proven block, which is by definition valid
this.store.setPendingChainValidationStatus({ valid: true }),
// Remove contract data for all blocks being removed
...blocksRemoved.map(block => this.removeContractDataFromDb(block)),
this.store.deleteLogs(blocksRemoved),
]);
return await this.store.transactionAsync(async () => {
const { blocksRemoved = [] } = await this.store.removeCheckpointsAfter(checkpointNumber);

const opResults = await Promise.all([
// Prune rolls back to the last proven block, which is by definition valid
this.store.setPendingChainValidationStatus({ valid: true }),
// Remove contract data for all blocks being removed
...blocksRemoved.map(block => this.removeContractDataFromDb(block)),
this.store.deleteLogs(blocksRemoved),
]);

return opResults.every(Boolean);
await this.l2TipsCache?.refresh();
return opResults.every(Boolean);
});
}

/**
* Updates the proven checkpoint number and refreshes the L2 tips cache.
* @param checkpointNumber - The checkpoint number to set as proven.
*/
public async setProvenCheckpointNumber(checkpointNumber: CheckpointNumber): Promise<void> {
await this.store.transactionAsync(async () => {
await this.store.setProvenCheckpointNumber(checkpointNumber);
await this.l2TipsCache?.refresh();
});
}

/** Extracts and stores contract data from a single block. */
Expand Down
Loading
Loading