From d8a8e276e0bdf06d82aca3ceb4626374c6ffc2db Mon Sep 17 00:00:00 2001 From: Enrico Del Fante Date: Tue, 20 May 2025 11:59:08 +0200 Subject: [PATCH 1/3] introduce-poolV2 --- .../AggregatingAttestationPool.java | 14 +- .../AggregatingAttestationPoolV2.java | 547 ++++++++++++++++++ .../attestation/AttestationForkChecker.java | 5 + .../AggregatingAttestationPoolProfiler.java | 58 ++ .../AggregatingAttestationPoolV2Test.java | 233 ++++++++ 5 files changed, 854 insertions(+), 3 deletions(-) create mode 100644 ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/attestation/AggregatingAttestationPoolV2.java create mode 100644 ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/attestation/utils/AggregatingAttestationPoolProfiler.java create mode 100644 ethereum/statetransition/src/test/java/tech/pegasys/teku/statetransition/attestation/AggregatingAttestationPoolV2Test.java diff --git a/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/attestation/AggregatingAttestationPool.java b/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/attestation/AggregatingAttestationPool.java index ce22614d843..3ca1022bc97 100644 --- a/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/attestation/AggregatingAttestationPool.java +++ b/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/attestation/AggregatingAttestationPool.java @@ -15,6 +15,7 @@ import java.util.List; import java.util.Optional; +import java.util.function.Supplier; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.tuweni.bytes.Bytes32; @@ -79,13 +80,19 @@ public abstract void onAttestationsIncludedInBlock( * otherwise. */ protected boolean ensureCommitteesSizeInAttestation(final ValidatableAttestation attestation) { + return ensureCommitteesSizeInAttestation( + attestation, () -> retrieveStateForAttestation(attestation.getData())); + } + + protected boolean ensureCommitteesSizeInAttestation( + final ValidatableAttestation attestation, + final Supplier> stateSupplier) { if (attestation.getCommitteesSize().isPresent() || !attestation.getAttestation().requiresCommitteeBits()) { return true; } - final Optional maybeState = - retrieveStateForAttestation(attestation.getAttestation().getData()); + final Optional maybeState = stateSupplier.get(); if (maybeState.isEmpty()) { return false; } @@ -95,7 +102,8 @@ protected boolean ensureCommitteesSizeInAttestation(final ValidatableAttestation return true; } - private Optional retrieveStateForAttestation(final AttestationData attestationData) { + protected Optional retrieveStateForAttestation( + final AttestationData attestationData) { // we can use the first state of the epoch to get committees for an attestation final MiscHelpers miscHelpers = spec.atSlot(attestationData.getSlot()).miscHelpers(); final Optional maybeEpoch = recentChainData.getCurrentEpoch(); diff --git a/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/attestation/AggregatingAttestationPoolV2.java b/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/attestation/AggregatingAttestationPoolV2.java new file mode 100644 index 00000000000..8a88bc946ee --- /dev/null +++ b/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/attestation/AggregatingAttestationPoolV2.java @@ -0,0 +1,547 @@ +/* + * Copyright Consensys Software Inc., 2025 + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package tech.pegasys.teku.statetransition.attestation; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Suppliers; +import it.unimi.dsi.fastutil.ints.Int2IntMap; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.NavigableMap; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ConcurrentNavigableMap; +import java.util.concurrent.ConcurrentSkipListMap; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.LongSupplier; +import java.util.function.Predicate; +import java.util.function.Supplier; +import java.util.stream.IntStream; +import java.util.stream.Stream; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.tuweni.bytes.Bytes; +import org.apache.tuweni.bytes.Bytes32; +import org.hyperledger.besu.plugin.services.MetricsSystem; +import tech.pegasys.teku.infrastructure.metrics.SettableGauge; +import tech.pegasys.teku.infrastructure.metrics.TekuMetricCategory; +import tech.pegasys.teku.infrastructure.ssz.SszList; +import tech.pegasys.teku.infrastructure.ssz.schema.SszListSchema; +import tech.pegasys.teku.infrastructure.unsigned.UInt64; +import tech.pegasys.teku.spec.Spec; +import tech.pegasys.teku.spec.datastructures.attestation.ValidatableAttestation; +import tech.pegasys.teku.spec.datastructures.operations.Attestation; +import tech.pegasys.teku.spec.datastructures.operations.AttestationData; +import tech.pegasys.teku.spec.datastructures.operations.AttestationSchema; +import tech.pegasys.teku.spec.datastructures.state.beaconstate.BeaconState; +import tech.pegasys.teku.spec.schemas.SchemaDefinitions; +import tech.pegasys.teku.statetransition.attestation.utils.AggregatingAttestationPoolProfiler; +import tech.pegasys.teku.statetransition.attestation.utils.RewardBasedAttestationSorter; +import tech.pegasys.teku.statetransition.attestation.utils.RewardBasedAttestationSorter.PooledAttestationWithRewardInfo; +import tech.pegasys.teku.statetransition.attestation.utils.RewardBasedAttestationSorter.RewardBasedAttestationSorterFactory; +import tech.pegasys.teku.storage.client.RecentChainData; + +/** + * Maintains a pool of attestations. Attestations can be retrieved either for inclusion in a block + * or as an aggregate to publish as part of the naive attestation aggregation algorithm. In both + * cases the returned attestations are aggregated to maximize the number of validators that can be + * included. + * + *

This V2 implementation uses concurrent collections to reduce contention. + */ +public class AggregatingAttestationPoolV2 extends AggregatingAttestationPool { + private static final Logger LOG = LogManager.getLogger(); + + private final ConcurrentMap attestationGroupByDataHash = + new ConcurrentHashMap<>(); + + private final ConcurrentNavigableMap> dataHashBySlot = + new ConcurrentSkipListMap<>(); + + private final SettableGauge sizeGauge; + private final int maximumAttestationCount; + private final AggregatingAttestationPoolProfiler aggregatingAttestationPoolProfiler; + + private final long maxBlockAggregationTimeNanos; + private final long maxTotalBlockAggregationTimeMillis; + private final boolean earlyDropSingleAttestations; + private final boolean parallel; + + private final LongSupplier nanosSupplier; + + private final AtomicInteger size = new AtomicInteger(0); + + private final RewardBasedAttestationSorterFactory rewardBasedAttestationSorterFactory; + + public AggregatingAttestationPoolV2( + final Spec spec, + final RecentChainData recentChainData, + final MetricsSystem metricsSystem, + final int maximumAttestationCount, + final AggregatingAttestationPoolProfiler aggregatingAttestationPoolProfiler, + final int maxBlockAggregationTimeMillis, + final int maxTotalBlockAggregationTimeMillis, + final boolean earlyDropSingleAttestations, + final boolean parallel) { + super(spec, recentChainData); + this.sizeGauge = + SettableGauge.create( + metricsSystem, + TekuMetricCategory.BEACON, + "attestation_pool_size", + "The number of attestations available to be included in proposed blocks"); + this.maximumAttestationCount = maximumAttestationCount; + this.aggregatingAttestationPoolProfiler = aggregatingAttestationPoolProfiler; + this.maxBlockAggregationTimeNanos = maxBlockAggregationTimeMillis * 1_000_000L; + this.maxTotalBlockAggregationTimeMillis = maxTotalBlockAggregationTimeMillis * 1_000_000L; + this.earlyDropSingleAttestations = earlyDropSingleAttestations; + this.parallel = parallel; + this.nanosSupplier = System::nanoTime; + this.rewardBasedAttestationSorterFactory = RewardBasedAttestationSorterFactory.DEFAULT; + } + + @VisibleForTesting + public AggregatingAttestationPoolV2( + final Spec spec, + final RecentChainData recentChainData, + final MetricsSystem metricsSystem, + final int maximumAttestationCount, + final LongSupplier nanosSupplier, + final RewardBasedAttestationSorterFactory rewardBasedAttestationSorterFactory, + final int maxBlockAggregationTimeMillis, + final int maxTotalBlockAggregationTimeMillis) { + super(spec, recentChainData); + this.sizeGauge = + SettableGauge.create( + metricsSystem, + TekuMetricCategory.BEACON, + "attestation_pool_size", + "The number of attestations available to be included in proposed blocks"); + this.maximumAttestationCount = maximumAttestationCount; + this.aggregatingAttestationPoolProfiler = AggregatingAttestationPoolProfiler.NOOP; + this.maxBlockAggregationTimeNanos = + maxBlockAggregationTimeMillis * 1_000_000L; // Integer.MAX_VALUE * 1_000_000L + this.maxTotalBlockAggregationTimeMillis = + maxTotalBlockAggregationTimeMillis * 1_000_000L; // Integer.MAX_VALUE * 1_000_000L + this.earlyDropSingleAttestations = false; + this.parallel = false; + this.nanosSupplier = nanosSupplier; + this.rewardBasedAttestationSorterFactory = rewardBasedAttestationSorterFactory; + } + + // No longer synchronized + @Override + public void add(final ValidatableAttestation attestation) { + final Supplier> cachingStateSupplier = + Suppliers.memoize(() -> retrieveStateForAttestation(attestation.getData())); + + if (!ensureCommitteesSizeInAttestation(attestation, cachingStateSupplier)) { + LOG.debug( + "Committees size couldn't be retrieved for attestation at slot {}, block root {} and target root {}. Will NOT add this attestation to the pool.", + attestation.getData().getSlot(), + attestation.getData().getBeaconBlockRoot(), + attestation.getData().getTarget().getRoot()); + return; + } + + final Optional> validatorIndices = + getValidatorIndices(attestation, cachingStateSupplier); + + if (validatorIndices.isEmpty()) { + LOG.debug( + "Validator indices couldn't be retrieved for attestation at slot {}, block root {} and target root {}. Will NOT add this attestation to the pool.", + attestation.getData().getSlot(), + attestation.getData().getBeaconBlockRoot(), + attestation.getData().getTarget().getRoot()); + return; + } + + getOrCreateAttestationGroup(attestation.getData(), attestation.getCommitteesSize()) + .ifPresent( + attestationGroup -> + attestationGroup.add( + PooledAttestation.fromValidatableAttestation( + attestation, validatorIndices.get()), + attestation.getCommitteeShufflingSeed())); + } + + private Optional> getValidatorIndices( + final ValidatableAttestation attestation, + final Supplier> stateSupplier) { + return attestation + .getIndexedAttestation() + .map(indexedAttestation -> indexedAttestation.getAttestingIndices().asListUnboxed()) + .or( + () -> + stateSupplier + .get() + .map( + state -> + spec.atSlot(attestation.getData().getSlot()) + .getAttestationUtil() + .getAttestingIndices(state, attestation.getAttestation()) + .intStream() + .mapToObj(UInt64::valueOf) + .toList())); + } + + /** + * @param committeesSize Required for aggregating attestations as per EIP-7549 + */ + private Optional getOrCreateAttestationGroup( + final AttestationData attestationData, final Optional committeesSize) { + + final Bytes dataHash = attestationData.hashTreeRoot(); + + dataHashBySlot + .computeIfAbsent(attestationData.getSlot(), __ -> ConcurrentHashMap.newKeySet()) + .add(dataHash); + + final MatchingDataAttestationGroupV2 attestationGroup = + attestationGroupByDataHash.computeIfAbsent( + dataHash, + __ -> + new MatchingDataAttestationGroupV2( + spec, + nanosSupplier, + attestationData, + committeesSize, + earlyDropSingleAttestations)); // Pass spec, data, committeesSize + + return Optional.of(attestationGroup); + } + + @Override + public void onSlot(final UInt64 slot) { + final int currentActualSize = + attestationGroupByDataHash.values().stream() + .mapToInt(MatchingDataAttestationGroupV2::size) + .sum(); + + size.set(currentActualSize); + sizeGauge.set(currentActualSize); + + LOG.trace("Attestation pool size recalculated to {}", currentActualSize); + + if (slot.isGreaterThan(ATTESTATION_RETENTION_SLOTS)) { + final UInt64 firstValidAttestationSlot = slot.minus(ATTESTATION_RETENTION_SLOTS); + removeAttestationsPriorToSlot(firstValidAttestationSlot); + } + + int sizeForPruningCheck = currentActualSize; // Use the size calculated at the start of onSlot + while (dataHashBySlot.size() > 1 && sizeForPruningCheck > maximumAttestationCount) { + LOG.trace( + "V2 Attestation cache at {} (pre-prune estimate) exceeds {}. Pruning...", + sizeForPruningCheck, + maximumAttestationCount); + final UInt64 oldestSlot = dataHashBySlot.firstKey(); + if (oldestSlot == null) { + break; + } + + // Estimate the size reduction (since removeAttestationsPriorToSlot no longer updates 'size') + // This is tricky because group.size() is approximate. + // We might need to actually get the groups to be removed and sum their sizes *before* + // removal. + int estimatedRemovalCount = 0; + final Set hashesToRemove = + dataHashBySlot.getOrDefault(oldestSlot.plus(1), Set.of()); // Check slot *after* oldest + for (final Bytes hash : hashesToRemove) { + MatchingDataAttestationGroupV2 group = attestationGroupByDataHash.get(hash); + if (group != null) { + estimatedRemovalCount += group.size(); + } + } + + removeAttestationsPriorToSlot(oldestSlot.plus(1)); // Remove the items + + if (estimatedRemovalCount == 0) { + // If we estimated 0 removed, or failed to find the slot, break to avoid potential infinite + // loop + LOG.warn( + "Failed to prune oldest slot {} or estimated 0 removals. Skipping further pruning this cycle.", + oldestSlot); + break; + } + sizeForPruningCheck -= estimatedRemovalCount; + } + + aggregatingAttestationPoolProfiler.execute(spec, slot, recentChainData, this); + } + + private void removeAttestationsPriorToSlot(final UInt64 firstValidAttestationSlot) { + final NavigableMap> headMap = + dataHashBySlot.headMap(firstValidAttestationSlot, false); + final List slotsToRemove = List.copyOf(headMap.keySet()); + + if (slotsToRemove.isEmpty()) { + return; + } + + LOG.trace( + "V2 Pruning attestations before slot {}. Slots to remove: {}", + firstValidAttestationSlot, + slotsToRemove.size()); + + for (final UInt64 slot : slotsToRemove) { + final Set dataHashes = dataHashBySlot.remove(slot); + if (dataHashes != null) { + dataHashes.forEach(attestationGroupByDataHash::remove); + } + } + } + + @Override + public void onAttestationsIncludedInBlock( + final UInt64 slot, final Iterable attestations) { + attestations.forEach(attestation -> onAttestationIncludedInBlock(slot, attestation)); + } + + private void onAttestationIncludedInBlock(final UInt64 slot, final Attestation attestation) { + final ValidatableAttestation validatableAttestation = + ValidatableAttestation.from(spec, attestation); + if (!ensureCommitteesSizeInAttestation(validatableAttestation)) { + LOG.debug( + "Attestation at slot {}, block root {} and target root {} has no committee size. Unable to call onAttestationIncludedInBlock.", + attestation.getData().getSlot(), + attestation.getData().getBeaconBlockRoot(), + attestation.getData().getTarget().getRoot()); + return; + } + getOrCreateAttestationGroup(attestation.getData(), validatableAttestation.getCommitteesSize()) + .ifPresent( + attestationGroup -> { + // MatchingDataAttestationGroupV2 must handle concurrency internally + final int numRemoved = + attestationGroup.onAttestationIncludedInBlock(slot, attestation); + if (numRemoved > 0) { + updateSize(-numRemoved); + } + }); + } + + private void updateSize(final int delta) { + if (delta != 0) { + final int currentSize = size.addAndGet(delta); + sizeGauge.set(currentSize); + } + } + + @Override + public int getSize() { + return size.get(); + } + + public static Predicate distinctByDataRoot() { + final Map seen = new ConcurrentHashMap<>(); + return t -> seen.putIfAbsent(t.getAttestation().data().hashTreeRoot(), Boolean.TRUE) == null; + } + + @Override + public SszList getAttestationsForBlock( + final BeaconState stateAtBlockSlot, final AttestationForkChecker forkChecker) { + final UInt64 currentEpoch = spec.getCurrentEpoch(stateAtBlockSlot); + final int previousEpochLimit = spec.getPreviousEpochAttestationCapacity(stateAtBlockSlot); + + final RewardBasedAttestationSorter rewardBasedAttestationSorter = + rewardBasedAttestationSorterFactory.create(spec, stateAtBlockSlot); + final SchemaDefinitions schemaDefinitions = + spec.atSlot(stateAtBlockSlot.getSlot()).getSchemaDefinitions(); + + final SszListSchema attestationsSchema = + schemaDefinitions.getBeaconBlockBodySchema().getAttestationsSchema(); + + final AttestationSchema attestationSchema = + schemaDefinitions.getAttestationSchema(); + + final boolean blockRequiresAttestationsWithCommitteeBits = + attestationSchema.requiresCommitteeBits(); + + final AtomicInteger prevEpochCount = new AtomicInteger(0); + + final long nowNanos = nanosSupplier.getAsLong(); + final long totalTimeLimitNanos = nowNanos + maxTotalBlockAggregationTimeMillis; + final long aggregationTimeLimit = nowNanos + maxBlockAggregationTimeNanos; + + /* -- Aggregation phase -- */ + + var dataHashes = + dataHashBySlot + // We can immediately skip any attestations from the block slot or later + .headMap(stateAtBlockSlot.getSlot(), false) + .descendingMap() // Safe view + .values(); + + var aggregates = + (parallel ? dataHashes.parallelStream() : dataHashes.stream()) + .flatMap( + dataHashSetForSlot -> + streamAggregatesForDataHashesBySlot( + dataHashSetForSlot, // dataHashSetForSlot is expected to be a Concurrent Set + stateAtBlockSlot, + forkChecker, + blockRequiresAttestationsWithCommitteeBits, + aggregationTimeLimit)) + .filter( + attestation -> { + if (spec.computeEpochAtSlot(attestation.data().getSlot()) + .isLessThan(currentEpoch)) { + final int currentCount = prevEpochCount.getAndIncrement(); + return currentCount < previousEpochLimit; + } + return true; + }) + .toList(); + + LOG.info( + "Aggregation phase took {} ms. Produced {} aggregations.", + (nanosSupplier.getAsLong() - nowNanos) / 1_000_000, + aggregates.size()); + + /* -- Sorting phase -- */ + + final List sortedAggregates = + rewardBasedAttestationSorter.sort( + aggregates, Math.toIntExact(attestationsSchema.getMaxLength())); + + /* -- FillUp phase -- */ + + var distintPredicate = distinctByDataRoot(); + final Stream> toBeFilledUpAggregates = + sortedAggregates.stream() + .map( + aggregate -> + distintPredicate.test(aggregate) ? Optional.of(aggregate) : Optional.empty()); + + final List> filledUpAggregates = + (parallel ? toBeFilledUpAggregates.parallel() : toBeFilledUpAggregates) + .peek( + maybeAttestation -> + maybeAttestation.ifPresent( + attestation -> + aggregatingAttestationPoolProfiler.onPreFillUp( + stateAtBlockSlot, attestation))) + .map( + maybeAttestation -> + maybeAttestation.map( + attestation -> fillUpAttestation(attestation, totalTimeLimitNanos))) + .peek( + maybeAttestation -> + maybeAttestation.ifPresent( + attestation -> + aggregatingAttestationPoolProfiler.onPostFillUp( + stateAtBlockSlot, attestation))) + .toList(); + + /* -- Final conversion phase -- */ + + return IntStream.range(0, sortedAggregates.size()) + .mapToObj(i -> filledUpAggregates.get(i).orElse(sortedAggregates.get(i))) + .map(a -> a.getAttestation().toAttestation(attestationSchema)) + .collect(attestationsSchema.collector()); + } + + private PooledAttestationWithRewardInfo fillUpAttestation( + final PooledAttestationWithRewardInfo attestationWithRewards, final long timeLimitNanos) { + if (nanosSupplier.getAsLong() > timeLimitNanos) { + LOG.info("Time limit reached, skipping fillUpAttestation"); + return attestationWithRewards; + } + + var attestation = attestationWithRewards.getAttestation(); + return Optional.ofNullable(attestationGroupByDataHash.get(attestation.data().hashTreeRoot())) + .map( + group -> + attestationWithRewards.withAttestation( + group.fillUpAggregation(attestation, timeLimitNanos))) + .orElse(attestationWithRewards); + } + + private Stream streamAggregatesForDataHashesBySlot( + final Set dataHashSetForSlot, + final BeaconState stateAtBlockSlot, + final AttestationForkChecker forkChecker, + final boolean blockRequiresAttestationsWithCommitteeBits, + final long baseAggregationTimeLimitNanos) { + + return dataHashSetForSlot.stream() + .map(attestationGroupByDataHash::get) + .filter(Objects::nonNull) + .filter(group -> group.isValid(stateAtBlockSlot, spec)) + .filter(forkChecker::areAttestationsFromCorrectForkV2) + .flatMap(group -> group.streamForBlockProduction(baseAggregationTimeLimitNanos)) + .filter( + attestation -> + attestation.pooledAttestation().bits().requiresCommitteeBits() + == blockRequiresAttestationsWithCommitteeBits); + } + + @Override + public List getAttestations( + final Optional maybeSlot, final Optional maybeCommitteeIndex) { + + final Predicate>> filterForSlot = + (entry) -> maybeSlot.map(slot -> entry.getKey().equals(slot)).orElse(true); + + final UInt64 slot = maybeSlot.orElse(recentChainData.getCurrentSlot().orElse(UInt64.ZERO)); + final SchemaDefinitions schemaDefinitions = spec.atSlot(slot).getSchemaDefinitions(); + final AttestationSchema attestationSchema = + schemaDefinitions.getAttestationSchema(); + final boolean requiresCommitteeBits = attestationSchema.requiresCommitteeBits(); + + return dataHashBySlot.descendingMap().entrySet().stream() + .filter(filterForSlot) + .map(Map.Entry::getValue) + .flatMap(Collection::stream) + .map(attestationGroupByDataHash::get) + .filter(Objects::nonNull) + .flatMap( + matchingDataAttestationGroup -> + matchingDataAttestationGroup.streamForApiRequest( + maybeCommitteeIndex, requiresCommitteeBits)) + .map(pooledAttestation -> pooledAttestation.toAttestation(attestationSchema)) + .toList(); + } + + @Override + public Optional createAggregateFor( + final Bytes32 attestationHashTreeRoot, final Optional committeeIndex) { + + final MatchingDataAttestationGroupV2 group = + attestationGroupByDataHash.get(attestationHashTreeRoot); + if (group == null) { + return Optional.empty(); + } + + final SchemaDefinitions schemaDefinitions = + spec.atSlot(group.getAttestationData().getSlot()).getSchemaDefinitions(); + final AttestationSchema attestationSchema = + schemaDefinitions.getAttestationSchema(); + + return group + .streamForAggregationProduction(committeeIndex, Long.MAX_VALUE) + .findFirst() + .map(pooledAttestation -> pooledAttestation.toAttestation(attestationSchema)); + } + + @Override + public void onReorg(final UInt64 commonAncestorSlot) { + attestationGroupByDataHash.values().forEach(group -> group.onReorg(commonAncestorSlot)); + } +} diff --git a/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/attestation/AttestationForkChecker.java b/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/attestation/AttestationForkChecker.java index c650c8644b3..ae98cdd3c40 100644 --- a/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/attestation/AttestationForkChecker.java +++ b/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/attestation/AttestationForkChecker.java @@ -42,4 +42,9 @@ public boolean areAttestationsFromCorrectFork( final MatchingDataAttestationGroup attestationGroup) { return attestationGroup.matchesCommitteeShufflingSeed(validCommitteeShufflingSeeds); } + + public boolean areAttestationsFromCorrectForkV2( + final MatchingDataAttestationGroupV2 attestationGroup) { + return attestationGroup.matchesCommitteeShufflingSeed(validCommitteeShufflingSeeds); + } } diff --git a/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/attestation/utils/AggregatingAttestationPoolProfiler.java b/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/attestation/utils/AggregatingAttestationPoolProfiler.java new file mode 100644 index 00000000000..c5fc6329aaf --- /dev/null +++ b/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/attestation/utils/AggregatingAttestationPoolProfiler.java @@ -0,0 +1,58 @@ +/* + * Copyright Consensys Software Inc., 2025 + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package tech.pegasys.teku.statetransition.attestation.utils; + +import tech.pegasys.teku.infrastructure.unsigned.UInt64; +import tech.pegasys.teku.spec.Spec; +import tech.pegasys.teku.spec.datastructures.state.beaconstate.BeaconState; +import tech.pegasys.teku.statetransition.attestation.AggregatingAttestationPool; +import tech.pegasys.teku.statetransition.attestation.utils.RewardBasedAttestationSorter.PooledAttestationWithRewardInfo; +import tech.pegasys.teku.storage.client.RecentChainData; + +public interface AggregatingAttestationPoolProfiler { + + AggregatingAttestationPoolProfiler NOOP = + new AggregatingAttestationPoolProfiler() { + @Override + public void execute( + final Spec spec, + final UInt64 slot, + final RecentChainData recentChainData, + final AggregatingAttestationPool aggregatingAttestationPool) { + // No-op + } + + @Override + public void onPreFillUp( + final BeaconState stateAtBlockSlot, final PooledAttestationWithRewardInfo attestation) { + // No-op + } + + @Override + public void onPostFillUp( + final BeaconState stateAtBlockSlot, final PooledAttestationWithRewardInfo attestation) { + // No-op + } + }; + + void execute( + Spec spec, + UInt64 slot, + RecentChainData recentChainData, + AggregatingAttestationPool aggregatingAttestationPool); + + void onPreFillUp(BeaconState stateAtBlockSlot, PooledAttestationWithRewardInfo attestation); + + void onPostFillUp(BeaconState stateAtBlockSlot, PooledAttestationWithRewardInfo attestation); +} diff --git a/ethereum/statetransition/src/test/java/tech/pegasys/teku/statetransition/attestation/AggregatingAttestationPoolV2Test.java b/ethereum/statetransition/src/test/java/tech/pegasys/teku/statetransition/attestation/AggregatingAttestationPoolV2Test.java new file mode 100644 index 00000000000..dabed0dddd9 --- /dev/null +++ b/ethereum/statetransition/src/test/java/tech/pegasys/teku/statetransition/attestation/AggregatingAttestationPoolV2Test.java @@ -0,0 +1,233 @@ +/* + * Copyright Consensys Software Inc., 2025 + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package tech.pegasys.teku.statetransition.attestation; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assumptions.assumeThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; +import static tech.pegasys.teku.infrastructure.unsigned.UInt64.ZERO; +import static tech.pegasys.teku.spec.SpecMilestone.ELECTRA; +import static tech.pegasys.teku.spec.SpecMilestone.PHASE0; +import static tech.pegasys.teku.statetransition.attestation.AggregatorUtil.aggregateAttestations; + +import java.util.List; +import java.util.Optional; +import java.util.function.LongSupplier; +import org.hyperledger.besu.metrics.noop.NoOpMetricsSystem; +import org.junit.jupiter.api.TestTemplate; +import tech.pegasys.teku.spec.Spec; +import tech.pegasys.teku.spec.TestSpecContext; +import tech.pegasys.teku.spec.datastructures.attestation.ValidatableAttestation; +import tech.pegasys.teku.spec.datastructures.operations.Attestation; +import tech.pegasys.teku.spec.datastructures.operations.AttestationData; +import tech.pegasys.teku.spec.datastructures.state.beaconstate.BeaconState; +import tech.pegasys.teku.statetransition.attestation.utils.RewardBasedAttestationSorter; +import tech.pegasys.teku.statetransition.attestation.utils.RewardBasedAttestationSorter.PooledAttestationWithRewardInfo; +import tech.pegasys.teku.statetransition.attestation.utils.RewardBasedAttestationSorter.RewardBasedAttestationSorterFactory; +import tech.pegasys.teku.storage.client.RecentChainData; + +@TestSpecContext(milestone = {PHASE0, ELECTRA}) +public class AggregatingAttestationPoolV2Test extends AggregatingAttestationPoolTest { + + @Override + AggregatingAttestationPool instantiatePool( + final Spec spec, final RecentChainData recentChainData, final int maxAttestations) { + return instantiatePool( + spec, + recentChainData, + maxAttestations, + () -> 0L, + RewardBasedAttestationSorterFactory.NOOP, + Integer.MAX_VALUE, + Integer.MAX_VALUE); + } + + AggregatingAttestationPool instantiatePool( + final LongSupplier nanosSupplier, + final int maxBlockAggregationTimeMillis, + final int maxTotalBlockAggregationTimeMillis) { + return instantiatePool( + mockSpec, + mockRecentChainData, + 10, + nanosSupplier, + RewardBasedAttestationSorterFactory.NOOP, + maxBlockAggregationTimeMillis, + maxTotalBlockAggregationTimeMillis); + } + + AggregatingAttestationPool instantiatePool( + final Spec spec, + final RecentChainData recentChainData, + final int maxAttestations, + final LongSupplier nanosSupplier, + final RewardBasedAttestationSorterFactory sorterFactory, + final int maxBlockAggregationTimeMillis, + final int maxTotalBlockAggregationTimeMillis) { + return new AggregatingAttestationPoolV2( + spec, + recentChainData, + new NoOpMetricsSystem(), + maxAttestations, + nanosSupplier, + sorterFactory, + maxBlockAggregationTimeMillis, + maxTotalBlockAggregationTimeMillis); + } + + @TestTemplate + @Override + public void createAggregateFor_shouldAggregateAttestationsWithMatchingData() { + final AttestationData attestationData = createAttestationData(); + final Attestation attestation1 = addAttestationFromValidators(attestationData, 1); + final Attestation attestation2 = addAttestationFromValidators(attestationData, 2); + + final Optional result = + aggregatingPool.createAggregateFor(attestationData.hashTreeRoot(), committeeIndex); + assertThat(result).contains(aggregateAttestations(committeeSizes, attestation1, attestation2)); + } + + @TestTemplate + @Override + public void createAggregateFor_shouldReturnBestAggregateForMatchingDataWhenSomeOverlap() { + // this does not apply since we only deal with single attestation, which cannot partially + // overlap + } + + @TestTemplate + public void getAttestationsForBlock_shouldFillupOnlyFirstAggregateFromSameMatchingData() { + assumeThat(specMilestone).isGreaterThanOrEqualTo(ELECTRA); + + final AttestationData attestationData = createAttestationData(ZERO); + + final Attestation attestationBestAggregate = + addAttestationFromValidators(attestationData, 1, 2, 3, 4); + final Attestation attestationAggregate = addAttestationFromValidators(attestationData, 1, 2, 5); + + final Attestation singleAttestation = addAttestationFromValidators(attestationData, 6); + + final BeaconState stateAtBlockSlot = dataStructureUtil.randomBeaconState(); + + assertThat(aggregatingPool.getAttestationsForBlock(stateAtBlockSlot, forkChecker)) + .containsExactlyInAnyOrder( + aggregateAttestations(committeeSizes, attestationBestAggregate, singleAttestation), + attestationAggregate); + } + + @TestTemplate + public void getAttestationsForBlock_shouldNotTryToFillupIfTimeLimitIsExceeded() { + assumeThat(specMilestone).isGreaterThanOrEqualTo(ELECTRA); + + // by passing 0 as maxTotalBlockAggregationTimeMillis we give no time to fillup + aggregatingPool = instantiatePool(System::nanoTime, Integer.MAX_VALUE, 0); + + final AttestationData attestationData = createAttestationData(ZERO); + + final Attestation attestationBestAggregate = + addAttestationFromValidators(attestationData, 1, 2, 3, 4); + addAttestationFromValidators(attestationData, 6); + + final BeaconState stateAtBlockSlot = dataStructureUtil.randomBeaconState(); + + assertThat(aggregatingPool.getAttestationsForBlock(stateAtBlockSlot, forkChecker)) + .containsExactlyInAnyOrder(attestationBestAggregate); + } + + @TestTemplate + public void getAttestationsForBlock_shouldNotContinueAggregatingIfTimeLimitIsExceeded() { + assumeThat(specMilestone).isGreaterThanOrEqualTo(ELECTRA); + + final LongSupplier nanosSupplier = mock(LongSupplier.class); + when(nanosSupplier.getAsLong()) + .thenReturn( + 0L, // first call to get now + 1_000_000L, // 1 ms, first aggregation time check + 3_000_000L // 3 ms, second aggregation time + ); + + final int maxBlockAggregationTimeMillis = 2; // less than 3 ms + + // by passing 0 as maxBlockAggregationTimeMillis we limit the time to aggregate + aggregatingPool = + instantiatePool(nanosSupplier, maxBlockAggregationTimeMillis, Integer.MAX_VALUE); + + final AttestationData attestationData = createAttestationData(ZERO); + + final Attestation attestationBestAggregate = + addAttestationFromValidators(attestationData, 1, 2, 3, 4); + + // let's add another attestation which overlaps, so that it should be added to another aggregate + // in the result, but we will not have time to get it + addAttestationFromValidators(attestationData, 1, 2, 5); + + // the fillup will anyway happen since the total time limit won't be reached + final Attestation singleAttestation = addAttestationFromValidators(attestationData, 6); + + final BeaconState stateAtBlockSlot = dataStructureUtil.randomBeaconState(); + + assertThat(aggregatingPool.getAttestationsForBlock(stateAtBlockSlot, forkChecker)) + .containsExactlyInAnyOrder( + aggregateAttestations(committeeSizes, attestationBestAggregate, singleAttestation)); + } + + @TestTemplate + public void getAttestationsForBlock_shouldRespectSorter() { + var attestations = + List.of( + createAttestation(createAttestationData(), 1, 2, 3, 4), + createAttestation(createAttestationData(), 4, 5, 6)); + + var sorterResult = + attestations.stream().map(this::convertToPooledAttestationWithRewardInfo).toList(); + + var localSorter = + new RewardBasedAttestationSorter(null, null, null, null) { + @Override + public List sort( + final List attestations, final int maxAttestations) { + return sorterResult; + } + }; + + var sorterFactory = mock(RewardBasedAttestationSorterFactory.class); + when(sorterFactory.create(any(), any())).thenReturn(localSorter); + + aggregatingPool = + instantiatePool( + mockSpec, + mockRecentChainData, + 10, + () -> 0L, + sorterFactory, + Integer.MAX_VALUE, + Integer.MAX_VALUE); + + final BeaconState stateAtBlockSlot = dataStructureUtil.randomBeaconState(); + + assertThat(aggregatingPool.getAttestationsForBlock(stateAtBlockSlot, forkChecker)) + .containsExactlyElementsOf(attestations); + } + + private PooledAttestationWithRewardInfo convertToPooledAttestationWithRewardInfo( + final Attestation attestation) { + final ValidatableAttestation validatableAttestation = + createValidatableAttestationFromAttestation(attestation, true, true); + return PooledAttestationWithRewardInfo.empty( + new PooledAttestationWithData( + validatableAttestation.getData(), + PooledAttestation.fromValidatableAttestation(validatableAttestation))); + } +} From c93927ea057adc9b5c82b89c33e3470f218eb7e6 Mon Sep 17 00:00:00 2001 From: Enrico Del Fante Date: Tue, 20 May 2025 12:37:33 +0200 Subject: [PATCH 2/3] fix tests --- .../teku/spec/util/DataStructureUtil.java | 5 + .../AggregatingAttestationPoolV2.java | 1 - .../AggregatingAttestationPoolTest.java | 324 +++++++++++------- 3 files changed, 208 insertions(+), 122 deletions(-) diff --git a/ethereum/spec/src/testFixtures/java/tech/pegasys/teku/spec/util/DataStructureUtil.java b/ethereum/spec/src/testFixtures/java/tech/pegasys/teku/spec/util/DataStructureUtil.java index f07a05bccc1..7106e9bb19c 100644 --- a/ethereum/spec/src/testFixtures/java/tech/pegasys/teku/spec/util/DataStructureUtil.java +++ b/ethereum/spec/src/testFixtures/java/tech/pegasys/teku/spec/util/DataStructureUtil.java @@ -811,6 +811,11 @@ public AttestationData randomAttestationData(final UInt64 slot) { slot, randomUInt64(), randomBytes32(), randomCheckpoint(), randomCheckpoint()); } + public AttestationData randomAttestationData(final UInt64 slot, final UInt64 committeeIndex) { + return new AttestationData( + slot, committeeIndex, randomBytes32(), randomCheckpoint(), randomCheckpoint()); + } + public AttestationData randomAttestationData(final UInt64 slot, final Bytes32 blockRoot) { return new AttestationData( slot, diff --git a/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/attestation/AggregatingAttestationPoolV2.java b/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/attestation/AggregatingAttestationPoolV2.java index 8a88bc946ee..d34842cc9e2 100644 --- a/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/attestation/AggregatingAttestationPoolV2.java +++ b/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/attestation/AggregatingAttestationPoolV2.java @@ -144,7 +144,6 @@ public AggregatingAttestationPoolV2( this.rewardBasedAttestationSorterFactory = rewardBasedAttestationSorterFactory; } - // No longer synchronized @Override public void add(final ValidatableAttestation attestation) { final Supplier> cachingStateSupplier = diff --git a/ethereum/statetransition/src/test/java/tech/pegasys/teku/statetransition/attestation/AggregatingAttestationPoolTest.java b/ethereum/statetransition/src/test/java/tech/pegasys/teku/statetransition/attestation/AggregatingAttestationPoolTest.java index 41c7e3b37e8..51752ffd2bf 100644 --- a/ethereum/statetransition/src/test/java/tech/pegasys/teku/statetransition/attestation/AggregatingAttestationPoolTest.java +++ b/ethereum/statetransition/src/test/java/tech/pegasys/teku/statetransition/attestation/AggregatingAttestationPoolTest.java @@ -38,6 +38,7 @@ import java.util.Optional; import java.util.function.Supplier; import java.util.stream.IntStream; +import org.assertj.core.api.AbstractIntegerAssert; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.TestTemplate; import org.mockito.ArgumentMatchers; @@ -67,19 +68,19 @@ abstract class AggregatingAttestationPoolTest { public static final UInt64 SLOT = UInt64.valueOf(1234); private static final int COMMITTEE_SIZE = 130; - private Spec spec; - private SpecMilestone specMilestone; - private DataStructureUtil dataStructureUtil; - private Optional committeeIndex; - private final Spec mockSpec = mock(Spec.class); - private final RecentChainData mockRecentChainData = mock(RecentChainData.class); + protected Spec spec; + protected SpecMilestone specMilestone; + protected DataStructureUtil dataStructureUtil; + protected Optional committeeIndex; + protected final Spec mockSpec = mock(Spec.class); + protected final RecentChainData mockRecentChainData = mock(RecentChainData.class); - private AggregatingAttestationPool aggregatingPool; + protected AggregatingAttestationPool aggregatingPool; - private final AttestationForkChecker forkChecker = mock(AttestationForkChecker.class); + protected final AttestationForkChecker forkChecker = mock(AttestationForkChecker.class); - private BeaconState state; - private Int2IntMap committeeSizes; + protected BeaconState state; + protected Int2IntMap committeeSizes; abstract AggregatingAttestationPool instantiatePool( final Spec spec, final RecentChainData recentChainData, final int maxAttestations); @@ -95,6 +96,16 @@ public void setUp(final SpecContext specContext) { IntStream.range(0, spec.getGenesisSpec().getConfig().getMaxCommitteesPerSlot()) .forEach(index -> committeeSizes.put(index, COMMITTEE_SIZE)); + final UpdatableStore mockStore = mock(UpdatableStore.class); + state = dataStructureUtil.randomBeaconState(); + when(mockRecentChainData.getCurrentEpoch()).thenReturn(Optional.of(ZERO)); + when(mockRecentChainData.getStore()).thenReturn(mockStore); + when(mockRecentChainData.getBestState()) + .thenReturn(Optional.of(SafeFuture.completedFuture(state))); + when(mockRecentChainData.retrieveStateInEffectAtSlot(any())) + .thenReturn(SafeFuture.completedFuture(Optional.of(state))); + when(mockSpec.getBeaconCommitteesSize(any(), any())).thenReturn(committeeSizes); + if (specMilestone.equals(PHASE0)) { committeeIndex = Optional.empty(); } else { @@ -102,19 +113,10 @@ public void setUp(final SpecContext specContext) { Optional.of( dataStructureUtil.randomUInt64( spec.getGenesisSpec().getConfig().getMaxCommitteesPerSlot())); - - state = dataStructureUtil.randomBeaconState(); - final UpdatableStore mockStore = mock(UpdatableStore.class); - when(mockRecentChainData.getCurrentEpoch()).thenReturn(Optional.of(ZERO)); - when(mockRecentChainData.getStore()).thenReturn(mockStore); - when(mockRecentChainData.getBestState()) - .thenReturn(Optional.of(SafeFuture.completedFuture(state))); - when(mockRecentChainData.retrieveStateInEffectAtSlot(any())) - .thenReturn(SafeFuture.completedFuture(Optional.of(state))); - when(mockSpec.getBeaconCommitteesSize(any(), any())).thenReturn(committeeSizes); } when(forkChecker.areAttestationsFromCorrectFork(any())).thenReturn(true); + when(forkChecker.areAttestationsFromCorrectForkV2(any())).thenReturn(true); when(mockSpec.getPreviousEpochAttestationCapacity(any())).thenReturn(Integer.MAX_VALUE); // Fwd some calls to the real spec @@ -125,16 +127,23 @@ public void setUp(final SpecContext specContext) { .thenAnswer(i -> spec.getCurrentEpoch(i.getArgument(0))); when(mockSpec.atSlot(any())).thenAnswer(invocation -> spec.atSlot(invocation.getArgument(0))); when(mockSpec.getGenesisSchemaDefinitions()).thenReturn(spec.getGenesisSchemaDefinitions()); + when(mockSpec.getSeed(any(), any(), any())) + .thenAnswer( + invocation -> + spec.getSeed( + invocation.getArgument(0), + invocation.getArgument(1), + invocation.getArgument(2))); } @TestTemplate public void add_shouldRetrieveCommitteeSizesFromStateWhenMissing() { - final AttestationData attestationData = dataStructureUtil.randomAttestationData(ZERO); + final AttestationData attestationData = createAttestationData(ZERO); final Attestation attestation = createAttestation(attestationData, spec, 1); final ValidatableAttestation validatableAttestation = - ValidatableAttestation.from(mockSpec, attestation); + createValidatableAttestationFromAttestation(attestation, false, true); assertThat(validatableAttestation.getCommitteesSize()).isEmpty(); @@ -145,25 +154,29 @@ public void add_shouldRetrieveCommitteeSizesFromStateWhenMissing() { verify(mockSpec, times(expectedCalls)) .getBeaconCommitteesSize(eq(state), eq(attestationData.getSlot())); - assertThat(aggregatingPool.getSize()).isEqualTo(1); + assertSize().isEqualTo(1); } @TestTemplate public void add_shouldNotRetrieveCommitteeSizesWhenNotNeeded() { - final AttestationData attestationData = dataStructureUtil.randomAttestationData(ZERO); + final AttestationData attestationData = createAttestationData(ZERO); final Attestation attestation = createAttestation(attestationData, spec, 1); final ValidatableAttestation validatableAttestation = - ValidatableAttestation.from(mockSpec, attestation, committeeSizes); + createValidatableAttestationFromAttestation(attestation, true, true); - assertThat(validatableAttestation.getCommitteesSize()).isNotEmpty(); + if (specMilestone.isLessThan(ELECTRA)) { + assertThat(validatableAttestation.getCommitteesSize()).isEmpty(); + } else { + assertThat(validatableAttestation.getCommitteesSize()).isNotEmpty(); + } aggregatingPool.add(validatableAttestation); verify(mockSpec, never()).getBeaconCommitteesSize(eq(state), eq(attestationData.getSlot())); - assertThat(aggregatingPool.getSize()).isEqualTo(1); + assertSize().isEqualTo(1); } @TestTemplate @@ -172,35 +185,34 @@ public void add_shouldNotAddIfFailsRetrievingCommitteesSize() { when(mockRecentChainData.retrieveStateInEffectAtSlot(any())) .thenReturn(SafeFuture.completedFuture(Optional.empty())); - final AttestationData attestationData = dataStructureUtil.randomAttestationData(ZERO); + final AttestationData attestationData = createAttestationData(ZERO); final Attestation attestation = createAttestation(attestationData, spec, 1); final ValidatableAttestation validatableAttestation = - ValidatableAttestation.from(mockSpec, attestation); + createValidatableAttestationFromAttestation(attestation, false, true); assertThat(validatableAttestation.getCommitteesSize()).isEmpty(); aggregatingPool.add(validatableAttestation); if (specMilestone.isGreaterThanOrEqualTo(ELECTRA)) { - assertThat(aggregatingPool.getSize()).isZero(); + assertSize().isZero(); } else { - assertThat(aggregatingPool.getSize()).isEqualTo(1); + assertSize().isEqualTo(1); } } @TestTemplate public void createAggregateFor_shouldReturnEmptyWhenNoAttestationsMatchGivenData() { final Optional result = - aggregatingPool.createAggregateFor( - dataStructureUtil.randomAttestationData().hashTreeRoot(), committeeIndex); + aggregatingPool.createAggregateFor(createAttestationData().hashTreeRoot(), committeeIndex); assertThat(result).isEmpty(); } @TestTemplate public void createAggregateFor_shouldAggregateAttestationsWithMatchingData() { - final AttestationData attestationData = dataStructureUtil.randomAttestationData(); + final AttestationData attestationData = createAttestationData(); final Attestation attestation1 = addAttestationFromValidators(attestationData, 1, 3, 5); final Attestation attestation2 = addAttestationFromValidators(attestationData, 2, 4, 6); @@ -211,7 +223,7 @@ public void createAggregateFor_shouldAggregateAttestationsWithMatchingData() { @TestTemplate public void createAggregateFor_shouldReturnBestAggregateForMatchingDataWhenSomeOverlap() { - final AttestationData attestationData = dataStructureUtil.randomAttestationData(); + final AttestationData attestationData = createAttestationData(); final Attestation attestation1 = addAttestationFromValidators(attestationData, 1, 3, 5, 7); final Attestation attestation2 = addAttestationFromValidators(attestationData, 2, 4, 6, 8); addAttestationFromValidators(attestationData, 2, 3, 9); @@ -232,9 +244,9 @@ public void getAttestationsForBlock_shouldReturnEmptyListWhenNoAttestationsAvail @TestTemplate public void getAttestationsForBlock_shouldNotIncludeAttestationsWhereDataDoesNotValidate() { - addAttestationFromValidators(dataStructureUtil.randomAttestationData(), 1); - addAttestationFromValidators(dataStructureUtil.randomAttestationData(), 2); - addAttestationFromValidators(dataStructureUtil.randomAttestationData(), 3); + addAttestationFromValidators(createAttestationData(), 1); + addAttestationFromValidators(createAttestationData(), 2); + addAttestationFromValidators(createAttestationData(), 3); when(mockSpec.validateAttestation(any(), any())) .thenReturn(Optional.of(AttestationInvalidReason.SLOT_NOT_IN_EPOCH)); @@ -246,13 +258,12 @@ public void getAttestationsForBlock_shouldNotIncludeAttestationsWhereDataDoesNot @TestTemplate void getAttestationsForBlock_shouldNotThrowExceptionWhenShufflingSeedIsUnknown() { - final Attestation attestation = - createAttestation(dataStructureUtil.randomAttestationData(ONE), 1, 2, 3, 4); + final Attestation attestation = createAttestation(createAttestationData(ONE), 1, 2, 3, 4); // Receive the attestation from a block, prior to receiving it via gossip aggregatingPool.onAttestationsIncludedInBlock(ONE, List.of(attestation)); // Attestation isn't added because it's already redundant - aggregatingPool.add(ValidatableAttestation.fromValidator(mockSpec, attestation)); - assertThat(aggregatingPool.getSize()).isZero(); + aggregatingPool.add(createValidatableAttestationFromAttestation(attestation, true, true)); + assertSize().isZero(); // But we now have a MatchingDataAttestationGroup with unknown shuffling seed present // It was previously assumed that wasn't possible so it threw an IllegalStateException @@ -266,11 +277,11 @@ void getAttestationsForBlock_shouldNotThrowExceptionWhenShufflingSeedIsUnknown() @TestTemplate public void getAttestationsForBlock_shouldIncludeAttestationsThatPassValidation() { final Attestation attestation1 = - addAttestationFromValidators(dataStructureUtil.randomAttestationData(ZERO), 1, 2); + addAttestationFromValidators(createAttestationData(ZERO), 1, 2); final Attestation attestation2 = - addAttestationFromValidators(dataStructureUtil.randomAttestationData(ZERO), 2, 3); + addAttestationFromValidators(createAttestationData(ZERO), 2, 3); final Attestation attestation3 = - addAttestationFromValidators(dataStructureUtil.randomAttestationData(ZERO), 3, 4); + addAttestationFromValidators(createAttestationData(ZERO), 3, 4); final BeaconState state = dataStructureUtil.randomBeaconState(ONE); when(mockSpec.validateAttestation(state, attestation1.getData())) @@ -284,7 +295,7 @@ public void getAttestationsForBlock_shouldIncludeAttestationsThatPassValidation( @TestTemplate public void getAttestationsForBlock_shouldAggregateAttestationsWhenPossible() { - final AttestationData attestationData = dataStructureUtil.randomAttestationData(SLOT); + final AttestationData attestationData = createAttestationData(SLOT); final Attestation attestation1 = addAttestationFromValidators(attestationData, 1, 2); final Attestation attestation2 = addAttestationFromValidators(attestationData, 3, 4); @@ -296,11 +307,11 @@ public void getAttestationsForBlock_shouldAggregateAttestationsWhenPossible() { @TestTemplate public void getAttestationsForBlock_shouldIncludeAttestationsWithDifferentData() { - final AttestationData attestationData = dataStructureUtil.randomAttestationData(ZERO); + final AttestationData attestationData = createAttestationData(ZERO); final Attestation attestation1 = addAttestationFromValidators(attestationData, 1, 2); final Attestation attestation2 = addAttestationFromValidators(attestationData, 3, 4); final Attestation attestation3 = - addAttestationFromValidators(dataStructureUtil.randomAttestationData(ZERO), 3, 4); + addAttestationFromValidators(createAttestationData(ZERO), 3, 4); final BeaconState stateAtBlockSlot = dataStructureUtil.randomBeaconState(ONE); @@ -311,12 +322,9 @@ public void getAttestationsForBlock_shouldIncludeAttestationsWithDifferentData() @TestTemplate void getAttestationsForBlock_shouldIncludeMoreRecentAttestationsFirst() { - final AttestationData attestationData1 = - dataStructureUtil.randomAttestationData(UInt64.valueOf(5)); - final AttestationData attestationData2 = - dataStructureUtil.randomAttestationData(UInt64.valueOf(6)); - final AttestationData attestationData3 = - dataStructureUtil.randomAttestationData(UInt64.valueOf(7)); + final AttestationData attestationData1 = createAttestationData(UInt64.valueOf(5)); + final AttestationData attestationData2 = createAttestationData(UInt64.valueOf(6)); + final AttestationData attestationData3 = createAttestationData(UInt64.valueOf(7)); final Attestation attestation1 = addAttestationFromValidators(attestationData1, 1, 2); final Attestation attestation2 = addAttestationFromValidators(attestationData2, 3, 4); final Attestation attestation3 = addAttestationFromValidators(attestationData3, 5, 6); @@ -339,7 +347,7 @@ public void getAttestationsForBlock_shouldNotAddMoreAttestationsThanAllowedInBlo final int validatorCount = allowed + 1; final BeaconState state = dataStructureUtil.randomBeaconState(validatorCount, 100, ONE); - final AttestationData attestationData = dataStructureUtil.randomAttestationData(ZERO); + final AttestationData attestationData = createAttestationData(ZERO); final int lastValidatorIndex = validatorCount - 1; @@ -366,8 +374,8 @@ public void getAttestationsForBlock_shouldGivePriorityToBestAggregationForEachSl final BeaconState state = dataStructureUtil.randomBeaconState(ONE); // let's prepare 2 different attestationData for the same slot - final AttestationData attestationData0 = dataStructureUtil.randomAttestationData(ZERO); - final AttestationData attestationData1 = dataStructureUtil.randomAttestationData(ZERO); + final AttestationData attestationData0 = createAttestationData(ZERO); + final AttestationData attestationData1 = createAttestationData(ZERO); // let's fill up the pool with non-aggregatable attestationsData0 addAttestationFromValidators(attestationData0, 1, 2); @@ -438,79 +446,78 @@ void testPrevEpochLimits(final int prevEpochCapacity) { @TestTemplate public void onSlot_shouldPruneAttestationsMoreThanTwoEpochsBehindCurrentSlot() { - final AttestationData pruneAttestationData = dataStructureUtil.randomAttestationData(SLOT); - final AttestationData preserveAttestationData = - dataStructureUtil.randomAttestationData(SLOT.plus(ONE)); + final AttestationData pruneAttestationData = createAttestationData(SLOT); + final AttestationData preserveAttestationData = createAttestationData(SLOT.plus(ONE)); addAttestationFromValidators(pruneAttestationData, 1, 2); final Attestation preserveAttestation = addAttestationFromValidators(preserveAttestationData, 2, 3); final BeaconState stateAtBlockSlot = dataStructureUtil.randomBeaconState(); - assertThat(aggregatingPool.getSize()).isEqualTo(2); + assertSize().isEqualTo(2); aggregatingPool.onSlot( pruneAttestationData.getSlot().plus(ATTESTATION_RETENTION_SLOTS).plus(ONE)); assertThat(aggregatingPool.getAttestationsForBlock(stateAtBlockSlot, forkChecker)) .containsOnly(preserveAttestation); - assertThat(aggregatingPool.getSize()).isEqualTo(1); + assertSize().isEqualTo(1); } @TestTemplate public void getSize_shouldIncludeAttestationsAdded() { - final AttestationData attestationData = dataStructureUtil.randomAttestationData(); + final AttestationData attestationData = createAttestationData(); addAttestationFromValidators(attestationData, 1, 2, 3, 4); addAttestationFromValidators(attestationData, 2, 5); - assertThat(aggregatingPool.getSize()).isEqualTo(2); + assertSize().isEqualTo(2); } @TestTemplate public void getSize_shouldDecreaseWhenAttestationsRemoved() { - final AttestationData attestationData = dataStructureUtil.randomAttestationData(ZERO); + final AttestationData attestationData = createAttestationData(ZERO); addAttestationFromValidators(attestationData, 1, 2, 3, 4); final Attestation attestationToRemove = addAttestationFromValidators(attestationData, 2, 5); aggregatingPool.onAttestationsIncludedInBlock(ZERO, List.of(attestationToRemove)); - assertThat(aggregatingPool.getSize()).isEqualTo(1); + assertSize().isEqualTo(1); } @TestTemplate public void getSize_shouldNotIncrementWhenAttestationAlreadyExists() { - final AttestationData attestationData = dataStructureUtil.randomAttestationData(); + final AttestationData attestationData = createAttestationData(); final Attestation attestation = addAttestationFromValidators(attestationData, 1, 2, 3, 4); - aggregatingPool.add(ValidatableAttestation.from(mockSpec, attestation)); - assertThat(aggregatingPool.getSize()).isEqualTo(1); + aggregatingPool.add(createValidatableAttestationFromAttestation(attestation, true, true)); + assertSize().isEqualTo(1); } @TestTemplate public void getSize_shouldDecrementForAllRemovedAttestations() { - final AttestationData attestationData = dataStructureUtil.randomAttestationData(ZERO); + final AttestationData attestationData = createAttestationData(ZERO); addAttestationFromValidators(attestationData, 1, 2, 3); addAttestationFromValidators(attestationData, 4, 5); - assertThat(aggregatingPool.getSize()).isEqualTo(2); + assertSize().isEqualTo(2); final Attestation attestationToRemove = addAttestationFromValidators(attestationData, 1, 2, 3, 4, 5); aggregatingPool.onAttestationsIncludedInBlock(ZERO, List.of(attestationToRemove)); - assertThat(aggregatingPool.getSize()).isEqualTo(0); + assertSize().isEqualTo(0); } @TestTemplate public void getSize_shouldAddTheRightData() { - final AttestationData attestationData = dataStructureUtil.randomAttestationData(); + final AttestationData attestationData = createAttestationData(); addAttestationFromValidators(attestationData, 1, 2, 3, 4, 5); addAttestationFromValidators(attestationData, 1, 2, 3); addAttestationFromValidators(attestationData, 4, 5); addAttestationFromValidators(attestationData, 6); addAttestationFromValidators(attestationData, 7, 8); - assertThat(aggregatingPool.getSize()).isEqualTo(5); + assertSize().isEqualTo(5); } @TestTemplate public void getSize_shouldDecrementForAllRemovedAttestationsWhileKeepingOthers() { - final AttestationData attestationData = dataStructureUtil.randomAttestationData(ZERO); + final AttestationData attestationData = createAttestationData(ZERO); addAttestationFromValidators(attestationData, 1, 2, 3); addAttestationFromValidators(attestationData, 4, 5); @@ -519,68 +526,78 @@ public void getSize_shouldDecrementForAllRemovedAttestationsWhileKeepingOthers() final Attestation attestationToRemove = addAttestationFromValidators(attestationData, 1, 2, 3, 4, 5); - assertThat(aggregatingPool.getSize()).isEqualTo(5); + assertSize().isEqualTo(5); aggregatingPool.onAttestationsIncludedInBlock(ZERO, List.of(attestationToRemove)); - assertThat(aggregatingPool.getSize()).isEqualTo(2); + assertSize().isEqualTo(2); } @TestTemplate void shouldRemoveOldSlotsWhenMaximumNumberOfAttestationsReached() { aggregatingPool = instantiatePool(mockSpec, mockRecentChainData, 5); - final AttestationData attestationData0 = dataStructureUtil.randomAttestationData(ZERO); - final AttestationData attestationData1 = dataStructureUtil.randomAttestationData(ONE); - final AttestationData attestationData2 = - dataStructureUtil.randomAttestationData(UInt64.valueOf(2)); + final AttestationData attestationData0 = createAttestationData(ZERO); + final AttestationData attestationData1 = createAttestationData(ONE); + final AttestationData attestationData2 = createAttestationData(UInt64.valueOf(2)); addAttestationFromValidators(attestationData0, 1, 2); addAttestationFromValidators(attestationData0, 2, 3); addAttestationFromValidators(attestationData1, 3, 4); addAttestationFromValidators(attestationData1, 4, 5); addAttestationFromValidators(attestationData2, 5, 6); - assertThat(aggregatingPool.getSize()).isEqualTo(5); + assertSize().isEqualTo(5); final BeaconState slot1State = dataStructureUtil.randomBeaconState(ONE); assertThat(aggregatingPool.getAttestationsForBlock(slot1State, forkChecker)).isNotEmpty(); addAttestationFromValidators(attestationData2, 6, 7); // Should drop the slot 0 attestations - assertThat(aggregatingPool.getSize()).isEqualTo(4); + if (aggregatingPool instanceof AggregatingAttestationPoolV2) { + // v2 don't immediately drop attestations on add + aggregatingPool.onSlot(UInt64.valueOf(3)); + } + assertSize().isEqualTo(4); assertThat(aggregatingPool.getAttestationsForBlock(slot1State, forkChecker)).isEmpty(); } @TestTemplate void shouldNotRemoveLastSlotEvenWhenMaximumNumberOfAttestationsReached() { aggregatingPool = instantiatePool(mockSpec, mockRecentChainData, 5); - final AttestationData attestationData = dataStructureUtil.randomAttestationData(ZERO); + final AttestationData attestationData = createAttestationData(ZERO); addAttestationFromValidators(attestationData, 1, 2); addAttestationFromValidators(attestationData, 2, 3); addAttestationFromValidators(attestationData, 3, 4); addAttestationFromValidators(attestationData, 4, 5); addAttestationFromValidators(attestationData, 5, 6); - assertThat(aggregatingPool.getSize()).isEqualTo(5); + assertSize().isEqualTo(5); final BeaconState slot1State = dataStructureUtil.randomBeaconState(ONE); assertThat(aggregatingPool.getAttestationsForBlock(slot1State, forkChecker)).isNotEmpty(); addAttestationFromValidators(attestationData, 6, 7); // Can't drop anything as we only have one slot. - assertThat(aggregatingPool.getSize()).isEqualTo(6); + assertSize().isEqualTo(6); } @TestTemplate public void getAttestationsForBlock_shouldNotAddAttestationsFromWrongFork() { - final AttestationData attestationData1 = dataStructureUtil.randomAttestationData(ZERO); - final AttestationData attestationData2 = dataStructureUtil.randomAttestationData(ZERO); + final AttestationData attestationData1 = createAttestationData(ZERO); + final AttestationData attestationData2 = createAttestationData(ZERO); addAttestationFromValidators(attestationData1, 1, 2, 3); Attestation attestation2 = addAttestationFromValidators(attestationData2, 4, 5); - when(forkChecker.areAttestationsFromCorrectFork(any())).thenReturn(false); - when(forkChecker.areAttestationsFromCorrectFork( - ArgumentMatchers.argThat(arg -> arg.getAttestationData().equals(attestationData2)))) - .thenReturn(true); + if (aggregatingPool instanceof AggregatingAttestationPoolV2) { + when(forkChecker.areAttestationsFromCorrectForkV2(any())).thenReturn(false); + when(forkChecker.areAttestationsFromCorrectForkV2( + ArgumentMatchers.argThat(arg -> arg.getAttestationData().equals(attestationData2)))) + .thenReturn(true); + } else { + when(forkChecker.areAttestationsFromCorrectFork(any())).thenReturn(false); + when(forkChecker.areAttestationsFromCorrectFork( + ArgumentMatchers.argThat(arg -> arg.getAttestationData().equals(attestationData2)))) + .thenReturn(true); + } final BeaconState state = dataStructureUtil.randomBeaconState(ONE); assertThat(aggregatingPool.getAttestationsForBlock(state, forkChecker)) @@ -589,10 +606,10 @@ public void getAttestationsForBlock_shouldNotAddAttestationsFromWrongFork() { @TestTemplate public void getAttestations_shouldReturnAllAttestations() { - final AttestationData firstAttestationData = dataStructureUtil.randomAttestationData(); + final AttestationData firstAttestationData = createAttestationData(); final Attestation firstAttestation = addAttestationFromValidators(firstAttestationData, 1, 2, 3); - final AttestationData secondAttestationData = dataStructureUtil.randomAttestationData(); + final AttestationData secondAttestationData = createAttestationData(); final Attestation secondAttestation = addAttestationFromValidators(secondAttestationData, 3, 4, 5); assertThat(aggregatingPool.getAttestations(Optional.empty(), Optional.empty())) @@ -620,7 +637,7 @@ public void getAttestations_shouldReturnAllAttestations() { // Electra activates from SLOT when(mockedSpec.atSlot(argThat(slot -> slot.isGreaterThanOrEqualTo(SLOT)))) .thenReturn(electraSpec.getGenesisSpec()); - final AttestationData electraAttestationData = dataStructureUtil.randomAttestationData(SLOT); + final AttestationData electraAttestationData = createAttestationData(SLOT); committeeIndex = Optional.of( dataStructureUtil.randomUInt64( @@ -639,7 +656,7 @@ public void getAttestations_shouldReturnAttestationsForGivenCommitteeIndexOnly_P assumeThat(specMilestone).isLessThan(ELECTRA); // Pre Electra the committee index filter is applied to the index set at the attestation data // level - final AttestationData attestationData1 = dataStructureUtil.randomAttestationData(); + final AttestationData attestationData1 = createAttestationData(); final AttestationData attestationData2 = new AttestationData( attestationData1.getSlot(), @@ -659,11 +676,18 @@ public void getAttestations_shouldReturnAttestationsForGivenCommitteeIndexOnly_P public void getAttestations_shouldReturnAttestationsForGivenCommitteeIndexOnly_PostElectra() { assumeThat(specMilestone).isGreaterThanOrEqualTo(ELECTRA); // Post Electra the committee index filter is applied to the committee bits - final AttestationData attestationData1 = dataStructureUtil.randomAttestationData(); - final AttestationData attestationData2 = dataStructureUtil.randomAttestationData(); + final AttestationData attestationData1 = createAttestationData(); + final AttestationData attestationData2 = createAttestationData(); final Attestation attestation1 = addAttestationFromValidators(attestationData1, 1, 2, 3); final Optional committeeIndexFilter = committeeIndex; - committeeIndex = Optional.of(committeeIndex.get().plus(1)); + + // set a different committee index + if (committeeIndex.get().isZero()) { + committeeIndex = Optional.of(committeeIndex.get().plus(1)); + } else { + committeeIndex = Optional.of(committeeIndex.get().minus(1)); + } + addAttestationFromValidators(attestationData2, 4, 5, 6); assertThat(aggregatingPool.getAttestations(Optional.empty(), committeeIndexFilter)) .containsExactly(attestation1); @@ -671,7 +695,7 @@ public void getAttestations_shouldReturnAttestationsForGivenCommitteeIndexOnly_P @TestTemplate public void getAttestations_shouldReturnAttestationsForGivenSlotOnly() { - final AttestationData attestationData1 = dataStructureUtil.randomAttestationData(); + final AttestationData attestationData1 = createAttestationData(); final AttestationData attestationData2 = new AttestationData( attestationData1.getSlot().plus(1), @@ -689,30 +713,30 @@ public void getAttestations_shouldReturnAttestationsForGivenSlotOnly() { @TestTemplate void onAttestationsIncludedInBlock_shouldNotAddAttestationsAlreadySeenInABlock() { - final AttestationData attestationData = dataStructureUtil.randomAttestationData(ZERO); + final AttestationData attestationData = createAttestationData(ZERO); // Included in block before we see any attestations with this data aggregatingPool.onAttestationsIncludedInBlock( ONE, List.of(createAttestation(attestationData, 1, 2, 3, 4))); // But still shouldn't be able to add a redundant attestation later addAttestationFromValidators(attestationData, 2, 3); - assertThat(aggregatingPool.getSize()).isZero(); + assertSize().isZero(); } @TestTemplate void onAttestationsIncludedInBlock_shouldRemoveAttestationsWhenSeenInABlock() { - final AttestationData attestationData = dataStructureUtil.randomAttestationData(ZERO); + final AttestationData attestationData = createAttestationData(ZERO); addAttestationFromValidators(attestationData, 2, 3); aggregatingPool.onAttestationsIncludedInBlock( ONE, List.of(createAttestation(attestationData, 1, 2, 3, 4))); - assertThat(aggregatingPool.getSize()).isZero(); + assertSize().isZero(); } @TestTemplate public void onAttestationsIncludedInBlock_shouldRetrieveCommitteeSizesFromStateWhenMissing() { - final AttestationData attestationData = dataStructureUtil.randomAttestationData(ZERO); + final AttestationData attestationData = createAttestationData(ZERO); final Attestation attestation = createAttestation(attestationData, spec, 1); @@ -726,7 +750,7 @@ public void onAttestationsIncludedInBlock_shouldRetrieveCommitteeSizesFromStateW @TestTemplate public void onAttestationsIncludedInBlock_shouldNotAddIfFailsRetrievingCommitteesSize() { - final AttestationData attestationData = dataStructureUtil.randomAttestationData(ZERO); + final AttestationData attestationData = createAttestationData(ZERO); addAttestationFromValidators(attestationData, 2, 3); when(mockRecentChainData.getBestState()).thenReturn(Optional.empty()); @@ -738,15 +762,15 @@ public void onAttestationsIncludedInBlock_shouldNotAddIfFailsRetrievingCommittee if (specMilestone.isGreaterThanOrEqualTo(ELECTRA)) { // we can't process onAttestationsIncludedInBlock wihthout committees size - assertThat(aggregatingPool.getSize()).isEqualTo(1); + assertSize().isEqualTo(1); } else { - assertThat(aggregatingPool.getSize()).isZero(); + assertSize().isZero(); } } @TestTemplate void onReorg_shouldBeAbleToReadAttestations() { - final AttestationData attestationData = dataStructureUtil.randomAttestationData(ZERO); + final AttestationData attestationData = createAttestationData(ZERO); // Included in block before we see any attestations with this data aggregatingPool.onAttestationsIncludedInBlock( ONE, List.of(createAttestation(attestationData, 1, 2, 3, 4))); @@ -755,24 +779,24 @@ void onReorg_shouldBeAbleToReadAttestations() { // Should now be able to add attestations that were redundant addAttestationFromValidators(attestationData, 2, 3); - assertThat(aggregatingPool.getSize()).isEqualTo(1); + assertSize().isEqualTo(1); } - private Attestation addAttestationFromValidators(final UInt64 slot, final int... validators) { - return addAttestationFromValidators(dataStructureUtil.randomAttestationData(slot), validators); + protected Attestation addAttestationFromValidators(final UInt64 slot, final int... validators) { + return addAttestationFromValidators(createAttestationData(slot), validators); } - private Attestation addAttestationFromValidators( + protected Attestation addAttestationFromValidators( final AttestationData data, final int... validators) { return addAttestationFromValidators(data, spec, validators); } - private Attestation addAttestationFromValidators( + protected Attestation addAttestationFromValidators( final AttestationData data, final Spec spec, final int... validators) { return addAttestationFromValidators(aggregatingPool, data, spec, validators); } - private Attestation addAttestationFromValidators( + protected Attestation addAttestationFromValidators( final AggregatingAttestationPool aggregatingAttestationPool, final AttestationData data, final Spec spec, @@ -786,7 +810,7 @@ private Attestation addAttestationFromValidators( } final ValidatableAttestation validatableAttestation = - ValidatableAttestation.from(spec, attestationFromValidators, committeeSizes); + createValidatableAttestationFromAttestation(attestationFromValidators, true, true); if (attestationFromValidators.isSingleAttestation()) { attestation = createAttestation(data, spec, validators); @@ -795,17 +819,67 @@ private Attestation addAttestationFromValidators( attestation = attestationFromValidators; } - validatableAttestation.saveCommitteeShufflingSeedAndCommitteesSize( - dataStructureUtil.randomBeaconState(100, 15, data.getSlot())); aggregatingAttestationPool.add(validatableAttestation); return attestation; } - private Attestation createAttestation(final AttestationData data, final int... validators) { + protected ValidatableAttestation createValidatableAttestationFromAttestation( + final Attestation attestation, + final boolean addShufflingAndCommitteeSizes, + final boolean addIndexedAttestation) { + final ValidatableAttestation validatableAttestation = + ValidatableAttestation.from(mockSpec, attestation); + + final Attestation finalAttestation; + + if (attestation.isSingleAttestation()) { + finalAttestation = + createAttestation( + attestation.getData(), spec, attestation.getValidatorIndexRequired().intValue()); + validatableAttestation.convertToAggregatedFormatFromSingleAttestation(attestation); + } else { + finalAttestation = attestation; + } + + if (addShufflingAndCommitteeSizes) { + validatableAttestation.saveCommitteeShufflingSeedAndCommitteesSize( + dataStructureUtil.randomBeaconState(100, 15, finalAttestation.getData().getSlot())); + } + if (addIndexedAttestation) { + validatableAttestation.setIndexedAttestation( + dataStructureUtil.randomIndexedAttestation( + finalAttestation.getData(), + finalAttestation + .getAggregationBits() + .streamAllSetBits() + .mapToObj(this::validatorBitToValidatorIndex) + .toArray(UInt64[]::new))); + } + + return validatableAttestation; + } + + protected AttestationData createAttestationData() { + return createAttestationData(dataStructureUtil.randomUInt64()); + } + + protected AttestationData createAttestationData(final UInt64 slot) { + if (specMilestone.isLessThan(ELECTRA)) { + return dataStructureUtil.randomAttestationData( + slot, committeeIndex.orElse(UInt64.valueOf(dataStructureUtil.randomPositiveInt()))); + } + return dataStructureUtil.randomAttestationData(slot, ZERO); + } + + private UInt64 validatorBitToValidatorIndex(final int validatorBit) { + return UInt64.valueOf(validatorBit + 100); + } + + protected Attestation createAttestation(final AttestationData data, final int... validators) { return createAttestation(data, spec, validators); } - private SingleAttestation createSingleAttestation( + protected SingleAttestation createSingleAttestation( final AttestationData data, final int validatorIndex) { final SingleAttestationSchema attestationSchema = spec.getGenesisSchemaDefinitions() @@ -820,7 +894,7 @@ private SingleAttestation createSingleAttestation( dataStructureUtil.randomSignature()); } - private Attestation createAttestation( + protected Attestation createAttestation( final AttestationData data, final Spec spec, final int... validators) { final AttestationSchema attestationSchema = spec.getGenesisSchemaDefinitions().getAttestationSchema(); @@ -845,4 +919,12 @@ private Attestation createAttestation( return attestationSchema.create( bitlist, data, dataStructureUtil.randomSignature(), committeeBits); } + + protected AbstractIntegerAssert assertSize() { + if (aggregatingPool instanceof AggregatingAttestationPoolV2) { + // V2 prunes at onSlot, so we have to call it before checking the size + aggregatingPool.onSlot(ONE); + } + return assertThat(aggregatingPool.getSize()); + } } From 3afaf0d3a7fd07cc2e889373f842ba21ed32d09e Mon Sep 17 00:00:00 2001 From: Enrico Del Fante Date: Tue, 20 May 2025 18:07:58 +0200 Subject: [PATCH 3/3] profiler --- .../AttestationManagerIntegrationTest.java | 7 +- .../AggregatingAttestationPoolBenchmark.java | 13 +- .../AggregatingAttestationPoolV1.java | 7 + ...AggregatingAttestationPoolProfilerCSV.java | 387 ++++++++++++++++++ ...AggregatingAttestationPoolProfilerLog.java | 147 +++++++ .../AggregatingAttestationPoolV1Test.java | 7 +- .../beaconchain/BeaconChainController.java | 7 +- 7 files changed, 570 insertions(+), 5 deletions(-) create mode 100644 ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/attestation/utils/AggregatingAttestationPoolProfilerCSV.java create mode 100644 ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/attestation/utils/AggregatingAttestationPoolProfilerLog.java diff --git a/ethereum/statetransition/src/integration-test/java/tech/pegasys/teku/statetransition/attestation/AttestationManagerIntegrationTest.java b/ethereum/statetransition/src/integration-test/java/tech/pegasys/teku/statetransition/attestation/AttestationManagerIntegrationTest.java index 72cb320e41c..5f46d388c0f 100644 --- a/ethereum/statetransition/src/integration-test/java/tech/pegasys/teku/statetransition/attestation/AttestationManagerIntegrationTest.java +++ b/ethereum/statetransition/src/integration-test/java/tech/pegasys/teku/statetransition/attestation/AttestationManagerIntegrationTest.java @@ -41,6 +41,7 @@ import tech.pegasys.teku.spec.datastructures.state.Fork; import tech.pegasys.teku.spec.datastructures.state.ForkInfo; import tech.pegasys.teku.spec.generator.AggregateGenerator; +import tech.pegasys.teku.statetransition.attestation.utils.AggregatingAttestationPoolProfiler; import tech.pegasys.teku.statetransition.blobs.BlobSidecarManager; import tech.pegasys.teku.statetransition.forkchoice.ForkChoice; import tech.pegasys.teku.statetransition.forkchoice.MergeTransitionBlockValidator; @@ -74,7 +75,11 @@ class AttestationManagerIntegrationTest { private final AggregatingAttestationPool attestationPool = new AggregatingAttestationPoolV1( - spec, recentChainData, new NoOpMetricsSystem(), DEFAULT_MAXIMUM_ATTESTATION_COUNT); + spec, + recentChainData, + new NoOpMetricsSystem(), + AggregatingAttestationPoolProfiler.NOOP, + DEFAULT_MAXIMUM_ATTESTATION_COUNT); private final MergeTransitionBlockValidator transitionBlockValidator = new MergeTransitionBlockValidator(spec, recentChainData); private final ForkChoice forkChoice = diff --git a/ethereum/statetransition/src/jmh/java/tech/pegasys/teku/statetransition.validation.signatures/AggregatingAttestationPoolBenchmark.java b/ethereum/statetransition/src/jmh/java/tech/pegasys/teku/statetransition.validation.signatures/AggregatingAttestationPoolBenchmark.java index 6ca5611a955..71ce395897c 100644 --- a/ethereum/statetransition/src/jmh/java/tech/pegasys/teku/statetransition.validation.signatures/AggregatingAttestationPoolBenchmark.java +++ b/ethereum/statetransition/src/jmh/java/tech/pegasys/teku/statetransition.validation.signatures/AggregatingAttestationPoolBenchmark.java @@ -61,6 +61,7 @@ import tech.pegasys.teku.statetransition.attestation.AggregatingAttestationPool; import tech.pegasys.teku.statetransition.attestation.AggregatingAttestationPoolV1; import tech.pegasys.teku.statetransition.attestation.AttestationForkChecker; +import tech.pegasys.teku.statetransition.attestation.utils.AggregatingAttestationPoolProfiler; import tech.pegasys.teku.storage.client.RecentChainData; @Warmup(iterations = 5, time = 2000, timeUnit = TimeUnit.MILLISECONDS) @@ -109,7 +110,11 @@ public void init() throws Exception { this.pool = new AggregatingAttestationPoolV1( - SPEC, recentChainData, new NoOpMetricsSystem(), DEFAULT_MAXIMUM_ATTESTATION_COUNT); + SPEC, + recentChainData, + new NoOpMetricsSystem(), + AggregatingAttestationPoolProfiler.NOOP, + DEFAULT_MAXIMUM_ATTESTATION_COUNT); this.recentChainData = mock(RecentChainData.class); try (final FileInputStream fileInputStream = new FileInputStream(STATE_PATH)) { @@ -213,7 +218,11 @@ public void getAttestationsForBlock(final Blackhole bh) { public void add(final Blackhole bh) { var emptyPool = new AggregatingAttestationPoolV1( - SPEC, recentChainData, new NoOpMetricsSystem(), DEFAULT_MAXIMUM_ATTESTATION_COUNT); + SPEC, + recentChainData, + new NoOpMetricsSystem(), + AggregatingAttestationPoolProfiler.NOOP, + DEFAULT_MAXIMUM_ATTESTATION_COUNT); attestations.forEach(emptyPool::add); } diff --git a/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/attestation/AggregatingAttestationPoolV1.java b/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/attestation/AggregatingAttestationPoolV1.java index 31c0ee02526..aad38fe4d11 100644 --- a/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/attestation/AggregatingAttestationPoolV1.java +++ b/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/attestation/AggregatingAttestationPoolV1.java @@ -45,6 +45,7 @@ import tech.pegasys.teku.spec.datastructures.operations.AttestationSchema; import tech.pegasys.teku.spec.datastructures.state.beaconstate.BeaconState; import tech.pegasys.teku.spec.schemas.SchemaDefinitions; +import tech.pegasys.teku.statetransition.attestation.utils.AggregatingAttestationPoolProfiler; import tech.pegasys.teku.storage.client.RecentChainData; /** @@ -68,12 +69,15 @@ public class AggregatingAttestationPoolV1 extends AggregatingAttestationPool { private final SettableGauge sizeGauge; private final int maximumAttestationCount; + private final AggregatingAttestationPoolProfiler aggregatingAttestationPoolProfiler; + private final AtomicInteger size = new AtomicInteger(0); public AggregatingAttestationPoolV1( final Spec spec, final RecentChainData recentChainData, final MetricsSystem metricsSystem, + final AggregatingAttestationPoolProfiler aggregatingAttestationPoolProfiler, final int maximumAttestationCount) { super(spec, recentChainData); this.sizeGauge = @@ -83,6 +87,7 @@ public AggregatingAttestationPoolV1( "attestation_pool_size", "The number of attestations available to be included in proposed blocks"); this.maximumAttestationCount = maximumAttestationCount; + this.aggregatingAttestationPoolProfiler = aggregatingAttestationPoolProfiler; } @Override @@ -140,6 +145,8 @@ public synchronized void onSlot(final UInt64 slot) { } final UInt64 firstValidAttestationSlot = slot.minus(ATTESTATION_RETENTION_SLOTS); removeAttestationsPriorToSlot(firstValidAttestationSlot); + + aggregatingAttestationPoolProfiler.execute(spec, slot, recentChainData, this); } private void removeAttestationsPriorToSlot(final UInt64 firstValidAttestationSlot) { diff --git a/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/attestation/utils/AggregatingAttestationPoolProfilerCSV.java b/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/attestation/utils/AggregatingAttestationPoolProfilerCSV.java new file mode 100644 index 00000000000..8580327e70b --- /dev/null +++ b/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/attestation/utils/AggregatingAttestationPoolProfilerCSV.java @@ -0,0 +1,387 @@ +/* + * Copyright Consensys Software Inc., 2025 + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package tech.pegasys.teku.statetransition.attestation.utils; + +import static tech.pegasys.teku.infrastructure.logging.Converter.gweiToEth; +import static tech.pegasys.teku.spec.constants.IncentivizationWeights.PROPOSER_WEIGHT; +import static tech.pegasys.teku.spec.constants.IncentivizationWeights.WEIGHT_DENOMINATOR; +import static tech.pegasys.teku.spec.logic.versions.altair.helpers.MiscHelpersAltair.PARTICIPATION_FLAG_WEIGHTS; + +import it.unimi.dsi.fastutil.ints.IntList; +import java.io.File; +import java.io.FileWriter; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; +import java.util.stream.IntStream; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import tech.pegasys.teku.bls.BLSSignatureVerifier; +import tech.pegasys.teku.infrastructure.async.SafeFuture; +import tech.pegasys.teku.infrastructure.ssz.SszList; +import tech.pegasys.teku.infrastructure.ssz.primitive.SszByte; +import tech.pegasys.teku.infrastructure.unsigned.UInt64; +import tech.pegasys.teku.spec.Spec; +import tech.pegasys.teku.spec.SpecVersion; +import tech.pegasys.teku.spec.cache.IndexedAttestationCache; +import tech.pegasys.teku.spec.datastructures.operations.Attestation; +import tech.pegasys.teku.spec.datastructures.operations.AttestationData; +import tech.pegasys.teku.spec.datastructures.state.beaconstate.BeaconState; +import tech.pegasys.teku.spec.datastructures.state.beaconstate.versions.altair.BeaconStateAltair; +import tech.pegasys.teku.spec.datastructures.state.beaconstate.versions.altair.MutableBeaconStateAltair; +import tech.pegasys.teku.spec.logic.common.block.AbstractBlockProcessor; +import tech.pegasys.teku.spec.logic.versions.altair.block.BlockProcessorAltair; +import tech.pegasys.teku.spec.logic.versions.altair.helpers.BeaconStateAccessorsAltair; +import tech.pegasys.teku.spec.logic.versions.altair.helpers.MiscHelpersAltair; +import tech.pegasys.teku.statetransition.attestation.AggregatingAttestationPool; +import tech.pegasys.teku.statetransition.attestation.AttestationForkChecker; +import tech.pegasys.teku.statetransition.attestation.utils.RewardBasedAttestationSorter.PooledAttestationWithRewardInfo; +import tech.pegasys.teku.storage.client.RecentChainData; + +public class AggregatingAttestationPoolProfilerCSV implements AggregatingAttestationPoolProfiler { + private static final String[] PACKING_SUMMARY_HEADERS = { + "slot", "total_pool_size", "packed_attestations", "packing_time_millis", "total_rewards_eth" + }; + + private static final String[] ATTESTATION_DETAILS_HEADERS = { + "slot", + "index_in_block", + "distance", + "root", + "source", + "target", + "bits_count", + "committee_bits_count", + "final_reward", + "inblock_reward", + "attestation_data_root", + }; + + private static final String[] ATTESTATION_IMPROVEMENT_HEADERS = { + "slot", "attestation_bits_count", "filled_up", "sorting_reward", "attestation_data_root" + }; + + private final FileWriter packingSummaryCsvWriter; + private final FileWriter attestationDetailsCsvWriter; + private final FileWriter attestationImprovementsCsvWriter; + + private static final long PROPOSER_REWARD_DENOMINATOR = + WEIGHT_DENOMINATOR + .minus(PROPOSER_WEIGHT) + .times(WEIGHT_DENOMINATOR) + .dividedBy(PROPOSER_WEIGHT) + .longValue(); + + public AggregatingAttestationPoolProfilerCSV(final Path outputDir) { + + try { + createDirectory(outputDir); + + File packingSummaryFile = outputDir.resolve("packing_summary.csv").toFile(); + + if (packingSummaryFile.exists()) { + packingSummaryCsvWriter = new FileWriter(packingSummaryFile, StandardCharsets.UTF_8, true); + } else { + packingSummaryCsvWriter = new FileWriter(packingSummaryFile, StandardCharsets.UTF_8, false); + packingSummaryCsvWriter.write(String.join(",", PACKING_SUMMARY_HEADERS) + "\n"); + } + } catch (final IOException e) { + throw new RuntimeException(e); + } + + try { + File attestationsDetailsFile = outputDir.resolve("attestations_details.csv").toFile(); + if (attestationsDetailsFile.exists()) { + attestationDetailsCsvWriter = + new FileWriter(attestationsDetailsFile, StandardCharsets.UTF_8, true); + } else { + attestationDetailsCsvWriter = + new FileWriter(attestationsDetailsFile, StandardCharsets.UTF_8, false); + attestationDetailsCsvWriter.write(String.join(",", ATTESTATION_DETAILS_HEADERS) + "\n"); + } + } catch (final IOException e) { + throw new RuntimeException(e); + } + + try { + File packingSummaryFile = outputDir.resolve("fill_up_details.csv").toFile(); + + if (packingSummaryFile.exists()) { + attestationImprovementsCsvWriter = + new FileWriter(packingSummaryFile, StandardCharsets.UTF_8, true); + } else { + attestationImprovementsCsvWriter = + new FileWriter(packingSummaryFile, StandardCharsets.UTF_8, false); + attestationImprovementsCsvWriter.write( + String.join(",", ATTESTATION_IMPROVEMENT_HEADERS) + "\n"); + } + } catch (final IOException e) { + throw new RuntimeException(e); + } + } + + private static final Logger LOG = LogManager.getLogger(); + + @Override + public void execute( + final Spec spec, + final UInt64 slot, + final RecentChainData recentChainData, + final AggregatingAttestationPool aggregatingAttestationPool) { + final Optional> headState = recentChainData.getBestState(); + if (headState.isEmpty()) { + return; + } + + try { + var preState = spec.processSlots(headState.get().join(), slot); + + final int aggregatingAttestationPoolSize = aggregatingAttestationPool.getSize(); + LOG.info("Pool size: {}", aggregatingAttestationPoolSize); + + var getAttestationsForBlockStart = System.nanoTime(); + var attestationPacking = + aggregatingAttestationPool.getAttestationsForBlock( + preState, new AttestationForkChecker(spec, preState)); + var getAttestationsForBlockEnd = System.nanoTime(); + var packingTotalTimeMillis = + (getAttestationsForBlockEnd - getAttestationsForBlockStart) / 1_000_000; + + spec.atSlot(slot) + .getBlockProcessor() + .processAttestations( + BeaconStateAltair.required(preState).createWritableCopy(), + attestationPacking, + BLSSignatureVerifier.SIMPLE); + + var attestationRewards = + calculateAttestationRewards( + attestationPacking, + BlockProcessorAltair.required(spec.atSlot(slot).getBlockProcessor()), + preState); + + var rewards = gweiToEth(UInt64.valueOf(attestationRewards.stream().reduce(0L, Long::sum))); + + LOG.info( + "getAttestationsForBlock for {} produced {} attestations, rewards: {} ETH, timing: {} milliseconds", + slot, + attestationPacking.size(), + rewards, + packingTotalTimeMillis); + + try { + packingSummaryCsvWriter.write( + String.join( + ",", + slot.toString(), + String.valueOf(aggregatingAttestationPoolSize), + String.valueOf(attestationPacking.size()), + String.valueOf(packingTotalTimeMillis), + rewards) + + "\n"); + } catch (IOException e) { + LOG.warn("Failed to write to CSV", e); + } + + var rewardsCalculator = AttestationRewardCalculator.create(spec, preState); + + IntStream.range(0, attestationPacking.size()) + .forEach( + i -> { + final Attestation attestation = attestationPacking.get(i); + final AttestationData data = attestation.getData(); + + var numerator = rewardsCalculator.getRewardNumeratorForAttestation(attestation); + try { + attestationDetailsCsvWriter.write( + String.join( + ",", + slot.toString(), + String.valueOf(i), + preState.getSlot().minus(data.getSlot()).toString(), + data.getBeaconBlockRoot().toHexString(), + data.getSource().getEpoch().toString(), + data.getTarget().getEpoch().toString(), + String.valueOf(attestation.getAggregationBits().getBitCount()), + attestation + .getCommitteeBits() + .map(sszBits -> String.valueOf(sszBits.getBitCount())) + .orElse("N/A"), + getEthRewardFromNumerator(UInt64.valueOf(numerator)), + gweiToEth(UInt64.valueOf(attestationRewards.get(i))), + data.hashTreeRoot().toString()) + + "\n"); + } catch (IOException e) { + LOG.error("Failed to write to CSV", e); + } + }); + + } catch (final Exception e) { + LOG.error("Error occurred while profiling AggregatingAttestationPool", e); + } finally { + try { + packingSummaryCsvWriter.flush(); + } catch (IOException e) { + LOG.error("Failed to flush CSV printer", e); + } + + try { + attestationDetailsCsvWriter.flush(); + } catch (IOException e) { + LOG.error("Failed to flush CSV printer", e); + } + + try { + attestationImprovementsCsvWriter.flush(); + } catch (IOException e) { + LOG.error("Failed to flush CSV printer", e); + } + } + } + + @Override + public void onPreFillUp( + final BeaconState stateAtBlockSlot, + final PooledAttestationWithRewardInfo validatableAttestationWithSortingReward) { + + var attestation = validatableAttestationWithSortingReward.getAttestation(); + var sortingRewardNumerator = validatableAttestationWithSortingReward.getRewardNumerator(); + + try { + attestationImprovementsCsvWriter.write( + String.join( + ",", + stateAtBlockSlot.getSlot().toString(), + String.valueOf(attestation.pooledAttestation().bits().getBitCount()), + "0", // not filled up + getEthRewardFromNumerator(sortingRewardNumerator), + attestation.data().hashTreeRoot().toString()) + + "\n"); + } catch (final IOException e) { + LOG.error("Error printing CSV record", e); + } + } + + @Override + public void onPostFillUp( + final BeaconState stateAtBlockSlot, + final PooledAttestationWithRewardInfo validatableAttestationWithSortingReward) { + + var attestation = validatableAttestationWithSortingReward.getAttestation(); + var sortingRewardNumerator = validatableAttestationWithSortingReward.getRewardNumerator(); + + try { + attestationImprovementsCsvWriter.write( + String.join( + ",", + stateAtBlockSlot.getSlot().toString(), + String.valueOf(attestation.pooledAttestation().bits().getBitCount()), + "1", // filled up + getEthRewardFromNumerator(sortingRewardNumerator), + attestation.data().hashTreeRoot().toString()) + + "\n"); + } catch (final IOException e) { + LOG.error("Error printing CSV record", e); + } + } + + private String getEthRewardFromNumerator(final UInt64 numerator) { + return gweiToEth(numerator.dividedBy(PROPOSER_REWARD_DENOMINATOR)); + } + + private List calculateAttestationRewards( + final SszList attestations, + final BlockProcessorAltair blockProcessor, + final BeaconState preState) { + final List> rewards = new ArrayList<>(); + final MutableBeaconStateAltair mutableBeaconStateAltair = + BeaconStateAltair.required(preState).createWritableCopy(); + final AbstractBlockProcessor.IndexedAttestationProvider indexedAttestationProvider = + blockProcessor.createIndexedAttestationProvider( + mutableBeaconStateAltair, IndexedAttestationCache.capturing()); + attestations.forEach( + attestation -> + rewards.add( + blockProcessor.processAttestationProposerReward( + mutableBeaconStateAltair, attestation, indexedAttestationProvider))); + + return rewards.stream() + .map(maybeValue -> maybeValue.orElse(UInt64.ZERO)) + .map(UInt64::longValue) + .toList(); + } + + private void createDirectory(final Path path) { + if (!path.toFile().mkdirs()) { + if (!path.toFile().exists()) { + LOG.error("Unable to create directory {}", path); + } + } + } + + private record AttestationRewardCalculator( + Spec spec, + BeaconStateAltair state, + BeaconStateAccessorsAltair beaconStateAccessors, + MiscHelpersAltair miscHelpers) { + + public static AttestationRewardCalculator create(final Spec spec, final BeaconState state) { + final SpecVersion specVersion = spec.atSlot(state.getSlot()); + + return new AttestationRewardCalculator( + spec, + BeaconStateAltair.required(state), + BeaconStateAccessorsAltair.required(specVersion.beaconStateAccessors()), + specVersion.miscHelpers().toVersionAltair().orElseThrow()); + } + + public long getRewardNumeratorForAttestation(final Attestation attestation) { + final AttestationData data = attestation.getData(); + final List participationFlagIndices = + BeaconStateAccessorsAltair.required(beaconStateAccessors) + .getAttestationParticipationFlagIndices( + state, data, state.getSlot().minusMinZero(data.getSlot())); + + final SszList epochParticipation; + if (data.getTarget().getEpoch().equals(spec.getCurrentEpoch(state))) { + epochParticipation = state.getCurrentEpochParticipation(); + } else { + epochParticipation = state.getPreviousEpochParticipation(); + } + + UInt64 proposerRewardNumerator = UInt64.ZERO; + final IntList attestingIndices = spec.getAttestingIndices(state, attestation); + for (final Integer attestingIndex : attestingIndices) { + for (int flagIndex = 0; flagIndex < PARTICIPATION_FLAG_WEIGHTS.size(); flagIndex++) { + if (participationFlagIndices.contains(flagIndex) + && !miscHelpers.hasFlag(epochParticipation.get(attestingIndex).get(), flagIndex)) { + + final UInt64 weight = PARTICIPATION_FLAG_WEIGHTS.get(flagIndex); + + final UInt64 reward = + BeaconStateAccessorsAltair.required(beaconStateAccessors) + .getBaseReward(state, attestingIndex); + + proposerRewardNumerator = proposerRewardNumerator.plus(reward.times(weight)); + } + } + } + + return proposerRewardNumerator.longValue(); + } + } +} diff --git a/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/attestation/utils/AggregatingAttestationPoolProfilerLog.java b/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/attestation/utils/AggregatingAttestationPoolProfilerLog.java new file mode 100644 index 00000000000..77ada53ee79 --- /dev/null +++ b/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/attestation/utils/AggregatingAttestationPoolProfilerLog.java @@ -0,0 +1,147 @@ +/* + * Copyright Consensys Software Inc., 2025 + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package tech.pegasys.teku.statetransition.attestation.utils; + +import static tech.pegasys.teku.infrastructure.logging.Converter.gweiToEth; + +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; +import java.util.stream.IntStream; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import tech.pegasys.teku.bls.BLSSignatureVerifier; +import tech.pegasys.teku.infrastructure.async.SafeFuture; +import tech.pegasys.teku.infrastructure.ssz.SszList; +import tech.pegasys.teku.infrastructure.unsigned.UInt64; +import tech.pegasys.teku.spec.Spec; +import tech.pegasys.teku.spec.cache.IndexedAttestationCache; +import tech.pegasys.teku.spec.datastructures.operations.Attestation; +import tech.pegasys.teku.spec.datastructures.state.beaconstate.BeaconState; +import tech.pegasys.teku.spec.datastructures.state.beaconstate.versions.altair.BeaconStateAltair; +import tech.pegasys.teku.spec.datastructures.state.beaconstate.versions.altair.MutableBeaconStateAltair; +import tech.pegasys.teku.spec.logic.common.block.AbstractBlockProcessor; +import tech.pegasys.teku.spec.logic.versions.altair.block.BlockProcessorAltair; +import tech.pegasys.teku.statetransition.attestation.AggregatingAttestationPool; +import tech.pegasys.teku.statetransition.attestation.AttestationForkChecker; +import tech.pegasys.teku.statetransition.attestation.utils.RewardBasedAttestationSorter.PooledAttestationWithRewardInfo; +import tech.pegasys.teku.storage.client.RecentChainData; + +public class AggregatingAttestationPoolProfilerLog implements AggregatingAttestationPoolProfiler { + private static final Logger LOG = LogManager.getLogger(); + + @Override + public void execute( + final Spec spec, + final UInt64 slot, + final RecentChainData recentChainData, + final AggregatingAttestationPool aggregatingAttestationPool) { + final Optional> headState = recentChainData.getBestState(); + if (headState.isEmpty()) { + return; + } + + try { + var preState = spec.processSlots(headState.get().join(), slot); + + LOG.info("Pool size: {}", aggregatingAttestationPool.getSize()); + var getAttestationsForBlockStart = System.nanoTime(); + var attestationPacking = + aggregatingAttestationPool.getAttestationsForBlock( + preState, new AttestationForkChecker(spec, preState)); + var getAttestationsForBlockEnd = System.nanoTime(); + + spec.atSlot(slot) + .getBlockProcessor() + .processAttestations( + BeaconStateAltair.required(preState).createWritableCopy(), + attestationPacking, + BLSSignatureVerifier.SIMPLE); + + var rewards = + gweiToEth( + UInt64.valueOf( + calculateAttestationRewards( + attestationPacking, + BlockProcessorAltair.required(spec.atSlot(slot).getBlockProcessor()), + preState))); + + LOG.info( + "getAttestationsForBlock for {} produced {} attestations, rewards: {}ETH, timing: {} milliseconds", + slot, + attestationPacking.size(), + rewards, + (getAttestationsForBlockEnd - getAttestationsForBlockStart) / 1_000_000); + + IntStream.range(0, attestationPacking.size()) + .forEach( + i -> { + final Attestation attestation = attestationPacking.get(i); + LOG.info( + "attestation {}: bits: {}, committee bits: {}, {}", + i, + attestation.getAggregationBits().getBitCount(), + attestation + .getCommitteeBits() + .map(sszBits -> String.valueOf(sszBits.getBitCount())) + .orElse("N/A"), + attestation.getData()); + }); + + } catch (final Exception e) { + LOG.error("Error occurred while profiling AggregatingAttestationPool", e); + } + } + + @Override + public void onPreFillUp( + final BeaconState stateAtBlockSlot, final PooledAttestationWithRewardInfo attestation) { + LOG.info( + "Pre-fill up: attestationDataHash: {}, bits: {}", + attestation.getAttestation().data().hashTreeRoot(), + attestation.getAttestation().pooledAttestation().bits().getBitCount()); + } + + @Override + public void onPostFillUp( + final BeaconState stateAtBlockSlot, final PooledAttestationWithRewardInfo attestation) { + LOG.info( + "Post-fill up: attestationDataHash: {}, bits: {}", + attestation.getAttestation().data().hashTreeRoot(), + attestation.getAttestation().pooledAttestation().bits().getBitCount()); + } + + private long calculateAttestationRewards( + final SszList attestations, + final BlockProcessorAltair blockProcessor, + final BeaconState preState) { + final List> rewards = new ArrayList<>(); + final MutableBeaconStateAltair mutableBeaconStateAltair = + BeaconStateAltair.required(preState).createWritableCopy(); + final AbstractBlockProcessor.IndexedAttestationProvider indexedAttestationProvider = + blockProcessor.createIndexedAttestationProvider( + mutableBeaconStateAltair, IndexedAttestationCache.capturing()); + attestations.forEach( + attestation -> + rewards.add( + blockProcessor.processAttestationProposerReward( + mutableBeaconStateAltair, attestation, indexedAttestationProvider))); + + return rewards.stream() + .filter(Optional::isPresent) + .map(Optional::get) + .map(UInt64::longValue) + .reduce(0L, Long::sum); + } +} diff --git a/ethereum/statetransition/src/test/java/tech/pegasys/teku/statetransition/attestation/AggregatingAttestationPoolV1Test.java b/ethereum/statetransition/src/test/java/tech/pegasys/teku/statetransition/attestation/AggregatingAttestationPoolV1Test.java index f150c93b65d..7073d61d0e5 100644 --- a/ethereum/statetransition/src/test/java/tech/pegasys/teku/statetransition/attestation/AggregatingAttestationPoolV1Test.java +++ b/ethereum/statetransition/src/test/java/tech/pegasys/teku/statetransition/attestation/AggregatingAttestationPoolV1Test.java @@ -19,6 +19,7 @@ import org.hyperledger.besu.metrics.noop.NoOpMetricsSystem; import tech.pegasys.teku.spec.Spec; import tech.pegasys.teku.spec.TestSpecContext; +import tech.pegasys.teku.statetransition.attestation.utils.AggregatingAttestationPoolProfiler; import tech.pegasys.teku.storage.client.RecentChainData; @TestSpecContext(milestone = {PHASE0, ELECTRA}) @@ -28,6 +29,10 @@ public class AggregatingAttestationPoolV1Test extends AggregatingAttestationPool AggregatingAttestationPool instantiatePool( final Spec spec, final RecentChainData recentChainData, final int maxAttestations) { return new AggregatingAttestationPoolV1( - spec, recentChainData, new NoOpMetricsSystem(), maxAttestations); + spec, + recentChainData, + new NoOpMetricsSystem(), + AggregatingAttestationPoolProfiler.NOOP, + maxAttestations); } } diff --git a/services/beaconchain/src/main/java/tech/pegasys/teku/services/beaconchain/BeaconChainController.java b/services/beaconchain/src/main/java/tech/pegasys/teku/services/beaconchain/BeaconChainController.java index d89c96d0699..d29f3a64e0a 100644 --- a/services/beaconchain/src/main/java/tech/pegasys/teku/services/beaconchain/BeaconChainController.java +++ b/services/beaconchain/src/main/java/tech/pegasys/teku/services/beaconchain/BeaconChainController.java @@ -126,6 +126,7 @@ import tech.pegasys.teku.statetransition.attestation.AggregatingAttestationPool; import tech.pegasys.teku.statetransition.attestation.AggregatingAttestationPoolV1; import tech.pegasys.teku.statetransition.attestation.AttestationManager; +import tech.pegasys.teku.statetransition.attestation.utils.AggregatingAttestationPoolProfiler; import tech.pegasys.teku.statetransition.blobs.BlobSidecarManager; import tech.pegasys.teku.statetransition.blobs.BlobSidecarManager.RemoteOrigin; import tech.pegasys.teku.statetransition.blobs.BlobSidecarManagerImpl; @@ -1211,7 +1212,11 @@ public void initAttestationPool() { LOG.debug("BeaconChainController.initAttestationPool()"); attestationPool = new AggregatingAttestationPoolV1( - spec, recentChainData, metricsSystem, DEFAULT_MAXIMUM_ATTESTATION_COUNT); + spec, + recentChainData, + metricsSystem, + AggregatingAttestationPoolProfiler.NOOP, + DEFAULT_MAXIMUM_ATTESTATION_COUNT); eventChannels.subscribe(SlotEventsChannel.class, attestationPool); blockImporter.subscribeToVerifiedBlockAttestations( attestationPool::onAttestationsIncludedInBlock);