diff --git a/beacon_chain/gossip_processing/gossip_validation.nim b/beacon_chain/gossip_processing/gossip_validation.nim index 080dfdb634..ebcd9b4991 100644 --- a/beacon_chain/gossip_processing/gossip_validation.nim +++ b/beacon_chain/gossip_processing/gossip_validation.nim @@ -16,7 +16,7 @@ import # Internals ../spec/[ beaconstate, state_transition_block, forks, - helpers, network, signatures, eip7594_helpers], + helpers, network, signatures, peerdas_helpers], ../consensus_object_pools/[ attestation_pool, blockchain_dag, blob_quarantine, block_quarantine, data_column_quarantine, spec_cache, light_client_pool, sync_committee_msg_pool, @@ -496,7 +496,7 @@ proc validateBlobSidecar*( # https://github.com/ethereum/consensus-specs/blob/v1.5.0-alpha.8/specs/_features/eip7594/p2p-interface.md#data_column_sidecar_subnet_id proc validateDataColumnSidecar*( dag: ChainDAGRef, quarantine: ref Quarantine, - dataColumnQuarantine: ref DataColumnQuarantine, + dataColumnQuarantine: ref DataColumnQuarantine, data_column_sidecar: DataColumnSidecar, wallTime: BeaconTime, subnet_id: uint64): Result[void, ValidationError] = @@ -508,14 +508,14 @@ proc validateDataColumnSidecar*( if not (data_column_sidecar.index < NUMBER_OF_COLUMNS): return dag.checkedReject("DataColumnSidecar: The sidecar's index should be consistent with NUMBER_OF_COLUMNS") - # [REJECT] The sidecar is for the correct subnet + # [REJECT] The sidecar is for the correct subnet # -- i.e. `compute_subnet_for_data_column_sidecar(blob_sidecar.index) == subnet_id`. if not (compute_subnet_for_data_column_sidecar(data_column_sidecar.index) == subnet_id): return dag.checkedReject("DataColumnSidecar: The sidecar is not for the correct subnet") - # [IGNORE] The sidecar is not from a future slot - # (with a `MAXIMUM_GOSSIP_CLOCK_DISPARITY` allowance) -- i.e. validate that - # `block_header.slot <= current_slot`(a client MAY queue future sidecars for + # [IGNORE] The sidecar is not from a future slot + # (with a `MAXIMUM_GOSSIP_CLOCK_DISPARITY` allowance) -- i.e. validate that + # `block_header.slot <= current_slot`(a client MAY queue future sidecars for # processing at the appropriate slot). if not (block_header.slot <= (wallTime + MAXIMUM_GOSSIP_CLOCK_DISPARITY).slotOrZero): @@ -608,7 +608,7 @@ proc validateDataColumnSidecar*( data_column_sidecar.signed_block_header.signature): return dag.checkedReject("DataColumnSidecar: Invalid proposer signature") - # [REJECT] The sidecar's column data is valid as + # [REJECT] The sidecar's column data is valid as # verified by `verify_data_column_kzg_proofs(sidecar)` block: let r = check_data_column_sidecar_kzg_proofs(data_column_sidecar) diff --git a/beacon_chain/networking/eth2_discovery.nim b/beacon_chain/networking/eth2_discovery.nim index 8bb648dbab..e146d707f5 100644 --- a/beacon_chain/networking/eth2_discovery.nim +++ b/beacon_chain/networking/eth2_discovery.nim @@ -11,7 +11,7 @@ import std/[algorithm, sequtils], chronos, chronicles, eth/p2p/discoveryv5/[enr, protocol, node, random2], - ../spec/datatypes/altair, + ../spec/datatypes/[altair, fulu], ../spec/eth2_ssz_serialization, ".."/[conf, conf_light_client] @@ -127,6 +127,7 @@ proc queryRandom*( forkId: ENRForkID, wantedAttnets: AttnetBits, wantedSyncnets: SyncnetBits, + wantedCscnets: CscBits, minScore: int): Future[seq[Node]] {.async: (raises: [CancelledError]).} = ## Perform a discovery query for a random target ## (forkId) and matching at least one of the attestation subnets. @@ -151,13 +152,26 @@ proc queryRandom*( if not forkId.isCompatibleForkId(peerForkId): continue + let cscCountBytes = n.record.get(enrCustodySubnetCountField, seq[byte]) + if cscCountBytes.isOk(): + let cscCountNode = + try: + SSZ.decode(cscCountBytes.get(), uint8) + except SerializationError as e: + debug "Could not decode the csc ENR field of peer", + peer = n.record.toURI(), exception = e.name, msg = e.msg + continue + + if wantedCscnets.countOnes().uint8 == cscCountNode: + score += 1 + let attnetsBytes = n.record.get(enrAttestationSubnetsField, seq[byte]) if attnetsBytes.isOk(): let attnetsNode = try: SSZ.decode(attnetsBytes.get(), AttnetBits) except SerializationError as e: - debug "Could not decode the attnets ERN bitfield of peer", + debug "Could not decode the attnets ENR bitfield of peer", peer = n.record.toURI(), exception = e.name, msg = e.msg continue diff --git a/beacon_chain/networking/eth2_network.nim b/beacon_chain/networking/eth2_network.nim index 6248a5f6fa..dd8f1f01ad 100644 --- a/beacon_chain/networking/eth2_network.nim +++ b/beacon_chain/networking/eth2_network.nim @@ -1505,7 +1505,8 @@ proc trimConnections(node: Eth2Node, count: int) = inc(nbc_cycling_kicked_peers) if toKick <= 0: return -proc getLowSubnets(node: Eth2Node, epoch: Epoch): (AttnetBits, SyncnetBits) = +proc getLowSubnets(node: Eth2Node, epoch: Epoch): + (AttnetBits, SyncnetBits, CscBits) = # Returns the subnets required to have a healthy mesh # The subnets are computed, to, in order: # - Have 0 subnet with < `dLow` peers from topic subscription @@ -1570,7 +1571,11 @@ proc getLowSubnets(node: Eth2Node, epoch: Epoch): (AttnetBits, SyncnetBits) = if epoch + 1 >= node.cfg.ALTAIR_FORK_EPOCH: findLowSubnets(getSyncCommitteeTopic, SyncSubcommitteeIndex, SYNC_COMMITTEE_SUBNET_COUNT) else: - default(SyncnetBits) + default(SyncnetBits), + if epoch >= node.cfg.FULU_FORK_EPOCH: + findLowSubnets(getDataColumnSidecarTopic, uint64, (DATA_COLUMN_SIDECAR_SUBNET_COUNT).int) + else: + default(CscBits) ) proc runDiscoveryLoop(node: Eth2Node) {.async: (raises: [CancelledError]).} = @@ -1579,23 +1584,29 @@ proc runDiscoveryLoop(node: Eth2Node) {.async: (raises: [CancelledError]).} = while true: let currentEpoch = node.getBeaconTime().slotOrZero.epoch - (wantedAttnets, wantedSyncnets) = node.getLowSubnets(currentEpoch) + (wantedAttnets, wantedSyncnets, wantedCscnets) = node.getLowSubnets(currentEpoch) wantedAttnetsCount = wantedAttnets.countOnes() wantedSyncnetsCount = wantedSyncnets.countOnes() + wantedCscnetsCount = wantedCscnets.countOnes() outgoingPeers = node.peerPool.lenCurrent({PeerType.Outgoing}) targetOutgoingPeers = max(node.wantedPeers div 10, 3) if wantedAttnetsCount > 0 or wantedSyncnetsCount > 0 or - outgoingPeers < targetOutgoingPeers: + wantedCscnetsCount > 0 or outgoingPeers < targetOutgoingPeers: let minScore = - if wantedAttnetsCount > 0 or wantedSyncnetsCount > 0: + if wantedAttnetsCount > 0 or wantedSyncnetsCount > 0 or + wantedCscnetsCount > 0: 1 else: 0 discoveredNodes = await node.discovery.queryRandom( - node.discoveryForkId, wantedAttnets, wantedSyncnets, minScore) + node.discoveryForkId, + wantedAttnets, + wantedSyncnets, + wantedCscnets, + minScore) let newPeers = block: var np = newSeq[PeerAddr]() diff --git a/beacon_chain/nimbus_beacon_node.nim b/beacon_chain/nimbus_beacon_node.nim index e849ca6a98..a6898dd3aa 100644 --- a/beacon_chain/nimbus_beacon_node.nim +++ b/beacon_chain/nimbus_beacon_node.nim @@ -21,7 +21,7 @@ import ./spec/datatypes/[altair, bellatrix, phase0], ./spec/[ deposit_snapshots, engine_authentication, weak_subjectivity, - eip7594_helpers], + peerdas_helpers], ./sync/[sync_protocol, light_client_protocol, sync_overseer], ./validators/[keystore_management, beacon_validators], "."/[ @@ -535,7 +535,7 @@ proc initFullNode( processor: processor, network: node.network) requestManager = RequestManager.init( - node.network, supernode, custody_columns_set, dag.cfg.DENEB_FORK_EPOCH, + node.network, supernode, custody_columns_set, dag.cfg.DENEB_FORK_EPOCH, getBeaconTime, (proc(): bool = syncManager.inProgress), quarantine, blobQuarantine, dataColumnQuarantine, rmanBlockVerifier, rmanBlockLoader, rmanBlobLoader, rmanDataColumnLoader) diff --git a/beacon_chain/spec/network.nim b/beacon_chain/spec/network.nim index 3dabaa73af..fedce7ac21 100644 --- a/beacon_chain/spec/network.nim +++ b/beacon_chain/spec/network.nim @@ -235,3 +235,13 @@ func getSyncSubnets*( iterator blobSidecarTopics*(forkDigest: ForkDigest): string = for subnet_id in BlobId: yield getBlobSidecarTopic(forkDigest, subnet_id) + +# https://github.com/ethereum/consensus-specs/blob/v1.5.0-alpha.10/specs/fulu/p2p-interface.md#data_column_sidecar_subnet_id +func getDataColumnSidecarTopic*(forkDigest: ForkDigest, + subnet_id: uint64): string = + eth2Prefix(forkDigest) & "data_column_sidecar_" & $subnet_id & "/ssz_snappy" + +iterator dataColumnSidecarTopics*(forkDigest: ForkDigest, + targetSubnetCount: uint64): string = + for subnet_id in 0'u64..