Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
150 changes: 125 additions & 25 deletions yarn-project/archiver/src/archiver/archive_source_base.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,17 @@ import {
} from '@aztec/stdlib/block';
import { Checkpoint, PublishedCheckpoint } from '@aztec/stdlib/checkpoint';
import type { ContractClassPublic, ContractDataSource, ContractInstanceWithAddress } from '@aztec/stdlib/contract';
import type { L1RollupConstants } from '@aztec/stdlib/epoch-helpers';
import { type L1RollupConstants, getSlotRangeForEpoch } from '@aztec/stdlib/epoch-helpers';
import type { GetContractClassLogsResponse, GetPublicLogsResponse } from '@aztec/stdlib/interfaces/client';
import type { L2LogsSource } from '@aztec/stdlib/interfaces/server';
import type { LogFilter, SiloedTag, Tag, TxScopedL2Log } from '@aztec/stdlib/logs';
import type { L1ToL2MessageSource } from '@aztec/stdlib/messaging';
import type { CheckpointHeader } from '@aztec/stdlib/rollup';
import type { BlockHeader, IndexedTxEffect, TxHash, TxReceipt } from '@aztec/stdlib/tx';
import type { UInt64 } from '@aztec/stdlib/types';

import type { ArchiveSource } from './archiver.js';
import type { CheckpointData } from './kv_archiver_store/block_store.js';
import type { KVArchiverDataStore } from './kv_archiver_store/kv_archiver_store.js';
import type { ValidateCheckpointResult } from './validation.js';

Expand All @@ -34,13 +36,10 @@ import type { ValidateCheckpointResult } from './validation.js';
export abstract class ArchiveSourceBase
implements ArchiveSource, L2LogsSource, ContractDataSource, L1ToL2MessageSource
{
protected readonly store: KVArchiverDataStore;

constructor(store: KVArchiverDataStore) {
this.store = store;
}

// Abstract methods that require L1 dependencies
constructor(
protected readonly store: KVArchiverDataStore,
protected readonly l1Constants?: L1RollupConstants,
) {}

abstract getRollupAddress(): Promise<EthAddress>;

Expand All @@ -58,17 +57,21 @@ export abstract class ArchiveSourceBase

abstract getL2EpochNumber(): Promise<EpochNumber | undefined>;

abstract getCheckpointsForEpoch(epochNumber: EpochNumber): Promise<Checkpoint[]>;

abstract getBlocksForEpoch(epochNumber: EpochNumber): Promise<L2Block[]>;

abstract getBlockHeadersForEpoch(epochNumber: EpochNumber): Promise<BlockHeader[]>;

abstract isEpochComplete(epochNumber: EpochNumber): Promise<boolean>;

abstract syncImmediate(): Promise<void>;

// Store-delegating methods
public getCheckpointNumber(): Promise<CheckpointNumber> {
return this.store.getSynchedCheckpointNumber();
}

public getSynchedCheckpointNumber(): Promise<CheckpointNumber> {
return this.store.getSynchedCheckpointNumber();
}

public getProvenCheckpointNumber(): Promise<CheckpointNumber> {
return this.store.getProvenCheckpointNumber();
}

public getBlockNumber(): Promise<BlockNumber> {
return this.store.getLatestBlockNumber();
Expand All @@ -91,6 +94,32 @@ export abstract class ArchiveSourceBase
return this.store.getCheckpointedBlock(number);
}

public getCheckpointedBlockNumber(): Promise<BlockNumber> {
return this.store.getCheckpointedL2BlockNumber();
}

public async getCheckpointHeader(number: CheckpointNumber | 'latest'): Promise<CheckpointHeader | undefined> {
if (number === 'latest') {
number = await this.store.getSynchedCheckpointNumber();
}
if (number === 0) {
return undefined;
}
const checkpoint = await this.store.getCheckpointData(number);
if (!checkpoint) {
return undefined;
}
return checkpoint.header;
}

public async getLastBlockNumberInCheckpoint(checkpointNumber: CheckpointNumber): Promise<BlockNumber | undefined> {
const checkpointData = await this.store.getCheckpointData(checkpointNumber);
if (!checkpointData) {
return undefined;
}
return BlockNumber(checkpointData.startBlock + checkpointData.numBlocks - 1);
}

public async getCheckpointedBlocks(
from: BlockNumber,
limit: number,
Expand Down Expand Up @@ -151,8 +180,6 @@ export abstract class ArchiveSourceBase
return blocks;
}

// L2LogsSource methods

public getPrivateLogsByTags(tags: SiloedTag[]): Promise<TxScopedL2Log[][]> {
return this.store.getPrivateLogsByTags(tags);
}
Expand All @@ -169,8 +196,6 @@ export abstract class ArchiveSourceBase
return this.store.getContractClassLogs(filter);
}

// ContractDataSource methods

public getContractClass(id: Fr): Promise<ContractClassPublic | undefined> {
return this.store.getContractClass(id);
}
Expand Down Expand Up @@ -207,8 +232,6 @@ export abstract class ArchiveSourceBase
return this.store.registerContractFunctionSignatures(signatures);
}

// L1ToL2MessageSource methods

public getL1ToL2Messages(checkpointNumber: CheckpointNumber): Promise<Fr[]> {
return this.store.getL1ToL2Messages(checkpointNumber);
}
Expand All @@ -217,8 +240,6 @@ export abstract class ArchiveSourceBase
return this.store.getL1ToL2MessageIndex(l1ToL2Message);
}

// Published checkpoint methods

public async getPublishedCheckpoints(
checkpointNumber: CheckpointNumber,
limit: number,
Expand Down Expand Up @@ -248,6 +269,87 @@ export abstract class ArchiveSourceBase
return fullCheckpoints;
}

public async getBlocksForEpoch(epochNumber: EpochNumber): Promise<L2Block[]> {
if (!this.l1Constants) {
throw new Error('L1 constants not set');
}

const [start, end] = getSlotRangeForEpoch(epochNumber, this.l1Constants);
const blocks: L2Block[] = [];

// Walk the list of checkpoints backwards and filter by slots matching the requested epoch.
// We'll typically ask for checkpoints for a very recent epoch, so we shouldn't need an index here.
let checkpoint = await this.store.getCheckpointData(await this.store.getSynchedCheckpointNumber());
const slot = (b: CheckpointData) => b.header.slotNumber;
while (checkpoint && slot(checkpoint) >= start) {
if (slot(checkpoint) <= end) {
// push the blocks on backwards
const endBlock = checkpoint.startBlock + checkpoint.numBlocks - 1;
for (let i = endBlock; i >= checkpoint.startBlock; i--) {
const block = await this.getBlock(BlockNumber(i));
if (block) {
blocks.push(block);
}
}
}
checkpoint = await this.store.getCheckpointData(CheckpointNumber(checkpoint.checkpointNumber - 1));
}

return blocks.reverse();
}

public async getBlockHeadersForEpoch(epochNumber: EpochNumber): Promise<BlockHeader[]> {
if (!this.l1Constants) {
throw new Error('L1 constants not set');
}

const [start, end] = getSlotRangeForEpoch(epochNumber, this.l1Constants);
const blocks: BlockHeader[] = [];

// Walk the list of checkpoints backwards and filter by slots matching the requested epoch.
// We'll typically ask for checkpoints for a very recent epoch, so we shouldn't need an index here.
let checkpoint = await this.store.getCheckpointData(await this.store.getSynchedCheckpointNumber());
const slot = (b: CheckpointData) => b.header.slotNumber;
while (checkpoint && slot(checkpoint) >= start) {
if (slot(checkpoint) <= end) {
// push the blocks on backwards
const endBlock = checkpoint.startBlock + checkpoint.numBlocks - 1;
for (let i = endBlock; i >= checkpoint.startBlock; i--) {
const block = await this.getBlockHeader(BlockNumber(i));
if (block) {
blocks.push(block);
}
}
}
checkpoint = await this.store.getCheckpointData(CheckpointNumber(checkpoint.checkpointNumber - 1));
}
return blocks.reverse();
}

public async getCheckpointsForEpoch(epochNumber: EpochNumber): Promise<Checkpoint[]> {
if (!this.l1Constants) {
throw new Error('L1 constants not set');
}

const [start, end] = getSlotRangeForEpoch(epochNumber, this.l1Constants);
const checkpoints: Checkpoint[] = [];

// Walk the list of checkpoints backwards and filter by slots matching the requested epoch.
// We'll typically ask for checkpoints for a very recent epoch, so we shouldn't need an index here.
let checkpointData = await this.store.getCheckpointData(await this.store.getSynchedCheckpointNumber());
const slot = (b: CheckpointData) => b.header.slotNumber;
while (checkpointData && slot(checkpointData) >= start) {
if (slot(checkpointData) <= end) {
// push the checkpoints on backwards
const [checkpoint] = await this.getPublishedCheckpoints(checkpointData.checkpointNumber, 1);
checkpoints.push(checkpoint.checkpoint);
}
checkpointData = await this.store.getCheckpointData(CheckpointNumber(checkpointData.checkpointNumber - 1));
}

return checkpoints.reverse();
}

public async getPublishedBlocks(from: BlockNumber, limit: number, proven?: boolean): Promise<PublishedL2Block[]> {
const checkpoints = await this.store.getRangeOfCheckpoints(CheckpointNumber(from), limit);
const provenCheckpointNumber = await this.store.getProvenCheckpointNumber();
Expand Down Expand Up @@ -280,8 +382,6 @@ export abstract class ArchiveSourceBase
return olbBlocks;
}

// Legacy APIs

public async getBlock(number: BlockNumber): Promise<L2Block | undefined> {
// If the number provided is -ve, then return the latest block.
if (number < 0) {
Expand Down
52 changes: 40 additions & 12 deletions yarn-project/archiver/src/archiver/archiver.sync.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,12 @@ import { getTelemetryClient } from '@aztec/telemetry-client';

import { jest } from '@jest/globals';
import assert from 'assert';
import { EventEmitter } from 'events';
import { type MockProxy, mock } from 'jest-mock-extended';
import type { GetBlockReturnType } from 'viem';

import { Archiver } from './archiver.js';
import { Archiver, type ArchiverEmitter } from './archiver.js';
import { ArchiverL1Synchronizer } from './archiver_l1_synchronizer.js';
import type { ArchiverInstrumentation } from './instrumentation.js';
import { KVArchiverDataStore } from './kv_archiver_store/kv_archiver_store.js';
import { FakeL1State, type FakeL1StateConfig } from './test/fake_l1_state.js';
Expand All @@ -49,14 +51,17 @@ describe('Archiver Sync', () => {
let archiverStore: KVArchiverDataStore;
let l1Constants: L1RollupConstants & { l1StartBlockHash: Buffer32; genesisArchiveRoot: Fr };
let archiver: Archiver;
let synchronizer: ArchiverL1Synchronizer;
let logger: Logger;
let syncLogger: Logger;
let now: number;

const GENESIS_ROOT = new Fr(GENESIS_ARCHIVE_ROOT);
const ETHEREUM_SLOT_DURATION = BigInt(DefaultL1ContractsConfig.ethereumSlotDuration);

beforeEach(async () => {
logger = createLogger('archiver:sync:test');
syncLogger = createLogger('archiver:l1-sync:test');
now = Math.floor(Date.now() / 1000);
dateProvider = new TestDateProvider();

Expand Down Expand Up @@ -109,24 +114,47 @@ describe('Archiver Sync', () => {
rollupContract = fake.createMockRollupContract(publicClient);
inboxContract = fake.createMockInboxContract(publicClient);

archiver = new Archiver(
const config = {
pollingIntervalMs: 1000,
batchSize: 1000,
maxAllowedEthClientDriftSeconds: 300,
ethereumAllowNoDebugHosts: true,
};

// Create event emitter shared by archiver and synchronizer
const events = new EventEmitter() as ArchiverEmitter;

// Create the L1 synchronizer
synchronizer = new ArchiverL1Synchronizer(
publicClient,
publicClient,
publicClient, // debugClient same as publicClient for tests
rollupContract,
inboxContract,
contractAddresses,
archiverStore,
{
pollingIntervalMs: 1000,
batchSize: 1000,
maxAllowedEthClientDriftSeconds: 300,
ethereumAllowNoDebugHosts: true,
},
config,
blobClient,
epochCache,
dateProvider,
instrumentation,
l1Constants,
events,
instrumentation.tracer,
syncLogger,
);

archiver = new Archiver(
publicClient,
publicClient,
rollupContract,
contractAddresses,
archiverStore,
config,
blobClient,
instrumentation,
l1Constants,
synchronizer,
events,
);
});

Expand Down Expand Up @@ -217,7 +245,7 @@ describe('Archiver Sync', () => {
}, 30_000);

it('ignores checkpoint 3 because it has been pruned', async () => {
const loggerSpy = jest.spyOn((archiver as any).log, 'warn');
const loggerSpy = jest.spyOn(syncLogger, 'warn');

expect(await archiver.getCheckpointNumber()).toEqual(CheckpointNumber(0));

Expand Down Expand Up @@ -311,7 +339,7 @@ describe('Archiver Sync', () => {
}, 10_000);

it('skip event search if no changes found', async () => {
const loggerSpy = jest.spyOn((archiver as any).log, 'debug');
const loggerSpy = jest.spyOn(syncLogger, 'debug');

expect(await archiver.getCheckpointNumber()).toEqual(CheckpointNumber(0));

Expand Down Expand Up @@ -755,7 +783,7 @@ describe('Archiver Sync', () => {

describe('reorg handling', () => {
it('handles L2 reorg', async () => {
const loggerSpy = jest.spyOn((archiver as any).log, 'debug');
const loggerSpy = jest.spyOn(syncLogger, 'debug');

expect(await archiver.getCheckpointNumber()).toEqual(CheckpointNumber(0));

Expand Down
Loading
Loading