Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 61 additions & 17 deletions beacon_chain/consensus_object_pools/attestation_pool.nim
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ const
type
OnPhase0AttestationCallback =
proc(data: phase0.Attestation) {.gcsafe, raises: [].}
OnElectraAttestationCallback =
proc(data: electra.Attestation) {.gcsafe, raises: [].}
OnSingleAttestationCallback =
proc(data: SingleAttestation) {.gcsafe, raises: [].}

Validation[CVBType] = object
## Validations collect a set of signatures for a distict attestation - in
Expand Down Expand Up @@ -96,7 +96,7 @@ type
## sequence based on validator indices

onPhase0AttestationAdded: OnPhase0AttestationCallback
onElectraAttestationAdded: OnElectraAttestationCallback
onSingleAttestationAdded: OnSingleAttestationCallback

logScope: topics = "attpool"

Expand All @@ -106,7 +106,7 @@ declareGauge attestation_pool_block_attestation_packing_time,
proc init*(T: type AttestationPool, dag: ChainDAGRef,
quarantine: ref Quarantine,
onPhase0Attestation: OnPhase0AttestationCallback = nil,
onElectraAttestation: OnElectraAttestationCallback = nil): T =
onSingleAttestation: OnSingleAttestationCallback = nil): T =
## Initialize an AttestationPool from the dag `headState`
## The `finalized_root` works around the finalized_checkpoint of the genesis block
## holding a zero_root.
Expand Down Expand Up @@ -182,7 +182,7 @@ proc init*(T: type AttestationPool, dag: ChainDAGRef,
quarantine: quarantine,
forkChoice: forkChoice,
onPhase0AttestationAdded: onPhase0Attestation,
onElectraAttestationAdded: onElectraAttestation
onSingleAttestationAdded: onSingleAttestation
)

proc addForkChoiceVotes(
Expand Down Expand Up @@ -351,13 +351,12 @@ func covers(

proc addAttestation(
entry: var AttestationEntry,
attestation: phase0.Attestation | electra.Attestation,
attestation: phase0.Attestation | electra.Attestation, _: int,
signature: CookedSig): bool =
logScope:
attestation = shortLog(attestation)

let
singleIndex = oneIndex(attestation.aggregation_bits)
let singleIndex = oneIndex(attestation.aggregation_bits)

if singleIndex.isSome():
if singleIndex.get() in entry.singles:
Expand Down Expand Up @@ -392,6 +391,28 @@ proc addAttestation(

true

proc addAttestation(
entry: var AttestationEntry, attestation: SingleAttestation,
index_in_committee: int,
signature: CookedSig): bool =
logScope:
attestation = shortLog(attestation)

if index_in_committee in entry.singles:
trace "SingleAttestation already seen",
singles = entry.singles.len(),
aggregates = entry.aggregates.len()

return false

debug "SingleAttestation resolved",
singles = entry.singles.len(),
aggregates = entry.aggregates.len()

entry.singles[index_in_committee] = signature

true

func getAttestationCandidateKey(
data: AttestationData,
committee_index: Opt[CommitteeIndex]): Eth2Digest =
Expand All @@ -403,7 +424,8 @@ func getAttestationCandidateKey(
# i.e. no committees selected, so it can't be an actual Electra attestation
hash_tree_root(data)
else:
hash_tree_root([hash_tree_root(data), hash_tree_root(committee_index.get.uint64)])
hash_tree_root([hash_tree_root(data),
hash_tree_root(committee_index.get.uint64)])

func getAttestationCandidateKey(
attestationDataRoot: Eth2Digest, committee_index: CommitteeIndex):
Expand All @@ -412,9 +434,9 @@ func getAttestationCandidateKey(

proc addAttestation*(
pool: var AttestationPool,
attestation: phase0.Attestation | electra.Attestation,
attesting_indices: openArray[ValidatorIndex],
signature: CookedSig, wallTime: BeaconTime) =
attestation: phase0.Attestation | electra.Attestation | SingleAttestation,
attesting_indices: openArray[ValidatorIndex], beacon_committee_len: int,
index_in_committee: int, signature: CookedSig, wallTime: BeaconTime) =
## Add an attestation to the pool, assuming it's been validated already.
##
## Assuming the votes in the attestation have not already been seen, the
Expand Down Expand Up @@ -445,12 +467,12 @@ proc addAttestation*(
let attestation_data_root = getAttestationCandidateKey(entry.data, committee_index)

attCandidates[candidateIdx.get()].withValue(attestation_data_root, entry) do:
if not addAttestation(entry[], attestation, signature):
if not addAttestation(entry[], attestation, index_in_committee, signature):
return
do:
if not addAttestation(
attCandidates[candidateIdx.get()].mgetOrPut(attestation_data_root, entry),
attestation, signature):
attestation, index_in_committee, signature):
# Returns from overall function, not only template
return

Expand All @@ -469,7 +491,7 @@ proc addAttestation*(
template addAttToPool(_: electra.Attestation) {.used.} =
let
committee_index = get_committee_index_one(attestation.committee_bits).expect("TODO")
data = AttestationData(
data = AttestationData(
slot: attestation.data.slot,
index: uint64 committee_index,
beacon_block_root: attestation.data.beacon_block_root,
Expand All @@ -483,9 +505,31 @@ proc addAttestation*(
attestation.data.slot, attesting_indices,
attestation.data.beacon_block_root, wallTime)

# There does not seem to be an SSE stream event corresponding to this,
# because both attestation and single_attestation specifically specify
# the `beacon_attestation_{subnet_id}` topic and that in not possible,
# for this type, in Electra because this case is always an aggregate.

template addAttToPool(_: SingleAttestation) {.used.} =
let
data = AttestationData(
slot: attestation.data.slot,
index: uint64 attestation.committee_index,
beacon_block_root: attestation.data.beacon_block_root,
source: attestation.data.source,
target: attestation.data.target)
newAttEntry = ElectraAttestationEntry(
data: data, committee_len: beacon_committee_len)
addAttToPool(
pool.electraCandidates, newAttEntry,
Opt.some attestation.committee_index.CommitteeIndex)
pool.addForkChoiceVotes(
attestation.data.slot, attesting_indices,
attestation.data.beacon_block_root, wallTime)

# Send notification about new attestation via callback.
if not(isNil(pool.onElectraAttestationAdded)):
pool.onElectraAttestationAdded(attestation)
if not(isNil(pool.onSingleAttestationAdded)):
pool.onSingleAttestationAdded(attestation)

addAttToPool(attestation)

Expand Down
9 changes: 5 additions & 4 deletions beacon_chain/consensus_object_pools/spec_cache.nim
Original file line number Diff line number Diff line change
Expand Up @@ -173,21 +173,22 @@ iterator get_attesting_indices*(
yield validator

iterator get_attesting_indices*(
dag: ChainDAGRef, attestation: electra.TrustedAttestation,
dag: ChainDAGRef,
attestation: electra.Attestation | electra.TrustedAttestation,
on_chain: static bool): ValidatorIndex =
block gaiBlock: # `return` is not allowed in an inline iterator
let
slot =
check_attestation_slot_target(attestation.data).valueOr:
warn "Invalid attestation slot in trusted attestation",
warn "Invalid attestation slot in attestation",
attestation = shortLog(attestation)
doAssert strictVerification notin dag.updateFlags
break gaiBlock
blck =
dag.getBlockRef(attestation.data.beacon_block_root).valueOr:
# Attestation block unknown - this is fairly common because we
# discard alternative histories on restart
debug "Pruned block in trusted attestation",
debug "Pruned block in attestation",
attestation = shortLog(attestation)
break gaiBlock
target =
Expand All @@ -196,7 +197,7 @@ iterator get_attesting_indices*(
# leading to the case where the attestation block root is the
# finalized head (exists as BlockRef) but its target vote has
# already been pruned
notice "Pruned target in trusted attestation",
notice "Pruned target in attestation",
blck = shortLog(blck),
attestation = shortLog(attestation)
doAssert strictVerification notin dag.updateFlags
Expand Down
22 changes: 13 additions & 9 deletions beacon_chain/gossip_processing/eth2_processor.nim
Original file line number Diff line number Diff line change
Expand Up @@ -338,7 +338,7 @@ func clearDoppelgangerProtection*(self: var Eth2Processor) =

proc checkForPotentialDoppelganger(
self: var Eth2Processor,
attestation: phase0.Attestation | electra.Attestation,
attestation: phase0.Attestation | electra.Attestation | SingleAttestation,
attesterIndices: openArray[ValidatorIndex]) =
# Only check for attestations after node launch. There might be one slot of
# overlap in quick intra-slot restarts so trade off a few true negatives in
Expand All @@ -360,8 +360,8 @@ proc checkForPotentialDoppelganger(

proc processAttestation*(
self: ref Eth2Processor, src: MsgSource,
attestation: phase0.Attestation | electra.Attestation, subnet_id: SubnetId,
checkSignature, checkValidator: bool
attestation: phase0.Attestation | SingleAttestation,
subnet_id: SubnetId, checkSignature, checkValidator: bool
): Future[ValidationRes] {.async: (raises: [CancelledError]).} =
var wallTime = self.getCurrentBeaconTime()
let (afterGenesis, wallSlot) = wallTime.toSlot()
Expand All @@ -380,14 +380,14 @@ proc processAttestation*(
debug "Attestation received", delay

# Now proceed to validation
let v =
await self.attestationPool.validateAttestation(
self.batchCrypto, attestation, wallTime, subnet_id, checkSignature)
let v = await self.attestationPool.validateAttestation(
self.batchCrypto, attestation, wallTime, subnet_id, checkSignature)
return if v.isOk():
# Due to async validation the wallTime here might have changed
wallTime = self.getCurrentBeaconTime()

let (attester_index, sig) = v.get()
let (attester_index, beacon_committee_len, index_in_committee, sig) =
v.get()

if checkValidator and (attester_index in self.validatorPool[]):
warn "A validator client has attempted to send an attestation from " &
Expand All @@ -400,7 +400,8 @@ proc processAttestation*(

trace "Attestation validated"
self.attestationPool[].addAttestation(
attestation, [attester_index], sig, wallTime)
attestation, [attester_index], beacon_committee_len,
index_in_committee, sig, wallTime)

self.validatorMonitor[].registerAttestation(
src, wallTime, attestation, attester_index)
Expand Down Expand Up @@ -456,8 +457,11 @@ proc processSignedAggregateAndProof*(

trace "Aggregate validated"

# -1 here is the notional index in committee for which the attestation pool
# only requires external input regarding SingleAttestation messages.
self.attestationPool[].addAttestation(
signedAggregateAndProof.message.aggregate, attesting_indices, sig,
signedAggregateAndProof.message.aggregate, attesting_indices,
signedAggregateAndProof.message.aggregate.aggregation_bits.len, -1, sig,
wallTime)

self.validatorMonitor[].registerAggregate(
Expand Down
Loading