Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions yarn-project/archiver/src/archiver-store.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import { EventEmitter } from 'events';
import { type MockProxy, mock } from 'jest-mock-extended';

import { Archiver, type ArchiverEmitter } from './archiver.js';
import { InitialBlockNumberNotSequentialError } from './errors.js';
import { BlockNumberNotSequentialError } from './errors.js';
import type { ArchiverInstrumentation } from './modules/instrumentation.js';
import { ArchiverL1Synchronizer } from './modules/l1_synchronizer.js';
import { KVArchiverDataStore } from './store/kv_archiver_store.js';
Expand Down Expand Up @@ -265,7 +265,7 @@ describe('Archiver Store', () => {
await archiver.addBlock(block1);

// Block 3 should be rejected because block 2 is missing
await expect(archiver.addBlock(block3)).rejects.toThrow(InitialBlockNumberNotSequentialError);
await expect(archiver.addBlock(block3)).rejects.toThrow(BlockNumberNotSequentialError);
});

it('rejects blocks with duplicate block numbers', async () => {
Expand All @@ -276,7 +276,7 @@ describe('Archiver Store', () => {
await archiver.addBlock(block2);

// Adding block 2 again shoud be rejected
await expect(archiver.addBlock(block2)).rejects.toThrow(InitialBlockNumberNotSequentialError);
await expect(archiver.addBlock(block2)).rejects.toThrow(BlockNumberNotSequentialError);
});

it('rejects first block if not starting from block 1', async () => {
Expand Down
10 changes: 7 additions & 3 deletions yarn-project/archiver/src/archiver-sync.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1327,7 +1327,7 @@ describe('Archiver Sync', () => {
expect(await archiver.getSynchedCheckpointNumber()).toEqual(CheckpointNumber(1));
const blockAlreadySyncedFromCheckpoint = cp1.blocks[cp1.blocks.length - 1];

// Now try and add one of the blocks via the addProposedBlocks method. It should throw
// Now try and add one of the blocks via the addProposedBlock method. It should throw
await expect(archiver.addBlock(blockAlreadySyncedFromCheckpoint)).rejects.toThrow();
}, 10_000);

Expand Down Expand Up @@ -1428,8 +1428,12 @@ describe('Archiver Sync', () => {
const { checkpoint: cp3 } = await fake.addCheckpoint(CheckpointNumber(3), { l1BlockNumber: 5010n });

// Add blocks from BOTH checkpoints locally (matching the L1 checkpoints)
await archiverStore.addProposedBlocks(cp2.blocks, { force: true });
await archiverStore.addProposedBlocks(cp3.blocks, { force: true });
for (const block of cp2.blocks) {
await archiverStore.addProposedBlock(block, { force: true });
}
for (const block of cp3.blocks) {
await archiverStore.addProposedBlock(block, { force: true });
}

// Verify all blocks are visible locally
const lastBlockInCheckpoint3 = cp3.blocks[cp3.blocks.length - 1].number;
Expand Down
9 changes: 7 additions & 2 deletions yarn-project/archiver/src/archiver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import {
import { type TelemetryClient, type Traceable, type Tracer, trackSpan } from '@aztec/telemetry-client';

import { type ArchiverConfig, mapArchiverConfig } from './config.js';
import { NoBlobBodiesFoundError } from './errors.js';
import { BlockAlreadyCheckpointedError, NoBlobBodiesFoundError } from './errors.js';
import { validateAndLogTraceAvailability } from './l1/validate_trace.js';
import { ArchiverDataSourceBase } from './modules/data_source_base.js';
import { ArchiverDataStoreUpdater } from './modules/data_store_updater.js';
Expand Down Expand Up @@ -242,10 +242,15 @@ export class Archiver extends ArchiverDataSourceBase implements L2BlockSink, Tra
}

try {
await this.updater.addProposedBlocks([block]);
await this.updater.addProposedBlock(block);
this.log.debug(`Added block ${block.number} to store`);
resolve();
} catch (err: any) {
if (err instanceof BlockAlreadyCheckpointedError) {
this.log.debug(`Proposed block ${block.number} matches already checkpointed block, ignoring late proposal`);
resolve();
continue;
}
this.log.error(`Failed to add block ${block.number} to store: ${err.message}`);
reject(err);
}
Expand Down
34 changes: 10 additions & 24 deletions yarn-project/archiver/src/errors.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,24 +6,9 @@ export class NoBlobBodiesFoundError extends Error {
}
}

export class InitialBlockNumberNotSequentialError extends Error {
constructor(
public readonly newBlockNumber: number,
public readonly previousBlockNumber: number | undefined,
) {
super(
`Cannot insert new block ${newBlockNumber} given previous block number in store is ${
previousBlockNumber ?? 'undefined'
}`,
);
}
}

export class BlockNumberNotSequentialError extends Error {
constructor(newBlockNumber: number, previous: number | undefined) {
super(
`Cannot insert new block ${newBlockNumber} given previous block number in batch is ${previous ?? 'undefined'}`,
);
super(`Cannot insert new block ${newBlockNumber} given previous block number is ${previous ?? 'undefined'}`);
}
}

Expand All @@ -48,14 +33,6 @@ export class CheckpointNumberNotSequentialError extends Error {
}
}

export class CheckpointNumberNotConsistentError extends Error {
constructor(newCheckpointNumber: number, previous: number | undefined) {
super(
`Cannot insert block for new checkpoint ${newCheckpointNumber} given previous block was checkpoint ${previous ?? 'undefined'}`,
);
}
}

export class BlockIndexNotSequentialError extends Error {
constructor(newBlockIndex: number, previousBlockIndex: number | undefined) {
super(
Expand Down Expand Up @@ -89,6 +66,15 @@ export class BlockNotFoundError extends Error {
}
}

/** Thrown when a proposed block matches a block that was already checkpointed. This is expected for late proposals. */
export class BlockAlreadyCheckpointedError extends Error {
constructor(public readonly blockNumber: number) {
super(`Block ${blockNumber} has already been checkpointed with the same content`);
this.name = 'BlockAlreadyCheckpointedError';
}
}

/** Thrown when a proposed block conflicts with an already checkpointed block (different content). */
export class CannotOverwriteCheckpointedBlockError extends Error {
constructor(
public readonly blockNumber: number,
Expand Down
12 changes: 6 additions & 6 deletions yarn-project/archiver/src/modules/data_store_updater.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ describe('ArchiverDataStoreUpdater', () => {
});

describe('contract data', () => {
it('stores contract class and instance data when blocks are added via addProposedBlocks', async () => {
it('stores contract class and instance data when blocks are added via addProposedBlock', async () => {
// Create block with contract class and instance logs
const block = await L2Block.random(BlockNumber(1), {
checkpointNumber: CheckpointNumber(1),
Expand All @@ -66,7 +66,7 @@ describe('ArchiverDataStoreUpdater', () => {
block.body.txEffects[0].contractClassLogs = [contractClassLog];
block.body.txEffects[0].privateLogs = [PrivateLog.fromBuffer(getSampleContractInstancePublishedEventPayload())];

await updater.addProposedBlocks([block]);
await updater.addProposedBlock(block);

// Verify contract class was stored
const retrievedClass = await store.getContractClass(contractClassId);
Expand All @@ -92,7 +92,7 @@ describe('ArchiverDataStoreUpdater', () => {
PrivateLog.fromBuffer(getSampleContractInstancePublishedEventPayload()),
];

await updater.addProposedBlocks([localBlock]);
await updater.addProposedBlock(localBlock);

// Verify contract data was stored
const timestamp = localBlock.header.globalVariables.timestamp + 1n;
Expand Down Expand Up @@ -155,7 +155,7 @@ describe('ArchiverDataStoreUpdater', () => {
slotNumber: SlotNumber(100),
});

await updater.addProposedBlocks([block]);
await updater.addProposedBlock(block);

// Create checkpoint with the SAME block (same archive root)
const publishedCheckpoint = makePublishedCheckpoint(makeCheckpoint([block]), 10);
Expand All @@ -175,7 +175,7 @@ describe('ArchiverDataStoreUpdater', () => {
indexWithinCheckpoint: IndexWithinCheckpoint(0),
slotNumber: SlotNumber(100),
});
await updater.addProposedBlocks([localBlock]);
await updater.addProposedBlock(localBlock);
const publicLogsBefore = await store.getPublicLogs({});
expect(publicLogsBefore.logs.map(l => l.log)).toEqual(localBlock.body.txEffects.flatMap(tx => tx.publicLogs));

Expand Down Expand Up @@ -203,7 +203,7 @@ describe('ArchiverDataStoreUpdater', () => {
indexWithinCheckpoint: IndexWithinCheckpoint(0),
slotNumber: SlotNumber(100),
});
await updater.addProposedBlocks([localBlock]);
await updater.addProposedBlock(localBlock);
const publicLogsBefore = await store.getPublicLogs({});
expect(publicLogsBefore.logs.map(l => l.log)).toEqual(localBlock.body.txEffects.flatMap(tx => tx.publicLogs));

Expand Down
22 changes: 11 additions & 11 deletions yarn-project/archiver/src/modules/data_store_updater.ts
Original file line number Diff line number Diff line change
Expand Up @@ -52,29 +52,29 @@ export class ArchiverDataStoreUpdater {
) {}

/**
* Adds proposed blocks to the store with contract class/instance extraction from logs.
* These are uncheckpointed blocks that have been proposed by the sequencer but not yet included in a checkpoint on L1.
* Adds a proposed block to the store with contract class/instance extraction from logs.
* This is an uncheckpointed block that has been proposed by the sequencer but not yet included in a checkpoint on L1.
* Extracts ContractClassPublished, ContractInstancePublished, ContractInstanceUpdated events,
* and individually broadcasted functions from the block logs.
*
* @param blocks - The proposed L2 blocks to add.
* @param block - The proposed L2 block to add.
* @param pendingChainValidationStatus - Optional validation status to set.
* @returns True if the operation is successful.
*/
public async addProposedBlocks(
blocks: L2Block[],
public async addProposedBlock(
block: L2Block,
pendingChainValidationStatus?: ValidateCheckpointResult,
): Promise<boolean> {
const result = await this.store.transactionAsync(async () => {
await this.store.addProposedBlocks(blocks);
await this.store.addProposedBlock(block);

const opResults = await Promise.all([
// Update the pending chain validation status if provided
pendingChainValidationStatus && this.store.setPendingChainValidationStatus(pendingChainValidationStatus),
// Add any logs emitted during the retrieved blocks
this.store.addLogs(blocks),
// Unroll all logs emitted during the retrieved blocks and extract any contract classes and instances from them
...blocks.map(block => this.addContractDataToDb(block)),
// Add any logs emitted during the retrieved block
this.store.addLogs([block]),
// Unroll all logs emitted during the retrieved block and extract any contract classes and instances from it
this.addContractDataToDb(block),
]);

await this.l2TipsCache?.refresh();
Expand Down Expand Up @@ -108,7 +108,7 @@ export class ArchiverDataStoreUpdater {

await this.store.addCheckpoints(checkpoints);

// Filter out blocks that were already inserted via addProposedBlocks() to avoid duplicating logs/contract data
// Filter out blocks that were already inserted via addProposedBlock() to avoid duplicating logs/contract data
const newBlocks = checkpoints
.flatMap((ch: PublishedCheckpoint) => ch.checkpoint.blocks)
.filter(b => lastAlreadyInsertedBlockNumber === undefined || b.number > lastAlreadyInsertedBlockNumber);
Expand Down
87 changes: 31 additions & 56 deletions yarn-project/archiver/src/store/block_store.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,15 +35,14 @@ import {
} from '@aztec/stdlib/tx';

import {
BlockAlreadyCheckpointedError,
BlockArchiveNotConsistentError,
BlockIndexNotSequentialError,
BlockNotFoundError,
BlockNumberNotSequentialError,
CannotOverwriteCheckpointedBlockError,
CheckpointNotFoundError,
CheckpointNumberNotConsistentError,
CheckpointNumberNotSequentialError,
InitialBlockNumberNotSequentialError,
InitialCheckpointNumberNotSequentialError,
} from '../errors.js';

Expand Down Expand Up @@ -141,95 +140,71 @@ export class BlockStore {
}

/**
* Append new proposed blocks to the store's list. All blocks must be for the 'current' checkpoint.
* These are uncheckpointed blocks that have been proposed by the sequencer but not yet included in a checkpoint on L1.
* Append a new proposed block to the store.
* This is an uncheckpointed block that has been proposed by the sequencer but not yet included in a checkpoint on L1.
* For checkpointed blocks (already published to L1), use addCheckpoints() instead.
* @param blocks - The proposed L2 blocks to be added to the store.
* @param block - The proposed L2 block to be added to the store.
* @returns True if the operation is successful.
*/
async addProposedBlocks(blocks: L2Block[], opts: { force?: boolean } = {}): Promise<boolean> {
if (blocks.length === 0) {
return true;
}

async addProposedBlock(block: L2Block, opts: { force?: boolean } = {}): Promise<boolean> {
return await this.db.transactionAsync(async () => {
// Check that the block immediately before the first block to be added is present in the store.
const firstBlockNumber = blocks[0].number;
const firstBlockCheckpointNumber = blocks[0].checkpointNumber;
const firstBlockIndex = blocks[0].indexWithinCheckpoint;
const firstBlockLastArchive = blocks[0].header.lastArchive.root;
const blockNumber = block.number;
const blockCheckpointNumber = block.checkpointNumber;
const blockIndex = block.indexWithinCheckpoint;
const blockLastArchive = block.header.lastArchive.root;

// Extract the latest block and checkpoint numbers
const previousBlockNumber = await this.getLatestBlockNumber();
const previousCheckpointNumber = await this.getLatestCheckpointNumber();

// Verify we're not overwriting checkpointed blocks
const lastCheckpointedBlockNumber = await this.getCheckpointedL2BlockNumber();
if (!opts.force && firstBlockNumber <= lastCheckpointedBlockNumber) {
throw new CannotOverwriteCheckpointedBlockError(firstBlockNumber, lastCheckpointedBlockNumber);
if (!opts.force && blockNumber <= lastCheckpointedBlockNumber) {
// Check if the proposed block matches the already-checkpointed one
const existingBlock = await this.getBlock(BlockNumber(blockNumber));
if (existingBlock && existingBlock.archive.root.equals(block.archive.root)) {
throw new BlockAlreadyCheckpointedError(blockNumber);
}
throw new CannotOverwriteCheckpointedBlockError(blockNumber, lastCheckpointedBlockNumber);
}

// Check that the first block number is the expected one
if (!opts.force && previousBlockNumber !== firstBlockNumber - 1) {
throw new InitialBlockNumberNotSequentialError(firstBlockNumber, previousBlockNumber);
// Check that the block number is the expected one
if (!opts.force && previousBlockNumber !== blockNumber - 1) {
throw new BlockNumberNotSequentialError(blockNumber, previousBlockNumber);
}

// The same check as above but for checkpoints
if (!opts.force && previousCheckpointNumber !== firstBlockCheckpointNumber - 1) {
throw new InitialCheckpointNumberNotSequentialError(firstBlockCheckpointNumber, previousCheckpointNumber);
if (!opts.force && previousCheckpointNumber !== blockCheckpointNumber - 1) {
throw new CheckpointNumberNotSequentialError(blockCheckpointNumber, previousCheckpointNumber);
}

// Extract the previous block if there is one and see if it is for the same checkpoint or not
const previousBlockResult = await this.getBlock(previousBlockNumber);

let expectedFirstblockIndex = 0;
let expectedBlockIndex = 0;
let previousBlockIndex: number | undefined = undefined;
if (previousBlockResult !== undefined) {
if (previousBlockResult.checkpointNumber === firstBlockCheckpointNumber) {
if (previousBlockResult.checkpointNumber === blockCheckpointNumber) {
// The previous block is for the same checkpoint, therefore our index should follow it
previousBlockIndex = previousBlockResult.indexWithinCheckpoint;
expectedFirstblockIndex = previousBlockIndex + 1;
expectedBlockIndex = previousBlockIndex + 1;
}
if (!previousBlockResult.archive.root.equals(firstBlockLastArchive)) {
if (!previousBlockResult.archive.root.equals(blockLastArchive)) {
throw new BlockArchiveNotConsistentError(
firstBlockNumber,
blockNumber,
previousBlockResult.number,
firstBlockLastArchive,
blockLastArchive,
previousBlockResult.archive.root,
);
}
}

// Now check that the first block has the expected index value
if (!opts.force && expectedFirstblockIndex !== firstBlockIndex) {
throw new BlockIndexNotSequentialError(firstBlockIndex, previousBlockIndex);
// Now check that the block has the expected index value
if (!opts.force && expectedBlockIndex !== blockIndex) {
throw new BlockIndexNotSequentialError(blockIndex, previousBlockIndex);
}

// Iterate over blocks array and insert them, checking that the block numbers and indexes are sequential. Also check they are for the correct checkpoint.
let previousBlock: L2Block | undefined = undefined;
for (const block of blocks) {
if (!opts.force && previousBlock) {
if (previousBlock.number + 1 !== block.number) {
throw new BlockNumberNotSequentialError(block.number, previousBlock.number);
}
if (previousBlock.indexWithinCheckpoint + 1 !== block.indexWithinCheckpoint) {
throw new BlockIndexNotSequentialError(block.indexWithinCheckpoint, previousBlock.indexWithinCheckpoint);
}
if (!previousBlock.archive.root.equals(block.header.lastArchive.root)) {
throw new BlockArchiveNotConsistentError(
block.number,
previousBlock.number,
block.header.lastArchive.root,
previousBlock.archive.root,
);
}
}
if (!opts.force && firstBlockCheckpointNumber !== block.checkpointNumber) {
throw new CheckpointNumberNotConsistentError(block.checkpointNumber, firstBlockCheckpointNumber);
}
previousBlock = block;
await this.addBlockToDatabase(block, block.checkpointNumber, block.indexWithinCheckpoint);
}
await this.addBlockToDatabase(block, block.checkpointNumber, block.indexWithinCheckpoint);

return true;
});
Expand Down
Loading
Loading