Skip to content
Merged
Show file tree
Hide file tree
Changes from 20 commits
Commits
Show all changes
43 commits
Select commit Hold shift + click to select a range
f40da81
Add VALIDATOR_CUSTODY_REQUIREMENT and BALANCE_PER_ADDITIONAL_CUSTODY_…
dguenther Mar 25, 2025
308f33f
add missing config fields to getSpecCriticalParams
hughy Mar 27, 2025
51e147c
convert CustodyConfig from type to class
hughy Mar 27, 2025
7062510
Add targetCustodyGroupCount and advertisedCustodyGroupCount
dguenther Mar 27, 2025
04879c0
Merge pull request #2 from dguenther/vc-add-cgcs
hughy Mar 27, 2025
d5b74a1
Fetch sampleSubnets and samplingGroups from CustodyConfig
dguenther Mar 27, 2025
9bc3975
Use CustodyConfig in PeerDiscovery
dguenther Mar 27, 2025
8dacc5e
compute validators custody requirement
hughy Mar 27, 2025
46345da
add CustodyConfig to init for NetworkCore in networkCoreWorker
hughy Mar 27, 2025
7c1ce33
update CustodyConfig on forkChoiceFinalized events
hughy Mar 28, 2025
fe23ffa
Pass sampling groups into network worker as well
dguenther Mar 28, 2025
a6771ab
Update metadata and ENR with advertised group count
dguenther Mar 28, 2025
2cfc051
track local validators
hughy Mar 28, 2025
ed58676
Replace sampledColumns with getSampledColumns
dguenther Mar 31, 2025
d2647ad
Remove custodyGroups from custodyConfig
dguenther Mar 31, 2025
2a018b4
Clean up uses of data column groups
dguenther Mar 31, 2025
bf78076
Fix from code review
dguenther Mar 31, 2025
aa664e3
Fix dataColumn tests
dguenther Mar 31, 2025
332f08f
Fix skipped test
dguenther Mar 31, 2025
1fa193c
Remove unused CustodyConfig from BaseNetworkInit
dguenther Mar 31, 2025
727ea9d
Simplify diff
dguenther Mar 31, 2025
6c60c45
Update comment
dguenther Mar 31, 2025
b6c5963
Fix Network
dguenther Mar 31, 2025
f89b721
Simplify diff
dguenther Mar 31, 2025
0c733b7
Simplify discover
dguenther Mar 31, 2025
d5d9352
Remove exports
dguenther Mar 31, 2025
aa576b4
remove LocalValidatorRegistry
hughy Apr 1, 2025
6e513f7
Add tests for updateCustodyRequirement
dguenther Apr 1, 2025
6c58566
Remove unused getSampledGroups
dguenther Apr 1, 2025
bb64459
reverts changes adjacent to ValidatorMonitor
hughy Apr 3, 2025
5a93092
Revert optional operator
dguenther Apr 4, 2025
736b3a6
Revert peerManager changes to samplingGroups
dguenther Apr 4, 2025
e45f9e6
Cache values instead of using getters
dguenther Apr 4, 2025
43773a9
Use ChainEventEmitter instead of CustodyEvent
dguenther Apr 7, 2025
b07d7e6
Merge remote-tracking branch 'upstream/peerDAS' into peerDAS-validato…
dguenther Apr 7, 2025
be88817
update CustodyConfig separately from computing custody requirement
hughy Apr 7, 2025
516f84a
Use CustodyConfig in NetworkConfig
dguenther Apr 7, 2025
5bd3044
Use updateTargetCustodyGroupCount in NetworkConfig
dguenther Apr 7, 2025
b7d2275
add option to disable validator custody
hughy Apr 7, 2025
25d568b
do not emit custody events if validator custody disabled
hughy Apr 8, 2025
f0343fe
Cleanup code
dguenther Apr 8, 2025
e0426e7
Merge remote-tracking branch 'upstream/peerDAS' into peerDAS-validato…
dguenther Apr 13, 2025
9dae088
Access custodyConfig via getter
dguenther Apr 14, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions packages/beacon-node/src/api/impl/validator/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1420,6 +1420,11 @@ export function getValidatorApi(
// If the discovery mechanism isn't disabled, attempt to set up a peer discovery for the
// required subnets.

// Register validators in LocalValidatorRegistry
for (const subscription of subscriptions) {
chain.localValidatorRegistry.registerLocalValidator(subscription.validatorIndex);
}

if (metrics) {
for (const subscription of subscriptions) {
metrics.registerLocalValidator(subscription.validatorIndex);
Expand Down
11 changes: 4 additions & 7 deletions packages/beacon-node/src/chain/blocks/writeBlockInputToDb.ts
Original file line number Diff line number Diff line change
Expand Up @@ -50,15 +50,15 @@ export async function writeBlockInputToDb(this: BeaconChain, blocksInput: BlockI
});
} else {
const {custodyConfig} = this.seenGossipBlockInput;
const {custodyColumnsLen, custodyColumnsIndex, custodyColumns} = custodyConfig;
const {custodyColumnsIndex, custodyColumns} = custodyConfig.getCustodyColumnsWithIndex();
const blobsLen = (block.message as fulu.BeaconBlock).body.blobKzgCommitments.length;
let dataColumnsLen: number;
let dataColumnsIndex: Uint8Array;
if (blobsLen === 0) {
dataColumnsLen = 0;
dataColumnsIndex = new Uint8Array(NUMBER_OF_COLUMNS);
} else {
dataColumnsLen = custodyColumnsLen;
dataColumnsLen = custodyColumns.length;
dataColumnsIndex = custodyColumnsIndex;
}

Expand Down Expand Up @@ -127,11 +127,8 @@ export async function removeEagerlyPersistedBlockInputs(this: BeaconChain, block
blobsToRemove.push({blockRoot, slot, blobSidecars});
} else {
const {custodyConfig} = this.seenGossipBlockInput;
const {
custodyColumnsLen: dataColumnsLen,
custodyColumnsIndex: dataColumnsIndex,
custodyColumns,
} = custodyConfig;
const {custodyColumnsIndex: dataColumnsIndex, custodyColumns} = custodyConfig.getCustodyColumnsWithIndex();
const dataColumnsLen = custodyColumns.length;
const dataColumnSidecars = (blockData as BlockInputDataColumns).dataColumns.filter((dataColumnSidecar) =>
custodyColumns.includes(dataColumnSidecar.index)
);
Expand Down
27 changes: 21 additions & 6 deletions packages/beacon-node/src/chain/chain.ts
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ import {Metrics} from "../metrics/index.js";
import {NodeId} from "../network/subnets/interface.js";
import {BufferPool} from "../util/bufferPool.js";
import {Clock, ClockEvent, IClock} from "../util/clock.js";
import {getCustodyConfig} from "../util/dataColumns.js";
import {CustodyConfig} from "../util/dataColumns.js";
import {ensureDir, writeIfNotExist} from "../util/file.js";
import {isOptimisticBlock} from "../util/forkChoice.js";
import {SerializedCache} from "../util/serializedCache.js";
Expand Down Expand Up @@ -105,6 +105,7 @@ import {FileCPStateDatastore} from "./stateCache/datastore/file.js";
import {FIFOBlockStateCache} from "./stateCache/fifoBlockStateCache.js";
import {InMemoryCheckpointStateCache} from "./stateCache/inMemoryCheckpointsCache.js";
import {PersistentCheckpointStateCache} from "./stateCache/persistentCheckpointsCache.js";
import {LocalValidatorRegistry} from "../node/localValidatorRegistry.js";

/**
* Arbitrary constants, blobs and payloads should be consumed immediately in the same slot
Expand Down Expand Up @@ -143,6 +144,9 @@ export class BeaconChain implements IBeaconChain {
readonly syncContributionAndProofPool = new SyncContributionAndProofPool();
readonly opPool = new OpPool();

// Data column custody config
readonly custodyConfig: CustodyConfig;

// Gossip seen cache
readonly seenAttesters = new SeenAttesters();
readonly seenAggregators = new SeenAggregators();
Expand Down Expand Up @@ -180,6 +184,7 @@ export class BeaconChain implements IBeaconChain {
protected readonly db: IBeaconDb;
private abortController = new AbortController();
private processShutdownCallback: ProcessShutdownCallback;
readonly localValidatorRegistry: LocalValidatorRegistry;

constructor(
opts: IChainOptions,
Expand All @@ -197,6 +202,7 @@ export class BeaconChain implements IBeaconChain {
eth1,
executionEngine,
executionBuilder,
localValidatorRegistry,
}: {
nodeId: NodeId;
config: BeaconConfig;
Expand All @@ -212,6 +218,7 @@ export class BeaconChain implements IBeaconChain {
eth1: IEth1ForBlockProduction;
executionEngine: IExecutionEngine;
executionBuilder?: IExecutionBuilder;
localValidatorRegistry: LocalValidatorRegistry;
}
) {
this.opts = opts;
Expand Down Expand Up @@ -250,11 +257,12 @@ export class BeaconChain implements IBeaconChain {
this.opts?.preaggregateSlotDistance
);

this.custodyConfig = new CustodyConfig(nodeId, config);

this.seenAggregatedAttestations = new SeenAggregatedAttestations(metrics);
this.seenContributionAndProof = new SeenContributionAndProof(metrics);
this.seenAttestationDatas = new SeenAttestationDatas(metrics, this.opts?.attDataCacheSlotDistance);
const custodyConfig = getCustodyConfig(nodeId, config);
this.seenGossipBlockInput = new SeenGossipBlockInput(custodyConfig);
this.seenGossipBlockInput = new SeenGossipBlockInput(this.custodyConfig);

this.beaconProposerCache = new BeaconProposerCache(opts);
this.checkpointBalancesCache = new CheckpointBalancesCache();
Expand Down Expand Up @@ -387,6 +395,8 @@ export class BeaconChain implements IBeaconChain {
clock.addListener(ClockEvent.epoch, this.onClockEpoch.bind(this));
emitter.addListener(ChainEvent.forkChoiceFinalized, this.onForkChoiceFinalized.bind(this));
emitter.addListener(ChainEvent.forkChoiceJustified, this.onForkChoiceJustified.bind(this));

this.localValidatorRegistry = localValidatorRegistry;
}

async init(): Promise<void> {
Expand Down Expand Up @@ -1134,11 +1144,13 @@ export class BeaconChain implements IBeaconChain {
}

const metrics = this.metrics;
if (metrics && (slot + 1) % SLOTS_PER_EPOCH === 0) {
if ((slot + 1) % SLOTS_PER_EPOCH === 0) {
// On the last slot of the epoch
sleep((1000 * this.config.SECONDS_PER_SLOT) / 2)
.then(() => metrics.onceEveryEndOfEpoch(this.getHeadState()))
.catch((e) => {
.then(() => {
metrics?.onceEveryEndOfEpoch(this.getHeadState())
this.localValidatorRegistry.pruneInactiveValidators()
}).catch((e) => {
if (!isErrorAborted(e)) this.logger.error("Error on validator monitor onceEveryEndOfEpoch", {slot}, e);
});
}
Expand Down Expand Up @@ -1188,6 +1200,9 @@ export class BeaconChain implements IBeaconChain {

if (headState) {
this.opPool.pruneAll(headBlock, headState);
// Update custody requirement based on finalized state
const validatorIndices = this.localValidatorRegistry.getLocalValidatorIndices();
this.custodyConfig.updateCustodyRequirement(headState, validatorIndices);
}

if (headState === null) {
Expand Down
6 changes: 6 additions & 0 deletions packages/beacon-node/src/chain/interface.ts
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ import {SeenAggregatedAttestations} from "./seenCache/seenAggregateAndProof.js";
import {SeenAttestationDatas} from "./seenCache/seenAttestationData.js";
import {SeenBlockAttesters} from "./seenCache/seenBlockAttesters.js";
import {ShufflingCache} from "./shufflingCache.js";
import {CustodyConfig} from "../util/dataColumns.js";
import {LocalValidatorRegistry} from "../node/localValidatorRegistry.js";

export {BlockType, type AssembledBlockType};
export {type ProposerPreparationData};
Expand Down Expand Up @@ -91,6 +93,7 @@ export interface IBeaconChain {
readonly logger: Logger;
readonly metrics: Metrics | null;
readonly bufferPool: BufferPool | null;
readonly localValidatorRegistry: LocalValidatorRegistry;

/** The initial slot that the chain is started with */
readonly anchorStateLatestBlockSlot: Slot;
Expand All @@ -113,6 +116,9 @@ export interface IBeaconChain {
readonly syncContributionAndProofPool: SyncContributionAndProofPool;
readonly opPool: OpPool;

// Data column custody config
readonly custodyConfig: CustodyConfig;

// Gossip seen cache
readonly seenAttesters: SeenAttesters;
readonly seenAggregators: SeenAggregators;
Expand Down
14 changes: 6 additions & 8 deletions packages/beacon-node/src/chain/seenCache/seenGossipBlockInput.ts
Original file line number Diff line number Diff line change
Expand Up @@ -247,15 +247,13 @@ export class SeenGossipBlockInput {
};
}

const sampledColumns = this.custodyConfig.getSampledColumns();
const sampledIndexesPresent =
dataColumnsCache.size >= this.custodyConfig.sampledColumns.length &&
this.custodyConfig.sampledColumns.reduce(
(acc, columnIndex) => acc && dataColumnsCache.has(columnIndex),
true
);
dataColumnsCache.size >= sampledColumns.length &&
sampledColumns.reduce((acc, columnIndex) => acc && dataColumnsCache.has(columnIndex), true);

if (sampledIndexesPresent) {
const allDataColumns = getBlockInputDataColumns(dataColumnsCache, this.custodyConfig.sampledColumns);
const allDataColumns = getBlockInputDataColumns(dataColumnsCache, sampledColumns);
metrics?.syncUnknownBlock.resolveAvailabilitySource.inc({source: BlockInputAvailabilitySource.GOSSIP});
const {dataColumns} = allDataColumns;
const blockData: BlockInputDataColumns = {
Expand All @@ -274,7 +272,7 @@ export class SeenGossipBlockInput {
blockInputMeta: {
pending: null,
haveColumns: dataColumns.length,
expectedColumns: this.custodyConfig.sampledColumns.length,
expectedColumns: sampledColumns.length,
},
};
}
Expand All @@ -287,7 +285,7 @@ export class SeenGossipBlockInput {
blockInputMeta: {
pending: GossipedInputType.dataColumn,
haveColumns: dataColumnsCache.size,
expectedColumns: this.custodyConfig.sampledColumns.length,
expectedColumns: sampledColumns.length,
},
};
}
Expand Down
8 changes: 8 additions & 0 deletions packages/beacon-node/src/metrics/validatorMonitor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ export type ValidatorMonitor = {
): void;
onceEveryEndOfEpoch(state: CachedBeaconStateAllForks): void;
scrapeMetrics(slotClock: Slot): void;
getLocalValidatorIndices(): ValidatorIndex[];
};

export type ValidatorMonitorOpts = {
Expand Down Expand Up @@ -789,6 +790,13 @@ export function createValidatorMonitor(
metrics.validatorMonitor.prevEpochSyncCommitteeHits.set(prevEpochSyncCommitteeHits);
metrics.validatorMonitor.prevEpochSyncCommitteeMisses.set(prevEpochSyncCommitteeMisses);
},

/**
* Get the indices of the validators that are being monitored.
*/
getLocalValidatorIndices() {
return Array.from(validators.keys());
},
};
}

Expand Down
16 changes: 15 additions & 1 deletion packages/beacon-node/src/network/core/networkCore.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import {SyncnetsService} from "../subnets/syncnetsService.js";
import {getConnectionsMap} from "../util.js";
import {NetworkCoreMetrics, createNetworkCoreMetrics} from "./metrics.js";
import {INetworkCore, MultiaddrStr, PeerIdStr} from "./types.js";
import {CustodyConfig} from "../../util/dataColumns.js";

type Mods = {
libp2p: Libp2p;
Expand Down Expand Up @@ -64,6 +65,8 @@ export type BaseNetworkInit = {
getReqRespHandler: GetReqRespHandlerFn;
activeValidatorCount: number;
initialStatus: phase0.Status;
initialSamplingGroupCount: number;
initialAdvertisedGroupCount: number;
};

/**
Expand Down Expand Up @@ -135,6 +138,8 @@ export class NetworkCore implements INetworkCore {
getReqRespHandler,
activeValidatorCount,
initialStatus,
initialSamplingGroupCount,
initialAdvertisedGroupCount,
}: BaseNetworkInit): Promise<NetworkCore> {
const libp2p = await createNodeJsLibp2p(peerId, opts, {
peerStoreDir,
Expand All @@ -154,7 +159,7 @@ export class NetworkCore implements INetworkCore {
const onMetadataSetValue = function onMetadataSetValue(key: string, value: Uint8Array): void {
discv5?.setEnrValue(key, value).catch((e) => logger.error("error on setEnrValue", {key}, e));
};
const metadata = new MetadataController({}, {config, onSetValue: onMetadataSetValue});
const metadata = new MetadataController({}, {config, onSetValue: onMetadataSetValue, initialAdvertisedGroupCount});

const reqResp = new ReqRespBeaconNode(
{
Expand Down Expand Up @@ -213,6 +218,7 @@ export class NetworkCore implements INetworkCore {
events,
peersData,
statusCache,
initialSamplingGroupCount,
},
opts
);
Expand Down Expand Up @@ -342,6 +348,14 @@ export class NetworkCore implements INetworkCore {
return recipients.length;
}

async setSamplingGroupCount(count: number): Promise<void> {
this.peerManager.setSamplingGroupCount(count);
}

async setAdvertisedGroupCount(count: number): Promise<void> {
this.metadata.cgc = count;
}

// REST API queries

async getNetworkIdentity(): Promise<routes.node.NetworkIdentity> {
Expand Down
5 changes: 5 additions & 0 deletions packages/beacon-node/src/network/core/networkCoreWorker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,8 @@ const core = await NetworkCore.init({
reqRespBridgeRespCaller.getAsyncIterable({method, req, peerId: peerIdToString(peerId)}),
activeValidatorCount: workerData.activeValidatorCount,
initialStatus: workerData.initialStatus,
initialSamplingGroupCount: workerData.initialSamplingGroupCount,
initialAdvertisedGroupCount: workerData.initialAdvertisedGroupCount,
});

wireEventsOnWorkerThread<NetworkEventData>(
Expand Down Expand Up @@ -138,6 +140,9 @@ const libp2pWorkerApi: NetworkWorkerApi = {
// sendReqRespRequest - handled via events with AsyncIterableBridgeHandler
publishGossip: (topic, data, opts) => core.publishGossip(topic, data, opts),

setSamplingGroupCount: (count) => core.setSamplingGroupCount(count),
setAdvertisedGroupCount: (count) => core.setAdvertisedGroupCount(count),

// Debug

getNetworkIdentity: () => core.getNetworkIdentity(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ export type WorkerNetworkCoreOpts = NetworkOptions & {
activeValidatorCount: number;
genesisTime: number;
initialStatus: phase0.Status;
initialSamplingGroupCount: number;
initialAdvertisedGroupCount: number;
};

export type WorkerNetworkCoreInitModules = {
Expand Down Expand Up @@ -104,7 +106,16 @@ export class WorkerNetworkCore implements INetworkCore {

static async init(modules: WorkerNetworkCoreInitModules): Promise<WorkerNetworkCore> {
const {opts, config, peerId} = modules;
const {genesisTime, peerStoreDir, activeValidatorCount, localMultiaddrs, metricsEnabled, initialStatus} = opts;
const {
genesisTime,
peerStoreDir,
activeValidatorCount,
localMultiaddrs,
metricsEnabled,
initialStatus,
initialSamplingGroupCount,
initialAdvertisedGroupCount,
} = opts;

const workerData: NetworkWorkerData = {
opts,
Expand All @@ -116,6 +127,8 @@ export class WorkerNetworkCore implements INetworkCore {
peerStoreDir,
genesisTime,
initialStatus,
initialSamplingGroupCount,
initialAdvertisedGroupCount,
activeValidatorCount,
loggerOpts: modules.logger.toOpts(),
};
Expand Down Expand Up @@ -217,6 +230,15 @@ export class WorkerNetworkCore implements INetworkCore {
return this.getApi().publishGossip(topic, data, opts);
}

// Custody

setSamplingGroupCount(count: number): Promise<void> {
return this.getApi().setSamplingGroupCount(count);
}
setAdvertisedGroupCount(count: number): Promise<void> {
return this.getApi().setAdvertisedGroupCount(count);
}

// Debug

connectToPeer(peer: PeerIdStr, multiaddr: MultiaddrStr[]): Promise<void> {
Expand Down
7 changes: 7 additions & 0 deletions packages/beacon-node/src/network/core/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ export interface INetworkCore extends INetworkCorePublic {
/** Chain must push status updates to the network core */
updateStatus(status: phase0.Status): Promise<void>;

setSamplingGroupCount(count: number): Promise<void>;
setAdvertisedGroupCount(count: number): Promise<void>;
/** Opens stream to handle ReqResp outgoing request */
sendReqRespRequest(data: OutgoingRequestArgs): AsyncIterable<ResponseIncoming>;
/** Publish gossip message to peers */
Expand All @@ -78,6 +80,8 @@ export type NetworkWorkerData = {
genesisTime: number;
activeValidatorCount: number;
initialStatus: phase0.Status;
initialSamplingGroupCount: number;
initialAdvertisedGroupCount: number;
peerIdProto: Uint8Array;
localMultiaddrs: string[];
metricsEnabled: boolean;
Expand All @@ -101,6 +105,9 @@ export type NetworkWorkerApi = INetworkCorePublic & {
getConnectedPeerCount(): Promise<number>;
updateStatus(status: phase0.Status): Promise<void>;

setSamplingGroupCount(count: number): Promise<void>;
setAdvertisedGroupCount(count: number): Promise<void>;

// sendReqRespRequest - implemented via events
publishGossip(topic: string, data: Uint8Array, opts?: PublishOpts): Promise<number>;

Expand Down
Loading