diff --git a/ethereum/statetransition/src/integration-test/java/tech/pegasys/teku/statetransition/attestation/AttestationManagerIntegrationTest.java b/ethereum/statetransition/src/integration-test/java/tech/pegasys/teku/statetransition/attestation/AttestationManagerIntegrationTest.java index 69d5a7f0d86..72cb320e41c 100644 --- a/ethereum/statetransition/src/integration-test/java/tech/pegasys/teku/statetransition/attestation/AttestationManagerIntegrationTest.java +++ b/ethereum/statetransition/src/integration-test/java/tech/pegasys/teku/statetransition/attestation/AttestationManagerIntegrationTest.java @@ -73,7 +73,7 @@ class AttestationManagerIntegrationTest { new AggregateGenerator(spec, storageSystem.chainBuilder().getValidatorKeys()); private final AggregatingAttestationPool attestationPool = - new AggregatingAttestationPool( + new AggregatingAttestationPoolV1( spec, recentChainData, new NoOpMetricsSystem(), DEFAULT_MAXIMUM_ATTESTATION_COUNT); private final MergeTransitionBlockValidator transitionBlockValidator = new MergeTransitionBlockValidator(spec, recentChainData); diff --git a/ethereum/statetransition/src/jmh/java/tech/pegasys/teku/statetransition.validation.signatures/AggregatingAttestationPoolBenchmark.java b/ethereum/statetransition/src/jmh/java/tech/pegasys/teku/statetransition.validation.signatures/AggregatingAttestationPoolBenchmark.java index c5d59c0a68e..6ca5611a955 100644 --- a/ethereum/statetransition/src/jmh/java/tech/pegasys/teku/statetransition.validation.signatures/AggregatingAttestationPoolBenchmark.java +++ b/ethereum/statetransition/src/jmh/java/tech/pegasys/teku/statetransition.validation.signatures/AggregatingAttestationPoolBenchmark.java @@ -59,6 +59,7 @@ import tech.pegasys.teku.spec.schemas.SchemaDefinitionsElectra; import tech.pegasys.teku.spec.util.DataStructureUtil; import tech.pegasys.teku.statetransition.attestation.AggregatingAttestationPool; +import tech.pegasys.teku.statetransition.attestation.AggregatingAttestationPoolV1; import tech.pegasys.teku.statetransition.attestation.AttestationForkChecker; import tech.pegasys.teku.storage.client.RecentChainData; @@ -107,7 +108,7 @@ public void init() throws Exception { new HashMap<>(); this.pool = - new AggregatingAttestationPool( + new AggregatingAttestationPoolV1( SPEC, recentChainData, new NoOpMetricsSystem(), DEFAULT_MAXIMUM_ATTESTATION_COUNT); this.recentChainData = mock(RecentChainData.class); @@ -211,7 +212,7 @@ public void getAttestationsForBlock(final Blackhole bh) { @BenchmarkMode(Mode.AverageTime) public void add(final Blackhole bh) { var emptyPool = - new AggregatingAttestationPool( + new AggregatingAttestationPoolV1( SPEC, recentChainData, new NoOpMetricsSystem(), DEFAULT_MAXIMUM_ATTESTATION_COUNT); attestations.forEach(emptyPool::add); } diff --git a/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/attestation/AggregatingAttestationPool.java b/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/attestation/AggregatingAttestationPool.java index e44b2b71db6..0471f5fde83 100644 --- a/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/attestation/AggregatingAttestationPool.java +++ b/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/attestation/AggregatingAttestationPool.java @@ -13,59 +13,17 @@ package tech.pegasys.teku.statetransition.attestation; -import it.unimi.dsi.fastutil.ints.Int2IntMap; -import java.util.Collection; -import java.util.Comparator; -import java.util.HashMap; -import java.util.HashSet; import java.util.List; -import java.util.Map; -import java.util.NavigableMap; -import java.util.Objects; import java.util.Optional; -import java.util.Set; -import java.util.TreeMap; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.function.Predicate; -import java.util.stream.Stream; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; -import org.apache.tuweni.bytes.Bytes; import org.apache.tuweni.bytes.Bytes32; -import org.hyperledger.besu.plugin.services.MetricsSystem; import tech.pegasys.teku.ethereum.events.SlotEventsChannel; -import tech.pegasys.teku.infrastructure.metrics.SettableGauge; -import tech.pegasys.teku.infrastructure.metrics.TekuMetricCategory; import tech.pegasys.teku.infrastructure.ssz.SszList; -import tech.pegasys.teku.infrastructure.ssz.schema.SszListSchema; import tech.pegasys.teku.infrastructure.unsigned.UInt64; -import tech.pegasys.teku.spec.Spec; import tech.pegasys.teku.spec.datastructures.attestation.ValidatableAttestation; import tech.pegasys.teku.spec.datastructures.operations.Attestation; -import tech.pegasys.teku.spec.datastructures.operations.AttestationData; -import tech.pegasys.teku.spec.datastructures.operations.AttestationSchema; import tech.pegasys.teku.spec.datastructures.state.beaconstate.BeaconState; -import tech.pegasys.teku.spec.logic.common.helpers.MiscHelpers; -import tech.pegasys.teku.spec.schemas.SchemaDefinitions; -import tech.pegasys.teku.storage.client.RecentChainData; - -/** - * Maintains a pool of attestations. Attestations can be retrieved either for inclusion in a block - * or as an aggregate to publish as part of the naive attestation aggregation algorithm. In both - * cases the returned attestations are aggregated to maximise the number of validators that can be - * included. - */ -public class AggregatingAttestationPool implements SlotEventsChannel { - private static final Logger LOG = LogManager.getLogger(); - - /** The valid attestation retention period is 64 slots in deneb */ - static final long ATTESTATION_RETENTION_SLOTS = 64; - - static final Comparator ATTESTATION_INCLUSION_COMPARATOR = - Comparator.comparingInt( - attestation -> attestation.pooledAttestation().bits().getBitCount()) - .reversed(); +public interface AggregatingAttestationPool extends SlotEventsChannel { /** * Default maximum number of attestations to store in the pool. * @@ -75,311 +33,25 @@ public class AggregatingAttestationPool implements SlotEventsChannel { *

Strictly to cache all attestations for a full 2 epochs is significantly larger than this * cache. */ - public static final int DEFAULT_MAXIMUM_ATTESTATION_COUNT = 187_500; - - private final Map attestationGroupByDataHash = - new HashMap<>(); - private final NavigableMap> dataHashBySlot = new TreeMap<>(); - - private final Spec spec; - private final RecentChainData recentChainData; - private final SettableGauge sizeGauge; - private final int maximumAttestationCount; - - private final AtomicInteger size = new AtomicInteger(0); - - public AggregatingAttestationPool( - final Spec spec, - final RecentChainData recentChainData, - final MetricsSystem metricsSystem, - final int maximumAttestationCount) { - this.spec = spec; - this.recentChainData = recentChainData; - this.sizeGauge = - SettableGauge.create( - metricsSystem, - TekuMetricCategory.BEACON, - "attestation_pool_size", - "The number of attestations available to be included in proposed blocks"); - this.maximumAttestationCount = maximumAttestationCount; - } - - public synchronized void add(final ValidatableAttestation attestation) { - final Optional committeesSize = - attestation.getCommitteesSize().or(() -> getCommitteesSize(attestation.getAttestation())); - getOrCreateAttestationGroup(attestation.getAttestation(), committeesSize) - .ifPresent( - attestationGroup -> { - final boolean added = - attestationGroup.add( - PooledAttestation.fromValidatableAttestation(attestation), - attestation.getCommitteeShufflingSeed()); - if (added) { - updateSize(1); - } - }); - // Always keep the latest slot attestations, so we don't discard everything - int currentSize = getSize(); - while (dataHashBySlot.size() > 1 && currentSize > maximumAttestationCount) { - LOG.trace("Attestation cache at {} exceeds {}, ", currentSize, maximumAttestationCount); - final UInt64 firstSlotToKeep = dataHashBySlot.firstKey().plus(1); - removeAttestationsPriorToSlot(firstSlotToKeep); - currentSize = getSize(); - } - } - - private Optional getCommitteesSize(final Attestation attestation) { - if (attestation.requiresCommitteeBits()) { - return getCommitteesSizeUsingTheState(attestation.getData()); - } - return Optional.empty(); - } - - /** - * @param committeesSize Required for aggregating attestations as per EIP-7549 - */ - private Optional getOrCreateAttestationGroup( - final Attestation attestation, final Optional committeesSize) { - final AttestationData attestationData = attestation.getData(); - // if an attestation has committee bits, committees size should have been computed. If this is - // not the case, we should ignore this attestation and not add it to the pool - if (attestation.requiresCommitteeBits() && committeesSize.isEmpty()) { - LOG.debug( - "Committees size couldn't be retrieved for attestation at slot {}, block root {} and target root {}. Will NOT add this attestation to the pool.", - attestationData.getSlot(), - attestationData.getBeaconBlockRoot(), - attestationData.getTarget().getRoot()); - return Optional.empty(); - } - dataHashBySlot - .computeIfAbsent(attestationData.getSlot(), slot -> new HashSet<>()) - .add(attestationData.hashTreeRoot()); - final MatchingDataAttestationGroup attestationGroup = - attestationGroupByDataHash.computeIfAbsent( - attestationData.hashTreeRoot(), - key -> new MatchingDataAttestationGroup(spec, attestationData, committeesSize)); - return Optional.of(attestationGroup); - } - - private Optional getCommitteesSizeUsingTheState( - final AttestationData attestationData) { - // we can use the first state of the epoch to get committees for an attestation - final MiscHelpers miscHelpers = spec.atSlot(attestationData.getSlot()).miscHelpers(); - final Optional maybeEpoch = recentChainData.getCurrentEpoch(); - // the only reason this can happen is we don't have a store yet. - if (maybeEpoch.isEmpty()) { - return Optional.empty(); - } - final UInt64 currentEpoch = maybeEpoch.get(); - final UInt64 attestationEpoch = miscHelpers.computeEpochAtSlot(attestationData.getSlot()); - - LOG.debug("currentEpoch {}, attestationEpoch {}", currentEpoch, attestationEpoch); - if (attestationEpoch.equals(currentEpoch) - || attestationEpoch.equals(currentEpoch.minusMinZero(1))) { - - return recentChainData - .getBestState() - .flatMap( - state -> { - try { - return Optional.of( - spec.getBeaconCommitteesSize( - state.getImmediately(), attestationData.getSlot())); - } catch (IllegalStateException e) { - LOG.debug( - "Couldn't retrieve state for committee calculation of slot {}", - attestationData.getSlot()); - return Optional.empty(); - } - }); - } + int DEFAULT_MAXIMUM_ATTESTATION_COUNT = 187_500; - // attestation is not from the current or previous epoch - // this is really an edge case because the current or previous epoch is at least 31 slots - // and the attestation is only valid for 64 slots, so it may be epoch-2 but not beyond. - final UInt64 attestationEpochStartSlot = miscHelpers.computeStartSlotAtEpoch(attestationEpoch); - LOG.debug("State at slot {} needed", attestationEpochStartSlot); - try { - return recentChainData - .retrieveStateInEffectAtSlot(attestationEpochStartSlot) - .getImmediately() - .map(state -> spec.getBeaconCommitteesSize(state, attestationData.getSlot())); - } catch (final IllegalStateException e) { - LOG.debug( - "Couldn't retrieve state in effect at slot {} for committee calculation of slot {}", - attestationEpochStartSlot, - attestationData.getSlot()); - return Optional.empty(); - } - } - - @Override - public synchronized void onSlot(final UInt64 slot) { - if (slot.compareTo(ATTESTATION_RETENTION_SLOTS) <= 0) { - return; - } - final UInt64 firstValidAttestationSlot = slot.minus(ATTESTATION_RETENTION_SLOTS); - removeAttestationsPriorToSlot(firstValidAttestationSlot); - } - - private void removeAttestationsPriorToSlot(final UInt64 firstValidAttestationSlot) { - final Collection> dataHashesToRemove = - dataHashBySlot.headMap(firstValidAttestationSlot, false).values(); - dataHashesToRemove.stream() - .flatMap(Set::stream) - .forEach( - key -> { - final int removed = attestationGroupByDataHash.get(key).size(); - attestationGroupByDataHash.remove(key); - updateSize(-removed); - }); - if (!dataHashesToRemove.isEmpty()) { - LOG.trace( - "firstValidAttestationSlot: {}, removing: {}", - () -> firstValidAttestationSlot, - dataHashesToRemove::size); - } - dataHashesToRemove.clear(); - } - - public synchronized void onAttestationsIncludedInBlock( - final UInt64 slot, final Iterable attestations) { - attestations.forEach(attestation -> onAttestationIncludedInBlock(slot, attestation)); - } - - private void onAttestationIncludedInBlock(final UInt64 slot, final Attestation attestation) { - getOrCreateAttestationGroup(attestation, getCommitteesSize(attestation)) - .ifPresent( - attestationGroup -> { - final int numRemoved = - attestationGroup.onAttestationIncludedInBlock(slot, attestation); - updateSize(-numRemoved); - }); - } - - private void updateSize(final int delta) { - final int currentSize = size.addAndGet(delta); - sizeGauge.set(currentSize); - } - - public synchronized int getSize() { - return size.get(); - } - - public synchronized SszList getAttestationsForBlock( - final BeaconState stateAtBlockSlot, final AttestationForkChecker forkChecker) { - final UInt64 currentEpoch = spec.getCurrentEpoch(stateAtBlockSlot); - final int previousEpochLimit = spec.getPreviousEpochAttestationCapacity(stateAtBlockSlot); - - final SchemaDefinitions schemaDefinitions = - spec.atSlot(stateAtBlockSlot.getSlot()).getSchemaDefinitions(); - final AttestationSchema attestationSchema = - schemaDefinitions.getAttestationSchema(); - final SszListSchema attestationsSchema = - schemaDefinitions.getBeaconBlockBodySchema().getAttestationsSchema(); - - final boolean blockRequiresAttestationsWithCommitteeBits = - attestationSchema.requiresCommitteeBits(); - - final AtomicInteger prevEpochCount = new AtomicInteger(0); - - return dataHashBySlot - // We can immediately skip any attestations from the block slot or later - .headMap(stateAtBlockSlot.getSlot(), false) - .descendingMap() - .values() - .stream() - .flatMap( - dataHashSetForSlot -> - streamAggregatesForDataHashesBySlot( - dataHashSetForSlot, - stateAtBlockSlot, - forkChecker, - blockRequiresAttestationsWithCommitteeBits)) - .limit(attestationsSchema.getMaxLength()) - .filter( - attestation -> { - if (spec.computeEpochAtSlot(attestation.data().getSlot()).isLessThan(currentEpoch)) { - final int currentCount = prevEpochCount.getAndIncrement(); - return currentCount < previousEpochLimit; - } - return true; - }) - .map(pooledAttestation -> pooledAttestation.toAttestation(attestationSchema)) - .collect(attestationsSchema.collector()); - } - - private Stream streamAggregatesForDataHashesBySlot( - final Set dataHashSetForSlot, - final BeaconState stateAtBlockSlot, - final AttestationForkChecker forkChecker, - final boolean blockRequiresAttestationsWithCommitteeBits) { - - return dataHashSetForSlot.stream() - .map(attestationGroupByDataHash::get) - .filter(Objects::nonNull) - .filter(group -> isValid(stateAtBlockSlot, group.getAttestationData())) - .filter(forkChecker::areAttestationsFromCorrectFork) - .flatMap(MatchingDataAttestationGroup::stream) - .filter( - attestation -> - attestation.pooledAttestation().bits().requiresCommitteeBits() - == blockRequiresAttestationsWithCommitteeBits) - .sorted(ATTESTATION_INCLUSION_COMPARATOR); - } - - public synchronized List getAttestations( - final Optional maybeSlot, final Optional maybeCommitteeIndex) { - - final Predicate>> filterForSlot = - (entry) -> maybeSlot.map(slot -> entry.getKey().equals(slot)).orElse(true); - - final UInt64 slot = maybeSlot.orElse(recentChainData.getCurrentSlot().orElse(UInt64.ZERO)); - final SchemaDefinitions schemaDefinitions = spec.atSlot(slot).getSchemaDefinitions(); - final AttestationSchema attestationSchema = - schemaDefinitions.getAttestationSchema(); + /** The valid attestation retention period is 64 slots in deneb */ + long ATTESTATION_RETENTION_SLOTS = 64; - final boolean requiresCommitteeBits = - schemaDefinitions.getAttestationSchema().requiresCommitteeBits(); + int getSize(); - return dataHashBySlot.descendingMap().entrySet().stream() - .filter(filterForSlot) - .map(Map.Entry::getValue) - .flatMap(Collection::stream) - .map(attestationGroupByDataHash::get) - .filter(Objects::nonNull) - .flatMap( - matchingDataAttestationGroup -> - matchingDataAttestationGroup.stream(maybeCommitteeIndex, requiresCommitteeBits)) - .map(pooledAttestation -> pooledAttestation.toAttestation(attestationSchema)) - .toList(); - } + void add(ValidatableAttestation attestation); - private boolean isValid( - final BeaconState stateAtBlockSlot, final AttestationData attestationData) { - return spec.validateAttestation(stateAtBlockSlot, attestationData).isEmpty(); - } + SszList getAttestationsForBlock( + BeaconState stateAtBlockSlot, AttestationForkChecker forkChecker); - public synchronized Optional createAggregateFor( - final Bytes32 attestationHashTreeRoot, final Optional committeeIndex) { - final MatchingDataAttestationGroup group = - attestationGroupByDataHash.get(attestationHashTreeRoot); - if (group == null) { - return Optional.empty(); - } + Optional createAggregateFor( + Bytes32 attestationHashTreeRoot, Optional committeeIndex); - final AttestationSchema attestationSchema = - spec.atSlot(group.getAttestationData().getSlot()) - .getSchemaDefinitions() - .getAttestationSchema(); + List getAttestations( + Optional maybeSlot, Optional maybeCommitteeIndex); - return group.stream(committeeIndex) - .findFirst() - .map(pooledAttestation -> pooledAttestation.toAttestation(attestationSchema)); - } + void onAttestationsIncludedInBlock(UInt64 slot, Iterable attestations); - public synchronized void onReorg(final UInt64 commonAncestorSlot) { - attestationGroupByDataHash.values().forEach(group -> group.onReorg(commonAncestorSlot)); - } + void onReorg(UInt64 commonAncestorSlot); } diff --git a/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/attestation/AggregatingAttestationPoolV1.java b/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/attestation/AggregatingAttestationPoolV1.java new file mode 100644 index 00000000000..d7caca9ed2f --- /dev/null +++ b/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/attestation/AggregatingAttestationPoolV1.java @@ -0,0 +1,377 @@ +/* + * Copyright Consensys Software Inc., 2025 + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package tech.pegasys.teku.statetransition.attestation; + +import it.unimi.dsi.fastutil.ints.Int2IntMap; +import java.util.Collection; +import java.util.Comparator; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.NavigableMap; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; +import java.util.TreeMap; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Predicate; +import java.util.stream.Stream; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.tuweni.bytes.Bytes; +import org.apache.tuweni.bytes.Bytes32; +import org.hyperledger.besu.plugin.services.MetricsSystem; +import tech.pegasys.teku.infrastructure.metrics.SettableGauge; +import tech.pegasys.teku.infrastructure.metrics.TekuMetricCategory; +import tech.pegasys.teku.infrastructure.ssz.SszList; +import tech.pegasys.teku.infrastructure.ssz.schema.SszListSchema; +import tech.pegasys.teku.infrastructure.unsigned.UInt64; +import tech.pegasys.teku.spec.Spec; +import tech.pegasys.teku.spec.datastructures.attestation.ValidatableAttestation; +import tech.pegasys.teku.spec.datastructures.operations.Attestation; +import tech.pegasys.teku.spec.datastructures.operations.AttestationData; +import tech.pegasys.teku.spec.datastructures.operations.AttestationSchema; +import tech.pegasys.teku.spec.datastructures.state.beaconstate.BeaconState; +import tech.pegasys.teku.spec.logic.common.helpers.MiscHelpers; +import tech.pegasys.teku.spec.schemas.SchemaDefinitions; +import tech.pegasys.teku.storage.client.RecentChainData; + +/** + * Maintains a pool of attestations. Attestations can be retrieved either for inclusion in a block + * or as an aggregate to publish as part of the naive attestation aggregation algorithm. In both + * cases the returned attestations are aggregated to maximise the number of validators that can be + * included. + */ +public class AggregatingAttestationPoolV1 implements AggregatingAttestationPool { + private static final Logger LOG = LogManager.getLogger(); + + static final Comparator ATTESTATION_INCLUSION_COMPARATOR = + Comparator.comparingInt( + attestation -> attestation.pooledAttestation().bits().getBitCount()) + .reversed(); + + private final Map attestationGroupByDataHash = + new HashMap<>(); + private final NavigableMap> dataHashBySlot = new TreeMap<>(); + + private final Spec spec; + private final RecentChainData recentChainData; + private final SettableGauge sizeGauge; + private final int maximumAttestationCount; + + private final AtomicInteger size = new AtomicInteger(0); + + public AggregatingAttestationPoolV1( + final Spec spec, + final RecentChainData recentChainData, + final MetricsSystem metricsSystem, + final int maximumAttestationCount) { + this.spec = spec; + this.recentChainData = recentChainData; + this.sizeGauge = + SettableGauge.create( + metricsSystem, + TekuMetricCategory.BEACON, + "attestation_pool_size", + "The number of attestations available to be included in proposed blocks"); + this.maximumAttestationCount = maximumAttestationCount; + } + + @Override + public synchronized void add(final ValidatableAttestation attestation) { + final Optional committeesSize = + attestation.getCommitteesSize().or(() -> getCommitteesSize(attestation.getAttestation())); + getOrCreateAttestationGroup(attestation.getAttestation(), committeesSize) + .ifPresent( + attestationGroup -> { + final boolean added = + attestationGroup.add( + PooledAttestation.fromValidatableAttestation(attestation), + attestation.getCommitteeShufflingSeed()); + if (added) { + updateSize(1); + } + }); + // Always keep the latest slot attestations, so we don't discard everything + int currentSize = getSize(); + while (dataHashBySlot.size() > 1 && currentSize > maximumAttestationCount) { + LOG.trace("Attestation cache at {} exceeds {}, ", currentSize, maximumAttestationCount); + final UInt64 firstSlotToKeep = dataHashBySlot.firstKey().plus(1); + removeAttestationsPriorToSlot(firstSlotToKeep); + currentSize = getSize(); + } + } + + private Optional getCommitteesSize(final Attestation attestation) { + if (attestation.requiresCommitteeBits()) { + return getCommitteesSizeUsingTheState(attestation.getData()); + } + return Optional.empty(); + } + + /** + * @param committeesSize Required for aggregating attestations as per EIP-7549 + */ + private Optional getOrCreateAttestationGroup( + final Attestation attestation, final Optional committeesSize) { + final AttestationData attestationData = attestation.getData(); + // if an attestation has committee bits, committees size should have been computed. If this is + // not the case, we should ignore this attestation and not add it to the pool + if (attestation.requiresCommitteeBits() && committeesSize.isEmpty()) { + LOG.debug( + "Committees size couldn't be retrieved for attestation at slot {}, block root {} and target root {}. Will NOT add this attestation to the pool.", + attestationData.getSlot(), + attestationData.getBeaconBlockRoot(), + attestationData.getTarget().getRoot()); + return Optional.empty(); + } + dataHashBySlot + .computeIfAbsent(attestationData.getSlot(), slot -> new HashSet<>()) + .add(attestationData.hashTreeRoot()); + final MatchingDataAttestationGroup attestationGroup = + attestationGroupByDataHash.computeIfAbsent( + attestationData.hashTreeRoot(), + key -> new MatchingDataAttestationGroup(spec, attestationData, committeesSize)); + return Optional.of(attestationGroup); + } + + private Optional getCommitteesSizeUsingTheState( + final AttestationData attestationData) { + // we can use the first state of the epoch to get committees for an attestation + final MiscHelpers miscHelpers = spec.atSlot(attestationData.getSlot()).miscHelpers(); + final Optional maybeEpoch = recentChainData.getCurrentEpoch(); + // the only reason this can happen is we don't have a store yet. + if (maybeEpoch.isEmpty()) { + return Optional.empty(); + } + final UInt64 currentEpoch = maybeEpoch.get(); + final UInt64 attestationEpoch = miscHelpers.computeEpochAtSlot(attestationData.getSlot()); + + LOG.debug("currentEpoch {}, attestationEpoch {}", currentEpoch, attestationEpoch); + if (attestationEpoch.equals(currentEpoch) + || attestationEpoch.equals(currentEpoch.minusMinZero(1))) { + + return recentChainData + .getBestState() + .flatMap( + state -> { + try { + return Optional.of( + spec.getBeaconCommitteesSize( + state.getImmediately(), attestationData.getSlot())); + } catch (IllegalStateException e) { + LOG.debug( + "Couldn't retrieve state for committee calculation of slot {}", + attestationData.getSlot()); + return Optional.empty(); + } + }); + } + + // attestation is not from the current or previous epoch + // this is really an edge case because the current or previous epoch is at least 31 slots + // and the attestation is only valid for 64 slots, so it may be epoch-2 but not beyond. + final UInt64 attestationEpochStartSlot = miscHelpers.computeStartSlotAtEpoch(attestationEpoch); + LOG.debug("State at slot {} needed", attestationEpochStartSlot); + try { + return recentChainData + .retrieveStateInEffectAtSlot(attestationEpochStartSlot) + .getImmediately() + .map(state -> spec.getBeaconCommitteesSize(state, attestationData.getSlot())); + } catch (final IllegalStateException e) { + LOG.debug( + "Couldn't retrieve state in effect at slot {} for committee calculation of slot {}", + attestationEpochStartSlot, + attestationData.getSlot()); + return Optional.empty(); + } + } + + @Override + public synchronized void onSlot(final UInt64 slot) { + if (slot.compareTo(ATTESTATION_RETENTION_SLOTS) <= 0) { + return; + } + final UInt64 firstValidAttestationSlot = slot.minus(ATTESTATION_RETENTION_SLOTS); + removeAttestationsPriorToSlot(firstValidAttestationSlot); + } + + private void removeAttestationsPriorToSlot(final UInt64 firstValidAttestationSlot) { + final Collection> dataHashesToRemove = + dataHashBySlot.headMap(firstValidAttestationSlot, false).values(); + dataHashesToRemove.stream() + .flatMap(Set::stream) + .forEach( + key -> { + final int removed = attestationGroupByDataHash.get(key).size(); + attestationGroupByDataHash.remove(key); + updateSize(-removed); + }); + if (!dataHashesToRemove.isEmpty()) { + LOG.trace( + "firstValidAttestationSlot: {}, removing: {}", + () -> firstValidAttestationSlot, + dataHashesToRemove::size); + } + dataHashesToRemove.clear(); + } + + @Override + public synchronized void onAttestationsIncludedInBlock( + final UInt64 slot, final Iterable attestations) { + attestations.forEach(attestation -> onAttestationIncludedInBlock(slot, attestation)); + } + + private void onAttestationIncludedInBlock(final UInt64 slot, final Attestation attestation) { + getOrCreateAttestationGroup(attestation, getCommitteesSize(attestation)) + .ifPresent( + attestationGroup -> { + final int numRemoved = + attestationGroup.onAttestationIncludedInBlock(slot, attestation); + updateSize(-numRemoved); + }); + } + + private void updateSize(final int delta) { + final int currentSize = size.addAndGet(delta); + sizeGauge.set(currentSize); + } + + @Override + public synchronized int getSize() { + return size.get(); + } + + @Override + public synchronized SszList getAttestationsForBlock( + final BeaconState stateAtBlockSlot, final AttestationForkChecker forkChecker) { + final UInt64 currentEpoch = spec.getCurrentEpoch(stateAtBlockSlot); + final int previousEpochLimit = spec.getPreviousEpochAttestationCapacity(stateAtBlockSlot); + + final SchemaDefinitions schemaDefinitions = + spec.atSlot(stateAtBlockSlot.getSlot()).getSchemaDefinitions(); + final AttestationSchema attestationSchema = + schemaDefinitions.getAttestationSchema(); + final SszListSchema attestationsSchema = + schemaDefinitions.getBeaconBlockBodySchema().getAttestationsSchema(); + + final boolean blockRequiresAttestationsWithCommitteeBits = + attestationSchema.requiresCommitteeBits(); + + final AtomicInteger prevEpochCount = new AtomicInteger(0); + + return dataHashBySlot + // We can immediately skip any attestations from the block slot or later + .headMap(stateAtBlockSlot.getSlot(), false) + .descendingMap() + .values() + .stream() + .flatMap( + dataHashSetForSlot -> + streamAggregatesForDataHashesBySlot( + dataHashSetForSlot, + stateAtBlockSlot, + forkChecker, + blockRequiresAttestationsWithCommitteeBits)) + .limit(attestationsSchema.getMaxLength()) + .filter( + attestation -> { + if (spec.computeEpochAtSlot(attestation.data().getSlot()).isLessThan(currentEpoch)) { + final int currentCount = prevEpochCount.getAndIncrement(); + return currentCount < previousEpochLimit; + } + return true; + }) + .map(pooledAttestation -> pooledAttestation.toAttestation(attestationSchema)) + .collect(attestationsSchema.collector()); + } + + private Stream streamAggregatesForDataHashesBySlot( + final Set dataHashSetForSlot, + final BeaconState stateAtBlockSlot, + final AttestationForkChecker forkChecker, + final boolean blockRequiresAttestationsWithCommitteeBits) { + + return dataHashSetForSlot.stream() + .map(attestationGroupByDataHash::get) + .filter(Objects::nonNull) + .filter(group -> isValid(stateAtBlockSlot, group.getAttestationData())) + .filter(forkChecker::areAttestationsFromCorrectFork) + .flatMap(MatchingDataAttestationGroup::stream) + .filter( + attestation -> + attestation.pooledAttestation().bits().requiresCommitteeBits() + == blockRequiresAttestationsWithCommitteeBits) + .sorted(ATTESTATION_INCLUSION_COMPARATOR); + } + + @Override + public synchronized List getAttestations( + final Optional maybeSlot, final Optional maybeCommitteeIndex) { + + final Predicate>> filterForSlot = + (entry) -> maybeSlot.map(slot -> entry.getKey().equals(slot)).orElse(true); + + final UInt64 slot = maybeSlot.orElse(recentChainData.getCurrentSlot().orElse(UInt64.ZERO)); + final SchemaDefinitions schemaDefinitions = spec.atSlot(slot).getSchemaDefinitions(); + final AttestationSchema attestationSchema = + schemaDefinitions.getAttestationSchema(); + + final boolean requiresCommitteeBits = + schemaDefinitions.getAttestationSchema().requiresCommitteeBits(); + + return dataHashBySlot.descendingMap().entrySet().stream() + .filter(filterForSlot) + .map(Map.Entry::getValue) + .flatMap(Collection::stream) + .map(attestationGroupByDataHash::get) + .filter(Objects::nonNull) + .flatMap( + matchingDataAttestationGroup -> + matchingDataAttestationGroup.stream(maybeCommitteeIndex, requiresCommitteeBits)) + .map(pooledAttestation -> pooledAttestation.toAttestation(attestationSchema)) + .toList(); + } + + private boolean isValid( + final BeaconState stateAtBlockSlot, final AttestationData attestationData) { + return spec.validateAttestation(stateAtBlockSlot, attestationData).isEmpty(); + } + + @Override + public synchronized Optional createAggregateFor( + final Bytes32 attestationHashTreeRoot, final Optional committeeIndex) { + final MatchingDataAttestationGroup group = + attestationGroupByDataHash.get(attestationHashTreeRoot); + if (group == null) { + return Optional.empty(); + } + + final AttestationSchema attestationSchema = + spec.atSlot(group.getAttestationData().getSlot()) + .getSchemaDefinitions() + .getAttestationSchema(); + + return group.stream(committeeIndex) + .findFirst() + .map(pooledAttestation -> pooledAttestation.toAttestation(attestationSchema)); + } + + @Override + public synchronized void onReorg(final UInt64 commonAncestorSlot) { + attestationGroupByDataHash.values().forEach(group -> group.onReorg(commonAncestorSlot)); + } +} diff --git a/ethereum/statetransition/src/test/java/tech/pegasys/teku/statetransition/attestation/AggregatingAttestationPoolTest.java b/ethereum/statetransition/src/test/java/tech/pegasys/teku/statetransition/attestation/AggregatingAttestationPoolTest.java index 6288d870748..33dde53b9ca 100644 --- a/ethereum/statetransition/src/test/java/tech/pegasys/teku/statetransition/attestation/AggregatingAttestationPoolTest.java +++ b/ethereum/statetransition/src/test/java/tech/pegasys/teku/statetransition/attestation/AggregatingAttestationPoolTest.java @@ -34,7 +34,6 @@ import java.util.Optional; import java.util.function.Supplier; import java.util.stream.IntStream; -import org.hyperledger.besu.metrics.noop.NoOpMetricsSystem; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.TestTemplate; import org.mockito.ArgumentMatchers; @@ -45,7 +44,6 @@ import tech.pegasys.teku.infrastructure.unsigned.UInt64; import tech.pegasys.teku.spec.Spec; import tech.pegasys.teku.spec.SpecMilestone; -import tech.pegasys.teku.spec.TestSpecContext; import tech.pegasys.teku.spec.TestSpecFactory; import tech.pegasys.teku.spec.TestSpecInvocationContextProvider.SpecContext; import tech.pegasys.teku.spec.datastructures.attestation.ValidatableAttestation; @@ -60,8 +58,7 @@ import tech.pegasys.teku.storage.client.RecentChainData; import tech.pegasys.teku.storage.store.UpdatableStore; -@TestSpecContext(milestone = {PHASE0, ELECTRA}) -class AggregatingAttestationPoolTest { +abstract class AggregatingAttestationPoolTest { public static final UInt64 SLOT = UInt64.valueOf(1234); private static final int COMMITTEE_SIZE = 130; @@ -73,19 +70,18 @@ class AggregatingAttestationPoolTest { private final Spec mockSpec = mock(Spec.class); private final RecentChainData mockRecentChainData = mock(RecentChainData.class); - private AggregatingAttestationPool aggregatingPool = - new AggregatingAttestationPool( - mockSpec, - mockRecentChainData, - new NoOpMetricsSystem(), - DEFAULT_MAXIMUM_ATTESTATION_COUNT); + private AggregatingAttestationPool aggregatingPool; private final AttestationForkChecker forkChecker = mock(AttestationForkChecker.class); private Int2IntMap committeeSizes; + abstract AggregatingAttestationPool instantiatePool( + final Spec spec, final RecentChainData recentChainData, final int maxAttestations); + @BeforeEach public void setUp(final SpecContext specContext) { + aggregatingPool = instantiatePool(mockSpec, mockRecentChainData, 100); spec = specContext.getSpec(); specMilestone = specContext.getSpecMilestone(); dataStructureUtil = specContext.getDataStructureUtil(); @@ -461,8 +457,7 @@ public void getSize_shouldDecrementForAllRemovedAttestationsWhileKeepingOthers() @TestTemplate void shouldRemoveOldSlotsWhenMaximumNumberOfAttestationsReached() { - aggregatingPool = - new AggregatingAttestationPool(mockSpec, mockRecentChainData, new NoOpMetricsSystem(), 5); + aggregatingPool = instantiatePool(mockSpec, mockRecentChainData, 5); final AttestationData attestationData0 = dataStructureUtil.randomAttestationData(ZERO); final AttestationData attestationData1 = dataStructureUtil.randomAttestationData(ONE); final AttestationData attestationData2 = @@ -486,8 +481,7 @@ void shouldRemoveOldSlotsWhenMaximumNumberOfAttestationsReached() { @TestTemplate void shouldNotRemoveLastSlotEvenWhenMaximumNumberOfAttestationsReached() { - aggregatingPool = - new AggregatingAttestationPool(mockSpec, mockRecentChainData, new NoOpMetricsSystem(), 5); + aggregatingPool = instantiatePool(mockSpec, mockRecentChainData, 5); final AttestationData attestationData = dataStructureUtil.randomAttestationData(ZERO); addAttestationFromValidators(attestationData, 1, 2); addAttestationFromValidators(attestationData, 2, 3); @@ -542,11 +536,7 @@ public void getAttestations_shouldReturnAllAttestations() { assumeThat(specMilestone).isLessThan(ELECTRA); final Spec mockedSpec = mock(Spec.class); final AggregatingAttestationPool aggregatingPool = - new AggregatingAttestationPool( - mockedSpec, - mockRecentChainData, - new NoOpMetricsSystem(), - DEFAULT_MAXIMUM_ATTESTATION_COUNT); + instantiatePool(mockedSpec, mockRecentChainData, DEFAULT_MAXIMUM_ATTESTATION_COUNT); // Adding a phase0 attestation to the aggregation pool final Spec phase0Spec = TestSpecFactory.createMinimalPhase0(); when(mockedSpec.atSlot(any())).thenReturn(phase0Spec.getGenesisSpec()); diff --git a/ethereum/statetransition/src/test/java/tech/pegasys/teku/statetransition/attestation/AggregatingAttestationPoolV1Test.java b/ethereum/statetransition/src/test/java/tech/pegasys/teku/statetransition/attestation/AggregatingAttestationPoolV1Test.java new file mode 100644 index 00000000000..f150c93b65d --- /dev/null +++ b/ethereum/statetransition/src/test/java/tech/pegasys/teku/statetransition/attestation/AggregatingAttestationPoolV1Test.java @@ -0,0 +1,33 @@ +/* + * Copyright Consensys Software Inc., 2025 + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package tech.pegasys.teku.statetransition.attestation; + +import static tech.pegasys.teku.spec.SpecMilestone.ELECTRA; +import static tech.pegasys.teku.spec.SpecMilestone.PHASE0; + +import org.hyperledger.besu.metrics.noop.NoOpMetricsSystem; +import tech.pegasys.teku.spec.Spec; +import tech.pegasys.teku.spec.TestSpecContext; +import tech.pegasys.teku.storage.client.RecentChainData; + +@TestSpecContext(milestone = {PHASE0, ELECTRA}) +public class AggregatingAttestationPoolV1Test extends AggregatingAttestationPoolTest { + + @Override + AggregatingAttestationPool instantiatePool( + final Spec spec, final RecentChainData recentChainData, final int maxAttestations) { + return new AggregatingAttestationPoolV1( + spec, recentChainData, new NoOpMetricsSystem(), maxAttestations); + } +} diff --git a/services/beaconchain/src/main/java/tech/pegasys/teku/services/beaconchain/BeaconChainController.java b/services/beaconchain/src/main/java/tech/pegasys/teku/services/beaconchain/BeaconChainController.java index fbf4dab4c6f..cb8db1ac238 100644 --- a/services/beaconchain/src/main/java/tech/pegasys/teku/services/beaconchain/BeaconChainController.java +++ b/services/beaconchain/src/main/java/tech/pegasys/teku/services/beaconchain/BeaconChainController.java @@ -123,6 +123,7 @@ import tech.pegasys.teku.statetransition.OperationsReOrgManager; import tech.pegasys.teku.statetransition.SimpleOperationPool; import tech.pegasys.teku.statetransition.attestation.AggregatingAttestationPool; +import tech.pegasys.teku.statetransition.attestation.AggregatingAttestationPoolV1; import tech.pegasys.teku.statetransition.attestation.AttestationManager; import tech.pegasys.teku.statetransition.blobs.BlobSidecarManager; import tech.pegasys.teku.statetransition.blobs.BlobSidecarManager.RemoteOrigin; @@ -1200,7 +1201,7 @@ protected void initSlotProcessor() { public void initAttestationPool() { LOG.debug("BeaconChainController.initAttestationPool()"); attestationPool = - new AggregatingAttestationPool( + new AggregatingAttestationPoolV1( spec, recentChainData, metricsSystem, DEFAULT_MAXIMUM_ATTESTATION_COUNT); eventChannels.subscribe(SlotEventsChannel.class, attestationPool); blockImporter.subscribeToVerifiedBlockAttestations(