Skip to content
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion beacon_chain/beacon_node.nim
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import
./spec/datatypes/[base, altair],
./spec/eth2_apis/dynamic_fee_recipients,
./spec/signatures_batch,
./sync/[sync_manager, request_manager, sync_types],
./sync/[sync_manager, request_manager, sync_types, validator_custody],
./validators/[
action_tracker, message_router, validator_monitor, validator_pool,
keystore_management],
Expand Down Expand Up @@ -95,6 +95,7 @@ type
eventBus*: EventBus
vcProcess*: Process
requestManager*: RequestManager
validatorCustody*: ValidatorCustodyRef
syncManager*: SyncManager[Peer, PeerId]
backfiller*: SyncManager[Peer, PeerId]
untrustedManager*: SyncManager[Peer, PeerId]
Expand Down
10 changes: 9 additions & 1 deletion beacon_chain/nimbus_beacon_node.nim
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import
./spec/[
deposit_snapshots, engine_authentication, weak_subjectivity,
peerdas_helpers],
./sync/[sync_protocol, light_client_protocol, sync_overseer],
./sync/[sync_protocol, light_client_protocol, sync_overseer, validator_custody],
./validators/[keystore_management, beacon_validators],
"."/[
beacon_node, beacon_node_light_client, deposits,
Expand Down Expand Up @@ -585,6 +585,9 @@ proc initFullNode(
dag.cfg.DENEB_FORK_EPOCH, getBeaconTime, (proc(): bool = syncManager.inProgress),
quarantine, blobQuarantine, dataColumnQuarantine, rmanBlockVerifier,
rmanBlockLoader, rmanBlobLoader, rmanDataColumnLoader)
validatorCustody = ValidatorCustodyRef.init(node.network, dag, supernode,
getLocalHeadSlot, custody_columns_set, getBeaconTime,
(proc(): bool = syncManager.inProgress), dataColumnQuarantine)

# As per EIP 7594, the BN is now categorised into a
# `Fullnode` and a `Supernode`, the fullnodes custodies a
Expand Down Expand Up @@ -645,6 +648,7 @@ proc initFullNode(
node.blockProcessor = blockProcessor
node.consensusManager = consensusManager
node.requestManager = requestManager
node.validatorCustody = validatorCustody
node.syncManager = syncManager
node.backfiller = backfiller
node.untrustedManager = untrustedManager
Expand Down Expand Up @@ -1322,8 +1326,11 @@ func getSyncCommitteeSubnets(node: BeaconNode, epoch: Epoch): SyncnetBits =
subnets + node.getNextSyncCommitteeSubnets(epoch)

func readCustodyGroupSubnets(node: BeaconNode): uint64 =
let vcus_count = node.dataColumnQuarantine.custody_columns.lenu64
if node.config.peerdasSupernode:
node.dag.cfg.NUMBER_OF_CUSTODY_GROUPS.uint64
elif vcus_count > node.dag.cfg.CUSTODY_REQUIREMENT.uint64:
vcus_count
else:
node.dag.cfg.CUSTODY_REQUIREMENT.uint64

Expand Down Expand Up @@ -2277,6 +2284,7 @@ proc run(node: BeaconNode) {.raises: [CatchableError].} =
if node.network.getBeaconTime().slotOrZero.epoch >=
node.network.cfg.FULU_FORK_EPOCH:
node.requestManager.switchToColumnLoop()
node.validatorCustody.start()
node.syncOverseer.start()

waitFor node.updateGossipStatus(wallSlot)
Expand Down
10 changes: 4 additions & 6 deletions beacon_chain/spec/peerdas_helpers.nim
Original file line number Diff line number Diff line change
Expand Up @@ -486,13 +486,11 @@ proc verify_data_column_sidecar_kzg_proofs*(sidecar: DataColumnSidecar):

ok()

# https://github.com/ethereum/consensus-specs/blob/v1.5.0-beta.3/specs/fulu/das-core.md#validator-custody
func get_validators_custody_requirement*(cfg: RuntimeConfig, state: fulu.BeaconState,
validator_indices: openArray[ValidatorIndex]):
# https://github.com/ethereum/consensus-specs/blob/v1.5.0/specs/fulu/validator.md#validator-custody
func get_validators_custody_requirement*(cfg: RuntimeConfig,
hstate: ForkyHashedBeaconState,
total_node_balance: Gwei):
uint64 =
var total_node_balance: Gwei
for index in validator_indices:
total_node_balance += state.balances[index]
let count = total_node_balance div BALANCE_PER_ADDITIONAL_CUSTODY_GROUP
min(max(count.uint64, cfg.VALIDATOR_CUSTODY_REQUIREMENT),
cfg.NUMBER_OF_CUSTODY_GROUPS.uint64)
6 changes: 3 additions & 3 deletions beacon_chain/sync/request_manager.nim
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ const
DATA_COLUMN_GOSSIP_WAIT_TIME_NS = 2 * 1_000_000_000
## How long to wait for blobs to arri ve over gossip before fetching.

POLL_INTERVAL = 1.seconds
POLL_INTERVAL* = 1.seconds

type
BlockVerifierFn = proc(
Expand Down Expand Up @@ -169,7 +169,7 @@ func checkResponseSubset(idList: seq[BlobIdentifier],
return false
true

func checkColumnResponse(idList: seq[DataColumnsByRootIdentifier],
func checkColumnResponse*(idList: seq[DataColumnsByRootIdentifier],
columns: openArray[ref DataColumnSidecar]): bool =
for colresp in columns:
let block_root =
Expand Down Expand Up @@ -260,7 +260,7 @@ proc requestBlocksByRoot(rman: RequestManager, items: seq[Eth2Digest]) {.async:
if not(isNil(peer)):
rman.network.peerPool.release(peer)

func cmpSidecarIndexes(x, y: ref BlobSidecar | ref DataColumnSidecar): int =
func cmpSidecarIndexes*(x, y: ref BlobSidecar | ref DataColumnSidecar): int =
cmp(x[].index, y[].index)

proc fetchBlobsFromNetwork(self: RequestManager,
Expand Down
243 changes: 243 additions & 0 deletions beacon_chain/sync/validator_custody.nim
Original file line number Diff line number Diff line change
@@ -0,0 +1,243 @@
# beacon_chain
# Copyright (c) 2018-2025 Status Research & Development GmbH
# Licensed and distributed under either of
# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT).
# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0).
# at your option. This file may not be copied, modified, or distributed except according to those terms.

{.push raises: [].}

import chronos, chronicles
import ssz_serialization/[proofs, types]
import
../validators/action_tracker,
../spec/[beaconstate, forks, network, helpers, peerdas_helpers],
../networking/eth2_network,
../consensus_object_pools/blockchain_dag,
../consensus_object_pools/block_dag,
../consensus_object_pools/data_column_quarantine,
"."/[request_manager, sync_manager, sync_protocol]

from std/algorithm import sort
from std/sequtils import toSeq
from ../beacon_clock import GetBeaconTimeFn

logScope: topics = "validator_custody"

const
PARALLEL_REFILL_REQUESTS = 32

type
InhibitFn = proc: bool {.gcsafe, raises: [].}

ValidatorCustody* = object
network*: Eth2Node
dag*: ChainDAGRef
supernode*: bool
cache*: StateCache
getLocalHeadSlot*: GetSlotCallback
older_column_set*: HashSet[ColumnIndex]
newer_column_set*: HashSet[ColumnIndex]
global_refill_list*: HashSet[DataColumnIdentifier]
requested_columns*: seq[DataColumnsByRootIdentifier]
getBeaconTime: GetBeaconTimeFn
inhibit: InhibitFn
dataColumnQuarantine: ref DataColumnQuarantine
validatorCustodyLoopFuture: Future[void].Raising([CancelledError])

ValidatorCustodyRef* = ref ValidatorCustody

proc init*(T: type ValidatorCustodyRef, network: Eth2Node,
dag: ChainDAGRef,
supernode: bool,
getLocalHeadSlotCb: GetSlotCallback,
older_column_set: HashSet[ColumnIndex],
getBeaconTime: GetBeaconTimeFn,
inhibit: InhibitFn,
dataColumnQuarantine: ref DataColumnQuarantine): ValidatorCustodyRef =
let localHeadSlot = getLocalHeadSlotCb
var cache = StateCache()
(ValidatorCustodyRef)(
network: network,
dag: dag,
supernode: supernode,
cache: cache,
getLocalHeadSlot: getLocalHeadSlotCb,
older_column_set: older_column_set,
getBeaconTime: getBeaconTime,
inhibit: inhibit,
dataColumnQuarantine: dataColumnQuarantine)

proc detectNewValidatorCustody(vcus: ValidatorCustodyRef): seq[ColumnIndex] =
var
diff_set: HashSet[ColumnIndex]
withState(vcus.dag.headState):
when consensusFork >= ConsensusFork.Fulu:
let total_node_balance =
get_total_active_balance(forkyState.data, vcus.cache)
let vcustody =
vcus.dag.cfg.get_validators_custody_requirement(forkyState, total_node_balance)

let
newer_columns =
vcus.dag.cfg.resolve_columns_from_custody_groups(
vcus.network.nodeId,
max(vcus.dag.cfg.SAMPLES_PER_SLOT.uint64,
vcustody))
newer_column_set = newer_columns.toHashSet()

# update data column quarantine custody requirements
vcus.dataColumnQuarantine[].custody_columns =
newer_columns

# check which custody set is larger
if newer_column_set.len > vcus.older_column_set.len:
diff_set = newer_column_set.difference(vcus.older_column_set)
vcus.newer_column_set = newer_column_set

toSeq(diff_set)

proc makeRefillList(vcus: ValidatorCustodyRef, diff: seq[ColumnIndex]) =
let
slot = vcus.getLocalHeadSlot()

let dataColumnRefillEpoch = (slot.epoch -
vcus.dag.cfg.MIN_EPOCHS_FOR_BLOB_SIDECARS_REQUESTS - 1)
if slot.is_epoch() and dataColumnRefillEpoch >= vcus.dag.cfg.FULU_FORK_EPOCH:
var blocks: array[SLOTS_PER_EPOCH.int, BlockId]
let startIndex = vcus.dag.getBlockRange(
dataColumnRefillEpoch.start_slot, blocks.toOpenArray(0, SLOTS_PER_EPOCH - 1))
for i in startIndex..<SLOTS_PER_EPOCH:
let blck = vcus.dag.getForkedBlock(blocks[int(i)]).valueOr: continue
withBlck(blck):
# No need to check for fork version, as this loop is triggered post Fulu
let entry1 =
DataColumnsByRootIdentifier(block_root: blocks[int(i)].root,
indices: DataColumnIndices.init(diff))
vcus.requested_columns.add entry1
for column in vcus.newer_column_set:
let entry2 =
DataColumnIdentifier(block_root: blocks[int(i)].root,
index: ColumnIndex(column))
vcus.global_refill_list.incl(entry2)

proc checkIntersectingCustody(vcus: ValidatorCustodyRef,
peer: Peer): seq[DataColumnsByRootIdentifier] =
var columnList: seq[DataColumnsByRootIdentifier]

# Fetch the remote custody count
let remoteCustodyGroupCount =
peer.lookupCgcFromPeer()

# Extract remote peer's nodeID from peerID
# Fetch custody columns form remote peer
let
remoteNodeId = fetchNodeIdFromPeerId(peer)
remoteCustodyColumns =
vcus.dag.cfg.resolve_columns_from_custody_groups(
remoteNodeId,
max(vcus.dag.cfg.SAMPLES_PER_SLOT.uint64,
remoteCustodyGroupCount))
for request_item in vcus.requested_columns:
var colIds: seq[ColumnIndex]
for cindex in request_item.indices:
let lookup = DataColumnIdentifier(block_root: request_item.block_root,
index: cindex)
if lookup notin vcus.global_refill_list and cindex in remoteCustodyColumns:
colIds.add cindex
columnList.add DataColumnsByRootIdentifier(block_root: request_item.block_root,
indices: DataColumnIndices.init(colIds))

columnList

proc refillDataColumnsFromNetwork(vcus: ValidatorCustodyRef)
{.async: (raises: [CancelledError]).} =
var peer = await vcus.network.peerPool.acquire()
let colIdList = vcus.checkIntersectingCustody(peer)
try:
if colIdList.len > 0:
debug "Requesting data columns by root for refill", peer = peer,
columns = shortLog(colIdList), peer_score = peer.getScore()
let columns =
await dataColumnSidecarsByRoot(peer, DataColumnsByRootIdentifierList colIdList)
if columns.isOk:
var ucolumns = columns.get().asSeq()
ucolumns.sort(cmpSidecarIndexes)
if not checkColumnResponse(colIdList, ucolumns):
debug "Response to columns by root is not a subset",
peer = peer, columns = shortLog(colIdList), ucolumns = len(ucolumns)
peer.updateScore(PeerScoreBadResponse)
return
for col in ucolumns:
let
block_root =
hash_tree_root(col[].signed_block_header.message)
exclude =
DataColumnIdentifier(block_root: block_root,
index: col[].index)
vcus.global_refill_list.excl(exclude)
# write new columns to database, no need of BlockVerifier
# in this scenario as the columns historically did pass DA,
# and did meet the historical custody requirements
vcus.dag.db.putDataColumnSidecar(col[])

else:
debug "Data columns by root request not done, peer doesn't have custody column",
peer = peer, columns = shortLog(colIdList), err = columns.error()
peer.updateScore(PeerScoreNoValues)

finally:
if not(isNil(peer)):
vcus.network.peerPool.release(peer)

proc validatorCustodyColumnLoop(
vcus: ValidatorCustodyRef) {.async: (raises: [CancelledError]).} =

while true:
let diff = vcus.detectNewValidatorCustody()

await sleepAsync(POLL_INTERVAL)
if diff.len == 0:
# Validator custody same as previous interval
continue

if vcus.inhibit():
continue

vcus.makeRefillList(diff)

if vcus.global_refill_list.len != 0:
debug "Requesting detected missing data columns for refill",
columns = shortLog(vcus.requested_columns)
let start = SyncMoment.now(0)
var workers:
array[PARALLEL_REFILL_REQUESTS, Future[void].Raising([CancelledError])]
for i in 0..<PARALLEL_REFILL_REQUESTS:
workers[i] = vcus.refillDataColumnsFromNetwork()

await allFutures(workers)
let finish = SyncMoment.now(uint64(len(vcus.global_refill_list)))

debug "Validator custody backfill tick",
backfill_speed = speed(start, finish)

else:
## Done with column refilling
## hence now advertise the updated cgc count
## in ENR and metadata.
if vcus.older_column_set.len != vcus.newer_column_set.len:
# Newer cgc count can also drop from previous if validators detach
vcus.network.loadCgcnetMetadataAndEnr(CgcCount vcus.newer_column_set.lenu64)
# Make the newer set older
vcus.older_column_set = vcus.newer_column_set
# Clear the newer for future validator custody detection
vcus.newer_column_set.clear()

proc start*(vcus: ValidatorCustodyRef) =
## Start Validator Custody detection loop
vcus.validatorCustodyLoopFuture = vcus.validatorCustodyColumnLoop()

proc stop*(vcus: ValidatorCustodyRef) =
## Stop Request Manager's loop.
if not(isNil(vcus.validatorCustodyLoopFuture)):
vcus.validatorCustodyLoopFuture.cancelSoon()