Skip to content
13 changes: 11 additions & 2 deletions packages/beacon-node/src/api/impl/beacon/pool/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -104,11 +104,20 @@ export function getBeaconPoolApi({
try {
await validateBlsToExecutionChange(chain, blsToExecutionChange);
chain.opPool.insertBlsToExecutionChange(blsToExecutionChange);
await network.gossip.publishBlsToExecutionChange(blsToExecutionChange);
if (
chain.clock.currentEpoch >= chain.config.CAPELLA_FORK_EPOCH &&
// TODO: Remove below condition
// Only used for testing in devnet-3 of withdrawals
network.isSubscribedToGossipCoreTopics()
) {
await network.gossip.publishBlsToExecutionChange(blsToExecutionChange);
} else {
await chain.cacheBlsToExecutionChanges(blsToExecutionChange);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is it necessary to explicitly call this? chain.opPool.insertBlsToExecutionChange should eventually write to DB on shutdown

}
} catch (e) {
errors.push(e as Error);
logger.error(
`Error on submitPoolSyncCommitteeSignatures [${i}]`,
`Error on submitPoolBlsToExecutionChange [${i}]`,
{validatorIndex: blsToExecutionChange.message.validatorIndex},
e as Error
);
Expand Down
21 changes: 19 additions & 2 deletions packages/beacon-node/src/chain/chain.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,19 @@ import {
PubkeyIndexMap,
} from "@lodestar/state-transition";
import {IBeaconConfig} from "@lodestar/config";
import {allForks, UintNum64, Root, phase0, Slot, RootHex, Epoch, ValidatorIndex, eip4844, Wei} from "@lodestar/types";
import {
allForks,
UintNum64,
Root,
phase0,
Slot,
RootHex,
Epoch,
ValidatorIndex,
eip4844,
Wei,
capella,
} from "@lodestar/types";
import {CheckpointWithHex, ExecutionStatus, IForkChoice, ProtoBlock} from "@lodestar/fork-choice";
import {ProcessShutdownCallback} from "@lodestar/validator";
import {ILogger, pruneSetToMax, toHex} from "@lodestar/utils";
Expand Down Expand Up @@ -122,9 +134,9 @@ export class BeaconChain implements IBeaconChain {
/** Map keyed by executionPayload.blockHash of the block for those blobs */
readonly producedBlobsSidecarCache = new Map<RootHex, eip4844.BlobsSidecar>();
readonly opts: IChainOptions;
readonly db: IBeaconDb;

protected readonly blockProcessor: BlockProcessor;
protected readonly db: IBeaconDb;
private readonly archiver: Archiver;
private abortController = new AbortController();
private successfulExchangeTransition = false;
Expand Down Expand Up @@ -779,4 +791,9 @@ export class BeaconChain implements IBeaconChain {
}
}
}

/** Must be validated beforehand */
async cacheBlsToExecutionChanges(blsToExecutionChange: capella.SignedBLSToExecutionChange): Promise<void> {
return this.db.blsToExecutionChangeCache.add(blsToExecutionChange);
}
}
17 changes: 16 additions & 1 deletion packages/beacon-node/src/chain/interface.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,16 @@
import {allForks, UintNum64, Root, phase0, Slot, RootHex, Epoch, ValidatorIndex, eip4844, Wei} from "@lodestar/types";
import {
allForks,
UintNum64,
Root,
phase0,
Slot,
RootHex,
Epoch,
ValidatorIndex,
eip4844,
Wei,
capella,
} from "@lodestar/types";
import {CachedBeaconStateAllForks} from "@lodestar/state-transition";
import {IBeaconConfig} from "@lodestar/config";
import {CompositeTypeAny, TreeView, Type} from "@chainsafe/ssz";
Expand All @@ -8,6 +20,7 @@ import {IForkChoice, ProtoBlock} from "@lodestar/fork-choice";
import {IEth1ForBlockProduction} from "../eth1/index.js";
import {IExecutionEngine, IExecutionBuilder} from "../execution/index.js";
import {IMetrics} from "../metrics/metrics.js";
import {IBeaconDb} from "../db/index.js";
import {IBeaconClock} from "./clock/interface.js";
import {ChainEventEmitter} from "./emitter.js";
import {IStateRegenerator} from "./regen/index.js";
Expand Down Expand Up @@ -90,6 +103,7 @@ export interface IBeaconChain {
readonly checkpointBalancesCache: CheckpointBalancesCache;
readonly producedBlobsSidecarCache: Map<RootHex, eip4844.BlobsSidecar>;
readonly opts: IChainOptions;
readonly db: IBeaconDb;

/** Stop beacon chain processing */
close(): Promise<void>;
Expand Down Expand Up @@ -133,6 +147,7 @@ export interface IBeaconChain {
/** Persist bad items to persistInvalidSszObjectsDir dir, for example invalid state, attestations etc. */
persistInvalidSszView(view: TreeView<CompositeTypeAny>, suffix?: string): void;
updateBuilderStatus(clockSlot: Slot): void;
cacheBlsToExecutionChanges(blsToExecutionChange: capella.SignedBLSToExecutionChange): Promise<void>;
}

export type SSZObjectType =
Expand Down
5 changes: 2 additions & 3 deletions packages/beacon-node/src/chain/opPools/opPool.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import {
CachedBeaconStateAllForks,
CachedBeaconStateCapella,
computeEpochAtSlot,
computeStartSlotAtEpoch,
getAttesterSlashableIndices,
Expand Down Expand Up @@ -69,7 +68,7 @@ export class OpPool {
this.insertVoluntaryExit(voluntaryExit);
}
for (const item of blsToExecutionChanges) {
this.blsToExecutionChanges.set(item.message.validatorIndex, item);
this.insertBlsToExecutionChange(item);
}
}

Expand Down Expand Up @@ -233,7 +232,7 @@ export class OpPool {

const blsToExecutionChanges: capella.SignedBLSToExecutionChange[] = [];
for (const blsToExecutionChange of this.blsToExecutionChanges.values()) {
if (isValidBlsToExecutionChangeForBlockInclusion(state as CachedBeaconStateCapella, blsToExecutionChange)) {
if (isValidBlsToExecutionChangeForBlockInclusion(state, blsToExecutionChange)) {
blsToExecutionChanges.push(blsToExecutionChange);
if (blsToExecutionChanges.length >= MAX_BLS_TO_EXECUTION_CHANGES) {
break;
Expand Down
4 changes: 2 additions & 2 deletions packages/beacon-node/src/chain/opPools/utils.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import bls from "@chainsafe/bls";
import {CoordType, Signature} from "@chainsafe/bls/types";
import {BLS_WITHDRAWAL_PREFIX} from "@lodestar/params";
import {CachedBeaconStateCapella} from "@lodestar/state-transition";
import {CachedBeaconStateAllForks} from "@lodestar/state-transition";
import {Slot, capella} from "@lodestar/types";

/**
Expand Down Expand Up @@ -38,7 +38,7 @@ export function signatureFromBytesNoCheck(signature: Uint8Array): Signature {
* can become invalid for certain forks.
*/
export function isValidBlsToExecutionChangeForBlockInclusion(
state: CachedBeaconStateCapella,
state: CachedBeaconStateAllForks,
signedBLSToExecutionChange: capella.SignedBLSToExecutionChange
): boolean {
// For each condition from https://github.com/ethereum/consensus-specs/blob/dev/specs/capella/beacon-chain.md#new-process_bls_to_execution_change
Expand Down
3 changes: 3 additions & 0 deletions packages/beacon-node/src/db/beacon.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import {
BlobsSidecarRepository,
BlobsSidecarArchiveRepository,
BLSToExecutionChangeRepository,
BLSToExecutionChangeCacheRepository,
} from "./repositories/index.js";
import {PreGenesisState, PreGenesisStateLastProcessedBlock} from "./single/index.js";

Expand All @@ -33,6 +34,7 @@ export class BeaconDb extends DatabaseService implements IBeaconDb {
attesterSlashing: AttesterSlashingRepository;
depositEvent: DepositEventRepository;
blsToExecutionChange: BLSToExecutionChangeRepository;
blsToExecutionChangeCache: BLSToExecutionChangeCacheRepository;

depositDataRoot: DepositDataRootRepository;
eth1Data: Eth1DataRepository;
Expand All @@ -58,6 +60,7 @@ export class BeaconDb extends DatabaseService implements IBeaconDb {
this.stateArchive = new StateArchiveRepository(this.config, this.db);
this.voluntaryExit = new VoluntaryExitRepository(this.config, this.db);
this.blsToExecutionChange = new BLSToExecutionChangeRepository(this.config, this.db);
this.blsToExecutionChangeCache = new BLSToExecutionChangeCacheRepository(this.config, this.db);
this.proposerSlashing = new ProposerSlashingRepository(this.config, this.db);
this.attesterSlashing = new AttesterSlashingRepository(this.config, this.db);
this.depositEvent = new DepositEventRepository(this.config, this.db);
Expand Down
2 changes: 2 additions & 0 deletions packages/beacon-node/src/db/interface.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import {
BlobsSidecarRepository,
BlobsSidecarArchiveRepository,
BLSToExecutionChangeRepository,
BLSToExecutionChangeCacheRepository,
} from "./repositories/index.js";
import {PreGenesisState, PreGenesisStateLastProcessedBlock} from "./single/index.js";

Expand All @@ -43,6 +44,7 @@ export interface IBeaconDb {
attesterSlashing: AttesterSlashingRepository;
depositEvent: DepositEventRepository;
blsToExecutionChange: BLSToExecutionChangeRepository;
blsToExecutionChangeCache: BLSToExecutionChangeCacheRepository;

// eth1 processing
preGenesisState: PreGenesisState;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,16 @@ export class BLSToExecutionChangeRepository extends Repository<ValidatorIndex, c
return value.message.validatorIndex;
}
}

export class BLSToExecutionChangeCacheRepository extends Repository<
ValidatorIndex,
capella.SignedBLSToExecutionChange
> {
constructor(config: IChainForkConfig, db: Db) {
super(config, db, Bucket.capella_blsToExecutionChangeCache, ssz.capella.SignedBLSToExecutionChange);
}

getId(value: capella.SignedBLSToExecutionChange): ValidatorIndex {
return value.message.validatorIndex;
}
}
2 changes: 1 addition & 1 deletion packages/beacon-node/src/db/repositories/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,4 @@ export {CheckpointHeaderRepository} from "./lightclientCheckpointHeader.js";
export {SyncCommitteeRepository} from "./lightclientSyncCommittee.js";
export {SyncCommitteeWitnessRepository} from "./lightclientSyncCommitteeWitness.js";
export {BackfilledRanges} from "./backfilledRanges.js";
export {BLSToExecutionChangeRepository} from "./blsToExecutionChange.js";
export {BLSToExecutionChangeRepository, BLSToExecutionChangeCacheRepository} from "./blsToExecutionChange.js";
58 changes: 58 additions & 0 deletions packages/beacon-node/src/network/network.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import {routes} from "@lodestar/api";
import {IMetrics} from "../metrics/index.js";
import {ChainEvent, IBeaconChain, IBeaconClock} from "../chain/index.js";
import {BlockInput, BlockInputType, getBlockInput} from "../chain/blocks/types.js";
import {isValidBlsToExecutionChangeForBlockInclusion} from "../chain/opPools/utils.js";
import {INetworkOptions} from "./options.js";
import {INetwork, Libp2p} from "./interface.js";
import {ReqRespBeaconNode, ReqRespHandlers, doBeaconBlocksMaybeBlobsByRange} from "./reqresp/index.js";
Expand All @@ -32,6 +33,9 @@ import {PeersData} from "./peers/peersData.js";
import {getConnectionsMap, isPublishToZeroPeersError} from "./util.js";
import {Discv5Worker} from "./discv5/index.js";

// How many changes to batch cleanup
const CACHED_BLS_BATCH_CLEANUP_LIMIT = 10;

interface INetworkModules {
config: IBeaconConfig;
libp2p: Libp2p;
Expand Down Expand Up @@ -63,6 +67,7 @@ export class Network implements INetwork {
private readonly signal: AbortSignal;

private subscribedForks = new Set<ForkName>();
private cachedBlsChangesPromise: Promise<void> | null = null;

constructor(private readonly opts: INetworkOptions, modules: INetworkModules) {
const {config, libp2p, logger, metrics, chain, reqRespHandlers, gossipHandlers, signal} = modules;
Expand Down Expand Up @@ -411,6 +416,17 @@ export class Network implements INetwork {
}
}
}

// If we are subscribed and post capella fork epoch, try gossiping the cached bls changes
if (
this.isSubscribedToGossipCoreTopics() &&
epoch >= this.config.CAPELLA_FORK_EPOCH &&
!this.cachedBlsChangesPromise
) {
this.cachedBlsChangesPromise = this.gossipCachedBlsChanges().then(() => {
this.cachedBlsChangesPromise = null;
});
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just move this code above under the // On fork transition comment, with some retry in case there are no peers

} catch (e) {
this.logger.error("Error on BeaconGossipHandler.onEpoch", {epoch}, e as Error);
}
Expand Down Expand Up @@ -482,6 +498,48 @@ export class Network implements INetwork {
return topics;
}

private async gossipCachedBlsChanges(): Promise<void> {
let gossipedKeys: number[] = [];
let includedKeys: number[] = [];
let totalProcessed = 0;
this.logger.info("Re-gossiping the cached bls changes");

try {
const headState = this.chain.getHeadState();
for await (const value of this.chain.db.blsToExecutionChangeCache.valuesStream()) {
if (isValidBlsToExecutionChangeForBlockInclusion(headState, value)) {
await this.gossip.publishBlsToExecutionChange(value);
gossipedKeys.push(value.message.validatorIndex);
} else {
// No need to gossip if its already been in the headState
// TODO: Should use final state?
includedKeys.push(value.message.validatorIndex);
}
totalProcessed += 1;

// Cleanup in small batches
if (totalProcessed % CACHED_BLS_BATCH_CLEANUP_LIMIT === 0) {
await this.chain.db.blsToExecutionChangeCache.batchDelete(gossipedKeys);
await this.chain.db.blsToExecutionChangeCache.batchDelete(includedKeys);
includedKeys = [];
gossipedKeys = [];
}
}
} catch (e) {
this.logger.error("Failed to gossip all cached bls changes", {totalProcessed}, e as Error);
} finally {
this.logger.info("Gossiped cached blsChanges", {
gossipedIndexes: `${gossipedKeys}`,
alreadyIncludedIndexes: `${includedKeys}`,
totalProcessed,
});
// Cleanup whatever was in the last batch
await this.chain.db.blsToExecutionChangeCache.batchDelete(gossipedKeys);
await this.chain.db.blsToExecutionChangeCache.batchDelete(includedKeys);
}
this.logger.info("Processed cached blsChanges", {totalProcessed});
}

private onLightClientFinalityUpdate = async (finalityUpdate: altair.LightClientFinalityUpdate): Promise<void> => {
if (this.hasAttachedSyncCommitteeMember()) {
try {
Expand Down
27 changes: 23 additions & 4 deletions packages/beacon-node/test/utils/mocks/chain/chain.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,20 @@
import sinon from "sinon";

import {CompositeTypeAny, toHexString, TreeView} from "@chainsafe/ssz";
import {phase0, allForks, UintNum64, Root, Slot, ssz, Uint16, UintBn64, RootHex, eip4844, Wei} from "@lodestar/types";
import {
phase0,
allForks,
UintNum64,
Root,
Slot,
ssz,
Uint16,
UintBn64,
RootHex,
eip4844,
Wei,
capella,
} from "@lodestar/types";
import {IBeaconConfig} from "@lodestar/config";
import {BeaconStateAllForks, CachedBeaconStateAllForks} from "@lodestar/state-transition";
import {CheckpointWithHex, IForkChoice, ProtoBlock, ExecutionStatus, AncestorStatus} from "@lodestar/fork-choice";
Expand Down Expand Up @@ -42,6 +55,7 @@ import {CheckpointBalancesCache} from "../../../../src/chain/balancesCache.js";
import {IChainOptions} from "../../../../src/chain/options.js";
import {BlockAttributes} from "../../../../src/chain/produceBlock/produceBlockBody.js";
import {ReqRespBlockResponse} from "../../../../src/network/index.js";
import {IBeaconDb} from "../../../../src/db/index.js";

/* eslint-disable @typescript-eslint/no-empty-function */

Expand All @@ -67,6 +81,7 @@ export class MockBeaconChain implements IBeaconChain {
safeSlotsToImportOptimistically: 0,
suggestedFeeRecipient: "0x0000000000000000000000000000000000000000",
};
readonly db: IBeaconDb;
readonly anchorStateLatestBlockSlot: Slot;

readonly bls: IBlsVerifier;
Expand Down Expand Up @@ -128,21 +143,21 @@ export class MockBeaconChain implements IBeaconChain {
this.forkChoice = mockForkChoice();
this.stateCache = new StateContextCache({});
this.checkpointStateCache = new CheckpointStateCache({});
const db = new StubbedBeaconDb();
this.db = new StubbedBeaconDb();
this.regen = new StateRegenerator({
config: this.config,
forkChoice: this.forkChoice,
stateCache: this.stateCache,
checkpointStateCache: this.checkpointStateCache,
db,
db: this.db,
metrics: null,
emitter: this.emitter,
});
this.lightClientServer = new LightClientServer(
{},
{
config: this.config,
db: db,
db: this.db,
metrics: null,
emitter: this.emitter,
logger: this.logger,
Expand Down Expand Up @@ -224,6 +239,10 @@ export class MockBeaconChain implements IBeaconChain {

async updateBeaconProposerData(): Promise<void> {}
updateBuilderStatus(): void {}

async cacheBlsToExecutionChanges(blsToExecutionChange: capella.SignedBLSToExecutionChange): Promise<void> {
return this.db.blsToExecutionChangeCache.add(blsToExecutionChange);
}
}

const root = ssz.Root.defaultValue() as Uint8Array;
Expand Down
2 changes: 2 additions & 0 deletions packages/beacon-node/test/utils/mocks/db.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import {
BlobsSidecarRepository,
BlobsSidecarArchiveRepository,
BLSToExecutionChangeRepository,
BLSToExecutionChangeCacheRepository,
} from "../../../src/db/repositories/index.js";
import {PreGenesisState, PreGenesisStateLastProcessedBlock} from "../../../src/db/single/index.js";
import {createStubInstance} from "../types.js";
Expand Down Expand Up @@ -45,6 +46,7 @@ export function getStubbedBeaconDb(): IBeaconDb {
attesterSlashing: createStubInstance(AttesterSlashingRepository),
depositEvent: createStubInstance(DepositEventRepository),
blsToExecutionChange: createStubInstance(BLSToExecutionChangeRepository),
blsToExecutionChangeCache: createStubInstance(BLSToExecutionChangeCacheRepository),

// eth1 processing
preGenesisState: createStubInstance(PreGenesisState),
Expand Down
Loading