Skip to content

Commit

Permalink
add: column support to ENR, Metadata and Request Manager (#6741)
Browse files Browse the repository at this point in the history
* add csc to enr and metadata

* add column filtering into RequestManager

* nits

* add comment

* resolved reviews 1

* added local custody column set into RequestManager as a field

* faster lookups with hashsets

* fix regressions, fix other reviews, fix response checking for columns

* simpler fix for hashsets
  • Loading branch information
agnxsh authored Dec 15, 2024
1 parent 7647d17 commit 2bf0df7
Show file tree
Hide file tree
Showing 8 changed files with 397 additions and 17 deletions.
60 changes: 60 additions & 0 deletions beacon_chain/consensus_object_pools/block_quarantine.nim
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ const
## Enough for finalization in an alternative fork
MaxBlobless = SLOTS_PER_EPOCH
## Arbitrary
MaxColumnless = SLOTS_PER_EPOCH
## Arbitrary
MaxUnviables = 16 * 1024
## About a day of blocks - most likely not needed but it's quite cheap..

Expand Down Expand Up @@ -58,6 +60,12 @@ type
## block as well. A blobless block inserted into this table must
## have a resolved parent (i.e., it is not an orphan).

columnless*: OrderedTable[Eth2Digest, ForkedSignedBeaconBlock]
## Blocks that we don't have columns for. When we have received
## all columns for this block, we can proceed to resolving the
## block as well. A columnless block inserted into this table must
## have a resolved parent (i.e., it is not an orphan)

unviable*: OrderedTable[Eth2Digest, tuple[]]
## Unviable blocks are those that come from a history that does not
## include the finalized checkpoint we're currently following, and can
Expand Down Expand Up @@ -132,6 +140,10 @@ func removeBlobless*(
quarantine: var Quarantine, signedBlock: ForkySignedBeaconBlock) =
quarantine.blobless.del(signedBlock.root)

func removeColumnless*(
quarantine: var Quarantine, signedBlock: ForkySignedBeaconBlock) =
quarantine.columnless.del(signedBlock.root)

func isViable(
finalizedSlot: Slot, slot: Slot): bool =
# The orphan must be newer than the finalization point so that its parent
Expand Down Expand Up @@ -236,6 +248,18 @@ func cleanupBlobless(quarantine: var Quarantine, finalizedSlot: Slot) =
quarantine.addUnviable k
quarantine.blobless.del k

func cleanupColumnless(quarantine: var Quarantine, finalizedSlot: Slot) =
var toDel: seq[Eth2Digest]

for k, v in quarantine.columnless:
withBlck(v):
if not isViable(finalizedSlot, forkyBlck.message.slot):
toDel.add k

for k in toDel:
quarantine.addUnviable k
quarantine.columnless.del k

func clearAfterReorg*(quarantine: var Quarantine) =
## Clear missing and orphans to start with a fresh slate in case of a reorg
## Unviables remain unviable and are not cleared.
Expand Down Expand Up @@ -325,6 +349,29 @@ proc addBlobless*(
quarantine.missing.del(signedBlock.root)
true

proc addColumnless*(
quarantine: var Quarantine, finalizedSlot: Slot,
signedBlock: fulu.SignedBeaconBlock): bool =

if not isViable(finalizedSlot, signedBlock.message.slot):
quarantine.addUnviable(signedBlock.root)
return false

quarantine.cleanupColumnless(finalizedSlot)

if quarantine.columnless.lenu64 >= MaxColumnless:
var oldest_columnless_key: Eth2Digest
for k in quarantine.columnless.keys:
oldest_columnless_key = k
break
quarantine.blobless.del oldest_columnless_key

debug "block quarantine: Adding columnless", blck = shortLog(signedBlock)
quarantine.columnless[signedBlock.root] =
ForkedSignedBeaconBlock.init(signedBlock)
quarantine.missing.del(signedBlock.root)
true

func popBlobless*(
quarantine: var Quarantine,
root: Eth2Digest): Opt[ForkedSignedBeaconBlock] =
Expand All @@ -334,6 +381,19 @@ func popBlobless*(
else:
Opt.none(ForkedSignedBeaconBlock)

func popColumnless*(
quarantine: var Quarantine,
root: Eth2Digest): Opt[ForkedSignedBeaconBlock] =
var blck: ForkedSignedBeaconBlock
if quarantine.columnless.pop(root, blck):
Opt.some(blck)
else:
Opt.none(ForkedSignedBeaconBlock)

iterator peekBlobless*(quarantine: var Quarantine): ForkedSignedBeaconBlock =
for k, v in quarantine.blobless.mpairs():
yield v

iterator peekColumnless*(quarantine: var Quarantine): ForkedSignedBeaconBlock =
for k, v in quarantine.columnless.mpairs():
yield v
10 changes: 5 additions & 5 deletions beacon_chain/consensus_object_pools/data_column_quarantine.nim
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ func hasDataColumn*(
false

func peekColumnIndices*(quarantine: DataColumnQuarantine,
blck: electra.SignedBeaconBlock):
blck: fulu.SignedBeaconBlock):
seq[ColumnIndex] =
# Peeks into the currently received column indices
# from quarantine, necessary data availability checks
Expand Down Expand Up @@ -110,7 +110,7 @@ func gatherDataColumns*(quarantine: DataColumnQuarantine,

func popDataColumns*(
quarantine: var DataColumnQuarantine, digest: Eth2Digest,
blck: electra.SignedBeaconBlock):
blck: fulu.SignedBeaconBlock):
seq[ref DataColumnSidecar] =
var r: DataColumnSidecars
for idx in quarantine.custody_columns:
Expand All @@ -123,7 +123,7 @@ func popDataColumns*(
r

func hasMissingDataColumns*(quarantine: DataColumnQuarantine,
blck: electra.SignedBeaconBlock): bool =
blck: fulu.SignedBeaconBlock): bool =
# `hasMissingDataColumns` consists of the data columns that,
# have been missed over gossip, also in case of a supernode,
# the method would return missing columns when the supernode
Expand All @@ -149,7 +149,7 @@ func hasMissingDataColumns*(quarantine: DataColumnQuarantine,
return true

func hasEnoughDataColumns*(quarantine: DataColumnQuarantine,
blck: electra.SignedBeaconBlock): bool =
blck: fulu.SignedBeaconBlock): bool =
# `hasEnoughDataColumns` dictates whether there is `enough`
# data columns for a block to be enqueued, ideally for a supernode
# if it receives atleast 50%+ gossip and RPC
Expand All @@ -175,7 +175,7 @@ func hasEnoughDataColumns*(quarantine: DataColumnQuarantine,
return true

func dataColumnFetchRecord*(quarantine: DataColumnQuarantine,
blck: electra.SignedBeaconBlock):
blck: fulu.SignedBeaconBlock):
DataColumnFetchRecord =
var indices: seq[ColumnIndex]
for i in quarantine.custody_columns:
Expand Down
50 changes: 50 additions & 0 deletions beacon_chain/networking/eth2_network.nim
Original file line number Diff line number Diff line change
Expand Up @@ -1647,6 +1647,15 @@ proc runDiscoveryLoop(node: Eth2Node) {.async: (raises: [CancelledError]).} =
# Also, give some time to dial the discovered nodes and update stats etc
await sleepAsync(5.seconds)

proc fetchNodeIdFromPeerId*(peer: Peer): NodeId=
# Convert peer id to node id by extracting the peer's public key
let nodeId =
block:
var key: PublicKey
discard peer.peerId.extractPublicKey(key)
keys.PublicKey.fromRaw(key.skkey.getBytes()).get().toNodeId()
nodeId

proc resolvePeer(peer: Peer) =
# Resolve task which performs searching of peer's public key and recovery of
# ENR using discovery5. We only resolve ENR for peers we know about to avoid
Expand Down Expand Up @@ -2418,6 +2427,33 @@ func announcedENR*(node: Eth2Node): enr.Record =
doAssert node.discovery != nil, "The Eth2Node must be initialized"
node.discovery.localNode.record

proc lookupCscFromPeer*(peer: Peer): uint64 =
# Fetches the custody column count from a remote peer.
# If the peer advertises their custody column count via the `csc` ENR field,
# that value is returned. Otherwise, the default value `CUSTODY_REQUIREMENT`
# is assumed.

let metadata = peer.metadata
if metadata.isOk:
return metadata.get.custody_subnet_count

# Try getting the custody count from ENR if metadata fetch fails.
debug "Could not get csc from metadata, trying from ENR",
peer_id = peer.peerId
let enrOpt = peer.enr
if not enrOpt.isNone:
let enr = enrOpt.get
let enrFieldOpt = enr.get(enrCustodySubnetCountField, seq[byte])
if enrFieldOpt.isOk:
try:
let csc = SSZ.decode(enrFieldOpt.get, uint8)
return csc.uint64
except SszError, SerializationError:
discard # Ignore decoding errors and fallback to default

# Return default value if no valid custody subnet count is found.
return CUSTODY_REQUIREMENT.uint64

func shortForm*(id: NetKeyPair): string =
$PeerId.init(id.pubkey)

Expand Down Expand Up @@ -2579,6 +2615,20 @@ proc updateStabilitySubnetMetadata*(node: Eth2Node, attnets: AttnetBits) =
else:
debug "Stability subnets changed; updated ENR attnets", attnets

proc loadCscnetMetadataAndEnr*(node: Eth2Node, cscnets: CscCount) =
node.metadata.custody_subnet_count = cscnets.uint64
let res =
node.discovery.updateRecord({
enrCustodySubnetCountField: SSZ.encode(cscnets)
})

if res.isErr:
# This should not occur in this scenario as the private key would always
# be the correct one and the ENR will not increase in size
warn "Failed to update the ENR csc field", error = res.error
else:
debug "Updated ENR csc", cscnets

proc updateSyncnetsMetadata*(node: Eth2Node, syncnets: SyncnetBits) =
# https://github.com/ethereum/consensus-specs/blob/v1.5.0-alpha.9/specs/altair/validator.md#sync-committee-subnet-stability
if node.metadata.syncnets == syncnets:
Expand Down
26 changes: 21 additions & 5 deletions beacon_chain/nimbus_beacon_node.nim
Original file line number Diff line number Diff line change
Expand Up @@ -422,6 +422,9 @@ proc initFullNode(
DATA_COLUMN_SIDECAR_SUBNET_COUNT.uint64
else:
CUSTODY_REQUIREMENT.uint64
custody_columns_set =
node.network.nodeId.get_custody_columns_set(max(SAMPLES_PER_SLOT.uint64,
localCustodySubnets))
consensusManager = ConsensusManager.new(
dag, attestationPool, quarantine, node.elManager,
ActionTracker.init(node.network.nodeId, config.subscribeAllSubnets),
Expand Down Expand Up @@ -478,6 +481,13 @@ proc initFullNode(
Opt.some blob_sidecar
else:
Opt.none(ref BlobSidecar)
rmanDataColumnLoader = proc(
columnId: DataColumnIdentifier): Opt[ref DataColumnSidecar] =
var data_column_sidecar = DataColumnSidecar.new()
if dag.db.getDataColumnSidecar(columnId.block_root, columnId.index, data_column_sidecar[]):
Opt.some data_column_sidecar
else:
Opt.none(ref DataColumnSidecar)

processor = Eth2Processor.new(
config.doppelgangerDetection,
Expand Down Expand Up @@ -525,10 +535,10 @@ proc initFullNode(
processor: processor,
network: node.network)
requestManager = RequestManager.init(
node.network, dag.cfg.DENEB_FORK_EPOCH, getBeaconTime,
(proc(): bool = syncManager.inProgress),
quarantine, blobQuarantine, rmanBlockVerifier,
rmanBlockLoader, rmanBlobLoader)
node.network, supernode, custody_columns_set, dag.cfg.DENEB_FORK_EPOCH,
getBeaconTime, (proc(): bool = syncManager.inProgress),
quarantine, blobQuarantine, dataColumnQuarantine, rmanBlockVerifier,
rmanBlockLoader, rmanBlobLoader, rmanDataColumnLoader)

# As per EIP 7594, the BN is now categorised into a
# `Fullnode` and a `Supernode`, the fullnodes custodies a
Expand All @@ -552,7 +562,13 @@ proc initFullNode(
dataColumnQuarantine[].supernode = supernode
dataColumnQuarantine[].custody_columns =
node.network.nodeId.get_custody_columns(max(SAMPLES_PER_SLOT.uint64,
localCustodySubnets))
localCustodySubnets))

if node.config.subscribeAllSubnets:
node.network.loadCscnetMetadataAndEnr(DATA_COLUMN_SIDECAR_SUBNET_COUNT.uint8)
else:
node.network.loadCscnetMetadataAndEnr(CUSTODY_REQUIREMENT.uint8)

if node.config.lightClientDataServe:
proc scheduleSendingLightClientUpdates(slot: Slot) =
if node.lightClientPool[].broadcastGossipFut != nil:
Expand Down
2 changes: 1 addition & 1 deletion beacon_chain/spec/datatypes/fulu.nim
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ type
seq_number*: uint64
attnets*: AttnetBits
syncnets*: SyncnetBits
custody_subnet_count*: CscCount
custody_subnet_count*: uint64

# https://github.com/ethereum/consensus-specs/blob/v1.5.0-alpha.8/specs/deneb/beacon-chain.md#executionpayload
ExecutionPayload* = object
Expand Down
14 changes: 14 additions & 0 deletions beacon_chain/spec/eip7594_helpers.nim
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,20 @@ func get_custody_columns*(node_id: NodeId,

sortedColumnIndices(ColumnIndex(columns_per_subnet), subnet_ids)

func get_custody_columns_set*(node_id: NodeId,
custody_subnet_count: uint64):
HashSet[ColumnIndex] =
# This method returns a HashSet of column indices,
# the method is specifically relevant while peer filtering
let
subnet_ids =
get_custody_column_subnets(node_id, custody_subnet_count)
const
columns_per_subnet =
NUMBER_OF_COLUMNS div DATA_COLUMN_SIDECAR_SUBNET_COUNT

sortedColumnIndices(ColumnIndex(columns_per_subnet), subnet_ids).toHashSet()

func get_custody_column_list*(node_id: NodeId,
custody_subnet_count: uint64):
List[ColumnIndex, NUMBER_OF_COLUMNS] =
Expand Down
1 change: 1 addition & 0 deletions beacon_chain/spec/network.nim
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ const

enrAttestationSubnetsField* = "attnets"
enrSyncSubnetsField* = "syncnets"
enrCustodySubnetCountField* = "csc"
enrForkIdField* = "eth2"

template eth2Prefix(forkDigest: ForkDigest): string =
Expand Down
Loading

0 comments on commit 2bf0df7

Please sign in to comment.