Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 37 additions & 5 deletions yarn-project/archiver/src/archiver/archiver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,15 @@ import {
} from '@aztec/protocol-contracts/instance-deployer';
import type { FunctionSelector } from '@aztec/stdlib/abi';
import type { AztecAddress } from '@aztec/stdlib/aztec-address';
import type { InBlock, L2Block, L2BlockId, L2BlockSource, L2Tips, NullifierWithBlockSource } from '@aztec/stdlib/block';
import {
type InBlock,
type L2Block,
type L2BlockId,
type L2BlockSource,
L2BlockSourceEvents,
type L2Tips,
type NullifierWithBlockSource,
} from '@aztec/stdlib/block';
import {
type ContractClassPublic,
type ContractDataSource,
Expand All @@ -32,6 +40,7 @@ import {
} from '@aztec/stdlib/contract';
import {
type L1RollupConstants,
getEpochAtSlot,
getEpochNumberAtTimestamp,
getSlotAtTimestamp,
getSlotRangeForEpoch,
Expand All @@ -44,6 +53,7 @@ import type { InboxLeaf, L1ToL2MessageSource } from '@aztec/stdlib/messaging';
import { type BlockHeader, TxEffect, TxHash, TxReceipt } from '@aztec/stdlib/tx';
import { Attributes, type TelemetryClient, type Traceable, type Tracer, trackSpan } from '@aztec/telemetry-client';

import { EventEmitter } from 'events';
import groupBy from 'lodash.groupby';
import { type GetContractReturnType, createPublicClient, fallback, getContract, http } from 'viem';

Expand All @@ -69,7 +79,7 @@ export type ArchiveSource = L2BlockSource &
* Responsible for handling robust L1 polling so that other components do not need to
* concern themselves with it.
*/
export class Archiver implements ArchiveSource, Traceable {
export class Archiver extends EventEmitter implements ArchiveSource, Traceable {
/**
* A promise in which we will be continually fetching new L2 blocks.
*/
Expand Down Expand Up @@ -105,6 +115,8 @@ export class Archiver implements ArchiveSource, Traceable {
private readonly l1constants: L1RollupConstants,
private readonly log: Logger = createLogger('archiver'),
) {
super();

this.tracer = instrumentation.tracer;
this.store = new ArchiverStoreHelper(dataStore);

Expand Down Expand Up @@ -299,6 +311,17 @@ export class Archiver implements ArchiveSource, Traceable {
const canPrune = localPendingBlockNumber > provenBlockNumber && (await this.canPrune(currentL1BlockNumber));

if (canPrune) {
const localPendingSlotNumber = await this.getL2SlotNumber();
const localPendingEpochNumber = getEpochAtSlot(localPendingSlotNumber, this.l1constants);

// Emit an event for listening services to react to the chain prune
this.emit(L2BlockSourceEvents.L2PruneDetected, {
type: L2BlockSourceEvents.L2PruneDetected,
blockNumber: localPendingBlockNumber,
slotNumber: localPendingSlotNumber,
epochNumber: localPendingEpochNumber,
});

const blocksToUnwind = localPendingBlockNumber - provenBlockNumber;
this.log.debug(
`L2 prune from ${provenBlockNumber + 1n} to ${localPendingBlockNumber} will occur on next block submission.`,
Expand Down Expand Up @@ -846,9 +869,18 @@ export class Archiver implements ArchiveSource, Traceable {
const provenBlockHeaderHash = await provenBlockHeader?.hash();
const finalizedBlockHeaderHash = await provenBlockHeader?.hash();
return {
latest: { number: latestBlockNumber, hash: latestBlockHeaderHash?.toString() } as L2BlockId,
proven: { number: provenBlockNumber, hash: provenBlockHeaderHash?.toString() } as L2BlockId,
finalized: { number: provenBlockNumber, hash: finalizedBlockHeaderHash?.toString() } as L2BlockId,
latest: {
number: latestBlockNumber,
hash: latestBlockHeaderHash?.toString(),
} as L2BlockId,
proven: {
number: provenBlockNumber,
hash: provenBlockHeaderHash?.toString(),
} as L2BlockId,
finalized: {
number: provenBlockNumber,
hash: finalizedBlockHeaderHash?.toString(),
} as L2BlockId,
};
}
}
Expand Down
41 changes: 29 additions & 12 deletions yarn-project/archiver/src/factory.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import type { BlobSinkClientInterface } from '@aztec/blob-sink/client';
import { createLogger } from '@aztec/foundation/log';
import type { Maybe } from '@aztec/foundation/types';
import type { DataStoreConfig } from '@aztec/kv-store/config';
import { createStore } from '@aztec/kv-store/lmdb-v2';
import { TokenContractArtifact } from '@aztec/noir-contracts.js/Token';
Expand All @@ -9,6 +8,7 @@ import { getVKTreeRoot } from '@aztec/noir-protocol-circuits-types/vk-tree';
import { protocolContractNames, protocolContractTreeRoot } from '@aztec/protocol-contracts';
import { BundledProtocolContractsProvider } from '@aztec/protocol-contracts/providers/bundle';
import { FunctionType, decodeFunctionSignature } from '@aztec/stdlib/abi';
import type { L2BlockSourceEventEmitter } from '@aztec/stdlib/block';
import {
type ContractClassPublic,
computePublicBytecodeCommitment,
Expand All @@ -23,24 +23,41 @@ import type { ArchiverConfig } from './archiver/config.js';
import { KVArchiverDataStore } from './archiver/index.js';
import { createArchiverClient } from './rpc/index.js';

/**
* Creates a local archiver.
* @param config - The archiver configuration.
* @param blobSinkClient - The blob sink client.
* @param opts - The options.
* @param telemetry - The telemetry client.
* @returns The local archiver.
*/
export async function createArchiver(
config: ArchiverConfig & DataStoreConfig,
blobSinkClient: BlobSinkClientInterface,
opts: { blockUntilSync: boolean } = { blockUntilSync: true },
telemetry: TelemetryClient = getTelemetryClient(),
): Promise<ArchiverApi & Maybe<Service>> {
): Promise<ArchiverApi & Service & L2BlockSourceEventEmitter> {
const store = await createStore('archiver', config, createLogger('archiver:lmdb'));
const archiverStore = new KVArchiverDataStore(store, config.maxLogs);
await registerProtocolContracts(archiverStore);
await registerCommonContracts(archiverStore);
return Archiver.createAndSync(config, archiverStore, { telemetry, blobSinkClient }, opts.blockUntilSync);
}

/**
* Creates a remote archiver client.
* @param config - The archiver configuration.
* @returns The remote archiver client.
*/
export function createRemoteArchiver(config: ArchiverConfig): ArchiverApi {
if (!config.archiverUrl) {
const store = await createStore('archiver', config, createLogger('archiver:lmdb'));
const archiverStore = new KVArchiverDataStore(store, config.maxLogs);
await registerProtocolContracts(archiverStore);
await registerCommonContracts(archiverStore);
return Archiver.createAndSync(config, archiverStore, { telemetry, blobSinkClient }, opts.blockUntilSync);
} else {
return createArchiverClient(
config.archiverUrl,
getComponentsVersionsFromConfig(config, protocolContractTreeRoot, getVKTreeRoot()),
);
throw new Error('Archiver URL is required');
}

return createArchiverClient(
config.archiverUrl,
getComponentsVersionsFromConfig(config, protocolContractTreeRoot, getVKTreeRoot()),
);
}

async function registerProtocolContracts(store: KVArchiverDataStore) {
Expand Down
15 changes: 12 additions & 3 deletions yarn-project/archiver/src/test/mock_l2_block_source.ts
Original file line number Diff line number Diff line change
Expand Up @@ -164,9 +164,18 @@ export class MockL2BlockSource implements L2BlockSource {
const finalizedBlock = this.l2Blocks[finalized - 1];

return {
latest: { number: latest, hash: (await latestBlock?.hash())?.toString() },
proven: { number: proven, hash: (await provenBlock?.hash())?.toString() },
finalized: { number: finalized, hash: (await finalizedBlock?.hash())?.toString() },
latest: {
number: latest,
hash: (await latestBlock?.hash())?.toString(),
},
proven: {
number: proven,
hash: (await provenBlock?.hash())?.toString(),
},
finalized: {
number: finalized,
hash: (await finalizedBlock?.hash())?.toString(),
},
};
}

Expand Down
2 changes: 1 addition & 1 deletion yarn-project/aztec-node/src/aztec-node/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ export class AztecNodeService implements AztecNode, Traceable {
telemetry,
);

const slasherClient = await createSlasherClient(config, archiver, telemetry);
const slasherClient = createSlasherClient(config, archiver, telemetry);

// start both and wait for them to sync from the block source
await Promise.all([p2pClient.start(), worldStateSynchronizer.start(), slasherClient.start()]);
Expand Down
1 change: 1 addition & 0 deletions yarn-project/end-to-end/scripts/run_test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ case "$type" in
--mount type=tmpfs,target=/tmp-jest,tmpfs-size=512m \
-e JEST_CACHE_DIR=/tmp-jest \
-e FAKE_PROOFS \
-e LOG_LEVEL \
--workdir /root/aztec-packages/yarn-project/end-to-end \
aztecprotocol/build:3.0 ./scripts/test_simple.sh $TEST
;;
Expand Down
2 changes: 1 addition & 1 deletion yarn-project/end-to-end/src/e2e_p2p/reqresp.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ describe('e2e_p2p_reqresp_tx', () => {
contexts.flatMap((context, i) =>
context.txs.map(async (tx, j) => {
t.logger.info(`Waiting for tx ${i}-${j}: ${await tx.getTxHash()} to be mined`);
await tx.wait({ timeout: WAIT_FOR_TX_TIMEOUT });
await tx.wait({ timeout: WAIT_FOR_TX_TIMEOUT * 1.5 }); // more transactions in this test so allow more time
t.logger.info(`Tx ${i}-${j}: ${await tx.getTxHash()} has been mined`);
return await tx.getTxHash();
}),
Expand Down
22 changes: 13 additions & 9 deletions yarn-project/end-to-end/src/e2e_p2p/slashing.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import { getAddress, getContract, parseEventLogs } from 'viem';
import { shouldCollectMetrics } from '../fixtures/fixtures.js';
import { createNodes } from '../fixtures/setup_p2p_test.js';
import { P2PNetworkTest } from './p2p_network.js';
import { createPXEServiceAndSubmitTransactions } from './shared.js';

jest.setTimeout(1000000);

Expand All @@ -38,6 +37,8 @@ describe('e2e_p2p_slashing', () => {
metricsPort: shouldCollectMetrics(),
initialConfig: {
aztecEpochDuration: 1,
ethereumSlotDuration: 4,
aztecSlotDuration: 12,
aztecProofSubmissionWindow: 1,
slashingQuorum,
slashingRoundSize,
Expand Down Expand Up @@ -110,6 +111,7 @@ describe('e2e_p2p_slashing', () => {
};

t.ctx.aztecNodeConfig.validatorReexecute = false;
t.ctx.aztecNodeConfig.minTxsPerBlock = 0;

// create our network of nodes and submit txs into each of them
// the number of txs per node and the number of txs per rollup
Expand Down Expand Up @@ -159,42 +161,44 @@ describe('e2e_p2p_slashing', () => {
}
const sequencer = (seqClient as any).sequencer;
const slasher = (sequencer as any).slasherClient;
let slashEvents: any[] = [];

t.logger.info(`Producing blocks until we hit a pruning event`);

// Run for up to the slashing round size, or as long as needed to get a slash event
// Variable because sometimes hit race-condition issues with attestations.
for (let i = 0; i < slashingRoundSize; i++) {
t.logger.info('Submitting transactions');
const bn = await nodes[0].getBlockNumber();
await createPXEServiceAndSubmitTransactions(t.logger, nodes[0], 1, t.fundedAccount);

t.logger.info(`Waiting for block number to change`);
while (bn === (await nodes[0].getBlockNumber())) {
await sleep(1000);
}

if (slasher.slashEvents.length > 0) {
// Create a clone of slasher.slashEvents to prevent race conditions
// The validator client can remove elements from the original array
slashEvents = [...slasher.slashEvents];
t.logger.info(`Slash events: ${slashEvents.length}`);
if (slashEvents.length > 0) {
t.logger.info(`We have a slash event ${i}`);
break;
}
}

expect(slasher.slashEvents.length).toBeGreaterThan(0);

expect(slashEvents.length).toBeGreaterThan(0);
// We should push us to land exactly at the next round
await jumpToNextRound();

// For the next round we will try to cast votes.
// Stop early if we have enough votes.
t.logger.info(`Waiting for votes to be cast`);
for (let i = 0; i < slashingRoundSize; i++) {
t.logger.info('Waiting for slot number to change and votes to be cast');
const slotNumber = await rollup.read.getCurrentSlot();
t.logger.info(`Waiting for block number to change`);
const slotNumber = await rollup.read.getCurrentSlot();
while (slotNumber === (await rollup.read.getCurrentSlot())) {
await sleep(1000);
}

sInfo = await slashingInfo();
t.logger.info(`We have ${sInfo.leaderVotes} votes in round ${sInfo.roundNumber} on ${sInfo.info[1]}`);
if (sInfo.leaderVotes > votesNeeded) {
Expand All @@ -204,7 +208,7 @@ describe('e2e_p2p_slashing', () => {
}

t.logger.info('Deploy the actual payload for slashing!');
const slashEvent = slasher.slashEvents[0];
const slashEvent = slashEvents[0];
await t.ctx.deployL1ContractsValues.publicClient.waitForTransactionReceipt({
hash: await slashFactory.write.createSlashPayload([slashEvent.epoch, slashEvent.amount], {
account: t.ctx.deployL1ContractsValues.walletClient.account,
Expand Down
1 change: 1 addition & 0 deletions yarn-project/kv-store/src/stores/l2_tips_store.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ export class L2TipsStore implements L2BlockStreamEventHandler, L2BlockStreamLoca
if (!blockHash) {
throw new Error(`Block hash not found for block number ${blockNumber}`);
}

return { number: blockNumber, hash: blockHash };
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,13 @@ export class ConnectionSampler {
*/
async close(streamId: string): Promise<void> {
try {
const { stream, peerId } = this.streams.get(streamId)!;
const streamAndPeerId = this.streams.get(streamId);
if (!streamAndPeerId) {
this.logger.warn(`Stream ${streamId} not found`);
return;
}

const { stream, peerId } = streamAndPeerId;

const updatedActiveConnectionsCount = (this.activeConnectionsCount.get(peerId) ?? 1) - 1;
this.activeConnectionsCount.set(peerId, updatedActiveConnectionsCount);
Expand All @@ -184,7 +190,7 @@ export class ConnectionSampler {

await stream?.close();
} catch (error) {
this.logger.warn(`Failed to close connection to peer with stream id ${streamId}`);
this.logger.error(`Failed to close connection to peer with stream id ${streamId}`, error);
} finally {
this.streams.delete(streamId);
}
Expand Down
2 changes: 1 addition & 1 deletion yarn-project/sequencer-client/src/sequencer/sequencer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ export class Sequencer {
this.log.debug(`Stopping sequencer`);
await this.validatorClient?.stop();
await this.runningPromise?.stop();
await this.slasherClient?.stop();
this.slasherClient.stop();
this.publisher.interrupt();
this.setState(SequencerState.STOPPED, 0n, true /** force */);
this.log.info('Stopped sequencer');
Expand Down
16 changes: 5 additions & 11 deletions yarn-project/sequencer-client/src/slasher/factory.ts
Original file line number Diff line number Diff line change
@@ -1,21 +1,15 @@
import type { L1ContractsConfig, L1ReaderConfig } from '@aztec/ethereum';
import { createLogger } from '@aztec/foundation/log';
import type { AztecAsyncKVStore } from '@aztec/kv-store';
import type { DataStoreConfig } from '@aztec/kv-store/config';
import { createStore } from '@aztec/kv-store/lmdb-v2';
import type { L2BlockSource } from '@aztec/stdlib/block';
import type { L2BlockSourceEventEmitter } from '@aztec/stdlib/block';
import { type TelemetryClient, getTelemetryClient } from '@aztec/telemetry-client';

import { SlasherClient } from './slasher_client.js';
import type { SlasherConfig } from './slasher_client.js';

export const createSlasherClient = async (
_config: SlasherConfig & DataStoreConfig & L1ContractsConfig & L1ReaderConfig,
l2BlockSource: L2BlockSource,
export const createSlasherClient = (
_config: SlasherConfig & L1ContractsConfig & L1ReaderConfig,
l2BlockSource: L2BlockSourceEventEmitter,
telemetry: TelemetryClient = getTelemetryClient(),
deps: { store?: AztecAsyncKVStore } = {},
) => {
const config = { ..._config };
const store = deps.store ?? (await createStore('slasher', config, createLogger('slasher:lmdb')));
return new SlasherClient(config, store, l2BlockSource, telemetry);
return new SlasherClient(config, l2BlockSource, telemetry);
};
Loading
Loading