diff --git a/yarn-project/aztec-node/package.json b/yarn-project/aztec-node/package.json index 383c73277786..c06afd94adae 100644 --- a/yarn-project/aztec-node/package.json +++ b/yarn-project/aztec-node/package.json @@ -5,7 +5,8 @@ "type": "module", "exports": { ".": "./dest/index.js", - "./config": "./dest/aztec-node/config.js" + "./config": "./dest/aztec-node/config.js", + "./sentinel": "./dest/aztec-node/sentinel.js" }, "bin": "./dest/bin/index.js", "typedocOptions": { diff --git a/yarn-project/aztec-node/src/aztec-node/config.ts b/yarn-project/aztec-node/src/aztec-node/config.ts index f630b941e599..13b47686e6be 100644 --- a/yarn-project/aztec-node/src/aztec-node/config.ts +++ b/yarn-project/aztec-node/src/aztec-node/config.ts @@ -12,6 +12,8 @@ import { readFileSync } from 'fs'; import { dirname, resolve } from 'path'; import { fileURLToPath } from 'url'; +import { type SentinelConfig, sentinelConfigMappings } from '../sentinel/config.js'; + export { sequencerClientConfigMappings, type SequencerClientConfig }; /** @@ -24,7 +26,8 @@ export type AztecNodeConfig = ArchiverConfig & WorldStateConfig & Pick & P2PConfig & - DataStoreConfig & { + DataStoreConfig & + SentinelConfig & { /** Whether the validator is disabled for this node */ disableValidator: boolean; /** Whether to populate the genesis state with initial fee juice for the test accounts */ @@ -41,6 +44,7 @@ export const aztecNodeConfigMappings: ConfigMappingsType = { ...proverClientConfigMappings, ...worldStateConfigMappings, ...p2pConfigMappings, + ...sentinelConfigMappings, l1Contracts: { description: 'The deployed L1 contract addresses', nested: l1ContractAddressesMapping, diff --git a/yarn-project/aztec-node/src/aztec-node/server.test.ts b/yarn-project/aztec-node/src/aztec-node/server.test.ts index f93a4c08f03a..03fc6cd310c9 100644 --- a/yarn-project/aztec-node/src/aztec-node/server.test.ts +++ b/yarn-project/aztec-node/src/aztec-node/server.test.ts @@ -115,6 +115,7 @@ describe('aztec node', () => { l1ToL2MessageSource, worldState, undefined, + undefined, 12345, 1, globalVariablesBuilder, diff --git a/yarn-project/aztec-node/src/aztec-node/server.ts b/yarn-project/aztec-node/src/aztec-node/server.ts index 2a466675ef22..fc4a3a463e5e 100644 --- a/yarn-project/aztec-node/src/aztec-node/server.ts +++ b/yarn-project/aztec-node/src/aztec-node/server.ts @@ -74,6 +74,7 @@ import { TxStatus, type TxValidationResult, } from '@aztec/stdlib/tx'; +import type { ValidatorsStats } from '@aztec/stdlib/validators'; import { Attributes, type TelemetryClient, @@ -85,6 +86,8 @@ import { import { createValidatorClient } from '@aztec/validator-client'; import { createWorldStateSynchronizer } from '@aztec/world-state'; +import { createSentinel } from '../sentinel/factory.js'; +import { Sentinel } from '../sentinel/sentinel.js'; import { type AztecNodeConfig, getPackageVersion } from './config.js'; import { NodeMetrics } from './node_metrics.js'; @@ -109,6 +112,7 @@ export class AztecNodeService implements AztecNode, AztecNodeAdmin, Traceable { protected readonly l1ToL2MessageSource: L1ToL2MessageSource, protected readonly worldStateSynchronizer: WorldStateSynchronizer, protected readonly sequencer: SequencerClient | undefined, + protected readonly validatorsSentinel: Sentinel | undefined, protected readonly l1ChainId: number, protected readonly version: number, protected readonly globalVariableBuilder: GlobalVariableBuilder, @@ -199,6 +203,9 @@ export class AztecNodeService implements AztecNode, AztecNodeAdmin, Traceable { const validatorClient = createValidatorClient(config, { p2pClient, telemetry, dateProvider, epochCache }); + const validatorsSentinel = await createSentinel(epochCache, archiver, p2pClient, config); + await validatorsSentinel?.start(); + // now create the sequencer const sequencer = config.disableValidator ? undefined @@ -225,6 +232,7 @@ export class AztecNodeService implements AztecNode, AztecNodeAdmin, Traceable { archiver, worldStateSynchronizer, sequencer, + validatorsSentinel, ethereumChain.chainInfo.id, config.version, new GlobalVariableBuilder(config), @@ -471,6 +479,7 @@ export class AztecNodeService implements AztecNode, AztecNodeAdmin, Traceable { public async stop() { this.log.info(`Stopping`); await this.txQueue.end(); + await this.validatorsSentinel?.stop(); await this.sequencer?.stop(); await this.p2pClient.stop(); await this.worldStateSynchronizer.stop(); @@ -978,6 +987,10 @@ export class AztecNodeService implements AztecNode, AztecNodeAdmin, Traceable { return Promise.resolve(); } + public getValidatorsStats(): Promise { + return this.validatorsSentinel?.computeStats() ?? Promise.resolve({ stats: {}, slotWindow: 0 }); + } + /** * Returns an instance of MerkleTreeOperations having first ensured the world state is fully synched * @param blockNumber - The block number at which to get the data. diff --git a/yarn-project/aztec-node/src/sentinel/config.ts b/yarn-project/aztec-node/src/sentinel/config.ts new file mode 100644 index 000000000000..418cb4c2c90f --- /dev/null +++ b/yarn-project/aztec-node/src/sentinel/config.ts @@ -0,0 +1,19 @@ +import { type ConfigMappingsType, booleanConfigHelper, numberConfigHelper } from '@aztec/foundation/config'; + +export type SentinelConfig = { + sentinelHistoryLengthInEpochs: number; + sentinelEnabled: boolean; +}; + +export const sentinelConfigMappings: ConfigMappingsType = { + sentinelHistoryLengthInEpochs: { + description: 'The number of L2 epochs kept of history for each validator for computing their stats.', + env: 'SENTINEL_HISTORY_LENGTH_IN_EPOCHS', + ...numberConfigHelper(24), + }, + sentinelEnabled: { + description: 'Whether the sentinel is enabled or not.', + env: 'SENTINEL_ENABLED', + ...booleanConfigHelper(true), + }, +}; diff --git a/yarn-project/aztec-node/src/sentinel/factory.ts b/yarn-project/aztec-node/src/sentinel/factory.ts new file mode 100644 index 000000000000..55dfa2071ac7 --- /dev/null +++ b/yarn-project/aztec-node/src/sentinel/factory.ts @@ -0,0 +1,31 @@ +import type { EpochCache } from '@aztec/epoch-cache'; +import { createLogger } from '@aztec/foundation/log'; +import type { DataStoreConfig } from '@aztec/kv-store/config'; +import { createStore } from '@aztec/kv-store/lmdb-v2'; +import type { P2PClient } from '@aztec/p2p'; +import type { L2BlockSource } from '@aztec/stdlib/block'; + +import type { SentinelConfig } from './config.js'; +import { Sentinel } from './sentinel.js'; +import { SentinelStore } from './store.js'; + +export async function createSentinel( + epochCache: EpochCache, + archiver: L2BlockSource, + p2p: P2PClient, + config: SentinelConfig & DataStoreConfig, + logger = createLogger('node:sentinel'), +): Promise { + if (!config.sentinelEnabled) { + return undefined; + } + const kvStore = await createStore( + 'sentinel', + SentinelStore.SCHEMA_VERSION, + config, + createLogger('node:sentinel:lmdb'), + ); + const storeHistoryLength = config.sentinelHistoryLengthInEpochs * epochCache.getL1Constants().epochDuration; + const sentinelStore = new SentinelStore(kvStore, { historyLength: storeHistoryLength }); + return new Sentinel(epochCache, archiver, p2p, sentinelStore, logger); +} diff --git a/yarn-project/aztec-node/src/sentinel/index.ts b/yarn-project/aztec-node/src/sentinel/index.ts new file mode 100644 index 000000000000..6c97632216c6 --- /dev/null +++ b/yarn-project/aztec-node/src/sentinel/index.ts @@ -0,0 +1,8 @@ +export { Sentinel } from './sentinel.js'; + +export type { + ValidatorsStats, + ValidatorStats, + ValidatorStatusHistory, + ValidatorStatusInSlot, +} from '@aztec/stdlib/validators'; diff --git a/yarn-project/aztec-node/src/sentinel/sentinel.test.ts b/yarn-project/aztec-node/src/sentinel/sentinel.test.ts new file mode 100644 index 000000000000..ce08e71d197b --- /dev/null +++ b/yarn-project/aztec-node/src/sentinel/sentinel.test.ts @@ -0,0 +1,228 @@ +import type { EpochCache } from '@aztec/epoch-cache'; +import { times } from '@aztec/foundation/collection'; +import { Secp256k1Signer } from '@aztec/foundation/crypto'; +import { EthAddress } from '@aztec/foundation/eth-address'; +import { AztecLMDBStoreV2, openTmpStore } from '@aztec/kv-store/lmdb-v2'; +import type { P2PClient } from '@aztec/p2p'; +import { + type L2BlockSource, + type L2BlockStream, + type PublishedL2Block, + randomPublishedL2Block, +} from '@aztec/stdlib/block'; +import type { L1RollupConstants } from '@aztec/stdlib/epoch-helpers'; +import type { BlockAttestation } from '@aztec/stdlib/p2p'; +import { makeBlockAttestation } from '@aztec/stdlib/testing'; +import type { ValidatorStats, ValidatorStatusHistory } from '@aztec/stdlib/validators'; + +import { type MockProxy, mock } from 'jest-mock-extended'; + +import { Sentinel } from './sentinel.js'; +import { SentinelStore } from './store.js'; + +describe('sentinel', () => { + let epochCache: MockProxy; + let archiver: MockProxy; + let p2p: MockProxy; + let blockStream: MockProxy; + + let kvStore: AztecLMDBStoreV2; + let store: SentinelStore; + + let sentinel: TestSentinel; + + let slot: bigint; + let epoch: bigint; + let ts: bigint; + let l1Constants: L1RollupConstants; + + beforeEach(async () => { + epochCache = mock(); + archiver = mock(); + p2p = mock(); + blockStream = mock(); + + kvStore = await openTmpStore('sentinel-test'); + store = new SentinelStore(kvStore, { historyLength: 10 }); + + slot = 10n; + epoch = 0n; + ts = BigInt(Math.ceil(Date.now() / 1000)); + l1Constants = { + l1StartBlock: 1n, + l1GenesisTime: ts, + slotDuration: 24, + epochDuration: 32, + ethereumSlotDuration: 12, + }; + + epochCache.getEpochAndSlotNow.mockReturnValue({ epoch, slot, ts }); + epochCache.getL1Constants.mockReturnValue(l1Constants); + + sentinel = new TestSentinel(epochCache, archiver, p2p, store, blockStream); + }); + + afterEach(async () => { + await kvStore.close(); + }); + + describe('getSlotActivity', () => { + let signers: Secp256k1Signer[]; + let validators: EthAddress[]; + let block: PublishedL2Block; + let attestations: BlockAttestation[]; + let proposer: EthAddress; + let committee: EthAddress[]; + + beforeEach(async () => { + signers = times(4, Secp256k1Signer.random); + validators = signers.map(signer => signer.address); + block = await randomPublishedL2Block(Number(slot)); + attestations = await Promise.all( + signers.map(signer => makeBlockAttestation({ signer, archive: block.block.archive.root })), + ); + proposer = validators[0]; + committee = [...validators]; + + p2p.getAttestationsForSlot.mockResolvedValue(attestations); + }); + + it('flags block as mined', async () => { + await sentinel.handleBlockStreamEvent({ type: 'blocks-added', blocks: [block] }); + + const activity = await sentinel.getSlotActivity(slot, epoch, proposer, committee); + expect(activity[proposer.toString()]).toEqual('block-mined'); + }); + + it('flags block as proposed when it is not mined but there are attestations', async () => { + p2p.getAttestationsForSlot.mockResolvedValue(attestations); + const activity = await sentinel.getSlotActivity(slot, epoch, proposer, committee); + expect(activity[proposer.toString()]).toEqual('block-proposed'); + }); + + it('flags block as missed when there are no attestations', async () => { + p2p.getAttestationsForSlot.mockResolvedValue([]); + const activity = await sentinel.getSlotActivity(slot, epoch, proposer, committee); + expect(activity[proposer.toString()]).toEqual('block-missed'); + }); + + it('identifies missed attestors if block is mined', async () => { + await sentinel.handleBlockStreamEvent({ type: 'blocks-added', blocks: [block] }); + p2p.getAttestationsForSlot.mockResolvedValue(attestations.slice(0, -1)); + + const activity = await sentinel.getSlotActivity(slot, epoch, proposer, committee); + expect(activity[committee[1].toString()]).toEqual('attestation-sent'); + expect(activity[committee[2].toString()]).toEqual('attestation-sent'); + expect(activity[committee[3].toString()]).toEqual('attestation-missed'); + }); + + it('identifies missed attestors if block is proposed', async () => { + p2p.getAttestationsForSlot.mockResolvedValue(attestations.slice(0, -1)); + + const activity = await sentinel.getSlotActivity(slot, epoch, proposer, committee); + expect(activity[committee[1].toString()]).toEqual('attestation-sent'); + expect(activity[committee[2].toString()]).toEqual('attestation-sent'); + expect(activity[committee[3].toString()]).toEqual('attestation-missed'); + }); + + it('does not tag attestors as missed if there was no block and no attestations', async () => { + p2p.getAttestationsForSlot.mockResolvedValue([]); + + const activity = await sentinel.getSlotActivity(slot, epoch, proposer, committee); + expect(activity[proposer.toString()]).toEqual('block-missed'); + expect(activity[committee[1].toString()]).not.toBeDefined(); + expect(activity[committee[2].toString()]).not.toBeDefined(); + expect(activity[committee[3].toString()]).not.toBeDefined(); + }); + }); + + describe('computeStatsForValidator', () => { + let validator: `0x${string}`; + + beforeEach(() => { + validator = EthAddress.random().toString(); + }); + + it('computes stats correctly', () => { + const stats = sentinel.computeStatsForValidator(validator, [ + { slot: 1n, status: 'block-mined' }, + { slot: 2n, status: 'block-proposed' }, + { slot: 3n, status: 'block-missed' }, + { slot: 4n, status: 'block-missed' }, + { slot: 5n, status: 'attestation-sent' }, + { slot: 6n, status: 'attestation-missed' }, + ]); + + expect(stats.address.toString()).toEqual(validator); + expect(stats.totalSlots).toEqual(6); + expect(stats.missedProposals.count).toEqual(2); + expect(stats.missedProposals.currentStreak).toEqual(2); + expect(stats.missedProposals.rate).toEqual(0.5); + expect(stats.lastProposal?.slot).toEqual(2n); + expect(stats.missedAttestations.count).toEqual(1); + expect(stats.missedAttestations.currentStreak).toEqual(1); + expect(stats.missedAttestations.rate).toEqual(0.5); + expect(stats.lastAttestation?.slot).toEqual(5n); + }); + + it('resets streaks correctly', () => { + const stats = sentinel.computeStatsForValidator(validator, [ + { slot: 1n, status: 'block-mined' }, + { slot: 2n, status: 'block-missed' }, + { slot: 3n, status: 'block-mined' }, + { slot: 4n, status: 'block-missed' }, + { slot: 5n, status: 'attestation-sent' }, + { slot: 6n, status: 'attestation-missed' }, + { slot: 7n, status: 'attestation-sent' }, + { slot: 8n, status: 'attestation-missed' }, + ]); + + expect(stats.address.toString()).toEqual(validator); + expect(stats.totalSlots).toEqual(8); + expect(stats.missedProposals.count).toEqual(2); + expect(stats.missedProposals.currentStreak).toEqual(1); + expect(stats.missedProposals.rate).toEqual(0.5); + expect(stats.missedAttestations.count).toEqual(2); + expect(stats.missedAttestations.currentStreak).toEqual(1); + expect(stats.missedAttestations.rate).toEqual(0.5); + }); + + it('considers only latest slots', () => { + const history = times(20, i => ({ slot: BigInt(i), status: 'block-missed' } as const)); + const stats = sentinel.computeStatsForValidator(validator, history, 15n); + + expect(stats.address.toString()).toEqual(validator); + expect(stats.totalSlots).toEqual(5); + expect(stats.missedProposals.count).toEqual(5); + }); + }); +}); + +class TestSentinel extends Sentinel { + constructor( + epochCache: EpochCache, + archiver: L2BlockSource, + p2p: P2PClient, + store: SentinelStore, + protected override blockStream: L2BlockStream, + ) { + super(epochCache, archiver, p2p, store); + } + + public override init() { + this.initialSlot = this.epochCache.getEpochAndSlotNow().slot; + return Promise.resolve(); + } + + public override getSlotActivity(slot: bigint, epoch: bigint, proposer: EthAddress, committee: EthAddress[]) { + return super.getSlotActivity(slot, epoch, proposer, committee); + } + + public override computeStatsForValidator( + address: `0x${string}`, + history: ValidatorStatusHistory, + fromSlot?: bigint, + ): ValidatorStats { + return super.computeStatsForValidator(address, history, fromSlot); + } +} diff --git a/yarn-project/aztec-node/src/sentinel/sentinel.ts b/yarn-project/aztec-node/src/sentinel/sentinel.ts new file mode 100644 index 000000000000..f6bd67dc9896 --- /dev/null +++ b/yarn-project/aztec-node/src/sentinel/sentinel.ts @@ -0,0 +1,280 @@ +import type { EpochCache } from '@aztec/epoch-cache'; +import { countWhile } from '@aztec/foundation/collection'; +import { EthAddress } from '@aztec/foundation/eth-address'; +import { createLogger } from '@aztec/foundation/log'; +import { RunningPromise } from '@aztec/foundation/running-promise'; +import { L2TipsMemoryStore, type L2TipsStore } from '@aztec/kv-store/stores'; +import type { P2PClient } from '@aztec/p2p'; +import { + type L2BlockSource, + L2BlockStream, + type L2BlockStreamEvent, + type L2BlockStreamEventHandler, +} from '@aztec/stdlib/block'; +import { getTimestampForSlot } from '@aztec/stdlib/epoch-helpers'; +import type { + ValidatorStats, + ValidatorStatusHistory, + ValidatorStatusInSlot, + ValidatorStatusType, + ValidatorsStats, +} from '@aztec/stdlib/validators'; + +import { SentinelStore } from './store.js'; + +export class Sentinel implements L2BlockStreamEventHandler { + protected runningPromise: RunningPromise; + protected blockStream!: L2BlockStream; + protected l2TipsStore: L2TipsStore; + + protected initialSlot: bigint | undefined; + protected lastProcessedSlot: bigint | undefined; + protected slotNumberToArchive: Map = new Map(); + + constructor( + protected epochCache: EpochCache, + protected archiver: L2BlockSource, + protected p2p: P2PClient, + protected store: SentinelStore, + protected logger = createLogger('node:sentinel'), + ) { + this.l2TipsStore = new L2TipsMemoryStore(); + const interval = (epochCache.getL1Constants().ethereumSlotDuration * 1000) / 4; + this.runningPromise = new RunningPromise(this.work.bind(this), logger, interval); + } + + public async start() { + await this.init(); + this.runningPromise.start(); + } + + /** Loads initial slot and initializes blockstream. We will not process anything at or before the initial slot. */ + protected async init() { + this.initialSlot = this.epochCache.getEpochAndSlotNow().slot; + const startingBlock = await this.archiver.getBlockNumber(); + this.blockStream = new L2BlockStream(this.archiver, this.l2TipsStore, this, this.logger, { startingBlock }); + } + + public stop() { + return this.runningPromise.stop(); + } + + public async handleBlockStreamEvent(event: L2BlockStreamEvent): Promise { + await this.l2TipsStore.handleBlockStreamEvent(event); + if (event.type === 'blocks-added') { + // Store mapping from slot to archive + for (const block of event.blocks) { + this.slotNumberToArchive.set(block.block.header.getSlot(), block.block.archive.root.toString()); + } + + // Prune the archive map to only keep at most N entries + const historyLength = this.store.getHistoryLength(); + if (this.slotNumberToArchive.size > historyLength) { + const toDelete = Array.from(this.slotNumberToArchive.keys()) + .sort((a, b) => Number(a - b)) + .slice(0, this.slotNumberToArchive.size - historyLength); + for (const key of toDelete) { + this.slotNumberToArchive.delete(key); + } + } + } + } + + /** + * Process data for two L2 slots ago. + * Note that we do not process historical data, since we rely on p2p data for processing, + * and we don't have that data if we were offline during the period. + */ + public async work() { + const { slot: currentSlot } = this.epochCache.getEpochAndSlotNow(); + try { + // Manually sync the block stream to ensure we have the latest data. + // Note we never `start` the blockstream, so it loops at the same pace as we do. + await this.blockStream.sync(); + + // Check if we are ready to process data for two L2 slots ago. + const targetSlot = await this.isReadyToProcess(currentSlot); + + // And process it if we are. + if (targetSlot !== false) { + await this.processSlot(targetSlot); + } + } catch (err) { + this.logger.error(`Failed to process slot ${currentSlot}`, err); + } + } + + /** + * Check if we are ready to process data for two L2 slots ago, so we allow plenty of time for p2p to process all in-flight attestations. + * We also don't move past the archiver last synced L2 slot, as we don't want to process data that is not yet available. + * Last, we check the p2p is synced with the archiver, so it has pulled all attestations from it. + */ + protected async isReadyToProcess(currentSlot: bigint) { + const targetSlot = currentSlot - 2n; + if (this.lastProcessedSlot && this.lastProcessedSlot >= targetSlot) { + this.logger.trace(`Already processed slot ${targetSlot}`, { lastProcessedSlot: this.lastProcessedSlot }); + return false; + } + + if (this.initialSlot === undefined) { + this.logger.error(`Initial slot not loaded.`); + return false; + } + + if (targetSlot <= this.initialSlot) { + this.logger.debug(`Refusing to process slot ${targetSlot} given initial slot ${this.initialSlot}`); + return false; + } + + const archiverSlot = await this.archiver.getL2SlotNumber(); + if (archiverSlot < targetSlot) { + this.logger.debug(`Waiting for archiver to sync with L2 slot ${targetSlot}`, { archiverSlot, targetSlot }); + return false; + } + + const archiverLastBlockHash = await this.l2TipsStore.getL2Tips().then(tip => tip.latest.hash); + const p2pLastBlockHash = await this.p2p.getL2Tips().then(tips => tips.latest.hash); + const isP2pSynced = archiverLastBlockHash === p2pLastBlockHash; + if (!isP2pSynced) { + this.logger.debug(`Waiting for P2P client to sync with archiver`, { archiverLastBlockHash, p2pLastBlockHash }); + return false; + } + + return targetSlot; + } + + /** + * Gathers committee and proposer data for a given slot, computes slot stats, + * and updates overall stats. + */ + protected async processSlot(slot: bigint) { + const { epoch, seed, committee } = await this.epochCache.getCommittee(slot); + if (committee.length === 0) { + this.logger.warn(`No committee found for slot ${slot} at epoch ${epoch}`); + this.lastProcessedSlot = slot; + return; + } + const proposerIndex = this.epochCache.computeProposerIndex(slot, epoch, seed, BigInt(committee.length)); + const proposer = committee[Number(proposerIndex)]; + const stats = await this.getSlotActivity(slot, epoch, proposer, committee); + this.logger.verbose(`Updating L2 slot ${slot} observed activity`, stats); + await this.updateValidators(slot, stats); + this.lastProcessedSlot = slot; + } + + /** Computes activity for a given slot. */ + protected async getSlotActivity(slot: bigint, epoch: bigint, proposer: EthAddress, committee: EthAddress[]) { + this.logger.debug(`Computing stats for slot ${slot} at epoch ${epoch}`, { slot, epoch, proposer, committee }); + + // Check if there is an L2 block in L1 for this L2 slot + + // Here we get all attestations for the block mined at the given slot, + // or all attestations for all proposals in the slot if no block was mined. + const archive = this.slotNumberToArchive.get(slot); + const attested = await this.p2p.getAttestationsForSlot(slot, archive); + const attestors = new Set(await Promise.all(attested.map(a => a.getSender().then(a => a.toString())))); + + // We assume that there was a block proposal if at least one of the validators attested to it. + // It could be the case that every single validator failed, and we could differentiate it by having + // this node re-execute every block proposal it sees and storing it in the attestation pool. + // But we'll leave that corner case out to reduce pressure on the node. + const blockStatus = archive ? 'mined' : attestors.size > 0 ? 'proposed' : 'missed'; + this.logger.debug(`Block for slot ${slot} was ${blockStatus}`, { archive, slot }); + + // Get attestors that failed their duties for this block, but only if there was a block proposed + const missedAttestors = new Set( + blockStatus === 'missed' + ? [] + : committee.filter(v => !attestors.has(v.toString()) && !proposer.equals(v)).map(v => v.toString()), + ); + + this.logger.debug(`Retrieved ${attestors.size} attestors out of ${committee.length} for slot ${slot}`, { + blockStatus, + proposer: proposer.toString(), + archive, + slot, + attestors: [...attestors], + missedAttestors: [...missedAttestors], + committee: committee.map(c => c.toString()), + }); + + // Compute the status for each validator in the committee + const statusFor = (who: `0x${string}`): ValidatorStatusInSlot | undefined => { + if (who === proposer.toString()) { + return `block-${blockStatus}`; + } else if (attestors.has(who)) { + return 'attestation-sent'; + } else if (missedAttestors.has(who)) { + return 'attestation-missed'; + } else { + return undefined; + } + }; + + return Object.fromEntries(committee.map(v => v.toString()).map(who => [who, statusFor(who)])); + } + + /** Push the status for each slot for each validator. */ + protected updateValidators(slot: bigint, stats: Record<`0x${string}`, ValidatorStatusInSlot | undefined>) { + return this.store.updateValidators(slot, stats); + } + + /** Computes stats to be returned based on stored data. */ + public async computeStats(): Promise { + const histories = await this.store.getHistories(); + const slotNow = this.epochCache.getEpochAndSlotNow().slot; + const fromSlot = (this.lastProcessedSlot ?? slotNow) - BigInt(this.store.getHistoryLength()); + const result: Record<`0x${string}`, ValidatorStats> = {}; + for (const [address, history] of Object.entries(histories)) { + const validatorAddress = address as `0x${string}`; + result[validatorAddress] = this.computeStatsForValidator(validatorAddress, history, fromSlot); + } + return { + stats: result, + lastProcessedSlot: this.lastProcessedSlot, + initialSlot: this.initialSlot, + slotWindow: this.store.getHistoryLength(), + }; + } + + protected computeStatsForValidator( + address: `0x${string}`, + allHistory: ValidatorStatusHistory, + fromSlot?: bigint, + ): ValidatorStats { + const history = fromSlot ? allHistory.filter(h => h.slot >= fromSlot) : allHistory; + return { + address: EthAddress.fromString(address), + lastProposal: this.computeFromSlot( + history.filter(h => h.status === 'block-proposed' || h.status === 'block-mined').at(-1)?.slot, + ), + lastAttestation: this.computeFromSlot(history.filter(h => h.status === 'attestation-sent').at(-1)?.slot), + totalSlots: history.length, + missedProposals: this.computeMissed(history, 'block', 'block-missed'), + missedAttestations: this.computeMissed(history, 'attestation', 'attestation-missed'), + history, + }; + } + + protected computeMissed( + history: ValidatorStatusHistory, + computeOverPrefix: ValidatorStatusType, + filter: ValidatorStatusInSlot, + ) { + const relevantHistory = history.filter(h => h.status.startsWith(computeOverPrefix)); + const filteredHistory = relevantHistory.filter(h => h.status === filter); + return { + currentStreak: countWhile([...relevantHistory].reverse(), h => h.status === filter), + rate: filteredHistory.length / relevantHistory.length, + count: filteredHistory.length, + }; + } + + protected computeFromSlot(slot: bigint | undefined) { + if (slot === undefined) { + return undefined; + } + const timestamp = getTimestampForSlot(slot, this.epochCache.getL1Constants()); + return { timestamp, slot, date: new Date(Number(timestamp) * 1000).toISOString() }; + } +} diff --git a/yarn-project/aztec-node/src/sentinel/store.test.ts b/yarn-project/aztec-node/src/sentinel/store.test.ts new file mode 100644 index 000000000000..314055555cde --- /dev/null +++ b/yarn-project/aztec-node/src/sentinel/store.test.ts @@ -0,0 +1,95 @@ +import { times } from '@aztec/foundation/collection'; +import { AztecLMDBStoreV2, openTmpStore } from '@aztec/kv-store/lmdb-v2'; +import type { ValidatorStatusInSlot } from '@aztec/stdlib/validators'; + +import { SentinelStore } from './store.js'; + +describe('sentinel-store', () => { + let kvStore: AztecLMDBStoreV2; + let store: SentinelStore; + + beforeEach(async () => { + kvStore = await openTmpStore('sentinel-store-test'); + store = new SentinelStore(kvStore, { historyLength: 4 }); + }); + + afterEach(async () => { + await kvStore.close(); + }); + + it('inserts new validators with all statuses', async () => { + const slot = 1n; + const validators: `0x${string}`[] = times(5, i => `0x${i}` as `0x${string}`); + const statuses: ValidatorStatusInSlot[] = [ + 'block-mined', + 'block-proposed', + 'block-missed', + 'attestation-sent', + 'attestation-missed', + ]; + + await store.updateValidators(slot, Object.fromEntries(validators.map((v, i) => [v, statuses[i]] as const))); + + const histories = await store.getHistories(); + expect(Object.keys(histories)).toHaveLength(validators.length); + + for (const index in validators) { + const validator = validators[index]; + const history = histories[validator]; + expect(history).toHaveLength(1); + expect(history[0].slot).toEqual(slot); + expect(history[0].status).toEqual(statuses[index]); + } + }); + + it('updates existing validators with new slots and inserts new ones', async () => { + const existingValidators: `0x${string}`[] = times(2, i => `0x${i}` as `0x${string}`); + const newValidators: `0x${string}`[] = times(2, i => `0x${i + 2}` as `0x${string}`); + + // Insert existing validators with initial statuses + await store.updateValidators(1n, Object.fromEntries(existingValidators.map(v => [v, 'block-mined'] as const))); + + // Insert new validators with their statuses, and append history to existing ones + await store.updateValidators( + 2n, + Object.fromEntries([ + ...newValidators.map(v => [v, 'block-proposed'] as const), + ...existingValidators.map(v => [v, 'block-missed'] as const), + ]), + ); + + const histories = await store.getHistories(); + expect(Object.keys(histories)).toHaveLength(4); + + expect(histories[existingValidators[0]]).toEqual([ + { slot: 1n, status: 'block-mined' }, + { slot: 2n, status: 'block-missed' }, + ]); + + expect(histories[existingValidators[1]]).toEqual([ + { slot: 1n, status: 'block-mined' }, + { slot: 2n, status: 'block-missed' }, + ]); + + expect(histories[newValidators[0]]).toEqual([{ slot: 2n, status: 'block-proposed' }]); + expect(histories[newValidators[1]]).toEqual([{ slot: 2n, status: 'block-proposed' }]); + }); + + it('trims history to the specified length', async () => { + const slot = 1n; + const validator: `0x${string}` = '0x1' as `0x${string}`; + + for (let i = 0; i < 10; i++) { + await store.updateValidators(slot + BigInt(i), { [validator]: 'block-mined' }); + } + + const histories = await store.getHistories(); + expect(histories[validator]).toHaveLength(4); + expect(histories[validator]).toEqual([ + { slot: 7n, status: 'block-mined' }, + { slot: 8n, status: 'block-mined' }, + { slot: 9n, status: 'block-mined' }, + { slot: 10n, status: 'block-mined' }, + ]); + }); +}); diff --git a/yarn-project/aztec-node/src/sentinel/store.ts b/yarn-project/aztec-node/src/sentinel/store.ts new file mode 100644 index 000000000000..0740be15a2e3 --- /dev/null +++ b/yarn-project/aztec-node/src/sentinel/store.ts @@ -0,0 +1,103 @@ +import { BufferReader, numToUInt8, numToUInt32BE, serializeToBuffer } from '@aztec/foundation/serialize'; +import type { AztecAsyncKVStore, AztecAsyncMap } from '@aztec/kv-store'; +import type { ValidatorStatusHistory, ValidatorStatusInSlot } from '@aztec/stdlib/validators'; + +export class SentinelStore { + public static readonly SCHEMA_VERSION = 1; + + private readonly map: AztecAsyncMap<`0x${string}`, Buffer>; + + constructor(private store: AztecAsyncKVStore, private config: { historyLength: number }) { + this.map = store.openMap('sentinel-validator-status'); + } + + public getHistoryLength() { + return this.config.historyLength; + } + + public async updateValidators(slot: bigint, statuses: Record<`0x${string}`, ValidatorStatusInSlot | undefined>) { + await this.store.transactionAsync(async () => { + for (const [who, status] of Object.entries(statuses)) { + if (status) { + await this.pushValidatorStatusForSlot(who as `0x${string}`, slot, status); + } + } + }); + } + + private async pushValidatorStatusForSlot( + who: `0x${string}`, + slot: bigint, + status: 'block-mined' | 'block-proposed' | 'block-missed' | 'attestation-sent' | 'attestation-missed', + ) { + const currentHistory = (await this.getHistory(who)) ?? []; + const newHistory = [...currentHistory, { slot, status }].slice(-this.config.historyLength); + await this.map.set(who, this.serializeHistory(newHistory)); + } + + public async getHistories(): Promise> { + const histories: Record<`0x${string}`, ValidatorStatusHistory> = {}; + for await (const [address, history] of this.map.entriesAsync()) { + histories[address] = this.deserializeHistory(history); + } + return histories; + } + + private async getHistory(address: `0x${string}`): Promise { + const data = await this.map.getAsync(address); + return data && this.deserializeHistory(data); + } + + private serializeHistory(history: ValidatorStatusHistory): Buffer { + return serializeToBuffer( + history.map(h => [numToUInt32BE(Number(h.slot)), numToUInt8(this.statusToNumber(h.status))]), + ); + } + + private deserializeHistory(buffer: Buffer): ValidatorStatusHistory { + const reader = new BufferReader(buffer); + const history: ValidatorStatusHistory = []; + while (!reader.isEmpty()) { + const slot = BigInt(reader.readNumber()); + const status = this.statusFromNumber(reader.readUInt8()); + history.push({ slot, status }); + } + return history; + } + + private statusToNumber(status: ValidatorStatusInSlot): number { + switch (status) { + case 'block-mined': + return 1; + case 'block-proposed': + return 2; + case 'block-missed': + return 3; + case 'attestation-sent': + return 4; + case 'attestation-missed': + return 5; + default: { + const _exhaustive: never = status; + throw new Error(`Unknown status: ${status}`); + } + } + } + + private statusFromNumber(status: number): ValidatorStatusInSlot { + switch (status) { + case 1: + return 'block-mined'; + case 2: + return 'block-proposed'; + case 3: + return 'block-missed'; + case 4: + return 'attestation-sent'; + case 5: + return 'attestation-missed'; + default: + throw new Error(`Unknown status: ${status}`); + } + } +} diff --git a/yarn-project/end-to-end/bootstrap.sh b/yarn-project/end-to-end/bootstrap.sh index 1996eddd802d..3fb64bf7b602 100755 --- a/yarn-project/end-to-end/bootstrap.sh +++ b/yarn-project/end-to-end/bootstrap.sh @@ -92,6 +92,7 @@ function test_cmds { echo "$prefix simple e2e_p2p/reex" echo "$prefix simple e2e_p2p/slashing" echo "$prefix simple e2e_p2p/upgrade_governance_proposer" + echo "$prefix simple e2e_p2p/validators_sentinel" echo "$prefix simple e2e_pending_note_hashes_contract" echo "$prefix simple e2e_private_voting_contract" diff --git a/yarn-project/end-to-end/src/e2e_p2p/p2p_network.ts b/yarn-project/end-to-end/src/e2e_p2p/p2p_network.ts index ca2a03ee339a..126c3ef588ba 100644 --- a/yarn-project/end-to-end/src/e2e_p2p/p2p_network.ts +++ b/yarn-project/end-to-end/src/e2e_p2p/p2p_network.ts @@ -54,6 +54,7 @@ export class P2PNetworkTest { public attesterPublicKeys: string[] = []; public proposerPrivateKeys: `0x${string}`[] = []; public peerIdPrivateKeys: string[] = []; + public validators: { attester: `0x${string}`; proposer: `0x${string}`; withdrawer: `0x${string}` }[] = []; public deployedAccounts: InitialAccountData[] = []; public prefilledPublicData: PublicDataTreeLeaf[] = []; @@ -230,9 +231,10 @@ export class P2PNetworkTest { amount: l1ContractsConfig.minimumStake, } as const); - this.logger.verbose(`Adding (attester, proposer) pair: (${attester.address}, ${forwarder}) as validator`); + this.logger.info(`Adding attester ${attester.address} proposer ${forwarder} as validator`); } + this.validators = validators; await deployL1ContractsValues.publicClient.waitForTransactionReceipt({ hash: await rollup.write.cheat__InitialiseValidatorSet([validators]), }); diff --git a/yarn-project/end-to-end/src/e2e_p2p/validators_sentinel.test.ts b/yarn-project/end-to-end/src/e2e_p2p/validators_sentinel.test.ts new file mode 100644 index 000000000000..0822b14f5fb9 --- /dev/null +++ b/yarn-project/end-to-end/src/e2e_p2p/validators_sentinel.test.ts @@ -0,0 +1,98 @@ +import type { AztecNodeService } from '@aztec/aztec-node'; +import { retryUntil } from '@aztec/aztec.js'; + +import { jest } from '@jest/globals'; +import fs from 'fs'; +import 'jest-extended'; +import os from 'os'; +import path from 'path'; + +import { createNodes } from '../fixtures/setup_p2p_test.js'; +import { P2PNetworkTest, SHORTENED_BLOCK_TIME_CONFIG } from './p2p_network.js'; + +const NUM_NODES = 4; +const NUM_VALIDATORS = NUM_NODES + 1; // We create an extra validator, who will not have a running node +const BOOT_NODE_UDP_PORT = 40900; + +const DATA_DIR = fs.mkdtempSync(path.join(os.tmpdir(), 'validators-sentinel-')); + +jest.setTimeout(1000 * 60 * 10); + +describe('e2e_p2p_validators_sentinel', () => { + let t: P2PNetworkTest; + let nodes: AztecNodeService[]; + + beforeEach(async () => { + t = await P2PNetworkTest.create({ + testName: 'e2e_p2p_network', + numberOfNodes: NUM_VALIDATORS, + basePort: BOOT_NODE_UDP_PORT, + initialConfig: { + ...SHORTENED_BLOCK_TIME_CONFIG, + minTxsPerBlock: 0, + aztecEpochDuration: 48, + validatorReexecute: false, + }, + }); + + await t.setupAccount(); + await t.applyBaseSnapshots(); + await t.setup(); + await t.removeInitialNode(); + }); + + afterEach(async () => { + await t.stopNodes(nodes); + await t.teardown(); + for (let i = 0; i < NUM_NODES; i++) { + fs.rmSync(`${DATA_DIR}-${i}`, { recursive: true, force: true, maxRetries: 3 }); + } + }); + + it('collects stats for offline validator', async () => { + if (!t.bootstrapNodeEnr) { + throw new Error('Bootstrap node ENR is not available'); + } + + t.logger.info('Creating nodes'); + nodes = await createNodes( + t.ctx.aztecNodeConfig, + t.ctx.dateProvider, + t.bootstrapNodeEnr, + NUM_NODES, + BOOT_NODE_UDP_PORT, + t.prefilledPublicData, + DATA_DIR, + ); + + // Wait for a few blocks to be mined + const currentBlock = t.monitor.l2BlockNumber; + const blockCount = NUM_VALIDATORS * 2; + const timeout = SHORTENED_BLOCK_TIME_CONFIG.aztecSlotDuration * blockCount * 2 + 20; + t.logger.info(`Waiting until L2 block ${currentBlock + blockCount}`, { currentBlock, blockCount, timeout }); + await retryUntil(() => t.monitor.l2BlockNumber >= currentBlock + blockCount, 'blocks mined', timeout); + + const stats = await nodes[0].getValidatorsStats(); + t.logger.info(`Collected validator stats at block ${t.monitor.l2BlockNumber}`, { stats, validators: t.validators }); + + // Check stats for the offline validator + const offlineValidator = t.validators.at(-1)!.attester.toLowerCase(); + t.logger.info(`Asserting stats for offline validator ${offlineValidator}`); + const offlineStats = stats.stats[offlineValidator]; + const historyLength = offlineStats.history.length; + expect(offlineStats.history.length).toBeGreaterThanOrEqual(blockCount - 1); + expect(offlineStats.history.every(h => h.status.endsWith('-missed'))).toBeTrue(); + expect(offlineStats.missedAttestations.count + offlineStats.missedProposals.count).toEqual(historyLength); + expect(offlineStats.missedAttestations.rate).toEqual(1); + expect(offlineStats.missedProposals.rate).toBeOneOf([1, NaN]); + + // Check stats for a working validator + const okValidator = t.validators[0].attester.toLowerCase(); + const okStats = stats.stats[okValidator]; + t.logger.info(`Asserting stats for ok validator ${okValidator}`); + expect(okStats.history.length).toBeGreaterThanOrEqual(blockCount - 1); + expect(okStats.history.some(h => h.status === 'attestation-sent')).toBeTrue(); + expect(okStats.history.some(h => h.status === 'block-mined' || 'block-proposed')).toBeTrue(); + expect(okStats.missedAttestations.rate).toBeLessThan(1); + }); +}); diff --git a/yarn-project/epoch-cache/src/epoch_cache.test.ts b/yarn-project/epoch-cache/src/epoch_cache.test.ts index b6afb5108a41..bfca5cf04d39 100644 --- a/yarn-project/epoch-cache/src/epoch_cache.test.ts +++ b/yarn-project/epoch-cache/src/epoch_cache.test.ts @@ -1,4 +1,5 @@ import type { RollupContract } from '@aztec/ethereum'; +import { times } from '@aztec/foundation/collection'; import { EthAddress } from '@aztec/foundation/eth-address'; import { afterEach, beforeEach, describe, expect, it, jest } from '@jest/globals'; @@ -45,7 +46,7 @@ describe('EpochCache', () => { epochDuration: EPOCH_DURATION, }; - epochCache = new EpochCache(rollupContract, testCommittee, 0n, testConstants); + epochCache = new EpochCache(rollupContract, 0n, testCommittee, 0n, testConstants); }); afterEach(() => { @@ -54,7 +55,7 @@ describe('EpochCache', () => { it('should cache the validator set for the length of an epoch', async () => { // Initial call to get validators - const initialCommittee = await epochCache.getCommittee(); + const { committee: initialCommittee } = await epochCache.getCommittee(); expect(initialCommittee).toEqual(testCommittee); // Not called as we should cache with the initial validator set expect(rollupContract.getCommitteeAt).toHaveBeenCalledTimes(0); @@ -66,7 +67,7 @@ describe('EpochCache', () => { rollupContract.getCommitteeAt.mockResolvedValue([...testCommittee, extraTestValidator].map(v => v.toString())); // Should use cached validators - const midEpochCommittee = await epochCache.getCommittee(); + const { committee: midEpochCommittee } = await epochCache.getCommittee(); expect(midEpochCommittee).toEqual(testCommittee); expect(rollupContract.getCommitteeAt).toHaveBeenCalledTimes(0); // Still cached @@ -74,7 +75,7 @@ describe('EpochCache', () => { jest.setSystemTime(Date.now() + Number(EPOCH_DURATION * SLOT_DURATION) * 1000); // Should fetch new validator - const nextEpochCommittee = await epochCache.getCommittee(); + const { committee: nextEpochCommittee } = await epochCache.getCommittee(); expect(nextEpochCommittee).toEqual([...testCommittee, extraTestValidator]); expect(rollupContract.getCommitteeAt).toHaveBeenCalledTimes(1); // Called again for new epoch }); @@ -114,4 +115,64 @@ describe('EpochCache', () => { await epochCache.getProposerInCurrentOrNextSlot(); expect(rollupContract.getCommitteeAt).toHaveBeenCalledTimes(1); }); + + it('should cache multiple epochs', async () => { + // Initial call to get validators + const { committee: initialCommittee } = await epochCache.getCommittee(); + expect(initialCommittee).toEqual(testCommittee); + + // Move time forward to next epoch (x 1000 for milliseconds) + jest.setSystemTime(Date.now() + Number(EPOCH_DURATION * SLOT_DURATION) * 1000); + + // Add another validator to the set + rollupContract.getCommitteeAt.mockResolvedValue([...testCommittee, extraTestValidator].map(v => v.toString())); + + // Should fetch new validator + const { committee: nextEpochCommittee } = await epochCache.getCommittee(); + expect(nextEpochCommittee).toEqual([...testCommittee, extraTestValidator]); + expect(rollupContract.getCommitteeAt).toHaveBeenCalledTimes(1); // Called again for new epoch + rollupContract.getCommitteeAt.mockClear(); + + // Should return the previous epoch still cached + const { committee: initialCommitteeRerequested } = await epochCache.getCommittee(1n); + expect(initialCommitteeRerequested).toEqual(testCommittee); + expect(rollupContract.getCommitteeAt).toHaveBeenCalledTimes(0); // Cached + }); + + it('should purge old epochs', async () => { + // Set the cache size to 3 epochs + (epochCache as any).config.cacheSize = 3; + + const extraValidators = [4, 5, 6, 7].map(EthAddress.fromNumber); + const committees = times(4, i => [...testCommittee, ...extraValidators.slice(0, i)]); + + // Seed the cache with 3 epochs worth of data + for (let i = 0; i < 3; i++) { + rollupContract.getCommitteeAt.mockResolvedValue(committees[i].map(v => v.toString())); + const { committee: actual } = await epochCache.getCommittee(BigInt(i * EPOCH_DURATION)); + expect(actual).toEqual(committees[i]); + expect(rollupContract.getCommitteeAt).toHaveBeenCalledTimes(i); // Epoch 0 is already initialized + } + + // Requesting any of them should not call the contract again + rollupContract.getCommitteeAt.mockClear(); + for (let i = 0; i < 3; i++) { + const { committee: actual } = await epochCache.getCommittee(BigInt(i * EPOCH_DURATION)); + expect(actual).toEqual(committees[i]); + expect(rollupContract.getCommitteeAt).toHaveBeenCalledTimes(0); + } + + // Requesting another epoch should cause the oldest to be purged + rollupContract.getCommitteeAt.mockResolvedValue(committees[3].map(v => v.toString())); + const { committee: fourth } = await epochCache.getCommittee(BigInt(3 * EPOCH_DURATION)); + expect(fourth).toEqual(committees[3]); + expect(rollupContract.getCommitteeAt).toHaveBeenCalledTimes(1); + rollupContract.getCommitteeAt.mockClear(); + + // So when going back to the first epoch, it should be re-requested from the contract + rollupContract.getCommitteeAt.mockResolvedValue(committees[0].map(v => v.toString())); + const { committee: first } = await epochCache.getCommittee(BigInt(0 * EPOCH_DURATION)); + expect(first).toEqual(committees[0]); + expect(rollupContract.getCommitteeAt).toHaveBeenCalledTimes(1); + }); }); diff --git a/yarn-project/epoch-cache/src/epoch_cache.ts b/yarn-project/epoch-cache/src/epoch_cache.ts index 0db6112deea0..e8a09ae14c1a 100644 --- a/yarn-project/epoch-cache/src/epoch_cache.ts +++ b/yarn-project/epoch-cache/src/epoch_cache.ts @@ -5,11 +5,12 @@ import { DateProvider } from '@aztec/foundation/timer'; import { EmptyL1RollupConstants, type L1RollupConstants, + getEpochAtSlot, getEpochNumberAtTimestamp, getSlotAtTimestamp, + getTimestampRangeForEpoch, } from '@aztec/stdlib/epoch-helpers'; -import { EventEmitter } from 'node:events'; import { createPublicClient, encodeAbiParameters, fallback, http, keccak256 } from 'viem'; import { type EpochCacheConfig, getEpochCacheConfigEnvVars } from './config.js'; @@ -20,8 +21,14 @@ type EpochAndSlot = { ts: bigint; }; +export type EpochCommitteeInfo = { + committee: EthAddress[]; + seed: bigint; + epoch: bigint; +}; + export interface EpochCacheInterface { - getCommittee(nextSlot: boolean): Promise; + getCommittee(slot: 'now' | 'next' | bigint | undefined): Promise; getEpochAndSlotNow(): EpochAndSlot; getProposerIndexEncoding(epoch: bigint, slot: bigint, seed: bigint): `0x${string}`; computeProposerIndex(slot: bigint, epoch: bigint, seed: bigint, size: bigint): bigint; @@ -38,35 +45,31 @@ export interface EpochCacheInterface { * Epoch cache * * This class is responsible for managing traffic to the l1 node, by caching the validator set. + * Keeps the last N epochs in cache. * It also provides a method to get the current or next proposer, and to check who is in the current slot. * - * If the epoch changes, then we update the stored validator set. - * * Note: This class is very dependent on the system clock being in sync. */ -export class EpochCache - extends EventEmitter<{ committeeChanged: [EthAddress[], bigint] }> - implements EpochCacheInterface -{ - private committee: EthAddress[]; - private cachedEpoch: bigint; - private cachedSampleSeed: bigint; +export class EpochCache implements EpochCacheInterface { + private cache: Map = new Map(); private readonly log: Logger = createLogger('epoch-cache'); constructor( private rollup: RollupContract, + initialEpoch: bigint = 0n, initialValidators: EthAddress[] = [], initialSampleSeed: bigint = 0n, private readonly l1constants: L1RollupConstants = EmptyL1RollupConstants, private readonly dateProvider: DateProvider = new DateProvider(), + private readonly config = { cacheSize: 12 }, ) { - super(); - this.committee = initialValidators; - this.cachedSampleSeed = initialSampleSeed; - - this.log.debug(`Initialized EpochCache with constants and validators`, { l1constants, initialValidators }); - - this.cachedEpoch = getEpochNumberAtTimestamp(this.nowInSeconds(), this.l1constants); + this.cache.set(initialEpoch, { epoch: initialEpoch, committee: initialValidators, seed: initialSampleSeed }); + this.log.debug(`Initialized EpochCache with ${initialValidators.length} validators`, { + l1constants, + initialValidators, + initialSampleSeed, + initialEpoch, + }); } static async create( @@ -84,11 +87,12 @@ export class EpochCache }); const rollup = new RollupContract(publicClient, rollupAddress.toString()); - const [l1StartBlock, l1GenesisTime, initialValidators, sampleSeed] = await Promise.all([ + const [l1StartBlock, l1GenesisTime, initialValidators, sampleSeed, epochNumber] = await Promise.all([ rollup.getL1StartBlock(), rollup.getL1GenesisTime(), rollup.getCurrentEpochCommittee(), rollup.getCurrentSampleSeed(), + rollup.getEpochNumber(), ] as const); const l1RollupConstants: L1RollupConstants = { @@ -101,6 +105,7 @@ export class EpochCache return new EpochCache( rollup, + epochNumber, initialValidators.map(v => EthAddress.fromString(v)), sampleSeed, l1RollupConstants, @@ -108,12 +113,22 @@ export class EpochCache ); } + public getL1Constants(): L1RollupConstants { + return this.l1constants; + } + + public getEpochAndSlotNow(): EpochAndSlot { + return this.getEpochAndSlotAtTimestamp(this.nowInSeconds()); + } + private nowInSeconds(): bigint { return BigInt(Math.floor(this.dateProvider.now() / 1000)); } - getEpochAndSlotNow(): EpochAndSlot { - return this.getEpochAndSlotAtTimestamp(this.nowInSeconds()); + private getEpochAndSlotAtSlot(slot: bigint): EpochAndSlot { + const epoch = getEpochAtSlot(slot, this.l1constants); + const ts = getTimestampRangeForEpoch(slot, this.l1constants)[0]; + return { epoch, ts, slot }; } private getEpochAndSlotInNextSlot(): EpochAndSlot { @@ -131,31 +146,42 @@ export class EpochCache /** * Get the current validator set - * * @param nextSlot - If true, get the validator set for the next slot. * @returns The current validator set. */ - async getCommittee(nextSlot: boolean = false): Promise { - // If the current epoch has changed, then we need to make a request to update the validator set - const { epoch: calculatedEpoch, ts } = nextSlot ? this.getEpochAndSlotInNextSlot() : this.getEpochAndSlotNow(); - - if (calculatedEpoch !== this.cachedEpoch) { - this.log.debug(`Updating validator set for new epoch ${calculatedEpoch}`, { - epoch: calculatedEpoch, - previousEpoch: this.cachedEpoch, - }); - const [committeeAtTs, sampleSeedAtTs] = await Promise.all([ - this.rollup.getCommitteeAt(ts), - this.rollup.getSampleSeedAt(ts), - ]); - this.committee = committeeAtTs.map((v: `0x${string}`) => EthAddress.fromString(v)); - this.cachedEpoch = calculatedEpoch; - this.cachedSampleSeed = sampleSeedAtTs; - this.log.debug(`Updated validator set for epoch ${calculatedEpoch}`, { commitee: this.committee }); - this.emit('committeeChanged', this.committee, calculatedEpoch); + public async getCommittee(slot: 'now' | 'next' | bigint = 'now'): Promise { + const { epoch, ts } = this.getEpochAndTimestamp(slot); + + if (this.cache.has(epoch)) { + return this.cache.get(epoch)!; } - return this.committee; + const epochData = await this.computeCommittee({ epoch, ts }); + this.cache.set(epoch, epochData); + + const toPurge = Array.from(this.cache.keys()) + .sort((a, b) => Number(b - a)) + .slice(this.config.cacheSize); + toPurge.forEach(key => this.cache.delete(key)); + + return epochData; + } + + private getEpochAndTimestamp(slot: 'now' | 'next' | bigint = 'now') { + if (slot === 'now') { + return this.getEpochAndSlotNow(); + } else if (slot === 'next') { + return this.getEpochAndSlotInNextSlot(); + } else { + return this.getEpochAndSlotAtSlot(slot); + } + } + + private async computeCommittee(when: { epoch: bigint; ts: bigint }): Promise { + const { ts, epoch } = when; + const [committeeHex, seed] = await Promise.all([this.rollup.getCommitteeAt(ts), this.rollup.getSampleSeedAt(ts)]); + const committee = committeeHex.map((v: `0x${string}`) => EthAddress.fromString(v)); + return { committee, seed, epoch }; } /** @@ -181,9 +207,6 @@ export class EpochCache * * We return the next proposer as the node will check if it is the proposer at the next ethereum block, which * can be the next slot. If this is the case, then it will send proposals early. - * - * If we are at an epoch boundary, then we can update the cache for the next epoch, this is the last check - * we do in the validator client, so we can update the cache here. */ async getProposerInCurrentOrNextSlot(): Promise<{ currentProposer: EthAddress; @@ -191,41 +214,29 @@ export class EpochCache currentSlot: bigint; nextSlot: bigint; }> { - // Validators are sorted by their index in the committee, and getValidatorSet will cache - const committee = await this.getCommittee(); - const { slot: currentSlot, epoch: currentEpoch } = this.getEpochAndSlotNow(); - const { slot: nextSlot, epoch: nextEpoch } = this.getEpochAndSlotInNextSlot(); - - // Compute the proposer in this and the next slot - const proposerIndex = this.computeProposerIndex( - currentSlot, - this.cachedEpoch, - this.cachedSampleSeed, - BigInt(committee.length), - ); - - // Check if the next proposer is in the next epoch - if (nextEpoch !== currentEpoch) { - await this.getCommittee(/*next slot*/ true); - } - const nextProposerIndex = this.computeProposerIndex( - nextSlot, - this.cachedEpoch, - this.cachedSampleSeed, - BigInt(committee.length), - ); + const current = this.getEpochAndSlotNow(); + const next = this.getEpochAndSlotInNextSlot(); - const currentProposer = committee[Number(proposerIndex)]; - const nextProposer = committee[Number(nextProposerIndex)]; + return { + currentProposer: await this.getProposerAt(current), + nextProposer: await this.getProposerAt(next), + currentSlot: current.slot, + nextSlot: next.slot, + }; + } - return { currentProposer, nextProposer, currentSlot, nextSlot }; + private async getProposerAt(when: EpochAndSlot) { + const { epoch, slot } = when; + const { seed, committee } = await this.getCommittee(slot); + const proposerIndex = this.computeProposerIndex(slot, epoch, seed, BigInt(committee.length)); + return committee[Number(proposerIndex)]; } /** * Check if a validator is in the current epoch's committee */ async isInCommittee(validator: EthAddress): Promise { - const committee = await this.getCommittee(); + const { committee } = await this.getCommittee(); return committee.some(v => v.equals(validator)); } } diff --git a/yarn-project/epoch-cache/src/timestamp_provider.ts b/yarn-project/epoch-cache/src/timestamp_provider.ts deleted file mode 100644 index e69de29bb2d1..000000000000 diff --git a/yarn-project/foundation/package.json b/yarn-project/foundation/package.json index 58a2cf3f1d5e..c053b9392f7f 100644 --- a/yarn-project/foundation/package.json +++ b/yarn-project/foundation/package.json @@ -11,6 +11,7 @@ "./abi": "./dest/abi/index.js", "./async-map": "./dest/async-map/index.js", "./async-pool": "./dest/async-pool/index.js", + "./bigint": "./dest/bigint/index.js", "./collection": "./dest/collection/index.js", "./config": "./dest/config/index.js", "./crypto": "./dest/crypto/index.js", diff --git a/yarn-project/foundation/src/bigint/index.ts b/yarn-project/foundation/src/bigint/index.ts new file mode 100644 index 000000000000..3dea7f5cbc72 --- /dev/null +++ b/yarn-project/foundation/src/bigint/index.ts @@ -0,0 +1,7 @@ +/** Returns minimum value across a list of bigints. */ +export function minBigint(...values: bigint[]): bigint { + if (values.length === 0) { + throw new Error('Cannot get min of empty array'); + } + return values.reduce((min, value) => (value < min ? value : min), values[0]); +} diff --git a/yarn-project/foundation/src/collection/array.ts b/yarn-project/foundation/src/collection/array.ts index 0e30ee657c02..251d531da8ad 100644 --- a/yarn-project/foundation/src/collection/array.ts +++ b/yarn-project/foundation/src/collection/array.ts @@ -201,3 +201,16 @@ export function stdDev(values: number[]) { } return Math.sqrt(variance(values)!); } + +/** Counts how many items from the beginning of the array match the given predicate. */ +export function countWhile(collection: T[], predicate: (x: T) => boolean): number { + let count = 0; + for (const item of collection) { + if (predicate(item)) { + count++; + } else { + break; + } + } + return count; +} diff --git a/yarn-project/foundation/src/config/env_var.ts b/yarn-project/foundation/src/config/env_var.ts index 32ec18b1497e..6111a7766961 100644 --- a/yarn-project/foundation/src/config/env_var.ts +++ b/yarn-project/foundation/src/config/env_var.ts @@ -149,6 +149,8 @@ export type EnvVar = | 'PXE_PROVER_ENABLED' | 'REGISTRY_CONTRACT_ADDRESS' | 'ROLLUP_CONTRACT_ADDRESS' + | 'SENTINEL_ENABLED' + | 'SENTINEL_HISTORY_LENGTH_IN_EPOCHS' | 'SEQ_ALLOWED_SETUP_FN' | 'SEQ_MAX_BLOCK_SIZE_IN_BYTES' | 'SEQ_MAX_TX_PER_BLOCK' diff --git a/yarn-project/foundation/src/eth-address/index.ts b/yarn-project/foundation/src/eth-address/index.ts index ed9acffbb5b3..0e406b315e63 100644 --- a/yarn-project/foundation/src/eth-address/index.ts +++ b/yarn-project/foundation/src/eth-address/index.ts @@ -228,6 +228,13 @@ export class EthAddress { return new EthAddress(reader.readBytes(EthAddress.SIZE_IN_BYTES)); } + /** Converts a number into an address. Useful for testing. */ + static fromNumber(num: bigint | number): EthAddress { + const buffer = Buffer.alloc(EthAddress.SIZE_IN_BYTES); + buffer.writeBigUInt64BE(BigInt(num), 0); + return new EthAddress(buffer); + } + toJSON() { return this.toString(); } diff --git a/yarn-project/foundation/src/promise/running-promise.ts b/yarn-project/foundation/src/promise/running-promise.ts index e3e82e0a4708..84e0b94fef3f 100644 --- a/yarn-project/foundation/src/promise/running-promise.ts +++ b/yarn-project/foundation/src/promise/running-promise.ts @@ -43,7 +43,7 @@ export class RunningPromise { public start() { if (this.running) { this.logger.warn(`Attempted to start running promise that was already started`); - return; + return this; } this.running = true; diff --git a/yarn-project/kv-store/src/stores/index.ts b/yarn-project/kv-store/src/stores/index.ts index c279b4ba6289..77a1e47481a1 100644 --- a/yarn-project/kv-store/src/stores/index.ts +++ b/yarn-project/kv-store/src/stores/index.ts @@ -1 +1,6 @@ +import type { L2BlockStreamEventHandler, L2BlockStreamLocalDataProvider } from '@aztec/stdlib/block'; + export * from './l2_tips_store.js'; +export * from './l2_tips_memory_store.js'; + +export type L2TipsStore = L2BlockStreamEventHandler & L2BlockStreamLocalDataProvider; diff --git a/yarn-project/kv-store/src/stores/interface.ts b/yarn-project/kv-store/src/stores/interface.ts new file mode 100644 index 000000000000..b47e46c7b280 --- /dev/null +++ b/yarn-project/kv-store/src/stores/interface.ts @@ -0,0 +1,3 @@ +import type { L2BlockStreamEventHandler, L2BlockStreamLocalDataProvider } from '@aztec/stdlib/block'; + +export type L2TipsStore = L2BlockStreamEventHandler & L2BlockStreamLocalDataProvider; diff --git a/yarn-project/kv-store/src/stores/l2_tips_memory_store.test.ts b/yarn-project/kv-store/src/stores/l2_tips_memory_store.test.ts new file mode 100644 index 000000000000..1686fbd72813 --- /dev/null +++ b/yarn-project/kv-store/src/stores/l2_tips_memory_store.test.ts @@ -0,0 +1,6 @@ +import { L2TipsMemoryStore } from './l2_tips_memory_store.js'; +import { testL2TipsStore } from './l2_tips_store_suite.test.js'; + +describe('L2TipsMemoryStore', () => { + testL2TipsStore(() => Promise.resolve(new L2TipsMemoryStore())); +}); diff --git a/yarn-project/kv-store/src/stores/l2_tips_memory_store.ts b/yarn-project/kv-store/src/stores/l2_tips_memory_store.ts new file mode 100644 index 000000000000..40640fa4a362 --- /dev/null +++ b/yarn-project/kv-store/src/stores/l2_tips_memory_store.ts @@ -0,0 +1,66 @@ +import type { + L2BlockId, + L2BlockStreamEvent, + L2BlockStreamEventHandler, + L2BlockStreamLocalDataProvider, + L2BlockTag, + L2Tips, +} from '@aztec/stdlib/block'; + +/** Stores currently synced L2 tips and unfinalized block hashes. */ +export class L2TipsMemoryStore implements L2BlockStreamEventHandler, L2BlockStreamLocalDataProvider { + private readonly l2TipsStore: Map = new Map(); + private readonly l2BlockHashesStore: Map = new Map(); + + public getL2BlockHash(number: number): Promise { + return Promise.resolve(this.l2BlockHashesStore.get(number)); + } + + public getL2Tips(): Promise { + return Promise.resolve({ + latest: this.getL2Tip('latest'), + finalized: this.getL2Tip('finalized'), + proven: this.getL2Tip('proven'), + }); + } + + private getL2Tip(tag: L2BlockTag): L2BlockId { + const blockNumber = this.l2TipsStore.get(tag); + if (blockNumber === undefined || blockNumber === 0) { + return { number: 0, hash: undefined }; + } + const blockHash = this.l2BlockHashesStore.get(blockNumber); + if (!blockHash) { + throw new Error(`Block hash not found for block number ${blockNumber}`); + } + + return { number: blockNumber, hash: blockHash }; + } + + public async handleBlockStreamEvent(event: L2BlockStreamEvent): Promise { + switch (event.type) { + case 'blocks-added': { + const blocks = event.blocks.map(b => b.block); + for (const block of blocks) { + this.l2BlockHashesStore.set(block.number, (await block.header.hash()).toString()); + } + this.l2TipsStore.set('latest', blocks.at(-1)!.number); + break; + } + case 'chain-pruned': + this.l2TipsStore.set('latest', event.blockNumber); + break; + case 'chain-proven': + this.l2TipsStore.set('proven', event.blockNumber); + break; + case 'chain-finalized': + this.l2TipsStore.set('finalized', event.blockNumber); + for (const key of this.l2BlockHashesStore.keys()) { + if (key < event.blockNumber) { + this.l2BlockHashesStore.delete(key); + } + } + break; + } + } +} diff --git a/yarn-project/kv-store/src/stores/l2_tips_store.test.ts b/yarn-project/kv-store/src/stores/l2_tips_store.test.ts index 4ef330bfe6bf..31db77885591 100644 --- a/yarn-project/kv-store/src/stores/l2_tips_store.test.ts +++ b/yarn-project/kv-store/src/stores/l2_tips_store.test.ts @@ -1,81 +1,22 @@ -import { times } from '@aztec/foundation/collection'; -import { Fr } from '@aztec/foundation/fields'; import type { AztecAsyncKVStore } from '@aztec/kv-store'; import { openTmpStore } from '@aztec/kv-store/lmdb-v2'; -import type { L2Block, PublishedL2Block } from '@aztec/stdlib/block'; -import type { BlockHeader } from '@aztec/stdlib/tx'; -import { expect } from 'chai'; - -import { L2TipsStore } from './l2_tips_store.js'; +import { L2TipsKVStore } from './l2_tips_store.js'; +import { testL2TipsStore } from './l2_tips_store_suite.test.js'; describe('L2TipsStore', () => { let kvStore: AztecAsyncKVStore; - let tipsStore: L2TipsStore; beforeEach(async () => { kvStore = await openTmpStore('test', true); - tipsStore = new L2TipsStore(kvStore, 'test'); }); afterEach(async () => { await kvStore.delete(); }); - const makeBlock = (number: number): PublishedL2Block => ({ - block: { number, header: { hash: () => Promise.resolve(new Fr(number)) } as BlockHeader } as L2Block, - l1: { blockNumber: BigInt(number), blockHash: `0x${number}`, timestamp: BigInt(number) }, - signatures: [], - }); - - const makeTip = (number: number) => ({ number, hash: number === 0 ? undefined : new Fr(number).toString() }); - - const makeTips = (latest: number, proven: number, finalized: number) => ({ - latest: makeTip(latest), - proven: makeTip(proven), - finalized: makeTip(finalized), - }); - - it('returns zero if no tips are stored', async () => { - const tips = await tipsStore.getL2Tips(); - expect(tips).to.deep.equal(makeTips(0, 0, 0)); - }); - - it('stores chain tips', async () => { - await tipsStore.handleBlockStreamEvent({ type: 'blocks-added', blocks: times(20, i => makeBlock(i + 1)) }); - - await tipsStore.handleBlockStreamEvent({ type: 'chain-finalized', blockNumber: 5 }); - await tipsStore.handleBlockStreamEvent({ type: 'chain-proven', blockNumber: 8 }); - await tipsStore.handleBlockStreamEvent({ type: 'chain-pruned', blockNumber: 10 }); - - const tips = await tipsStore.getL2Tips(); - expect(tips).to.deep.equal(makeTips(10, 8, 5)); - }); - - it('sets latest tip from blocks added', async () => { - await tipsStore.handleBlockStreamEvent({ type: 'blocks-added', blocks: times(3, i => makeBlock(i + 1)) }); - - const tips = await tipsStore.getL2Tips(); - expect(tips).to.deep.equal(makeTips(3, 0, 0)); - - expect(await tipsStore.getL2BlockHash(1)).to.deep.equal(new Fr(1).toString()); - expect(await tipsStore.getL2BlockHash(2)).to.deep.equal(new Fr(2).toString()); - expect(await tipsStore.getL2BlockHash(3)).to.deep.equal(new Fr(3).toString()); - }); - - it('clears block hashes when setting finalized chain', async () => { - await tipsStore.handleBlockStreamEvent({ type: 'blocks-added', blocks: times(5, i => makeBlock(i + 1)) }); - await tipsStore.handleBlockStreamEvent({ type: 'chain-proven', blockNumber: 3 }); - await tipsStore.handleBlockStreamEvent({ type: 'chain-finalized', blockNumber: 3 }); - - const tips = await tipsStore.getL2Tips(); - expect(tips).to.deep.equal(makeTips(5, 3, 3)); - - expect(await tipsStore.getL2BlockHash(1)).to.be.undefined; - expect(await tipsStore.getL2BlockHash(2)).to.be.undefined; - - expect(await tipsStore.getL2BlockHash(3)).to.deep.equal(new Fr(3).toString()); - expect(await tipsStore.getL2BlockHash(4)).to.deep.equal(new Fr(4).toString()); - expect(await tipsStore.getL2BlockHash(5)).to.deep.equal(new Fr(5).toString()); + testL2TipsStore(async () => { + kvStore = await openTmpStore('test', true); + return new L2TipsKVStore(kvStore, 'test'); }); }); diff --git a/yarn-project/kv-store/src/stores/l2_tips_store.ts b/yarn-project/kv-store/src/stores/l2_tips_store.ts index e0db2e9b518a..0a0e399cb994 100644 --- a/yarn-project/kv-store/src/stores/l2_tips_store.ts +++ b/yarn-project/kv-store/src/stores/l2_tips_store.ts @@ -11,7 +11,7 @@ import type { AztecAsyncMap } from '../interfaces/map.js'; import type { AztecAsyncKVStore } from '../interfaces/store.js'; /** Stores currently synced L2 tips and unfinalized block hashes. */ -export class L2TipsStore implements L2BlockStreamEventHandler, L2BlockStreamLocalDataProvider { +export class L2TipsKVStore implements L2BlockStreamEventHandler, L2BlockStreamLocalDataProvider { private readonly l2TipsStore: AztecAsyncMap; private readonly l2BlockHashesStore: AztecAsyncMap; diff --git a/yarn-project/kv-store/src/stores/l2_tips_store_suite.test.ts b/yarn-project/kv-store/src/stores/l2_tips_store_suite.test.ts new file mode 100644 index 000000000000..7562da39e594 --- /dev/null +++ b/yarn-project/kv-store/src/stores/l2_tips_store_suite.test.ts @@ -0,0 +1,73 @@ +import { times } from '@aztec/foundation/collection'; +import { Fr } from '@aztec/foundation/fields'; +import type { L2Block, PublishedL2Block } from '@aztec/stdlib/block'; +import type { BlockHeader } from '@aztec/stdlib/tx'; + +import { expect } from 'chai'; + +import type { L2TipsStore } from './index.js'; + +export function testL2TipsStore(makeTipsStore: () => Promise) { + let tipsStore: L2TipsStore; + + beforeEach(async () => { + tipsStore = await makeTipsStore(); + }); + + const makeBlock = (number: number): PublishedL2Block => ({ + block: { number, header: { hash: () => Promise.resolve(new Fr(number)) } as BlockHeader } as L2Block, + l1: { blockNumber: BigInt(number), blockHash: `0x${number}`, timestamp: BigInt(number) }, + signatures: [], + }); + + const makeTip = (number: number) => ({ number, hash: number === 0 ? undefined : new Fr(number).toString() }); + + const makeTips = (latest: number, proven: number, finalized: number) => ({ + latest: makeTip(latest), + proven: makeTip(proven), + finalized: makeTip(finalized), + }); + + it('returns zero if no tips are stored', async () => { + const tips = await tipsStore.getL2Tips(); + expect(tips).to.deep.equal(makeTips(0, 0, 0)); + }); + + it('stores chain tips', async () => { + await tipsStore.handleBlockStreamEvent({ type: 'blocks-added', blocks: times(20, i => makeBlock(i + 1)) }); + + await tipsStore.handleBlockStreamEvent({ type: 'chain-finalized', blockNumber: 5 }); + await tipsStore.handleBlockStreamEvent({ type: 'chain-proven', blockNumber: 8 }); + await tipsStore.handleBlockStreamEvent({ type: 'chain-pruned', blockNumber: 10 }); + + const tips = await tipsStore.getL2Tips(); + expect(tips).to.deep.equal(makeTips(10, 8, 5)); + }); + + it('sets latest tip from blocks added', async () => { + await tipsStore.handleBlockStreamEvent({ type: 'blocks-added', blocks: times(3, i => makeBlock(i + 1)) }); + + const tips = await tipsStore.getL2Tips(); + expect(tips).to.deep.equal(makeTips(3, 0, 0)); + + expect(await tipsStore.getL2BlockHash(1)).to.deep.equal(new Fr(1).toString()); + expect(await tipsStore.getL2BlockHash(2)).to.deep.equal(new Fr(2).toString()); + expect(await tipsStore.getL2BlockHash(3)).to.deep.equal(new Fr(3).toString()); + }); + + it('clears block hashes when setting finalized chain', async () => { + await tipsStore.handleBlockStreamEvent({ type: 'blocks-added', blocks: times(5, i => makeBlock(i + 1)) }); + await tipsStore.handleBlockStreamEvent({ type: 'chain-proven', blockNumber: 3 }); + await tipsStore.handleBlockStreamEvent({ type: 'chain-finalized', blockNumber: 3 }); + + const tips = await tipsStore.getL2Tips(); + expect(tips).to.deep.equal(makeTips(5, 3, 3)); + + expect(await tipsStore.getL2BlockHash(1)).to.be.undefined; + expect(await tipsStore.getL2BlockHash(2)).to.be.undefined; + + expect(await tipsStore.getL2BlockHash(3)).to.deep.equal(new Fr(3).toString()); + expect(await tipsStore.getL2BlockHash(4)).to.deep.equal(new Fr(4).toString()); + expect(await tipsStore.getL2BlockHash(5)).to.deep.equal(new Fr(5).toString()); + }); +} diff --git a/yarn-project/p2p/src/client/p2p_client.ts b/yarn-project/p2p/src/client/p2p_client.ts index 0027d15dbee6..0b5c3d3d0dce 100644 --- a/yarn-project/p2p/src/client/p2p_client.ts +++ b/yarn-project/p2p/src/client/p2p_client.ts @@ -187,6 +187,7 @@ export class P2PClient private synchedBlockHashes: AztecAsyncMap; private synchedLatestBlockNumber: AztecAsyncSingleton; private synchedProvenBlockNumber: AztecAsyncSingleton; + private synchedLatestSlot: AztecAsyncSingleton; private txPool: TxPool; private attestationPool: T extends P2PClientType.Full ? AttestationPool : undefined; @@ -236,6 +237,7 @@ export class P2PClient this.synchedBlockHashes = store.openMap('p2p_pool_block_hashes'); this.synchedLatestBlockNumber = store.openSingleton('p2p_pool_last_l2_block'); this.synchedProvenBlockNumber = store.openSingleton('p2p_pool_last_proven_l2_block'); + this.synchedLatestSlot = store.openSingleton('p2p_pool_last_l2_slot'); this.txPool = mempools.txPool; this.attestationPool = mempools.attestationPool!; @@ -380,8 +382,16 @@ export class P2PClient return this.p2pService.propagate(proposal); } - public async getAttestationsForSlot(slot: bigint, proposalId: string): Promise { - return (await this.attestationPool?.getAttestationsForSlot(slot, proposalId)) ?? []; + public async getAttestationsForSlot(slot: bigint, proposalId?: string): Promise { + return ( + (await (proposalId + ? this.attestationPool?.getAttestationsForSlotAndProposal(slot, proposalId) + : this.attestationPool?.getAttestationsForSlot(slot))) ?? [] + ); + } + + public addAttestation(attestation: BlockAttestation): Promise { + return this.attestationPool?.addAttestations([attestation]) ?? Promise.resolve(); } // REVIEW: https://github.com/AztecProtocol/aztec-packages/issues/7963 @@ -592,6 +602,11 @@ export class P2PClient return (await this.synchedProvenBlockNumber.getAsync()) ?? INITIAL_L2_BLOCK_NUM - 1; } + /** Returns latest L2 slot for which we have seen an L2 block. */ + public async getSyncedLatestSlot(): Promise { + return (await this.synchedLatestSlot.getAsync()) ?? BigInt(0); + } + /** * Method to check the status the p2p client. * @returns Information about p2p client status: state & syncedToBlockNum. @@ -659,12 +674,13 @@ export class P2PClient await this.markTxsAsMinedFromBlocks(blocks.map(b => b.block)); await this.addAttestationsToPool(blocks); - const lastBlockNum = blocks.at(-1)!.block.number; + const lastBlock = blocks.at(-1)!.block; await Promise.all( blocks.map(async block => this.synchedBlockHashes.set(block.block.number, (await block.block.hash()).toString())), ); - await this.synchedLatestBlockNumber.set(lastBlockNum); - this.log.verbose(`Synched to latest block ${lastBlockNum}`); + await this.synchedLatestBlockNumber.set(lastBlock.number); + await this.synchedLatestSlot.set(lastBlock.header.getSlot()); + this.log.verbose(`Synched to latest block ${lastBlock.number}`); await this.startServiceIfSynched(); } diff --git a/yarn-project/p2p/src/mem_pools/attestation_pool/attestation_pool.ts b/yarn-project/p2p/src/mem_pools/attestation_pool/attestation_pool.ts index f57c9214f0a0..b43869c87b2a 100644 --- a/yarn-project/p2p/src/mem_pools/attestation_pool/attestation_pool.ts +++ b/yarn-project/p2p/src/mem_pools/attestation_pool/attestation_pool.ts @@ -50,7 +50,17 @@ export interface AttestationPool { deleteAttestationsForSlotAndProposal(slot: bigint, proposalId: string): Promise; /** - * Get Attestations for slot + * Get all Attestations for all proposals for a given slot + * + * Retrieve all of the attestations observed pertaining to a given slot + * + * @param slot - The slot to query + * @return BlockAttestations + */ + getAttestationsForSlot(slot: bigint): Promise; + + /** + * Get Attestations for slot and given proposal * * Retrieve all of the attestations observed pertaining to a given slot * @@ -58,5 +68,5 @@ export interface AttestationPool { * @param proposalId - The proposal to query * @return BlockAttestations */ - getAttestationsForSlot(slot: bigint, proposalId: string): Promise; + getAttestationsForSlotAndProposal(slot: bigint, proposalId: string): Promise; } diff --git a/yarn-project/p2p/src/mem_pools/attestation_pool/attestation_pool_test_suite.ts b/yarn-project/p2p/src/mem_pools/attestation_pool/attestation_pool_test_suite.ts index 963fb0283375..88f55b3aa569 100644 --- a/yarn-project/p2p/src/mem_pools/attestation_pool/attestation_pool_test_suite.ts +++ b/yarn-project/p2p/src/mem_pools/attestation_pool/attestation_pool_test_suite.ts @@ -45,25 +45,45 @@ export function describeAttestationPool(getAttestationPool: () => AttestationPoo it('should add attestations to pool', async () => { const slotNumber = 420; const archive = Fr.random(); - const attestations = await Promise.all(signers.map(signer => mockAttestation(signer, slotNumber, archive))); + const attestations = await Promise.all( + signers.slice(0, -1).map(signer => mockAttestation(signer, slotNumber, archive)), + ); await ap.addAttestations(attestations); // Check metrics have been updated. expect(metricsMock.recordAddedObjects).toHaveBeenCalledWith(attestations.length); - const retreivedAttestations = await ap.getAttestationsForSlot(BigInt(slotNumber), archive.toString()); + const retrievedAttestations = await ap.getAttestationsForSlotAndProposal(BigInt(slotNumber), archive.toString()); + expect(retrievedAttestations.length).toBe(attestations.length); + compareAttestations(retrievedAttestations, attestations); - expect(retreivedAttestations.length).toBe(NUMBER_OF_SIGNERS_PER_TEST); + const retrievedAttestationsForSlot = await ap.getAttestationsForSlot(BigInt(slotNumber)); + expect(retrievedAttestationsForSlot.length).toBe(attestations.length); + compareAttestations(retrievedAttestationsForSlot, attestations); - compareAttestations(retreivedAttestations, attestations); + // Add another one + const newAttestation = await mockAttestation(signers[NUMBER_OF_SIGNERS_PER_TEST - 1], slotNumber, archive); + await ap.addAttestations([newAttestation]); + expect(metricsMock.recordAddedObjects).toHaveBeenCalledWith(1); + const retrievedAttestationsAfterAdd = await ap.getAttestationsForSlotAndProposal( + BigInt(slotNumber), + archive.toString(), + ); + expect(retrievedAttestationsAfterAdd.length).toBe(attestations.length + 1); + compareAttestations(retrievedAttestationsAfterAdd, [...attestations, newAttestation]); + const retrievedAttestationsForSlotAfterAdd = await ap.getAttestationsForSlot(BigInt(slotNumber)); + expect(retrievedAttestationsForSlotAfterAdd.length).toBe(attestations.length + 1); + compareAttestations(retrievedAttestationsForSlotAfterAdd, [...attestations, newAttestation]); // Delete by slot await ap.deleteAttestationsForSlot(BigInt(slotNumber)); + expect(metricsMock.recordRemovedObjects).toHaveBeenCalledWith(attestations.length + 1); - expect(metricsMock.recordRemovedObjects).toHaveBeenCalledWith(attestations.length); - - const retreivedAttestationsAfterDelete = await ap.getAttestationsForSlot(BigInt(slotNumber), archive.toString()); + const retreivedAttestationsAfterDelete = await ap.getAttestationsForSlotAndProposal( + BigInt(slotNumber), + archive.toString(), + ); expect(retreivedAttestationsAfterDelete.length).toBe(0); }); @@ -82,7 +102,7 @@ export function describeAttestationPool(getAttestationPool: () => AttestationPoo // Add them to store and check we end up with only one await ap.addAttestations(attestations); - const retreivedAttestations = await ap.getAttestationsForSlot(BigInt(slotNumber), archive.toString()); + const retreivedAttestations = await ap.getAttestationsForSlotAndProposal(BigInt(slotNumber), archive.toString()); expect(retreivedAttestations.length).toBe(1); expect(retreivedAttestations[0].toBuffer()).toEqual(attestations[0].toBuffer()); expect(retreivedAttestations[0].payload.txHashes).toEqual(txs); @@ -90,7 +110,7 @@ export function describeAttestationPool(getAttestationPool: () => AttestationPoo // Try adding them on another operation and check they are still not duplicated await ap.addAttestations([attestations[0]]); - expect(await ap.getAttestationsForSlot(BigInt(slotNumber), archive.toString())).toHaveLength(1); + expect(await ap.getAttestationsForSlotAndProposal(BigInt(slotNumber), archive.toString())).toHaveLength(1); }); it('should store attestations by differing slot', async () => { @@ -103,7 +123,7 @@ export function describeAttestationPool(getAttestationPool: () => AttestationPoo const slot = attestation.payload.header.globalVariables.slotNumber; const archive = attestation.archive.toString(); - const retreivedAttestations = await ap.getAttestationsForSlot(slot.toBigInt(), archive); + const retreivedAttestations = await ap.getAttestationsForSlotAndProposal(slot.toBigInt(), archive); expect(retreivedAttestations.length).toBe(1); expect(retreivedAttestations[0].toBuffer()).toEqual(attestation.toBuffer()); expect(retreivedAttestations[0].payload.header.globalVariables.slotNumber).toEqual(slot); @@ -123,7 +143,7 @@ export function describeAttestationPool(getAttestationPool: () => AttestationPoo const slot = attestation.payload.header.globalVariables.slotNumber; const proposalId = attestation.archive.toString(); - const retreivedAttestations = await ap.getAttestationsForSlot(slot.toBigInt(), proposalId); + const retreivedAttestations = await ap.getAttestationsForSlotAndProposal(slot.toBigInt(), proposalId); expect(retreivedAttestations.length).toBe(1); expect(retreivedAttestations[0].toBuffer()).toEqual(attestation.toBuffer()); expect(retreivedAttestations[0].payload.header.globalVariables.slotNumber).toEqual(slot); @@ -140,7 +160,7 @@ export function describeAttestationPool(getAttestationPool: () => AttestationPoo expect(metricsMock.recordAddedObjects).toHaveBeenCalledWith(attestations.length); - const retreivedAttestations = await ap.getAttestationsForSlot(BigInt(slotNumber), proposalId); + const retreivedAttestations = await ap.getAttestationsForSlotAndProposal(BigInt(slotNumber), proposalId); expect(retreivedAttestations.length).toBe(NUMBER_OF_SIGNERS_PER_TEST); compareAttestations(retreivedAttestations, attestations); @@ -148,7 +168,7 @@ export function describeAttestationPool(getAttestationPool: () => AttestationPoo expect(metricsMock.recordRemovedObjects).toHaveBeenCalledWith(attestations.length); - const gottenAfterDelete = await ap.getAttestationsForSlot(BigInt(slotNumber), proposalId); + const gottenAfterDelete = await ap.getAttestationsForSlotAndProposal(BigInt(slotNumber), proposalId); expect(gottenAfterDelete.length).toBe(0); }); @@ -160,13 +180,13 @@ export function describeAttestationPool(getAttestationPool: () => AttestationPoo await ap.addAttestations(attestations); - const retreivedAttestations = await ap.getAttestationsForSlot(BigInt(slotNumber), proposalId); + const retreivedAttestations = await ap.getAttestationsForSlotAndProposal(BigInt(slotNumber), proposalId); expect(retreivedAttestations.length).toBe(NUMBER_OF_SIGNERS_PER_TEST); compareAttestations(retreivedAttestations, attestations); await ap.deleteAttestationsForSlot(BigInt(slotNumber)); - const retreivedAttestationsAfterDelete = await ap.getAttestationsForSlot(BigInt(slotNumber), proposalId); + const retreivedAttestationsAfterDelete = await ap.getAttestationsForSlotAndProposal(BigInt(slotNumber), proposalId); expect(retreivedAttestationsAfterDelete.length).toBe(0); }); @@ -187,7 +207,7 @@ export function describeAttestationPool(getAttestationPool: () => AttestationPoo expect(metricsMock.recordAddedObjects).toHaveBeenCalledWith(attestations.length); expect(metricsMock.recordAddedObjects).toHaveBeenCalledWith(attestations2.length); - const retreivedAttestations = await ap.getAttestationsForSlot(BigInt(slotNumber), proposalId); + const retreivedAttestations = await ap.getAttestationsForSlotAndProposal(BigInt(slotNumber), proposalId); expect(retreivedAttestations.length).toBe(NUMBER_OF_SIGNERS_PER_TEST); compareAttestations(retreivedAttestations, attestations); @@ -195,10 +215,10 @@ export function describeAttestationPool(getAttestationPool: () => AttestationPoo expect(metricsMock.recordRemovedObjects).toHaveBeenCalledWith(attestations.length); - const retreivedAttestationsAfterDelete = await ap.getAttestationsForSlot(BigInt(slotNumber), proposalId); + const retreivedAttestationsAfterDelete = await ap.getAttestationsForSlotAndProposal(BigInt(slotNumber), proposalId); expect(retreivedAttestationsAfterDelete.length).toBe(0); - const retreivedAttestationsAfterDeleteForOtherProposal = await ap.getAttestationsForSlot( + const retreivedAttestationsAfterDeleteForOtherProposal = await ap.getAttestationsForSlotAndProposal( BigInt(slotNumber), proposalId2, ); @@ -215,14 +235,14 @@ export function describeAttestationPool(getAttestationPool: () => AttestationPoo await ap.addAttestations(attestations); - const attestationsForSlot1 = await ap.getAttestationsForSlot(BigInt(1), proposalId); + const attestationsForSlot1 = await ap.getAttestationsForSlotAndProposal(BigInt(1), proposalId); expect(attestationsForSlot1.length).toBe(signers.length); const deleteAttestationsSpy = jest.spyOn(ap, 'deleteAttestationsForSlot'); await ap.deleteAttestationsOlderThan(BigInt(73)); - const attestationsForSlot1AfterDelete = await ap.getAttestationsForSlot(BigInt(1), proposalId); + const attestationsForSlot1AfterDelete = await ap.getAttestationsForSlotAndProposal(BigInt(1), proposalId); expect(attestationsForSlot1AfterDelete.length).toBe(0); expect(deleteAttestationsSpy).toHaveBeenCalledTimes(5); diff --git a/yarn-project/p2p/src/mem_pools/attestation_pool/kv_attestation_pool.ts b/yarn-project/p2p/src/mem_pools/attestation_pool/kv_attestation_pool.ts index 72c617829c6b..6ef0ef3ca563 100644 --- a/yarn-project/p2p/src/mem_pools/attestation_pool/kv_attestation_pool.ts +++ b/yarn-project/p2p/src/mem_pools/attestation_pool/kv_attestation_pool.ts @@ -60,6 +60,9 @@ export class KvAttestationPool implements AttestationPool { this.log.verbose(`Added attestation for slot ${slotNumber.toBigInt()} from ${address}`, { signature: attestation.signature.toString(), + slotNumber, + address, + proposalId, }); } }); @@ -67,7 +70,19 @@ export class KvAttestationPool implements AttestationPool { this.metrics.recordAddedObjects(attestations.length); } - public async getAttestationsForSlot(slot: bigint, proposalId: string): Promise { + public async getAttestationsForSlot(slot: bigint): Promise { + const slotFr = new Fr(slot); + const proposalIds = await toArray(this.proposalsForSlot.getValuesAsync(slotFr.toString())); + const attestations: BlockAttestation[] = []; + + for (const proposalId of proposalIds) { + attestations.push(...(await this.getAttestationsForSlotAndProposal(slot, proposalId))); + } + + return attestations; + } + + public async getAttestationsForSlotAndProposal(slot: bigint, proposalId: string): Promise { const attestationIds = await toArray( this.attestationsForProposal.getValuesAsync(this.getProposalKey(slot, proposalId)), ); diff --git a/yarn-project/p2p/src/mem_pools/attestation_pool/memory_attestation_pool.ts b/yarn-project/p2p/src/mem_pools/attestation_pool/memory_attestation_pool.ts index f4e1cb6445ec..bc21adf94e4f 100644 --- a/yarn-project/p2p/src/mem_pools/attestation_pool/memory_attestation_pool.ts +++ b/yarn-project/p2p/src/mem_pools/attestation_pool/memory_attestation_pool.ts @@ -15,7 +15,15 @@ export class InMemoryAttestationPool implements AttestationPool { this.metrics = new PoolInstrumentation(telemetry, PoolName.ATTESTATION_POOL); } - public getAttestationsForSlot(slot: bigint, proposalId: string): Promise { + public getAttestationsForSlot(slot: bigint): Promise { + return Promise.resolve( + Array.from(this.attestations.get(slot)?.values() ?? []).flatMap(proposalAttestationMap => + Array.from(proposalAttestationMap.values()), + ), + ); + } + + public getAttestationsForSlotAndProposal(slot: bigint, proposalId: string): Promise { const slotAttestationMap = this.attestations.get(slot); if (slotAttestationMap) { const proposalAttestationMap = slotAttestationMap.get(proposalId); @@ -40,6 +48,9 @@ export class InMemoryAttestationPool implements AttestationPool { this.log.verbose(`Added attestation for slot ${slotNumber.toBigInt()} from ${address}`, { signature: attestation.signature.toString(), + slotNumber, + address, + proposalId, }); } diff --git a/yarn-project/p2p/src/services/libp2p/libp2p_service.ts b/yarn-project/p2p/src/services/libp2p/libp2p_service.ts index a04e343d1bd7..d130c0b96097 100644 --- a/yarn-project/p2p/src/services/libp2p/libp2p_service.ts +++ b/yarn-project/p2p/src/services/libp2p/libp2p_service.ts @@ -521,7 +521,8 @@ export class LibP2PService extends WithTracer implement await this.mempools.txPool.addTxs([tx]); } - /**Process Attestation From Peer + /** + * Process Attestation From Peer * When a proposal is received from a peer, we add it to the attestation pool, so it can be accessed by other services. * * @param attestation - The attestation to process. diff --git a/yarn-project/p2p/src/testbench/p2p_client_testbench_worker.ts b/yarn-project/p2p/src/testbench/p2p_client_testbench_worker.ts index 123202618061..d3f916b5c49b 100644 --- a/yarn-project/p2p/src/testbench/p2p_client_testbench_worker.ts +++ b/yarn-project/p2p/src/testbench/p2p_client_testbench_worker.ts @@ -48,12 +48,13 @@ function mockAttestationPool(): AttestationPool { deleteAttestationsForSlot: () => Promise.resolve(), deleteAttestationsForSlotAndProposal: () => Promise.resolve(), getAttestationsForSlot: () => Promise.resolve([]), + getAttestationsForSlotAndProposal: () => Promise.resolve([]), }; } function mockEpochCache(): EpochCacheInterface { return { - getCommittee: () => Promise.resolve([] as EthAddress[]), + getCommittee: () => Promise.resolve({ committee: [], seed: 1n, epoch: 0n }), getProposerIndexEncoding: () => '0x' as `0x${string}`, getEpochAndSlotNow: () => ({ epoch: 0n, slot: 0n, ts: 0n }), computeProposerIndex: () => 0n, diff --git a/yarn-project/prover-client/src/block_builder/light.ts b/yarn-project/prover-client/src/block_builder/light.ts index f39952a2728b..758279e4c048 100644 --- a/yarn-project/prover-client/src/block_builder/light.ts +++ b/yarn-project/prover-client/src/block_builder/light.ts @@ -68,7 +68,7 @@ export class LightweightBlockBuilder implements BlockBuilder { this.logger.debug(`Built block ${block.number}`, { globalVariables: this.globalVariables?.toInspect(), archiveRoot: newArchive.root.toString(), - blockHash: block.hash.toString(), + blockHash: (await block.hash()).toString(), }); return block; diff --git a/yarn-project/pxe/src/pxe_service/pxe_service.ts b/yarn-project/pxe/src/pxe_service/pxe_service.ts index b6541f3cea3e..e637ac39e023 100644 --- a/yarn-project/pxe/src/pxe_service/pxe_service.ts +++ b/yarn-project/pxe/src/pxe_service/pxe_service.ts @@ -6,7 +6,7 @@ import { Timer } from '@aztec/foundation/timer'; import type { SiblingPath } from '@aztec/foundation/trees'; import { KeyStore } from '@aztec/key-store'; import type { AztecAsyncKVStore } from '@aztec/kv-store'; -import { L2TipsStore } from '@aztec/kv-store/stores'; +import { L2TipsKVStore } from '@aztec/kv-store/stores'; import { ProtocolContractAddress, type ProtocolContractsProvider, @@ -140,7 +140,7 @@ export class PXEService implements PXE { const taggingDataProvider = new TaggingDataProvider(store); const capsuleDataProvider = new CapsuleDataProvider(store); const keyStore = new KeyStore(store); - const tipsStore = new L2TipsStore(store, 'pxe'); + const tipsStore = new L2TipsKVStore(store, 'pxe'); const synchronizer = new Synchronizer( node, syncDataProvider, diff --git a/yarn-project/pxe/src/synchronizer/synchronizer.test.ts b/yarn-project/pxe/src/synchronizer/synchronizer.test.ts index b550a0f18349..0439b24e44e1 100644 --- a/yarn-project/pxe/src/synchronizer/synchronizer.test.ts +++ b/yarn-project/pxe/src/synchronizer/synchronizer.test.ts @@ -1,6 +1,6 @@ import { timesParallel } from '@aztec/foundation/collection'; import { openTmpStore } from '@aztec/kv-store/lmdb-v2'; -import { L2TipsStore } from '@aztec/kv-store/stores'; +import { L2TipsKVStore } from '@aztec/kv-store/stores'; import { L2Block, type L2BlockStream, randomPublishedL2Block } from '@aztec/stdlib/block'; import type { AztecNode } from '@aztec/stdlib/interfaces/client'; @@ -14,7 +14,7 @@ import { Synchronizer } from './synchronizer.js'; describe('Synchronizer', () => { let synchronizer: Synchronizer; - let tipsStore: L2TipsStore; + let tipsStore: L2TipsKVStore; let syncDataProvider: SyncDataProvider; let noteDataProvider: NoteDataProvider; let taggingDataProvider: TaggingDataProvider; @@ -31,7 +31,7 @@ describe('Synchronizer', () => { const store = await openTmpStore('test'); blockStream = mock(); aztecNode = mock(); - tipsStore = new L2TipsStore(store, 'pxe'); + tipsStore = new L2TipsKVStore(store, 'pxe'); syncDataProvider = new SyncDataProvider(store); noteDataProvider = await NoteDataProvider.create(store); taggingDataProvider = new TaggingDataProvider(store); diff --git a/yarn-project/pxe/src/synchronizer/synchronizer.ts b/yarn-project/pxe/src/synchronizer/synchronizer.ts index d79f051f6b2c..a4022a682474 100644 --- a/yarn-project/pxe/src/synchronizer/synchronizer.ts +++ b/yarn-project/pxe/src/synchronizer/synchronizer.ts @@ -1,6 +1,6 @@ import { INITIAL_L2_BLOCK_NUM } from '@aztec/constants'; import { type Logger, createLogger } from '@aztec/foundation/log'; -import type { L2TipsStore } from '@aztec/kv-store/stores'; +import type { L2TipsKVStore } from '@aztec/kv-store/stores'; import { L2BlockStream, type L2BlockStreamEvent, type L2BlockStreamEventHandler } from '@aztec/stdlib/block'; import type { AztecNode } from '@aztec/stdlib/interfaces/client'; @@ -26,7 +26,7 @@ export class Synchronizer implements L2BlockStreamEventHandler { private syncDataProvider: SyncDataProvider, private noteDataProvider: NoteDataProvider, private taggingDataProvider: TaggingDataProvider, - private l2TipsStore: L2TipsStore, + private l2TipsStore: L2TipsKVStore, config: Partial> = {}, loggerOrSuffix?: string | Logger, ) { diff --git a/yarn-project/stdlib/package.json b/yarn-project/stdlib/package.json index 565fb4d91b56..e62ed3c77074 100644 --- a/yarn-project/stdlib/package.json +++ b/yarn-project/stdlib/package.json @@ -46,7 +46,8 @@ "./epoch-helpers": "./dest/epoch-helpers/index.js", "./config": "./dest/config/index.js", "./testing/jest": "./dest/tests/jest.js", - "./database-version": "./dest/database-version/index.js" + "./database-version": "./dest/database-version/index.js", + "./validators": "./dest/validators/index.js" }, "typedocOptions": { "entryPoints": [ diff --git a/yarn-project/stdlib/src/block/l2_block_downloader/l2_block_stream.ts b/yarn-project/stdlib/src/block/l2_block_downloader/l2_block_stream.ts index 792229d2cb42..80987a873dfd 100644 --- a/yarn-project/stdlib/src/block/l2_block_downloader/l2_block_stream.ts +++ b/yarn-project/stdlib/src/block/l2_block_downloader/l2_block_stream.ts @@ -9,6 +9,7 @@ import type { PublishedL2Block } from '../published_l2_block.js'; export class L2BlockStream { private readonly runningPromise: RunningPromise; private isSyncing = false; + private hasStarted = false; constructor( private l2BlockSource: Pick, @@ -76,7 +77,12 @@ export class L2BlockStream { // If we are just starting, use the starting block number from the options. if (latestBlockNumber === 0 && this.opts.startingBlock !== undefined) { latestBlockNumber = Math.max(this.opts.startingBlock - 1, 0); + } + + // Only log this entry once (for sanity) + if (!this.hasStarted) { this.log.verbose(`Starting sync from block number ${latestBlockNumber}`); + this.hasStarted = true; } // Request new blocks from the source. diff --git a/yarn-project/stdlib/src/epoch-helpers/index.ts b/yarn-project/stdlib/src/epoch-helpers/index.ts index 2374abecba20..1efdc8f43af7 100644 --- a/yarn-project/stdlib/src/epoch-helpers/index.ts +++ b/yarn-project/stdlib/src/epoch-helpers/index.ts @@ -26,6 +26,14 @@ export const L1RollupConstantsSchema = z.object({ ethereumSlotDuration: z.number(), }) satisfies ZodFor; +/** Returns the timestamp for a given L2 slot. */ +export function getTimestampForSlot( + slot: bigint, + constants: Pick, +) { + return constants.l1GenesisTime + slot * BigInt(constants.slotDuration); +} + /** Returns the slot number for a given timestamp. */ export function getSlotAtTimestamp(ts: bigint, constants: Pick) { return ts < constants.l1GenesisTime ? 0n : (ts - constants.l1GenesisTime) / BigInt(constants.slotDuration); diff --git a/yarn-project/stdlib/src/interfaces/aztec-node.test.ts b/yarn-project/stdlib/src/interfaces/aztec-node.test.ts index 81eafdcbf2b6..d34c0910cd19 100644 --- a/yarn-project/stdlib/src/interfaces/aztec-node.test.ts +++ b/yarn-project/stdlib/src/interfaces/aztec-node.test.ts @@ -48,6 +48,7 @@ import { TxEffect } from '../tx/tx_effect.js'; import { TxHash } from '../tx/tx_hash.js'; import { TxReceipt } from '../tx/tx_receipt.js'; import type { TxValidationResult } from '../tx/validator/tx_validator.js'; +import type { ValidatorsStats } from '../validators/types.js'; import { type AztecNode, AztecNodeApiSchema } from './aztec-node.js'; import type { SequencerConfig } from './configs.js'; import type { GetContractClassLogsResponse, GetPublicLogsResponse } from './get_logs_response.js'; @@ -289,6 +290,11 @@ describe('AztecNodeApiSchema', () => { expect(response).toBeInstanceOf(BlockHeader); }); + it('getValidatorsStats', async () => { + const response = await context.client.getValidatorsStats(); + expect(response).toBeDefined(); + }); + it('simulatePublicCalls', async () => { const response = await context.client.simulatePublicCalls(await Tx.random()); expect(response).toBeInstanceOf(PublicSimulationOutput); @@ -556,6 +562,14 @@ class MockAztecNode implements AztecNode { getBlockHeader(_blockNumber?: number | 'latest' | undefined): Promise { return Promise.resolve(BlockHeader.empty()); } + getValidatorsStats(): Promise { + return Promise.resolve({ + stats: {}, + lastProcessedSlot: 20n, + initialSlot: 1n, + slotWindow: 10, + } satisfies ValidatorsStats); + } simulatePublicCalls(tx: Tx, _enforceFeePayment = false): Promise { expect(tx).toBeInstanceOf(Tx); return Promise.resolve(PublicSimulationOutput.random()); diff --git a/yarn-project/stdlib/src/interfaces/aztec-node.ts b/yarn-project/stdlib/src/interfaces/aztec-node.ts index 1daaaf2cf339..189a82c17f8f 100644 --- a/yarn-project/stdlib/src/interfaces/aztec-node.ts +++ b/yarn-project/stdlib/src/interfaces/aztec-node.ts @@ -46,6 +46,8 @@ import { TxValidationResultSchema, } from '../tx/index.js'; import { TxEffect } from '../tx/tx_effect.js'; +import { ValidatorsStatsSchema } from '../validators/schemas.js'; +import type { ValidatorsStats } from '../validators/types.js'; import { type ComponentsVersions, getVersioningResponseHandler } from '../versioning/index.js'; import { type GetContractClassLogsResponse, @@ -379,6 +381,9 @@ export interface AztecNode */ getBlockHeader(blockNumber?: L2BlockNumber): Promise; + /** Returns stats for validators if enabled. */ + getValidatorsStats(): Promise; + /** * Simulates the public part of a transaction with the current state. * This currently just checks that the transaction execution succeeds. @@ -525,6 +530,8 @@ export const AztecNodeApiSchema: ApiSchemaFor = { getBlockHeader: z.function().args(optional(L2BlockNumberSchema)).returns(BlockHeader.schema.optional()), + getValidatorsStats: z.function().returns(ValidatorsStatsSchema), + simulatePublicCalls: z.function().args(Tx.schema, optional(z.boolean())).returns(PublicSimulationOutput.schema), isValidTx: z diff --git a/yarn-project/stdlib/src/interfaces/p2p.test.ts b/yarn-project/stdlib/src/interfaces/p2p.test.ts index 46d908395af9..86b8232290d2 100644 --- a/yarn-project/stdlib/src/interfaces/p2p.test.ts +++ b/yarn-project/stdlib/src/interfaces/p2p.test.ts @@ -50,6 +50,11 @@ describe('P2PApiSchema', () => { const peers = await context.client.getPeers(true); expect(peers).toEqual(peers); }); + + it('addAttestation', async () => { + const attestation = BlockAttestation.empty(); + await context.client.addAttestation(attestation); + }); }); const peers: PeerInfo[] = [ @@ -77,4 +82,9 @@ class MockP2P implements P2PApi { expect(includePending === undefined || includePending === true).toBeTruthy(); return Promise.resolve(peers); } + + addAttestation(attestation: BlockAttestation): Promise { + expect(attestation).toBeInstanceOf(BlockAttestation); + return Promise.resolve(); + } } diff --git a/yarn-project/stdlib/src/interfaces/p2p.ts b/yarn-project/stdlib/src/interfaces/p2p.ts index e04b2765b9d8..d26ca5730f12 100644 --- a/yarn-project/stdlib/src/interfaces/p2p.ts +++ b/yarn-project/stdlib/src/interfaces/p2p.ts @@ -50,6 +50,9 @@ export interface P2PClient extends P2PApiWithoutAttestations { * @returns BlockAttestations */ getAttestationsForSlot(slot: bigint, proposalId?: string): Promise; + + /** Manually adds an attestation to the p2p client attestation pool. */ + addAttestation(attestation: BlockAttestation): Promise; } export type P2PApi = T extends P2PClientType.Full @@ -64,4 +67,5 @@ export const P2PApiSchema: ApiSchemaFor = { getPendingTxs: z.function().returns(z.array(Tx.schema)), getEncodedEnr: z.function().returns(z.string().optional()), getPeers: z.function().args(optional(z.boolean())).returns(z.array(PeerInfoSchema)), + addAttestation: z.function().args(BlockAttestation.schema).returns(z.void()), }; diff --git a/yarn-project/stdlib/src/validators/index.ts b/yarn-project/stdlib/src/validators/index.ts new file mode 100644 index 000000000000..9ce5fa3ed30a --- /dev/null +++ b/yarn-project/stdlib/src/validators/index.ts @@ -0,0 +1,3 @@ +export type * from './types.js'; + +export * from './schemas.js'; diff --git a/yarn-project/stdlib/src/validators/schemas.ts b/yarn-project/stdlib/src/validators/schemas.ts new file mode 100644 index 000000000000..885e48c22e13 --- /dev/null +++ b/yarn-project/stdlib/src/validators/schemas.ts @@ -0,0 +1,53 @@ +import { type ZodFor, schemas } from '@aztec/foundation/schemas'; + +import { z } from 'zod'; + +import type { ValidatorStats, ValidatorStatusHistory, ValidatorStatusInSlot, ValidatorsStats } from './types.js'; + +export const ValidatorStatusInSlotSchema = z.enum([ + 'block-mined', + 'block-proposed', + 'block-missed', + 'attestation-sent', + 'attestation-missed', +]) satisfies ZodFor; + +export const ValidatorStatusHistorySchema = z.array( + z.object({ + slot: schemas.BigInt, + status: ValidatorStatusInSlotSchema, + }), +) satisfies ZodFor; + +export const ValidatorStatusHistorySchemaArray = z.array(ValidatorStatusHistorySchema); + +export const ValidatorStatusHistorySchemaMap = z.record(ValidatorStatusHistorySchemaArray); + +const ValidatorTimeStatSchema = z.object({ + timestamp: schemas.BigInt, + slot: schemas.BigInt, + date: z.string(), +}); + +const ValidatorFilteredHistorySchema = z.object({ + currentStreak: schemas.Integer, + rate: z.number(), + count: schemas.Integer, +}); + +export const ValidatorStatsSchema = z.object({ + address: schemas.EthAddress, + lastProposal: ValidatorTimeStatSchema.optional(), + lastAttestation: ValidatorTimeStatSchema.optional(), + totalSlots: schemas.Integer, + missedProposals: ValidatorFilteredHistorySchema, + missedAttestations: ValidatorFilteredHistorySchema, + history: ValidatorStatusHistorySchema, +}) satisfies ZodFor; + +export const ValidatorsStatsSchema = z.object({ + stats: z.record(ValidatorStatsSchema), + lastProcessedSlot: schemas.BigInt.optional(), + initialSlot: schemas.BigInt.optional(), + slotWindow: schemas.Integer, +}) satisfies ZodFor; diff --git a/yarn-project/stdlib/src/validators/types.ts b/yarn-project/stdlib/src/validators/types.ts new file mode 100644 index 000000000000..fb944aa1734f --- /dev/null +++ b/yarn-project/stdlib/src/validators/types.ts @@ -0,0 +1,37 @@ +import type { EthAddress } from '@aztec/foundation/eth-address'; + +export type ValidatorStatusType = 'block' | 'attestation'; + +export type ValidatorStatusInSlot = + | 'block-mined' + | 'block-proposed' + | 'block-missed' + | 'attestation-sent' + | 'attestation-missed'; + +export type ValidatorStatusHistory = { slot: bigint; status: ValidatorStatusInSlot }[]; + +export type ValidatorStats = { + address: EthAddress; + lastProposal?: { timestamp: bigint; slot: bigint; date: string }; + lastAttestation?: { timestamp: bigint; slot: bigint; date: string }; + totalSlots: number; + missedProposals: { + currentStreak: number; + rate: number; + count: number; + }; + missedAttestations: { + currentStreak: number; + rate: number; + count: number; + }; + history: ValidatorStatusHistory; +}; + +export type ValidatorsStats = { + stats: Record; + lastProcessedSlot?: bigint; + initialSlot?: bigint; + slotWindow: number; +}; diff --git a/yarn-project/txe/src/node/txe_node.ts b/yarn-project/txe/src/node/txe_node.ts index 52f5714dfa74..00dbca17b237 100644 --- a/yarn-project/txe/src/node/txe_node.ts +++ b/yarn-project/txe/src/node/txe_node.ts @@ -51,6 +51,7 @@ import { TxReceipt, type TxValidationResult, } from '@aztec/stdlib/tx'; +import type { ValidatorsStats } from '@aztec/stdlib/validators'; import type { NativeWorldStateService } from '@aztec/world-state'; export class TXENode implements AztecNode { @@ -679,4 +680,8 @@ export class TXENode implements AztecNode { getWorldStateSyncStatus(): Promise { throw new Error('TXE Node method getWorldStateSyncStatus not implemented'); } + + getValidatorsStats(): Promise { + throw new Error('TXE Node method getValidatorsStats not implemented'); + } } diff --git a/yarn-project/validator-client/src/validator.test.ts b/yarn-project/validator-client/src/validator.test.ts index aef3bea0dcb4..166fe1cce3f8 100644 --- a/yarn-project/validator-client/src/validator.test.ts +++ b/yarn-project/validator-client/src/validator.test.ts @@ -181,6 +181,7 @@ describe('ValidationService', () => { // Mock the attestations to be returned const expectedAttestations = await Promise.all([ + makeBlockAttestation({ signer, archive, txHashes }), makeBlockAttestation({ signer: attestor1, archive, txHashes }), makeBlockAttestation({ signer: attestor2, archive, txHashes }), ]); diff --git a/yarn-project/validator-client/src/validator.ts b/yarn-project/validator-client/src/validator.ts index 92418a10a869..20c2fd2f42f8 100644 --- a/yarn-project/validator-client/src/validator.ts +++ b/yarn-project/validator-client/src/validator.ts @@ -1,5 +1,6 @@ import type { EpochCache } from '@aztec/epoch-cache'; import { Buffer32 } from '@aztec/foundation/buffer'; +import type { EthAddress } from '@aztec/foundation/eth-address'; import type { Fr } from '@aztec/foundation/fields'; import { createLogger } from '@aztec/foundation/log'; import { RunningPromise } from '@aztec/foundation/running-promise'; @@ -70,6 +71,8 @@ export class ValidatorClient extends WithTracer implements Validator { // Callback registered to: sequencer.buildBlock private blockBuilder?: BlockBuilderCallback = undefined; + private myAddress: EthAddress; + private lastEpoch: bigint | undefined; private epochCacheUpdateLoop: RunningPromise; private blockProposalValidator: BlockProposalValidator; @@ -91,30 +94,30 @@ export class ValidatorClient extends WithTracer implements Validator { this.blockProposalValidator = new BlockProposalValidator(epochCache); - // Refresh epoch cache every second to trigger commiteeChanged event - this.epochCacheUpdateLoop = new RunningPromise( - () => - this.epochCache - .getCommittee() - .then(() => {}) - .catch(err => log.error('Error updating validator committee', err)), - log, - 1000, - ); - - // Listen to commiteeChanged event to alert operator when their validator has entered the committee - this.epochCache.on('committeeChanged', (newCommittee, epochNumber) => { - const me = this.keyStore.getAddress(); - if (newCommittee.some(addr => addr.equals(me))) { - this.log.info(`Validator ${me.toString()} is on the validator committee for epoch ${epochNumber}`); - } else { - this.log.verbose(`Validator ${me.toString()} not on the validator committee for epoch ${epochNumber}`); - } - }); + // Refresh epoch cache every second to trigger alert if participation in commitee changes + this.myAddress = this.keyStore.getAddress(); + this.epochCacheUpdateLoop = new RunningPromise(this.handleEpochCommiteeUpdate.bind(this), log, 1000); this.log.verbose(`Initialized validator with address ${this.keyStore.getAddress().toString()}`); } + private async handleEpochCommiteeUpdate() { + try { + const { committee, epoch } = await this.epochCache.getCommittee('now'); + if (epoch !== this.lastEpoch) { + const me = this.myAddress; + if (committee.some(addr => addr.equals(me))) { + this.log.info(`Validator ${me.toString()} is on the validator committee for epoch ${epoch}`); + } else { + this.log.verbose(`Validator ${me.toString()} not on the validator committee for epoch ${epoch}`); + } + this.lastEpoch = epoch; + } + } catch (err) { + this.log.error(`Error updating epoch committee`, err); + } + } + static new( config: ValidatorClientConfig, epochCache: EpochCache, @@ -222,7 +225,7 @@ export class ValidatorClient extends WithTracer implements Validator { this.log.info(`Attesting to proposal for slot ${slotNumber}`, proposalInfo); // If the above function does not throw an error, then we can attest to the proposal - return this.validationService.attestToProposal(proposal); + return this.doAttestToProposal(proposal); } /** @@ -327,15 +330,16 @@ export class ValidatorClient extends WithTracer implements Validator { } const proposalId = proposal.archive.toString(); - const myAttestation = await this.validationService.attestToProposal(proposal); + await this.doAttestToProposal(proposal); + const me = this.keyStore.getAddress(); let attestations: BlockAttestation[] = []; while (true) { - const collectedAttestations = [myAttestation, ...(await this.p2pClient.getAttestationsForSlot(slot, proposalId))]; + const collectedAttestations = await this.p2pClient.getAttestationsForSlot(slot, proposalId); const oldSenders = await Promise.all(attestations.map(attestation => attestation.getSender())); for (const collected of collectedAttestations) { const collectedSender = await collected.getSender(); - if (!oldSenders.some(sender => sender.equals(collectedSender))) { + if (!collectedSender.equals(me) && !oldSenders.some(sender => sender.equals(collectedSender))) { this.log.debug(`Received attestation for slot ${slot} from ${collectedSender.toString()}`); } } @@ -355,6 +359,12 @@ export class ValidatorClient extends WithTracer implements Validator { await sleep(this.config.attestationPollingIntervalMs); } } + + private async doAttestToProposal(proposal: BlockProposal): Promise { + const attestation = await this.validationService.attestToProposal(proposal); + await this.p2pClient.addAttestation(attestation); + return attestation; + } } function validatePrivateKey(privateKey: string): Buffer32 {