diff --git a/packages/beacon-node/src/api/impl/beacon/pool/index.ts b/packages/beacon-node/src/api/impl/beacon/pool/index.ts index 20e8cf56543a..f0f37a874270 100644 --- a/packages/beacon-node/src/api/impl/beacon/pool/index.ts +++ b/packages/beacon-node/src/api/impl/beacon/pool/index.ts @@ -104,11 +104,20 @@ export function getBeaconPoolApi({ try { await validateBlsToExecutionChange(chain, blsToExecutionChange); chain.opPool.insertBlsToExecutionChange(blsToExecutionChange); - await network.gossip.publishBlsToExecutionChange(blsToExecutionChange); + if ( + chain.clock.currentEpoch >= chain.config.CAPELLA_FORK_EPOCH && + // TODO: Remove below condition + // Only used for testing in devnet-3 of withdrawals + network.isSubscribedToGossipCoreTopics() + ) { + await network.gossip.publishBlsToExecutionChange(blsToExecutionChange); + } else { + await chain.cacheBlsToExecutionChanges(blsToExecutionChange); + } } catch (e) { errors.push(e as Error); logger.error( - `Error on submitPoolSyncCommitteeSignatures [${i}]`, + `Error on submitPoolBlsToExecutionChange [${i}]`, {validatorIndex: blsToExecutionChange.message.validatorIndex}, e as Error ); diff --git a/packages/beacon-node/src/chain/chain.ts b/packages/beacon-node/src/chain/chain.ts index ab86b36b71c7..d10513d4abcd 100644 --- a/packages/beacon-node/src/chain/chain.ts +++ b/packages/beacon-node/src/chain/chain.ts @@ -12,7 +12,19 @@ import { PubkeyIndexMap, } from "@lodestar/state-transition"; import {IBeaconConfig} from "@lodestar/config"; -import {allForks, UintNum64, Root, phase0, Slot, RootHex, Epoch, ValidatorIndex, eip4844, Wei} from "@lodestar/types"; +import { + allForks, + UintNum64, + Root, + phase0, + Slot, + RootHex, + Epoch, + ValidatorIndex, + eip4844, + Wei, + capella, +} from "@lodestar/types"; import {CheckpointWithHex, ExecutionStatus, IForkChoice, ProtoBlock} from "@lodestar/fork-choice"; import {ProcessShutdownCallback} from "@lodestar/validator"; import {ILogger, pruneSetToMax, toHex} from "@lodestar/utils"; @@ -122,9 +134,9 @@ export class BeaconChain implements IBeaconChain { /** Map keyed by executionPayload.blockHash of the block for those blobs */ readonly producedBlobsSidecarCache = new Map(); readonly opts: IChainOptions; + readonly db: IBeaconDb; protected readonly blockProcessor: BlockProcessor; - protected readonly db: IBeaconDb; private readonly archiver: Archiver; private abortController = new AbortController(); private successfulExchangeTransition = false; @@ -779,4 +791,9 @@ export class BeaconChain implements IBeaconChain { } } } + + /** Must be validated beforehand */ + async cacheBlsToExecutionChanges(blsToExecutionChange: capella.SignedBLSToExecutionChange): Promise { + return this.db.blsToExecutionChangeCache.add(blsToExecutionChange); + } } diff --git a/packages/beacon-node/src/chain/interface.ts b/packages/beacon-node/src/chain/interface.ts index 8242bf677ee3..1db4a6b04c20 100644 --- a/packages/beacon-node/src/chain/interface.ts +++ b/packages/beacon-node/src/chain/interface.ts @@ -1,4 +1,16 @@ -import {allForks, UintNum64, Root, phase0, Slot, RootHex, Epoch, ValidatorIndex, eip4844, Wei} from "@lodestar/types"; +import { + allForks, + UintNum64, + Root, + phase0, + Slot, + RootHex, + Epoch, + ValidatorIndex, + eip4844, + Wei, + capella, +} from "@lodestar/types"; import {CachedBeaconStateAllForks} from "@lodestar/state-transition"; import {IBeaconConfig} from "@lodestar/config"; import {CompositeTypeAny, TreeView, Type} from "@chainsafe/ssz"; @@ -8,6 +20,7 @@ import {IForkChoice, ProtoBlock} from "@lodestar/fork-choice"; import {IEth1ForBlockProduction} from "../eth1/index.js"; import {IExecutionEngine, IExecutionBuilder} from "../execution/index.js"; import {IMetrics} from "../metrics/metrics.js"; +import {IBeaconDb} from "../db/index.js"; import {IBeaconClock} from "./clock/interface.js"; import {ChainEventEmitter} from "./emitter.js"; import {IStateRegenerator} from "./regen/index.js"; @@ -90,6 +103,7 @@ export interface IBeaconChain { readonly checkpointBalancesCache: CheckpointBalancesCache; readonly producedBlobsSidecarCache: Map; readonly opts: IChainOptions; + readonly db: IBeaconDb; /** Stop beacon chain processing */ close(): Promise; @@ -133,6 +147,7 @@ export interface IBeaconChain { /** Persist bad items to persistInvalidSszObjectsDir dir, for example invalid state, attestations etc. */ persistInvalidSszView(view: TreeView, suffix?: string): void; updateBuilderStatus(clockSlot: Slot): void; + cacheBlsToExecutionChanges(blsToExecutionChange: capella.SignedBLSToExecutionChange): Promise; } export type SSZObjectType = diff --git a/packages/beacon-node/src/chain/opPools/opPool.ts b/packages/beacon-node/src/chain/opPools/opPool.ts index 565243c9b600..9120a49a28dc 100644 --- a/packages/beacon-node/src/chain/opPools/opPool.ts +++ b/packages/beacon-node/src/chain/opPools/opPool.ts @@ -1,6 +1,5 @@ import { CachedBeaconStateAllForks, - CachedBeaconStateCapella, computeEpochAtSlot, computeStartSlotAtEpoch, getAttesterSlashableIndices, @@ -69,7 +68,7 @@ export class OpPool { this.insertVoluntaryExit(voluntaryExit); } for (const item of blsToExecutionChanges) { - this.blsToExecutionChanges.set(item.message.validatorIndex, item); + this.insertBlsToExecutionChange(item); } } @@ -233,7 +232,7 @@ export class OpPool { const blsToExecutionChanges: capella.SignedBLSToExecutionChange[] = []; for (const blsToExecutionChange of this.blsToExecutionChanges.values()) { - if (isValidBlsToExecutionChangeForBlockInclusion(state as CachedBeaconStateCapella, blsToExecutionChange)) { + if (isValidBlsToExecutionChangeForBlockInclusion(state, blsToExecutionChange)) { blsToExecutionChanges.push(blsToExecutionChange); if (blsToExecutionChanges.length >= MAX_BLS_TO_EXECUTION_CHANGES) { break; diff --git a/packages/beacon-node/src/chain/opPools/utils.ts b/packages/beacon-node/src/chain/opPools/utils.ts index 103f7c8b579f..906f51593bde 100644 --- a/packages/beacon-node/src/chain/opPools/utils.ts +++ b/packages/beacon-node/src/chain/opPools/utils.ts @@ -1,7 +1,7 @@ import bls from "@chainsafe/bls"; import {CoordType, Signature} from "@chainsafe/bls/types"; import {BLS_WITHDRAWAL_PREFIX} from "@lodestar/params"; -import {CachedBeaconStateCapella} from "@lodestar/state-transition"; +import {CachedBeaconStateAllForks} from "@lodestar/state-transition"; import {Slot, capella} from "@lodestar/types"; /** @@ -38,7 +38,7 @@ export function signatureFromBytesNoCheck(signature: Uint8Array): Signature { * can become invalid for certain forks. */ export function isValidBlsToExecutionChangeForBlockInclusion( - state: CachedBeaconStateCapella, + state: CachedBeaconStateAllForks, signedBLSToExecutionChange: capella.SignedBLSToExecutionChange ): boolean { // For each condition from https://github.com/ethereum/consensus-specs/blob/dev/specs/capella/beacon-chain.md#new-process_bls_to_execution_change diff --git a/packages/beacon-node/src/db/beacon.ts b/packages/beacon-node/src/db/beacon.ts index 39419bfbd017..0fa328d84012 100644 --- a/packages/beacon-node/src/db/beacon.ts +++ b/packages/beacon-node/src/db/beacon.ts @@ -18,6 +18,7 @@ import { BlobsSidecarRepository, BlobsSidecarArchiveRepository, BLSToExecutionChangeRepository, + BLSToExecutionChangeCacheRepository, } from "./repositories/index.js"; import {PreGenesisState, PreGenesisStateLastProcessedBlock} from "./single/index.js"; @@ -33,6 +34,7 @@ export class BeaconDb extends DatabaseService implements IBeaconDb { attesterSlashing: AttesterSlashingRepository; depositEvent: DepositEventRepository; blsToExecutionChange: BLSToExecutionChangeRepository; + blsToExecutionChangeCache: BLSToExecutionChangeCacheRepository; depositDataRoot: DepositDataRootRepository; eth1Data: Eth1DataRepository; @@ -58,6 +60,7 @@ export class BeaconDb extends DatabaseService implements IBeaconDb { this.stateArchive = new StateArchiveRepository(this.config, this.db); this.voluntaryExit = new VoluntaryExitRepository(this.config, this.db); this.blsToExecutionChange = new BLSToExecutionChangeRepository(this.config, this.db); + this.blsToExecutionChangeCache = new BLSToExecutionChangeCacheRepository(this.config, this.db); this.proposerSlashing = new ProposerSlashingRepository(this.config, this.db); this.attesterSlashing = new AttesterSlashingRepository(this.config, this.db); this.depositEvent = new DepositEventRepository(this.config, this.db); diff --git a/packages/beacon-node/src/db/interface.ts b/packages/beacon-node/src/db/interface.ts index 4cba2364d9e4..0d29dae2a167 100644 --- a/packages/beacon-node/src/db/interface.ts +++ b/packages/beacon-node/src/db/interface.ts @@ -17,6 +17,7 @@ import { BlobsSidecarRepository, BlobsSidecarArchiveRepository, BLSToExecutionChangeRepository, + BLSToExecutionChangeCacheRepository, } from "./repositories/index.js"; import {PreGenesisState, PreGenesisStateLastProcessedBlock} from "./single/index.js"; @@ -43,6 +44,7 @@ export interface IBeaconDb { attesterSlashing: AttesterSlashingRepository; depositEvent: DepositEventRepository; blsToExecutionChange: BLSToExecutionChangeRepository; + blsToExecutionChangeCache: BLSToExecutionChangeCacheRepository; // eth1 processing preGenesisState: PreGenesisState; diff --git a/packages/beacon-node/src/db/repositories/blsToExecutionChange.ts b/packages/beacon-node/src/db/repositories/blsToExecutionChange.ts index 3f41dfa9fc1f..ce53150fa00a 100644 --- a/packages/beacon-node/src/db/repositories/blsToExecutionChange.ts +++ b/packages/beacon-node/src/db/repositories/blsToExecutionChange.ts @@ -11,3 +11,16 @@ export class BLSToExecutionChangeRepository extends Repository { + constructor(config: IChainForkConfig, db: Db) { + super(config, db, Bucket.capella_blsToExecutionChangeCache, ssz.capella.SignedBLSToExecutionChange); + } + + getId(value: capella.SignedBLSToExecutionChange): ValidatorIndex { + return value.message.validatorIndex; + } +} diff --git a/packages/beacon-node/src/db/repositories/index.ts b/packages/beacon-node/src/db/repositories/index.ts index 3de56a845c3e..801b50226500 100644 --- a/packages/beacon-node/src/db/repositories/index.ts +++ b/packages/beacon-node/src/db/repositories/index.ts @@ -17,4 +17,4 @@ export {CheckpointHeaderRepository} from "./lightclientCheckpointHeader.js"; export {SyncCommitteeRepository} from "./lightclientSyncCommittee.js"; export {SyncCommitteeWitnessRepository} from "./lightclientSyncCommitteeWitness.js"; export {BackfilledRanges} from "./backfilledRanges.js"; -export {BLSToExecutionChangeRepository} from "./blsToExecutionChange.js"; +export {BLSToExecutionChangeRepository, BLSToExecutionChangeCacheRepository} from "./blsToExecutionChange.js"; diff --git a/packages/beacon-node/src/network/network.ts b/packages/beacon-node/src/network/network.ts index 728564ca2eb2..48ce1c1db322 100644 --- a/packages/beacon-node/src/network/network.ts +++ b/packages/beacon-node/src/network/network.ts @@ -11,6 +11,7 @@ import {routes} from "@lodestar/api"; import {IMetrics} from "../metrics/index.js"; import {ChainEvent, IBeaconChain, IBeaconClock} from "../chain/index.js"; import {BlockInput, BlockInputType, getBlockInput} from "../chain/blocks/types.js"; +import {isValidBlsToExecutionChangeForBlockInclusion} from "../chain/opPools/utils.js"; import {INetworkOptions} from "./options.js"; import {INetwork, Libp2p} from "./interface.js"; import {ReqRespBeaconNode, ReqRespHandlers, doBeaconBlocksMaybeBlobsByRange} from "./reqresp/index.js"; @@ -32,6 +33,9 @@ import {PeersData} from "./peers/peersData.js"; import {getConnectionsMap, isPublishToZeroPeersError} from "./util.js"; import {Discv5Worker} from "./discv5/index.js"; +// How many changes to batch cleanup +const CACHED_BLS_BATCH_CLEANUP_LIMIT = 10; + interface INetworkModules { config: IBeaconConfig; libp2p: Libp2p; @@ -63,6 +67,7 @@ export class Network implements INetwork { private readonly signal: AbortSignal; private subscribedForks = new Set(); + private cachedBlsChangesPromise: Promise | null = null; constructor(private readonly opts: INetworkOptions, modules: INetworkModules) { const {config, libp2p, logger, metrics, chain, reqRespHandlers, gossipHandlers, signal} = modules; @@ -411,6 +416,17 @@ export class Network implements INetwork { } } } + + // If we are subscribed and post capella fork epoch, try gossiping the cached bls changes + if ( + this.isSubscribedToGossipCoreTopics() && + epoch >= this.config.CAPELLA_FORK_EPOCH && + !this.cachedBlsChangesPromise + ) { + this.cachedBlsChangesPromise = this.gossipCachedBlsChanges().then(() => { + this.cachedBlsChangesPromise = null; + }); + } } catch (e) { this.logger.error("Error on BeaconGossipHandler.onEpoch", {epoch}, e as Error); } @@ -482,6 +498,48 @@ export class Network implements INetwork { return topics; } + private async gossipCachedBlsChanges(): Promise { + let gossipedKeys: number[] = []; + let includedKeys: number[] = []; + let totalProcessed = 0; + this.logger.info("Re-gossiping the cached bls changes"); + + try { + const headState = this.chain.getHeadState(); + for await (const value of this.chain.db.blsToExecutionChangeCache.valuesStream()) { + if (isValidBlsToExecutionChangeForBlockInclusion(headState, value)) { + await this.gossip.publishBlsToExecutionChange(value); + gossipedKeys.push(value.message.validatorIndex); + } else { + // No need to gossip if its already been in the headState + // TODO: Should use final state? + includedKeys.push(value.message.validatorIndex); + } + totalProcessed += 1; + + // Cleanup in small batches + if (totalProcessed % CACHED_BLS_BATCH_CLEANUP_LIMIT === 0) { + await this.chain.db.blsToExecutionChangeCache.batchDelete(gossipedKeys); + await this.chain.db.blsToExecutionChangeCache.batchDelete(includedKeys); + includedKeys = []; + gossipedKeys = []; + } + } + } catch (e) { + this.logger.error("Failed to gossip all cached bls changes", {totalProcessed}, e as Error); + } finally { + this.logger.info("Gossiped cached blsChanges", { + gossipedIndexes: `${gossipedKeys}`, + alreadyIncludedIndexes: `${includedKeys}`, + totalProcessed, + }); + // Cleanup whatever was in the last batch + await this.chain.db.blsToExecutionChangeCache.batchDelete(gossipedKeys); + await this.chain.db.blsToExecutionChangeCache.batchDelete(includedKeys); + } + this.logger.info("Processed cached blsChanges", {totalProcessed}); + } + private onLightClientFinalityUpdate = async (finalityUpdate: altair.LightClientFinalityUpdate): Promise => { if (this.hasAttachedSyncCommitteeMember()) { try { diff --git a/packages/beacon-node/test/utils/mocks/chain/chain.ts b/packages/beacon-node/test/utils/mocks/chain/chain.ts index fb7843195dcb..ea82546fc513 100644 --- a/packages/beacon-node/test/utils/mocks/chain/chain.ts +++ b/packages/beacon-node/test/utils/mocks/chain/chain.ts @@ -1,7 +1,20 @@ import sinon from "sinon"; import {CompositeTypeAny, toHexString, TreeView} from "@chainsafe/ssz"; -import {phase0, allForks, UintNum64, Root, Slot, ssz, Uint16, UintBn64, RootHex, eip4844, Wei} from "@lodestar/types"; +import { + phase0, + allForks, + UintNum64, + Root, + Slot, + ssz, + Uint16, + UintBn64, + RootHex, + eip4844, + Wei, + capella, +} from "@lodestar/types"; import {IBeaconConfig} from "@lodestar/config"; import {BeaconStateAllForks, CachedBeaconStateAllForks} from "@lodestar/state-transition"; import {CheckpointWithHex, IForkChoice, ProtoBlock, ExecutionStatus, AncestorStatus} from "@lodestar/fork-choice"; @@ -42,6 +55,7 @@ import {CheckpointBalancesCache} from "../../../../src/chain/balancesCache.js"; import {IChainOptions} from "../../../../src/chain/options.js"; import {BlockAttributes} from "../../../../src/chain/produceBlock/produceBlockBody.js"; import {ReqRespBlockResponse} from "../../../../src/network/index.js"; +import {IBeaconDb} from "../../../../src/db/index.js"; /* eslint-disable @typescript-eslint/no-empty-function */ @@ -67,6 +81,7 @@ export class MockBeaconChain implements IBeaconChain { safeSlotsToImportOptimistically: 0, suggestedFeeRecipient: "0x0000000000000000000000000000000000000000", }; + readonly db: IBeaconDb; readonly anchorStateLatestBlockSlot: Slot; readonly bls: IBlsVerifier; @@ -128,13 +143,13 @@ export class MockBeaconChain implements IBeaconChain { this.forkChoice = mockForkChoice(); this.stateCache = new StateContextCache({}); this.checkpointStateCache = new CheckpointStateCache({}); - const db = new StubbedBeaconDb(); + this.db = new StubbedBeaconDb(); this.regen = new StateRegenerator({ config: this.config, forkChoice: this.forkChoice, stateCache: this.stateCache, checkpointStateCache: this.checkpointStateCache, - db, + db: this.db, metrics: null, emitter: this.emitter, }); @@ -142,7 +157,7 @@ export class MockBeaconChain implements IBeaconChain { {}, { config: this.config, - db: db, + db: this.db, metrics: null, emitter: this.emitter, logger: this.logger, @@ -224,6 +239,10 @@ export class MockBeaconChain implements IBeaconChain { async updateBeaconProposerData(): Promise {} updateBuilderStatus(): void {} + + async cacheBlsToExecutionChanges(blsToExecutionChange: capella.SignedBLSToExecutionChange): Promise { + return this.db.blsToExecutionChangeCache.add(blsToExecutionChange); + } } const root = ssz.Root.defaultValue() as Uint8Array; diff --git a/packages/beacon-node/test/utils/mocks/db.ts b/packages/beacon-node/test/utils/mocks/db.ts index 70e60e2234c2..20e9ca0f2c84 100644 --- a/packages/beacon-node/test/utils/mocks/db.ts +++ b/packages/beacon-node/test/utils/mocks/db.ts @@ -17,6 +17,7 @@ import { BlobsSidecarRepository, BlobsSidecarArchiveRepository, BLSToExecutionChangeRepository, + BLSToExecutionChangeCacheRepository, } from "../../../src/db/repositories/index.js"; import {PreGenesisState, PreGenesisStateLastProcessedBlock} from "../../../src/db/single/index.js"; import {createStubInstance} from "../types.js"; @@ -45,6 +46,7 @@ export function getStubbedBeaconDb(): IBeaconDb { attesterSlashing: createStubInstance(AttesterSlashingRepository), depositEvent: createStubInstance(DepositEventRepository), blsToExecutionChange: createStubInstance(BLSToExecutionChangeRepository), + blsToExecutionChangeCache: createStubInstance(BLSToExecutionChangeCacheRepository), // eth1 processing preGenesisState: createStubInstance(PreGenesisState), diff --git a/packages/beacon-node/test/utils/stub/beaconDb.ts b/packages/beacon-node/test/utils/stub/beaconDb.ts index 3e775030f4d4..6004885759a7 100644 --- a/packages/beacon-node/test/utils/stub/beaconDb.ts +++ b/packages/beacon-node/test/utils/stub/beaconDb.ts @@ -14,6 +14,7 @@ import { StateArchiveRepository, VoluntaryExitRepository, BLSToExecutionChangeRepository, + BLSToExecutionChangeCacheRepository, BlobsSidecarRepository, BlobsSidecarArchiveRepository, } from "../../../src/db/repositories/index.js"; @@ -32,6 +33,8 @@ export class StubbedBeaconDb extends BeaconDb { voluntaryExit: SinonStubbedInstance & VoluntaryExitRepository; blsToExecutionChange: SinonStubbedInstance & BLSToExecutionChangeRepository; + blsToExecutionChangeCache: SinonStubbedInstance & + BLSToExecutionChangeCacheRepository; proposerSlashing: SinonStubbedInstance & ProposerSlashingRepository; attesterSlashing: SinonStubbedInstance & AttesterSlashingRepository; depositEvent: SinonStubbedInstance & DepositEventRepository; @@ -49,6 +52,7 @@ export class StubbedBeaconDb extends BeaconDb { this.voluntaryExit = createStubInstance(VoluntaryExitRepository); this.blsToExecutionChange = createStubInstance(BLSToExecutionChangeRepository); + this.blsToExecutionChangeCache = createStubInstance(BLSToExecutionChangeCacheRepository); this.proposerSlashing = createStubInstance(ProposerSlashingRepository); this.attesterSlashing = createStubInstance(AttesterSlashingRepository); this.depositEvent = createStubInstance(DepositEventRepository); diff --git a/packages/db/src/schema.ts b/packages/db/src/schema.ts index 72ba515bbab6..d66a0b70bcbd 100644 --- a/packages/db/src/schema.ts +++ b/packages/db/src/schema.ts @@ -33,6 +33,7 @@ export enum Bucket { phase0_proposerSlashing = 14, // ValidatorIndex -> ProposerSlashing phase0_attesterSlashing = 15, // Root -> AttesterSlashing capella_blsToExecutionChange = 16, // ValidatorIndex -> SignedBLSToExecutionChange + capella_blsToExecutionChangeCache = 17, // ValidatorIndex -> SignedBLSToExecutionChange // validator // validator = 16, // DEPRECATED on v0.11.0 // lastProposedBlock = 17, // DEPRECATED on v0.11.0