diff --git a/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/attestation/MatchingDataAttestationGroup.java b/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/attestation/MatchingDataAttestationGroup.java index 443984a07a4..6b7a9a03776 100644 --- a/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/attestation/MatchingDataAttestationGroup.java +++ b/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/attestation/MatchingDataAttestationGroup.java @@ -178,8 +178,8 @@ public int size() { } /** - * Updates {@code seenAggregationBits} and removes any attestation from this group whose - * aggregation bits have all been seen. + * Updates includedValidators bits and removes any attestation from this group whose aggregation + * bits have all been seen. * *

This is well suited for removing attestations that have been included in a block. * diff --git a/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/attestation/MatchingDataAttestationGroupV2.java b/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/attestation/MatchingDataAttestationGroupV2.java new file mode 100644 index 00000000000..6301caa6472 --- /dev/null +++ b/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/attestation/MatchingDataAttestationGroupV2.java @@ -0,0 +1,506 @@ +/* + * Copyright Consensys Software Inc., 2025 + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package tech.pegasys.teku.statetransition.attestation; + +import static com.google.common.base.Preconditions.checkArgument; + +import it.unimi.dsi.fastutil.ints.Int2IntMap; +import java.util.Comparator; +import java.util.Iterator; +import java.util.NavigableMap; +import java.util.Optional; +import java.util.Set; +import java.util.Spliterator; +import java.util.Spliterators; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ConcurrentNavigableMap; +import java.util.concurrent.ConcurrentSkipListMap; +import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.function.LongSupplier; +import java.util.function.Predicate; +import java.util.function.Supplier; +import java.util.stream.Stream; +import java.util.stream.StreamSupport; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.tuweni.bytes.Bytes32; +import tech.pegasys.teku.infrastructure.unsigned.UInt64; +import tech.pegasys.teku.spec.Spec; +import tech.pegasys.teku.spec.datastructures.operations.Attestation; +import tech.pegasys.teku.spec.datastructures.operations.AttestationData; +import tech.pegasys.teku.spec.datastructures.state.beaconstate.BeaconState; +import tech.pegasys.teku.statetransition.attestation.utils.AttestationBits; +import tech.pegasys.teku.statetransition.attestation.utils.TimeLimitingIterator; + +/** + * Maintains an aggregated collection of attestations which all share the same {@link + * AttestationData}. + * + *

So that the added attestations can be aggregated into the smallest number of aggregates, even + * as the contents of the collection change, aggregation is actually done during iteration. + * Aggregation starts with the attestation that already includes the most validators then continues + * adding attestations in order of the number of validators they contain. + * + *

Note that the resulting aggregate will be invalid if attestations with different + * AttestationData are added. + * + *

This V2 implementation uses concurrent collections and a ReadWriteLock for thread-safety. + */ +public class MatchingDataAttestationGroupV2 { + private static final Logger LOG = LogManager.getLogger(); + + // Use Concurrent collections and lock for thread safety + private final ConcurrentNavigableMap> + attestationsByValidatorCount = + new ConcurrentSkipListMap<>( + Comparator.reverseOrder()); // Most validators first, thread-safe map + + private final ConcurrentMap> singleAttestationsByCommitteeIndex = + new ConcurrentHashMap<>(); + + private final Spec spec; + private final AtomicReference> committeeShufflingSeedRef = + new AtomicReference<>(Optional.empty()); + private final AttestationData attestationData; + private final Optional committeesSize; + private final LongSupplier nanosSupplier; + + /** + * Tracks which validators were included in attestations at a given slot on the canonical chain. + * Uses ConcurrentSkipListMap for concurrent access. AttestationBits instances managed within this + * map must be handled carefully under lock if mutable. + * + *

When a reorg occurs we can accurately compute the set of included validators at the common + * ancestor by removing blocks in slots after the ancestor then recalculating {@link + * #includedValidators}. Otherwise, we might remove a validator from the included list because it + * was in a block moved off the canonical chain even though that validator was also included in an + * earlier block which is still on the canonical chain. + * + *

Pruning isn't required for this map because the entire attestation group is dropped by + * {@link AggregatingAttestationPool} once it is too old to be included in blocks. + */ + private final ConcurrentNavigableMap includedValidatorsBySlot = + new ConcurrentSkipListMap<>(); + + /** + * Precalculated combined list of included validators across all blocks. Must be accessed via + * lock. AttestationBits is mutable, so needs protection. + */ + private AttestationBits includedValidators; + + private final ReadWriteLock lock = new ReentrantReadWriteLock(); + private final Lock readLock = lock.readLock(); + private final Lock writeLock = lock.writeLock(); + + private final boolean earlyDropSingleAttestations; + + public MatchingDataAttestationGroupV2( + final Spec spec, + final LongSupplier nanosSupplier, + final AttestationData attestationData, + final Optional committeesSize, + final boolean earlyDropSingleAttestations) { + this.spec = spec; + this.attestationData = attestationData; + this.committeesSize = committeesSize; + this.includedValidators = createEmptyAttestationBits(); + this.earlyDropSingleAttestations = earlyDropSingleAttestations; + this.nanosSupplier = nanosSupplier; + } + + private AttestationBits createEmptyAttestationBits() { + return AttestationBits.fromEmptyFromAttestationSchema( + spec.atSlot(attestationData.getSlot()).getSchemaDefinitions().getAttestationSchema(), + committeesSize); + } + + public AttestationData getAttestationData() { + return attestationData; + } + + public PooledAttestationWithData fillUpAggregation( + final PooledAttestationWithData attestation, final long timeLimitNanos) { + + final AggregateAttestationBuilder builder = new AggregateAttestationBuilder(true); + + builder.aggregate(attestation.pooledAttestation()); + + final Iterator singleAttestationTimeLimitedIterator = + new TimeLimitingIterator<>( + nanosSupplier, + timeLimitNanos, + singleAttestationsByCommitteeIndex.values().stream().flatMap(Set::stream).iterator(), + __ -> LOG.info("Time limit reached, while fillingUp single attestation")); + + while (singleAttestationTimeLimitedIterator.hasNext()) { + builder.aggregate(singleAttestationTimeLimitedIterator.next()); + } + + return new PooledAttestationWithData(attestationData, builder.buildAggregate()); + } + + /** + * Adds an attestation to this group. When possible, the attestation will be aggregated with + * others during iteration. Ignores attestations with no new, unseen aggregation bits. Optimized + * to set the committeeShufflingSeed only once using volatile + double-checked locking. + * + * @param attestation the attestation to add + * @return True if the attestation was added, false otherwise + */ + public boolean add( + final PooledAttestation attestation, final Optional committeeShufflingSeed) { + readLock.lock(); + try { + if (includedValidators.isSuperSetOf(attestation.bits())) { + return false; + } + } finally { + readLock.unlock(); + } + + if (committeeShufflingSeedRef.get().isEmpty()) { + if (committeeShufflingSeed.isPresent()) { + // Attempt to atomically set the value *only if* it's currently Optional.empty() + // This guarantees only one thread succeeds in setting the value. + committeeShufflingSeedRef.compareAndSet(Optional.empty(), committeeShufflingSeed); + // We don't need to check the return value of compareAndSet; + // if it fails, it means another thread already set it, which is fine. + } + } + + if (attestation.isSingleAttestation()) { + final Set singleAttestations = + singleAttestationsByCommitteeIndex.computeIfAbsent( + attestation.bits().getFirstCommitteeIndex(), __ -> ConcurrentHashMap.newKeySet()); + return singleAttestations.add(attestation); + } + + final Set attestations = + attestationsByValidatorCount.computeIfAbsent( + attestation.bits().getBitCount(), __ -> ConcurrentHashMap.newKeySet()); + + // .add() on the ConcurrentHashMap.KeySetView is thread-safe + final boolean added = attestations.add(attestation); + + if (earlyDropSingleAttestations && added && attestation.bits().requiresCommitteeBits()) { + + attestation + .bits() + .streamCommitteeIndices() + .forEach( + committeeIndex -> { + final Set singleAttestations = + singleAttestationsByCommitteeIndex.get(committeeIndex); + if (singleAttestations != null) { + singleAttestations.removeIf(sa -> attestation.bits().isSuperSetOf(sa.bits())); + } + }); + } + + return added; + } + + /** + * Iterates through the aggregation of attestations in this group. The iterator attempts to create + * the minimum number of attestations that include all attestations in the group. + * + *

committeeIndex is an optional parameter that enables aggregation over a specified committee + * (applies to Electra only) + * + *

While it is guaranteed that every validator from an attestation in this group is included in + * an aggregate produced by this iterator, there is no guarantee that the added attestation + * instances themselves will be included. + * + * @return an iterator including attestations for every validator included in this group. + */ + private Iterator createAggregatingIterator( + final long timeLimitNanos, + final Supplier> candidatesStreamSupplier) { + final AttestationBits includedValidatorsCopy; + readLock.lock(); + try { + // Capture a copy of includedValidators under lock for the iterator's isolated use + includedValidatorsCopy = this.includedValidators.copy(); + } finally { + readLock.unlock(); + } + final AggregatingIterator iterator = + new AggregatingIterator(includedValidatorsCopy, candidatesStreamSupplier); + if (timeLimitNanos == Long.MAX_VALUE) { + return iterator; + } + + return new TimeLimitingIterator<>( + nanosSupplier, + timeLimitNanos, + iterator, + __ -> LOG.info("Time limit reached, skipping aggregation")); + } + + public Stream streamForBlockProduction(final long timeLimitNanos) { + return StreamSupport.stream( + spliterator(timeLimitNanos, blockProductionCandidatesStreamSupplier()), false) + .map( + pooledAttestation -> new PooledAttestationWithData(attestationData, pooledAttestation)); + } + + public Stream streamForAggregationProduction( + final Optional committeeIndex, final long timeLimitNanos) { + checkArgument( + committeeIndex.isPresent() || !includedValidators.requiresCommitteeBits(), + "Committee index must be present if committee bits are required"); + return StreamSupport.stream( + spliterator( + timeLimitNanos, aggregationProductionCandidatesStreamSupplier(committeeIndex)), + false) + .map( + pooledAttestation -> new PooledAttestationWithData(attestationData, pooledAttestation)); + } + + public Stream streamForApiRequest( + final Optional committeeIndex, final boolean requiresCommitteeBits) { + if (noMatchingAttestations(committeeIndex, requiresCommitteeBits)) { + return Stream.empty(); + } + return StreamSupport.stream( + spliterator( + Long.MAX_VALUE, + apiRequestCandidatesStreamSupplier(committeeIndex, requiresCommitteeBits)), + false) + .map( + pooledAttestationBitsAndSignature -> + new PooledAttestationWithData(attestationData, pooledAttestationBitsAndSignature)); + } + + private Spliterator spliterator( + final long timeLimitNanos, + final Supplier> candidatesStreamSupplier) { + return Spliterators.spliteratorUnknownSize( + createAggregatingIterator(timeLimitNanos, candidatesStreamSupplier), 0); + } + + /** + * Returns true if there are no attestations in this group. + * + * @return true if this group is empty. + */ + public boolean isEmpty() { + return attestationsByValidatorCount.isEmpty() && singleAttestationsByCommitteeIndex.isEmpty(); + } + + public int size() { + return attestationsByValidatorCount.values().stream().mapToInt(Set::size).sum() + + singleAttestationsByCommitteeIndex.values().stream().mapToInt(Set::size).sum(); + } + + /** + * Updates includedValidators bits and removes any attestation from this group whose aggregation + * bits have all been seen. Needs a write lock as it modifies state. + * + * @param attestation the attestation to logically remove from the pool. + */ + public int onAttestationIncludedInBlock(final UInt64 slot, final Attestation attestation) { + writeLock.lock(); + try { + // Record validators in attestation as seen in this slot + includedValidatorsBySlot.compute( + slot, + (__, includedValidators) -> { + if (includedValidators == null) { + return AttestationBits.of(attestation, committeesSize); + } + // Mutate includedValidators aggregator under write lock + includedValidators.or(attestation); + return includedValidators; + }); + + // Check if already seen before modifying the main includedValidators + if (includedValidators.isSuperSetOf(attestation)) { + // We've already seen and filtered out all of these bits, nothing to do + return 0; + } + // Mutate main includedValidators under write lock + includedValidators.or(attestation); + + // Calculate size *before* removal for accurate delta. + final int sizeBefore = size(); + + attestationsByValidatorCount + .entrySet() + .removeIf(entry -> pruneSupersededPooledAttestations(entry.getValue())); + + singleAttestationsByCommitteeIndex + .entrySet() + .removeIf(entry -> pruneSupersededPooledAttestations(entry.getValue())); + + return sizeBefore - size(); + } finally { + writeLock.unlock(); + } + } + + private boolean pruneSupersededPooledAttestations(final Set candidates) { + candidates.removeIf(candidate -> includedValidators.isSuperSetOf(candidate.bits())); + return candidates.isEmpty(); + } + + public void onReorg(final UInt64 commonAncestorSlot) { + writeLock.lock(); + try { + final NavigableMap removedSlotsView = + includedValidatorsBySlot.tailMap(commonAncestorSlot, false); + if (removedSlotsView.isEmpty()) { + // No relevant attestations in affected slots, so nothing to do. + return; + } + + removedSlotsView.clear(); + + // Recalculate includedValidators as validators may have been seen in multiple blocks + includedValidators = createEmptyAttestationBits(); + includedValidatorsBySlot.values().forEach(includedValidators::or); + } finally { + writeLock.unlock(); + } + } + + public boolean isValid(final BeaconState stateAtBlockSlot, final Spec spec) { + return spec.validateAttestation(stateAtBlockSlot, attestationData).isEmpty(); + } + + public boolean matchesCommitteeShufflingSeed(final Set validSeeds) { + return committeeShufflingSeedRef.get().map(validSeeds::contains).orElse(false); + } + + private boolean noMatchingAttestations( + final Optional committeeIndex, final boolean requiresCommitteeBits) { + readLock.lock(); + try { + return requiresCommitteeBits != includedValidators.requiresCommitteeBits() + || noMatchingPreElectraAttestations(committeeIndex); + } finally { + readLock.unlock(); + } + } + + private boolean noMatchingPreElectraAttestations(final Optional committeeIndex) { + // Assumes called under read lock + return committeeIndex.isPresent() + && !includedValidators.requiresCommitteeBits() + && !attestationData.getIndex().equals(committeeIndex.get()); + } + + private Supplier> blockProductionCandidatesStreamSupplier() { + return () -> attestationsByValidatorCount.values().stream().flatMap(Set::stream); + } + + private Supplier> aggregationProductionCandidatesStreamSupplier( + final Optional maybeCommitteeIndex) { + if (maybeCommitteeIndex.isPresent()) { + // We are in aggregation mode post-Electra. + // Only consider single attestations from the given committee index + return () -> + singleAttestationsByCommitteeIndex + .getOrDefault(maybeCommitteeIndex.get().intValue(), Set.of()) + .stream(); + } + return () -> attestationsByValidatorCount.values().stream().flatMap(Set::stream); + } + + private Supplier> apiRequestCandidatesStreamSupplier( + final Optional maybeCommitteeIndex, final boolean requiresCommitteeBits) { + if (!requiresCommitteeBits) { + // in pre-electra mode this group has been already checked against committee index + // so we can just stream everything (single attestations don't exist) + return () -> attestationsByValidatorCount.values().stream().flatMap(Set::stream); + } + + // post electra we need a committee matcher if the committee index is specified + final Predicate committeeMatcher = + maybeCommitteeIndex + .>map( + committeeIndex -> + (PooledAttestation pooledAttestation) -> + pooledAttestation.bits().isFromCommittee(committeeIndex.intValue())) + .orElse(pooledAttestation -> true); + + return () -> { + final Stream singleAttestationsStream = + maybeCommitteeIndex + .map( + committeeIndex -> + // Stream single attestations by committee index + singleAttestationsByCommitteeIndex + .getOrDefault(committeeIndex.intValue(), Set.of()) + .stream()) + .orElse( + // No committee index filter, stream all single attestations + singleAttestationsByCommitteeIndex.values().stream().flatMap(Set::stream)); + // stream aggregates first and then single attestations + return Stream.concat( + attestationsByValidatorCount.values().stream() + .flatMap(Set::stream) + .filter(committeeMatcher), + singleAttestationsStream); + }; + } + + private static class AggregatingIterator implements Iterator { + private final Supplier> candidatesStreamSupplier; + private final AttestationBits includedValidators; + + private Iterator remainingAttestations; + + private AggregatingIterator( + final AttestationBits includedValidatorsCopy, + final Supplier> candidatesStreamSupplier) { + this.candidatesStreamSupplier = candidatesStreamSupplier; + this.includedValidators = includedValidatorsCopy; + this.remainingAttestations = getRemainingAttestations(); + } + + @Override + public boolean hasNext() { + if (!remainingAttestations.hasNext()) { + remainingAttestations = getRemainingAttestations(); + } + return remainingAttestations.hasNext(); + } + + @Override + public PooledAttestation next() { + final AggregateAttestationBuilder builder = new AggregateAttestationBuilder(true); + + remainingAttestations.forEachRemaining( + candidate -> { + if (builder.aggregate(candidate)) { + includedValidators.or(candidate.bits()); + } + }); + return builder.buildAggregate(); + } + + private Iterator getRemainingAttestations() { + + return candidatesStreamSupplier + .get() + .filter(candidate -> !includedValidators.isSuperSetOf(candidate.bits())) + .iterator(); + } + } +} diff --git a/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/attestation/utils/AttestationBits.java b/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/attestation/utils/AttestationBits.java index 839d35eaa01..3132c7971d7 100644 --- a/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/attestation/utils/AttestationBits.java +++ b/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/attestation/utils/AttestationBits.java @@ -15,6 +15,7 @@ import it.unimi.dsi.fastutil.ints.Int2IntMap; import java.util.Optional; +import java.util.stream.IntStream; import tech.pegasys.teku.infrastructure.ssz.collections.SszBitlist; import tech.pegasys.teku.infrastructure.ssz.collections.SszBitvector; import tech.pegasys.teku.spec.datastructures.attestation.ValidatableAttestation; @@ -80,6 +81,12 @@ static AttestationBits of( boolean isExclusivelyFromCommittee(int committeeIndex); + boolean isFromCommittee(int committeeIndex); + + int getFirstCommitteeIndex(); + + IntStream streamCommitteeIndices(); + /** Creates an independent copy of this instance */ AttestationBits copy(); } diff --git a/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/attestation/utils/AttestationBitsElectra.java b/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/attestation/utils/AttestationBitsElectra.java index eea1a1a5867..d57aed91431 100644 --- a/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/attestation/utils/AttestationBitsElectra.java +++ b/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/attestation/utils/AttestationBitsElectra.java @@ -22,6 +22,7 @@ import java.util.BitSet; import java.util.List; import java.util.Objects; +import java.util.stream.IntStream; import tech.pegasys.teku.infrastructure.ssz.collections.SszBitlist; import tech.pegasys.teku.infrastructure.ssz.collections.SszBitvector; import tech.pegasys.teku.infrastructure.ssz.schema.collections.SszBitlistSchema; @@ -329,6 +330,21 @@ public boolean isExclusivelyFromCommittee(final int committeeIndex) { && committeeAggregationBitsMap.containsKey(committeeIndex); } + @Override + public boolean isFromCommittee(final int committeeIndex) { + return committeeAggregationBitsMap.containsKey(committeeIndex); + } + + @Override + public int getFirstCommitteeIndex() { + return committeeBits.nextSetBit(0); + } + + @Override + public IntStream streamCommitteeIndices() { + return committeeBits.stream(); + } + @Override public String toString() { long totalSetBits = 0; diff --git a/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/attestation/utils/AttestationBitsPhase0.java b/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/attestation/utils/AttestationBitsPhase0.java index fd1a24eac3b..f23ded66f54 100644 --- a/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/attestation/utils/AttestationBitsPhase0.java +++ b/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/attestation/utils/AttestationBitsPhase0.java @@ -16,6 +16,7 @@ import com.google.common.base.MoreObjects; import it.unimi.dsi.fastutil.ints.Int2IntMap; import java.util.Objects; +import java.util.stream.IntStream; import tech.pegasys.teku.infrastructure.ssz.collections.SszBitlist; import tech.pegasys.teku.infrastructure.ssz.collections.SszBitvector; import tech.pegasys.teku.spec.datastructures.operations.Attestation; @@ -99,6 +100,21 @@ public boolean isExclusivelyFromCommittee(final int committeeIndex) { throw new IllegalStateException("Committee bits not available in phase0"); } + @Override + public boolean isFromCommittee(final int committeeIndex) { + throw new IllegalStateException("Committee bits not available in phase0"); + } + + @Override + public int getFirstCommitteeIndex() { + throw new IllegalStateException("Committee bits not available in phase0"); + } + + @Override + public IntStream streamCommitteeIndices() { + throw new IllegalStateException("Committee bits not available in phase0"); + } + @Override public String toString() { return MoreObjects.toStringHelper(this).add("aggregationBits", aggregationBits).toString(); diff --git a/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/attestation/utils/TimeLimitingIterator.java b/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/attestation/utils/TimeLimitingIterator.java new file mode 100644 index 00000000000..8959896e2fe --- /dev/null +++ b/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/attestation/utils/TimeLimitingIterator.java @@ -0,0 +1,39 @@ +/* + * Copyright Consensys Software Inc., 2025 + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package tech.pegasys.teku.statetransition.attestation.utils; + +import java.util.Iterator; +import java.util.function.LongConsumer; +import java.util.function.LongSupplier; + +public record TimeLimitingIterator( + LongSupplier nanosSupplier, long timeLimitNanos, Iterator delegate, LongConsumer onTimeLimit) + implements Iterator { + + @Override + public boolean hasNext() { + if (nanosSupplier.getAsLong() <= timeLimitNanos) { + return delegate.hasNext(); + } + + onTimeLimit.accept(timeLimitNanos); + + return false; + } + + @Override + public T next() { + return delegate.next(); + } +} diff --git a/ethereum/statetransition/src/test/java/tech/pegasys/teku/statetransition/attestation/AggregatingAttestationPoolTest.java b/ethereum/statetransition/src/test/java/tech/pegasys/teku/statetransition/attestation/AggregatingAttestationPoolTest.java index a6d827ae58d..41c7e3b37e8 100644 --- a/ethereum/statetransition/src/test/java/tech/pegasys/teku/statetransition/attestation/AggregatingAttestationPoolTest.java +++ b/ethereum/statetransition/src/test/java/tech/pegasys/teku/statetransition/attestation/AggregatingAttestationPoolTest.java @@ -206,7 +206,7 @@ public void createAggregateFor_shouldAggregateAttestationsWithMatchingData() { final Optional result = aggregatingPool.createAggregateFor(attestationData.hashTreeRoot(), committeeIndex); - assertThat(result).contains(aggregateAttestations(attestation1, attestation2)); + assertThat(result).contains(aggregateAttestations(committeeSizes, attestation1, attestation2)); } @TestTemplate @@ -218,7 +218,7 @@ public void createAggregateFor_shouldReturnBestAggregateForMatchingDataWhenSomeO final Optional result = aggregatingPool.createAggregateFor(attestationData.hashTreeRoot(), committeeIndex); - assertThat(result).contains(aggregateAttestations(attestation1, attestation2)); + assertThat(result).contains(aggregateAttestations(committeeSizes, attestation1, attestation2)); } @TestTemplate @@ -291,7 +291,7 @@ public void getAttestationsForBlock_shouldAggregateAttestationsWhenPossible() { final BeaconState stateAtBlockSlot = dataStructureUtil.randomBeaconState(SLOT.increment()); assertThat(aggregatingPool.getAttestationsForBlock(stateAtBlockSlot, forkChecker)) - .containsExactly(aggregateAttestations(attestation1, attestation2)); + .containsExactly(aggregateAttestations(committeeSizes, attestation1, attestation2)); } @TestTemplate @@ -305,7 +305,8 @@ public void getAttestationsForBlock_shouldIncludeAttestationsWithDifferentData() final BeaconState stateAtBlockSlot = dataStructureUtil.randomBeaconState(ONE); assertThat(aggregatingPool.getAttestationsForBlock(stateAtBlockSlot, forkChecker)) - .containsExactlyInAnyOrder(aggregateAttestations(attestation1, attestation2), attestation3); + .containsExactlyInAnyOrder( + aggregateAttestations(committeeSizes, attestation1, attestation2), attestation3); } @TestTemplate diff --git a/ethereum/statetransition/src/test/java/tech/pegasys/teku/statetransition/attestation/MatchingDataAttestationGroupTest.java b/ethereum/statetransition/src/test/java/tech/pegasys/teku/statetransition/attestation/MatchingDataAttestationGroupTest.java index e43d7cec1e7..f0095fc985f 100644 --- a/ethereum/statetransition/src/test/java/tech/pegasys/teku/statetransition/attestation/MatchingDataAttestationGroupTest.java +++ b/ethereum/statetransition/src/test/java/tech/pegasys/teku/statetransition/attestation/MatchingDataAttestationGroupTest.java @@ -120,7 +120,8 @@ public void remove_shouldRemoveAttestationsThatAreAggregatedIntoRemovedAttestati int numRemoved = group.onAttestationIncludedInBlock( UInt64.ZERO, - aggregateAttestations(toAttestation(attestation1), toAttestation(attestation2))); + aggregateAttestations( + committeeSizes, toAttestation(attestation1), toAttestation(attestation2))); assertThat(group.stream(Optional.of(UInt64.ZERO))) .containsExactly(toPooledAttestationWithData(attestation3)); @@ -155,7 +156,8 @@ public void add_shouldAggregateAttestationsFromSameCommittee(final SpecContext s .containsExactly(toPooledAttestationWithData(attestation1)); final Attestation expected = - aggregateAttestations(toAttestation(attestation2), toAttestation(attestation3)); + aggregateAttestations( + committeeSizes, toAttestation(attestation2), toAttestation(attestation3)); assertThat(group.stream(Optional.of(UInt64.ONE))) .containsExactly( @@ -184,7 +186,8 @@ public void iterator_shouldAggregateAttestationsWhereValidatorsDoNotOverlap() { final PooledAttestation attestation2 = addPooledAttestation(2); final Attestation expected = - aggregateAttestations(toAttestation(attestation1), toAttestation(attestation2)); + aggregateAttestations( + committeeSizes, toAttestation(attestation1), toAttestation(attestation2)); assertThat(group.stream(Optional.of(UInt64.ZERO))) .containsExactlyInAnyOrder( @@ -205,7 +208,9 @@ public void iterator_shouldAggregateAttestationsWithMoreValidatorsFirst() { ValidatableAttestation.from( spec, aggregateAttestations( - toAttestation(bigAttestation), toAttestation(littleAttestation)), + committeeSizes, + toAttestation(bigAttestation), + toAttestation(littleAttestation)), committeeSizes)), mediumAttestation); } @@ -250,7 +255,8 @@ void iterator_shouldOmitAttestationsThatOverlapWithFirstAttestationAndAreRedunda PooledAttestation.fromValidatableAttestation( ValidatableAttestation.from( spec, - aggregateAttestations(toAttestation(useful1), toAttestation(useful2)), + aggregateAttestations( + committeeSizes, toAttestation(useful1), toAttestation(useful2)), committeeSizes))); assertThat(group.stream(Optional.of(UInt64.ZERO))).containsExactly(expected); @@ -363,6 +369,7 @@ public void size() { group.onAttestationIncludedInBlock( UInt64.ZERO, aggregateAttestations( + committeeSizes, attestation1.toAttestation(attestationSchema), attestation2.toAttestation(attestationSchema))); diff --git a/ethereum/statetransition/src/test/java/tech/pegasys/teku/statetransition/attestation/MatchingDataAttestationGroupV2Test.java b/ethereum/statetransition/src/test/java/tech/pegasys/teku/statetransition/attestation/MatchingDataAttestationGroupV2Test.java new file mode 100644 index 00000000000..b33e6ee4548 --- /dev/null +++ b/ethereum/statetransition/src/test/java/tech/pegasys/teku/statetransition/attestation/MatchingDataAttestationGroupV2Test.java @@ -0,0 +1,828 @@ +/* + * Copyright Consensys Software Inc., 2025 + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package tech.pegasys.teku.statetransition.attestation; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; +import static tech.pegasys.teku.spec.SpecMilestone.ELECTRA; +import static tech.pegasys.teku.spec.SpecMilestone.PHASE0; +import static tech.pegasys.teku.statetransition.attestation.AggregatorUtil.aggregateAttestations; + +import it.unimi.dsi.fastutil.ints.Int2IntMap; +import it.unimi.dsi.fastutil.ints.Int2IntOpenHashMap; +import it.unimi.dsi.fastutil.ints.IntList; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Optional; +import java.util.Set; +import java.util.TreeSet; +import java.util.function.LongSupplier; +import java.util.function.Supplier; +import java.util.stream.IntStream; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.TestTemplate; +import tech.pegasys.teku.infrastructure.ssz.collections.SszBitlist; +import tech.pegasys.teku.infrastructure.ssz.collections.SszBitvector; +import tech.pegasys.teku.infrastructure.unsigned.UInt64; +import tech.pegasys.teku.spec.Spec; +import tech.pegasys.teku.spec.TestSpecContext; +import tech.pegasys.teku.spec.TestSpecInvocationContextProvider.SpecContext; +import tech.pegasys.teku.spec.datastructures.attestation.ValidatableAttestation; +import tech.pegasys.teku.spec.datastructures.operations.Attestation; +import tech.pegasys.teku.spec.datastructures.operations.AttestationData; +import tech.pegasys.teku.spec.datastructures.operations.AttestationSchema; +import tech.pegasys.teku.spec.util.DataStructureUtil; +import tech.pegasys.teku.statetransition.attestation.utils.AttestationBits; + +@TestSpecContext(milestone = {PHASE0, ELECTRA}) +class MatchingDataAttestationGroupV2Test { + private static final UInt64 SLOT = UInt64.valueOf(1234); + + private final LongSupplier nanoSupplier = mock(LongSupplier.class); + + private Spec spec; + private DataStructureUtil dataStructureUtil; + private AttestationSchema attestationSchema; + + private AttestationData attestationData; + + private MatchingDataAttestationGroupV2 group; + private Int2IntMap committeeSizes; + + @BeforeEach + public void setUp(final SpecContext specContext) { + spec = specContext.getSpec(); + attestationSchema = spec.getGenesisSchemaDefinitions().getAttestationSchema(); + dataStructureUtil = specContext.getDataStructureUtil(); + attestationData = dataStructureUtil.randomAttestationData(SLOT); + committeeSizes = new Int2IntOpenHashMap(); + committeeSizes.put(0, 10); + committeeSizes.put(1, 10); + when(nanoSupplier.getAsLong()).thenReturn(0L); + group = + new MatchingDataAttestationGroupV2( + spec, nanoSupplier, attestationData, Optional.of(committeeSizes), false); + } + + @TestTemplate + public void isEmpty_shouldBeEmptyInitially() { + assertThat(group.isEmpty()).isTrue(); + } + + @TestTemplate + public void isEmpty_shouldNotBeEmptyWhenAnAttestationIsAdded() { + addPooledAttestation(1); + assertThat(group.isEmpty()).isFalse(); + } + + @TestTemplate + public void isEmpty_shouldBeEmptyAfterAttestationRemoved() { + final Attestation attestation = toAttestation(addPooledAttestation(1)); + int numRemoved = group.onAttestationIncludedInBlock(UInt64.ZERO, attestation); + + assertThat(group.isEmpty()).isTrue(); + assertThat(numRemoved).isEqualTo(1); + } + + @TestTemplate + public void onAttestationIncludedInBlock_shouldRemoveAttestationEvenWhenInstanceIsDifferent() { + final Attestation attestation = toAttestation(addPooledAttestation(1)); + final Attestation copy = attestationSchema.sszDeserialize(attestation.sszSerialize()); + int numRemoved = group.onAttestationIncludedInBlock(UInt64.ZERO, copy); + + verifyGroupContainsExactly(); // empty + assertThat(group.isEmpty()).isTrue(); + assertThat(numRemoved).isEqualTo(1); + } + + @TestTemplate + public void onAttestationIncludedInBlock_multipleCallsShouldAggregate() { + + // Create attestations that will be removed + final PooledAttestation attestation1 = createPooledAttestation(1); + final PooledAttestation attestation2 = createPooledAttestation(2); + + // Add some attestations + final PooledAttestation attestation3 = addPooledAttestation(3); + addPooledAttestation(1, 2); // This will be an aggregate, not single + + int numRemoved = group.onAttestationIncludedInBlock(UInt64.ZERO, toAttestation(attestation1)); + assertThat(numRemoved).isEqualTo(0); // Attestation (1) is covered by (1,2) which is still there + numRemoved += group.onAttestationIncludedInBlock(UInt64.ZERO, toAttestation(attestation2)); + assertThat(numRemoved).isEqualTo(1); + verifyGroupContainsExactly(toPooledAttestationWithData(attestation3)); + } + + @TestTemplate + public void + onAttestationIncludedInBlock_shouldRemoveAttestationsThatAreAggregatedIntoRemovedAttestation() { + final PooledAttestation attestation1 = addPooledAttestation(1); + final PooledAttestation attestation2 = addPooledAttestation(2); + final PooledAttestation attestation3 = addPooledAttestation(3); + + int numRemoved = + group.onAttestationIncludedInBlock( + UInt64.ZERO, + aggregateAttestations( + committeeSizes, toAttestation(attestation1), toAttestation(attestation2))); + + verifyGroupContainsExactly(toPooledAttestationWithData(attestation3)); + assertThat(numRemoved).isEqualTo(2); + } + + @TestTemplate + public void add_shouldIgnoreAttestationWhoseBitsHaveAllBeenRemoved() { + // Create attestations that will be removed + final PooledAttestation attestation1 = createPooledAttestation(1); + final PooledAttestation attestation2 = createPooledAttestation(2); + + // Create attestation to be added / ignored + final PooledAttestation attestationToIgnore = createPooledAttestation(1, 2); + + int numRemoved = group.onAttestationIncludedInBlock(UInt64.ZERO, toAttestation(attestation1)); + numRemoved += group.onAttestationIncludedInBlock(UInt64.ZERO, toAttestation(attestation2)); + assertThat(numRemoved).isEqualTo(0); + + assertThat(group.add(attestationToIgnore, Optional.empty())).isFalse(); + verifyGroupContainsExactly(); // empty + } + + @TestTemplate + public void add_shouldIgnoreDuplicateAttestations() { + final PooledAttestation attestation = addPooledAttestation(1, 2); + final PooledAttestation copy = + PooledAttestation.fromValidatableAttestation( + ValidatableAttestation.from( + spec, + attestationSchema.sszDeserialize(toAttestation(attestation).sszSerialize()), + committeeSizes), + validatorBitToValidatorIndex(1, 2)); + + assertThat(group.add(copy, Optional.empty())).isFalse(); + + verifyGroupContainsExactly(toPooledAttestationWithData(attestation)); + } + + // --- Tests for streamForApiRequest --- + @TestTemplate + public void streamForApiRequest_shouldAggregateDisjointAttestations( + final SpecContext specContext) { + final PooledAttestation attestation1 = addPooledAttestation(1); + final PooledAttestation attestation2 = addPooledAttestation(2); + + final Attestation expectedAggregate = + aggregateAttestations( + committeeSizes, toAttestation(attestation1), toAttestation(attestation2)); + + verifyStreamForApiRequest( + Optional.empty(), + isElectra(specContext), + toPooledAttestationWithData( + PooledAttestation.fromValidatableAttestation( + ValidatableAttestation.from(spec, expectedAggregate, committeeSizes)))); + } + + @TestTemplate + public void streamForApiRequest_shouldPrioritizeLargerAndAggregateNonOverlapping( + final SpecContext specContext) { + final PooledAttestation bigAttestation = + addPooledAttestation(1, 3, 5, 7); // Aggregate (4 validators) + final PooledAttestation mediumAttestation = + addPooledAttestation(3, 5, 9); // Aggregate (3 validators) + final PooledAttestation littleAttestation = + addPooledAttestation(2, 4); // Aggregate (2 validators) + + final Attestation combinedBigLittle = + aggregateAttestations( + committeeSizes, toAttestation(bigAttestation), toAttestation(littleAttestation)); + + verifyStreamForApiRequest( + Optional.empty(), + isElectra(specContext), + toPooledAttestationWithData( + PooledAttestation.fromValidatableAttestation( + ValidatableAttestation.from( + spec, combinedBigLittle, committeeSizes))), // Aggregate of (1,2,3,4,5,7) + toPooledAttestationWithData( + mediumAttestation) // Separate (3,5,9) because 9 is new, but 3,5 overlap + ); + } + + @TestTemplate + public void streamForApiRequest_shouldReturnOverlappingAttestationsSeparately( + final SpecContext specContext) { + final PooledAttestation attestation1 = addPooledAttestation(1, 2, 5); + final PooledAttestation attestation2 = addPooledAttestation(1, 2, 3); + + // These overlap but neither is a superset of the other. They should be returned separately. + verifyStreamForApiRequest( + Optional.empty(), + isElectra(specContext), + toPooledAttestationWithData(attestation1), + toPooledAttestationWithData(attestation2)); + } + + @TestTemplate + void streamForApiRequest_shouldAggregateLeavingNoRedundantParts(final SpecContext specContext) { + final PooledAttestation useful1 = addPooledAttestation(1, 2, 3); + addPooledAttestation(2, 4); // This is (2,4). Partially overlaps with (1,2,3) and (4) + final PooledAttestation useful2 = addPooledAttestation(4); + + // Expect (1,2,3) and (4) to be aggregated into (1,2,3,4). + // The attestation (2,4) becomes redundant. + final PooledAttestationWithData expected = + toPooledAttestationWithData( + PooledAttestation.fromValidatableAttestation( + ValidatableAttestation.from( + spec, + aggregateAttestations( + committeeSizes, toAttestation(useful1), toAttestation(useful2)), + committeeSizes))); + verifyStreamForApiRequest(Optional.empty(), isElectra(specContext), expected); + } + + @TestTemplate + void streamForApiRequest_electra_withCommitteeIndex_returnsMatchingAggregated( + final SpecContext specContext) { + specContext.assumeElectraActive(); + // C0 attestations + final PooledAttestation singleC0V1 = addPooledAttestation(Optional.of(0), 1); // Single + final PooledAttestation aggC0V23 = addPooledAttestation(Optional.of(0), 2, 3); // Aggregate + // C1 attestations + final PooledAttestation singleC1V4 = addPooledAttestation(Optional.of(1), 4); // Single + + // Request for committee 0 + // Expected: aggregate of singleC0V1 and aggC0V23 + final Attestation expectedForC0 = + aggregateAttestations(committeeSizes, toAttestation(singleC0V1), toAttestation(aggC0V23)); + + verifyStreamForApiRequest( + Optional.of(UInt64.ZERO), + true, + toPooledAttestationWithData( + PooledAttestation.fromValidatableAttestation( + ValidatableAttestation.from(spec, expectedForC0, committeeSizes)))); + + // Request for committee 1 + // Expected: single from singleC1V4 + verifyStreamForApiRequest( + Optional.of(UInt64.ONE), true, toPooledAttestationWithData(singleC1V4)); + } + + // --- Tests for streamForAggregationProduction --- + + @TestTemplate + void streamForAggregationProduction_phase0_noCommitteeIndex_returnsAggregatedFromAggregates( + final SpecContext specContext) { + specContext.assumeIsNotOneOf(ELECTRA); + + final PooledAttestation att1 = + addPooledAttestation(1, 2); // Goes to attestationsByValidatorCount + final PooledAttestation att2 = addPooledAttestation(3); // Goes to attestationsByValidatorCount + final Attestation expected = aggregateAttestations(toAttestation(att1), toAttestation(att2)); + + verifyStreamForAggregationProductionPhase0ContainsExactly( + toPooledAttestationWithData( + PooledAttestation.fromValidatableAttestation( + ValidatableAttestation.from(spec, expected, committeeSizes)))); + } + + @TestTemplate + void streamForAggregationProduction_electra_noCommitteeIndex_throwsException( + final SpecContext specContext) { + specContext.assumeElectraActive(); + + assertThatThrownBy(this::verifyStreamForAggregationProductionPhase0ContainsExactly) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("Committee index must be present if committee bits are required"); + } + + @TestTemplate + void streamForAggregationProduction_electra_withCommitteeIndex_aggregatesMatchingSingles( + final SpecContext specContext) { + specContext.assumeElectraActive(); + + final PooledAttestation singleC0V1 = addPooledAttestation(Optional.of(0), 1); + final PooledAttestation singleC0V2 = addPooledAttestation(Optional.of(0), 2); + + // this will be ignored since it is not a single attestation + addPooledAttestation(Optional.of(0), 5, 6); + + // single att on committee 1 + addPooledAttestation(Optional.of(1), 3); + + final Attestation expectedForC0 = + aggregateAttestations(committeeSizes, toAttestation(singleC0V1), toAttestation(singleC0V2)); + verifyStreamForAggregationProductionContainsExactly( + UInt64.ZERO, + toPooledAttestationWithData( + PooledAttestation.fromValidatableAttestation( + ValidatableAttestation.from(spec, expectedForC0, committeeSizes)))); + } + + @TestTemplate + void streamForAggregationProduction_electra_withCommitteeIndex_noMatchingSingles_returnsEmpty( + final SpecContext specContext) { + specContext.assumeElectraActive(); + + addPooledAttestation(Optional.of(0), 1, 2); + addPooledAttestation(Optional.of(1), 3); + + // attestation for C0 is an aggregate, not a single. + verifyStreamForAggregationProductionContainsExactly(UInt64.ZERO); // Expect empty + } + + // --- Tests for streamForBlockProduction --- + @TestTemplate + public void streamForBlockProduction_electra_shouldSkipSinglesAndReturnAggregates( + final SpecContext specContext) { + specContext.assumeElectraActive(); + final PooledAttestation bigAttestation = addPooledAttestation(1, 3, 5, 7); + final PooledAttestation mediumAttestation = addPooledAttestation(3, 5, 9); + addPooledAttestation(Optional.of(0), 2); + + // we don't expect the single attestation to be returned + // in the block production flow they are used during fillUp phase only + + verifyStreamForBlockProductionContainsExactly( + toPooledAttestationWithData(bigAttestation), + toPooledAttestationWithData(mediumAttestation)); + } + + @TestTemplate + public void streamForBlockProduction_shouldOmitRedundantSmallerAttestations() { + final PooledAttestation aggregate = addPooledAttestation(1, 2, 3, 4); + addPooledAttestation(2, 3); + + verifyStreamForBlockProductionContainsExactly(toPooledAttestationWithData(aggregate)); + } + + @TestTemplate + void streamForBlockProduction_aggregatesNonOverlapping() { + final PooledAttestation att1 = addPooledAttestation(1, 2); + final PooledAttestation att2 = addPooledAttestation(3, 4); + final Attestation expected = + aggregateAttestations(committeeSizes, toAttestation(att1), toAttestation(att2)); + verifyStreamForBlockProductionContainsExactly( + toPooledAttestationWithData( + PooledAttestation.fromValidatableAttestation( + ValidatableAttestation.from(spec, expected, committeeSizes)))); + } + + // --- Tests for fillUpAggregation --- + @TestTemplate + void fillUp_noSingleAttestations_returnsOriginal(final SpecContext specContext) { + specContext.assumeElectraActive(); + final PooledAttestation initialAgg = createPooledAttestation(Optional.of(0), 1, 2); + final PooledAttestationWithData initial = toPooledAttestationWithData(initialAgg); + + final PooledAttestationWithData result = group.fillUpAggregation(initial, Long.MAX_VALUE); + assertThat(result).isEqualTo(toPooledAttestationWithData(initialAgg)); + } + + @TestTemplate + void fillUp_withNonOverlappingSingleAttestations_aggregatesThem(final SpecContext specContext) { + specContext.assumeElectraActive(); + final PooledAttestation singleC0V3 = addPooledAttestation(Optional.of(0), 3); + final PooledAttestation singleC1V4 = + addPooledAttestation(Optional.of(1), 4); // This single has committee_bits=1 + + final PooledAttestation initialAgg = + createPooledAttestation(Optional.of(0), 1, 2); // Initial agg is for C0 + final PooledAttestationWithData initial = toPooledAttestationWithData(initialAgg); + + final PooledAttestationWithData result = group.fillUpAggregation(initial, Long.MAX_VALUE); + + final Attestation expectedFullAggregate = + aggregateAttestations( + committeeSizes, + toAttestation(initialAgg), + toAttestation(singleC0V3), + toAttestation(singleC1V4)); + + assertThat(result).isEqualTo(toPooledAttestationWithData(expectedFullAggregate)); + } + + @TestTemplate + void fillUp_withSingleAttestationsPartiallyOverlappingInput_aggregatesNewBits( + final SpecContext specContext) { + specContext.assumeElectraActive(); + addPooledAttestation(Optional.of(0), 2); // Single on C0 for V2 (overlaps with initial) + final PooledAttestation singleC0V3 = + addPooledAttestation(Optional.of(0), 3); // Single on C0 for V3 (new) + final PooledAttestation singleC1V3 = + addPooledAttestation(Optional.of(1), 3); // Single on C1 for V3 (new) + addPooledAttestation(Optional.of(1), 4, 5); // aggregated (to be ignored) + + final PooledAttestation initialAgg = + createPooledAttestation(Optional.of(0), 1, 2); // C0, agg {1,2} + final PooledAttestationWithData initial = toPooledAttestationWithData(initialAgg); + + final PooledAttestationWithData result = group.fillUpAggregation(initial, Long.MAX_VALUE); + + final Attestation expectedAggregate = + aggregateAttestations( + committeeSizes, + toAttestation(initialAgg), + toAttestation(singleC0V3), + toAttestation(singleC1V3)); + + assertThat(result).isEqualTo(toPooledAttestationWithData(expectedAggregate)); + } + + @TestTemplate + void fillUp_withTimeLimit_stopsAggregating(final SpecContext specContext) { + specContext.assumeElectraActive(); + final long timeLimitNanos = 1_000_000_000L; + final PooledAttestation single1 = addPooledAttestation(Optional.of(0), 3); + addPooledAttestation(Optional.of(1), 4); + + final PooledAttestation initialAgg = createPooledAttestation(Optional.of(0), 1, 2); + final PooledAttestationWithData initial = toPooledAttestationWithData(initialAgg); + + when(nanoSupplier.getAsLong()) + .thenReturn(timeLimitNanos - 10) // first Aggregation attempt on time + .thenReturn(timeLimitNanos + 10); // second Aggregation after time limit + + final PooledAttestationWithData result = group.fillUpAggregation(initial, timeLimitNanos); + + final Attestation expectedWithSingle1 = + aggregateAttestations(committeeSizes, toAttestation(initialAgg), toAttestation(single1)); + + assertThat(result).isEqualTo(toPooledAttestationWithData(expectedWithSingle1)); + } + + @TestTemplate + void add_earlyDrop_electra_addingAggregateCoversAndRemovesSingles(final SpecContext specContext) { + specContext.assumeElectraActive(); + // Create a new group with earlyDrop=true for this test + final MatchingDataAttestationGroupV2 groupEarlyDrop = + new MatchingDataAttestationGroupV2( + spec, nanoSupplier, attestationData, Optional.of(committeeSizes), true); + + // Add attestations to this specific group + final PooledAttestation singleC0V1Early = createPooledAttestation(Optional.of(0), 1); + final PooledAttestation singleC0V2Early = createPooledAttestation(Optional.of(0), 2); + groupEarlyDrop.add(singleC0V1Early, Optional.empty()); + groupEarlyDrop.add(singleC0V2Early, Optional.empty()); + assertThat(groupEarlyDrop.size()).isEqualTo(2); + + final PooledAttestation aggregateC0V12 = createPooledAttestation(Optional.of(0), 1, 2); + groupEarlyDrop.add(aggregateC0V12, Optional.empty()); + + assertThat(groupEarlyDrop.size()).isEqualTo(1); + + // Verify using the groupEarlyDrop instance + + final List apiResult = + groupEarlyDrop + .streamForApiRequest(Optional.empty(), true) + .map(this::toPooledAttestationWithDataWithSortedValidatorIndices) + .toList(); + assertThat(apiResult).containsExactly(toPooledAttestationWithData(aggregateC0V12)); + + final List aggProdResult = + groupEarlyDrop + .streamForAggregationProduction(Optional.of(UInt64.ZERO), Long.MAX_VALUE) + .map(this::toPooledAttestationWithDataWithSortedValidatorIndices) + .toList(); + assertThat(aggProdResult).isEmpty(); + } + + @TestTemplate + void onAttestationIncludedInBlock_shouldRemoveAttestationsMadeRedundant() { + final PooledAttestation attestation1 = addPooledAttestation(1, 2, 3, 4); + final PooledAttestation attestation2 = addPooledAttestation(1, 5, 7); + final PooledAttestation attestation3 = addPooledAttestation(1, 6); + + assertThat(group.size()).isEqualTo(3); + assertThat(group.streamForBlockProduction(Long.MAX_VALUE)) + .containsExactlyInAnyOrder( + toPooledAttestationWithData(attestation1), + toPooledAttestationWithData(attestation2), + toPooledAttestationWithData(attestation3)); + + group.onAttestationIncludedInBlock( + UInt64.ZERO, toAttestation(createPooledAttestation(1, 2, 3, 4, 5, 6, 7))); + + assertThat(group.size()).isZero(); + verifyGroupContainsExactly(); + } + + @TestTemplate + void onAttestationIncludedInBlock_shouldNotRemoveAttestationsWithAdditionalValidators() { + final PooledAttestation attestation1 = addPooledAttestation(1, 2, 3, 4); + final PooledAttestation attestation2 = addPooledAttestation(1, 5, 7); + final PooledAttestation attestation3 = addPooledAttestation(1, 6); + + assertThat(group.size()).isEqualTo(3); + assertThat(group.streamForBlockProduction(Long.MAX_VALUE)) + .containsExactlyInAnyOrder( + toPooledAttestationWithData(attestation1), + toPooledAttestationWithData(attestation2), + toPooledAttestationWithData(attestation3)); + + group.onAttestationIncludedInBlock( + UInt64.ZERO, toAttestation(createPooledAttestation(1, 2, 3, 4, 5, 6))); + + // Validator 7 is still relevant (from attestation2) + assertThat(group.size()).isEqualTo(1); + verifyGroupContainsExactly(toPooledAttestationWithData(attestation2)); + } + + @TestTemplate + void onAttestationIncludedInBlock_shouldNotAddAttestationsAlreadySeenInBlocks() { + group.onAttestationIncludedInBlock( + UInt64.valueOf(1), toAttestation(createPooledAttestation(1, 2, 3, 4, 5, 6))); + + assertThat(group.add(createPooledAttestation(1), Optional.empty())).isFalse(); + assertThat(group.add(createPooledAttestation(1, 2, 3, 4, 5, 6), Optional.empty())).isFalse(); + assertThat(group.add(createPooledAttestation(2, 3), Optional.empty())).isFalse(); + } + + @TestTemplate + void onReorg_shouldAllowReadingAttestationsThatAreNoLongerRedundant() { + final PooledAttestation attestation = createPooledAttestation(3, 4); + + group.onAttestationIncludedInBlock( + UInt64.valueOf(1), toAttestation(createPooledAttestation(1, 2, 3, 4, 5, 6))); + + assertThat(group.add(attestation, Optional.empty())).isFalse(); + + group.onReorg(UInt64.ZERO); + + assertThat(group.add(attestation, Optional.empty())).isTrue(); + assertThat(group.size()).isEqualTo(1); + verifyGroupContainsExactly(toPooledAttestationWithData(attestation)); + } + + @TestTemplate + void onReorg_shouldNotAllowReadingAttestationsThatAreStillRedundant() { + final PooledAttestation attestation1 = createPooledAttestation(3, 4); + final PooledAttestation attestation2 = createPooledAttestation(1, 2, 3, 4); + + group.onAttestationIncludedInBlock( + UInt64.valueOf(1), toAttestation(createPooledAttestation(2, 3, 4))); + group.onAttestationIncludedInBlock( + UInt64.valueOf(3), toAttestation(createPooledAttestation(1, 2, 3, 4))); + + assertThat(group.add(attestation1, Optional.empty())).isFalse(); + assertThat(group.add(attestation2, Optional.empty())).isFalse(); + + group.onReorg(UInt64.valueOf(2)); // Block at slot 3 removed, block at slot 1 remains. + + // Attestation from slot 1 (2,3,4) still makes (3,4) redundant. + assertThat(group.add(attestation1, Optional.empty())).isFalse(); + + // Attestation (1,2,3,4) has validator 1, which is not in (2,3,4). So it can be added. + assertThat(group.add(attestation2, Optional.empty())).isTrue(); + assertThat(group.size()).isEqualTo(1); + verifyGroupContainsExactly(toPooledAttestationWithData(attestation2)); + } + + @TestTemplate + public void size() { + assertThat(group.size()).isEqualTo(0); + final PooledAttestationWithData attestation1Data = + toPooledAttestationWithData(addPooledAttestation(1)); + assertThat(group.size()).isEqualTo(1); + final PooledAttestationWithData attestation2Data = + toPooledAttestationWithData(addPooledAttestation(2)); + assertThat(group.size()).isEqualTo(2); + addPooledAttestation(3, 4); + assertThat(group.size()).isEqualTo(3); + addPooledAttestation(1, 2); + assertThat(group.size()).isEqualTo(4); + + int numRemoved = + group.onAttestationIncludedInBlock( + UInt64.ZERO, + aggregateAttestations( + committeeSizes, + attestation1Data.toAttestation(attestationSchema), + attestation2Data.toAttestation(attestationSchema))); + + assertThat(numRemoved).isEqualTo(3); + assertThat(group.size()).isEqualTo(1); + } + + void verifyGroupContainsExactly(final PooledAttestationWithData... expectedAttestations) { + // streamForApiRequest with no committee index is the only stream that gives us all attestations + verifyStreamForApiRequest( + Optional.empty(), + spec.getGenesisSpec().getMilestone().isGreaterThanOrEqualTo(ELECTRA), + expectedAttestations); + } + + void verifyStreamForAggregationProductionContainsExactly( + final UInt64 committeeIndex, final PooledAttestationWithData... expectedAttestations) { + assertThat( + group + .streamForAggregationProduction(Optional.of(committeeIndex), Long.MAX_VALUE) + .map(this::toPooledAttestationWithDataWithSortedValidatorIndices)) + .containsExactly(expectedAttestations); + } + + void verifyStreamForAggregationProductionPhase0ContainsExactly( + final PooledAttestationWithData... expectedAttestations) { + assertThat( + group + .streamForAggregationProduction(Optional.empty(), Long.MAX_VALUE) + .map(this::toPooledAttestationWithDataWithSortedValidatorIndices)) + .containsExactly(expectedAttestations); + } + + void verifyStreamForBlockProductionContainsExactly( + final PooledAttestationWithData... expectedAttestations) { + assertThat( + group + .streamForBlockProduction(Long.MAX_VALUE) + .map(this::toPooledAttestationWithDataWithSortedValidatorIndices)) + .containsExactly(expectedAttestations); + } + + void verifyStreamForApiRequest( + final Optional committeeIndex, + final boolean requiresCommitteeBits, + final PooledAttestationWithData... expectedAttestations) { + assertThat( + group + .streamForApiRequest(committeeIndex, requiresCommitteeBits) + .map(this::toPooledAttestationWithDataWithSortedValidatorIndices)) + .containsExactly(expectedAttestations); + } + + // This is needed because our underlining data structure stores the validator indices in a list + // for efficiency, + // but it is logically a set, so to be able to make equality checks, we need to sort the indices + private PooledAttestationWithData toPooledAttestationWithDataWithSortedValidatorIndices( + final PooledAttestationWithData attestation) { + return new PooledAttestationWithData( + attestation.data(), + new PooledAttestation( + attestation.pooledAttestation().bits(), + attestation.pooledAttestation().validatorIndices().map(TreeSet::new).map(List::copyOf), + attestation.pooledAttestation().aggregatedSignature(), + attestation.pooledAttestation().isSingleAttestation())); + } + + private PooledAttestation addPooledAttestation(final int... validators) { + return addPooledAttestation(Optional.empty(), validators); + } + + private PooledAttestation addPooledAttestation( + final Optional committeeIndex, final int... validators) { + final PooledAttestation attestation = createPooledAttestation(committeeIndex, validators); + final boolean added = group.add(attestation, Optional.empty()); + assertThat(added).isTrue(); + return attestation; + } + + private PooledAttestation createPooledAttestation(final int... validators) { + return createPooledAttestation(Optional.empty(), validators); + } + + private PooledAttestation createPooledAttestation( + final Optional committeeIndex, final int... validators) { + final SszBitlist aggregationBits = + attestationSchema.getAggregationBitsSchema().ofBits(10, validators); + final boolean isElectra = spec.atSlot(SLOT).getMilestone().isGreaterThanOrEqualTo(ELECTRA); + final Supplier committeeBits; + final Optional singleAttestation; + final int resolvedCommitteeIndex = committeeIndex.orElse(0); + + if (validators.length == 1 && isElectra) { + singleAttestation = + Optional.of( + spec.getGenesisSchemaDefinitions() + .toVersionElectra() + .orElseThrow() + .getSingleAttestationSchema() + .create( + UInt64.valueOf(resolvedCommitteeIndex), + UInt64.valueOf(validators[0]), + attestationData, + dataStructureUtil.randomSignature())); + } else { + singleAttestation = Optional.empty(); + } + + if (spec.atSlot(SLOT).getMilestone().isGreaterThanOrEqualTo(ELECTRA)) { + committeeBits = + () -> + attestationSchema + .getCommitteeBitsSchema() + .orElseThrow() + .ofBits(resolvedCommitteeIndex); + } else { + committeeBits = () -> null; + } + + final Attestation attestation = + attestationSchema.create( + aggregationBits, attestationData, dataStructureUtil.randomSignature(), committeeBits); + + final ValidatableAttestation validatableAttestation = + ValidatableAttestation.from(spec, singleAttestation.orElse(attestation), committeeSizes); + + singleAttestation.ifPresent( + __ -> validatableAttestation.convertToAggregatedFormatFromSingleAttestation(attestation)); + + return PooledAttestation.fromValidatableAttestation( + validatableAttestation, validatorBitToValidatorIndex(committeeIndex, validators)); + } + + private List validatorBitToValidatorIndex( + final Optional committeeIndex, final int... validatorBits) { + final int committeeOffset = + committeeIndex + .map( + index -> + IntStream.range(0, index) + .map(i -> committeeSizes.get(i)) + .reduce(0, Integer::sum)) + .orElse(0); + + return Arrays.stream(validatorBits) + .sorted() // make sure we sort validators so that we can compare the list without converting + // it to a set + .mapToObj(bit -> UInt64.valueOf(bit + 100 + committeeOffset)) + .toList(); + } + + private List validatorBitToValidatorIndex(final int... validatorBits) { + return validatorBitToValidatorIndex(Optional.empty(), validatorBits); + } + + private List validatorBitToValidatorIndex(final AttestationBits bits) { + if (!bits.requiresCommitteeBits()) { + return validatorBitToValidatorIndex(bits.getAggregationBits().getAllSetBits().toIntArray()); + } + + // only 2 committees are supported + assertThat(committeeSizes.keySet()).isEqualTo(Set.of(0, 1)); + final IntList committeeBits = bits.getCommitteeBits().getAllSetBits(); + assertThat(committeeBits.size()).isLessThanOrEqualTo(2); + + final List result = new ArrayList<>(); + int offset = 0; + if (committeeBits.contains(0)) { + result.addAll( + validatorBitToValidatorIndex( + Optional.of(0), + bits.getAggregationBits().getAsBitSet(0, committeeSizes.get(0)).stream().toArray())); + offset = committeeSizes.get(0); + } + if (committeeBits.contains(1)) { + result.addAll( + validatorBitToValidatorIndex( + Optional.of(1), + bits.getAggregationBits().getAsBitSet(offset, committeeSizes.get(1) + offset).stream() + .toArray())); + } + + return result; + } + + private Attestation toAttestation(final PooledAttestation pooledAttestation) { + return attestationSchema.create( + pooledAttestation.bits().getAggregationBits(), + attestationData, + pooledAttestation.aggregatedSignature(), + pooledAttestation.bits()::getCommitteeBits); // Supplier for committee bits + } + + private PooledAttestationWithData toPooledAttestationWithData( + final PooledAttestation pooledAttestation) { + // Use the attestation created by toAttestation to ensure consistency if PooledAttestation + // doesn't directly hold a full Attestation object in the exact schema form. + return toPooledAttestationWithData(toAttestation(pooledAttestation)); + } + + private PooledAttestationWithData toPooledAttestationWithData(final Attestation attestation) { + return new PooledAttestationWithData( + attestationData, + PooledAttestation.fromValidatableAttestation( + ValidatableAttestation.from(spec, attestation, committeeSizes), + validatorBitToValidatorIndex( + AttestationBits.of(attestation, Optional.of(committeeSizes))))); + } + + private boolean isElectra(final SpecContext specContext) { + return specContext.getSpecMilestone().isGreaterThanOrEqualTo(ELECTRA); + } +} diff --git a/ethereum/statetransition/src/test/java/tech/pegasys/teku/statetransition/attestation/utils/AttestationBitsElectraTest.java b/ethereum/statetransition/src/test/java/tech/pegasys/teku/statetransition/attestation/utils/AttestationBitsElectraTest.java index 5a351c06124..588a8ab1864 100644 --- a/ethereum/statetransition/src/test/java/tech/pegasys/teku/statetransition/attestation/utils/AttestationBitsElectraTest.java +++ b/ethereum/statetransition/src/test/java/tech/pegasys/teku/statetransition/attestation/utils/AttestationBitsElectraTest.java @@ -700,6 +700,38 @@ void isExclusivelyFromCommittee_shouldReturnConsistentResult() { assertThat(singleAttestationFromSingleCommittee.isExclusivelyFromCommittee(2)).isFalse(); } + @Test + void isFromCommittee_shouldReturnConsistentResult() { + final AttestationBits fromMultipleCommittees = createAttestationBits(List.of(0, 1), 0, 3); + final AttestationBits fromSingleCommittee = createAttestationBits(List.of(0), 0, 1); + final AttestationBits singleAttestationFromSingleCommittee = + createAttestationBits(List.of(1), 0); + + assertThat(fromMultipleCommittees.isFromCommittee(0)).isTrue(); + assertThat(fromMultipleCommittees.isFromCommittee(1)).isTrue(); + assertThat(fromMultipleCommittees.isFromCommittee(2)).isFalse(); + + assertThat(fromSingleCommittee.isFromCommittee(0)).isTrue(); + assertThat(fromSingleCommittee.isFromCommittee(1)).isFalse(); + assertThat(fromSingleCommittee.isFromCommittee(2)).isFalse(); + + assertThat(singleAttestationFromSingleCommittee.isFromCommittee(0)).isFalse(); + assertThat(singleAttestationFromSingleCommittee.isFromCommittee(1)).isTrue(); + assertThat(singleAttestationFromSingleCommittee.isFromCommittee(2)).isFalse(); + } + + @Test + void streamCommitteeIndices__shouldReturnConsistentResult() { + final AttestationBits fromMultipleCommittees = createAttestationBits(List.of(0, 1), 0, 3); + final AttestationBits fromSingleCommittee = createAttestationBits(List.of(0), 0, 1); + final AttestationBits singleAttestationFromSingleCommittee = + createAttestationBits(List.of(1), 0); + + assertThat(fromMultipleCommittees.streamCommitteeIndices()).containsExactly(0, 1); + assertThat(fromSingleCommittee.streamCommitteeIndices()).containsExactly(0); + assertThat(singleAttestationFromSingleCommittee.streamCommitteeIndices()).containsExactly(1); + } + @Test void getBitCount_shouldReturnConsistentResult() { final AttestationBits fromMultipleCommittees = createAttestationBits(List.of(0, 1), 0, 3); diff --git a/ethereum/statetransition/src/test/java/tech/pegasys/teku/statetransition/attestation/utils/TimeLimitingIteratorTest.java b/ethereum/statetransition/src/test/java/tech/pegasys/teku/statetransition/attestation/utils/TimeLimitingIteratorTest.java new file mode 100644 index 00000000000..1305d098019 --- /dev/null +++ b/ethereum/statetransition/src/test/java/tech/pegasys/teku/statetransition/attestation/utils/TimeLimitingIteratorTest.java @@ -0,0 +1,72 @@ +/* + * Copyright Consensys Software Inc., 2025 + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package tech.pegasys.teku.statetransition.attestation.utils; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoInteractions; +import static org.mockito.Mockito.when; + +import java.util.Iterator; +import java.util.function.LongConsumer; +import java.util.function.LongSupplier; +import org.junit.jupiter.api.Test; + +public class TimeLimitingIteratorTest { + private final LongSupplier nanosSupplier = mock(LongSupplier.class); + + @SuppressWarnings("unchecked") + private final Iterator delegateIterator = mock(Iterator.class); + + private final LongConsumer onTimeLimitCallback = mock(LongConsumer.class); + + private static final long TIME_LIMIT_NANOS = 1000L; + private final TimeLimitingIterator timeLimitingIterator = + new TimeLimitingIterator<>( + nanosSupplier, TIME_LIMIT_NANOS, delegateIterator, onTimeLimitCallback); + + @Test + void hasNext_shouldReturnTrueAndDelegateWhenWithinTimeLimitAndDelegateHasNext() { + when(nanosSupplier.getAsLong()).thenReturn(TIME_LIMIT_NANOS - 1); + when(delegateIterator.hasNext()).thenReturn(true); + + assertThat(timeLimitingIterator.hasNext()).isTrue(); + + verify(delegateIterator).hasNext(); + verifyNoInteractions(onTimeLimitCallback); + } + + @Test + void hasNext_shouldReturnFalseAndInvokeCallbackWhenTimeLimitExceededAndDelegateHasNext() { + when(nanosSupplier.getAsLong()).thenReturn(TIME_LIMIT_NANOS + 1); + assertThat(timeLimitingIterator.hasNext()).isFalse(); + + verify(onTimeLimitCallback).accept(TIME_LIMIT_NANOS); + verify(delegateIterator, never()).hasNext(); + } + + @Test + void next_shouldAlwaysDelegateToUnderlyingIterator() { + final String expectedValue = "testValue"; + when(delegateIterator.next()).thenReturn(expectedValue); + + assertThat(timeLimitingIterator.next()).isEqualTo(expectedValue); + + verify(delegateIterator).next(); + verifyNoInteractions(nanosSupplier); + verifyNoInteractions(onTimeLimitCallback); + } +} diff --git a/ethereum/statetransition/src/testFixtures/java/tech/pegasys/teku/statetransition/attestation/AggregatorUtil.java b/ethereum/statetransition/src/testFixtures/java/tech/pegasys/teku/statetransition/attestation/AggregatorUtil.java index af9137aabc6..2fdf5dc4c22 100644 --- a/ethereum/statetransition/src/testFixtures/java/tech/pegasys/teku/statetransition/attestation/AggregatorUtil.java +++ b/ethereum/statetransition/src/testFixtures/java/tech/pegasys/teku/statetransition/attestation/AggregatorUtil.java @@ -15,58 +15,49 @@ import static com.google.common.base.Preconditions.checkState; -import it.unimi.dsi.fastutil.ints.IntOpenHashSet; -import it.unimi.dsi.fastutil.ints.IntSet; +import it.unimi.dsi.fastutil.ints.Int2IntMap; import java.util.ArrayList; import java.util.List; -import java.util.function.Supplier; +import java.util.Optional; import tech.pegasys.teku.bls.BLS; import tech.pegasys.teku.bls.BLSSignature; -import tech.pegasys.teku.infrastructure.ssz.collections.SszBitlist; -import tech.pegasys.teku.infrastructure.ssz.collections.SszBitvector; import tech.pegasys.teku.spec.datastructures.operations.Attestation; +import tech.pegasys.teku.statetransition.attestation.utils.AttestationBits; public class AggregatorUtil { public static Attestation aggregateAttestations( final Attestation firstAttestation, final Attestation... attestations) { - SszBitlist aggregateBits = firstAttestation.getAggregationBits(); + return aggregateAttestations(Optional.empty(), firstAttestation, attestations); + } + + public static Attestation aggregateAttestations( + final Int2IntMap committeesSize, + final Attestation firstAttestation, + final Attestation... attestations) { + return aggregateAttestations(Optional.of(committeesSize), firstAttestation, attestations); + } + + public static Attestation aggregateAttestations( + final Optional committeesSize, + final Attestation firstAttestation, + final Attestation... attestations) { + final AttestationBits aggregateBits = AttestationBits.of(firstAttestation, committeesSize); final List signatures = new ArrayList<>(); signatures.add(firstAttestation.getAggregateSignature()); - final Supplier committeeBitsSupplier; - final IntSet participationIndices = new IntOpenHashSet(); - for (Attestation attestation : attestations) { - aggregateBits = aggregateBits.or(attestation.getAggregationBits()); + checkState( + aggregateBits.aggregateWith(AttestationBits.of(attestation, committeesSize)), + "attestations are not aggregatable"); signatures.add(attestation.getAggregateSignature()); - if (firstAttestation.getCommitteeBits().isPresent()) { - participationIndices.addAll(attestation.getCommitteeBitsRequired().getAllSetBits()); - checkState( - participationIndices.size() == 1, - "this test util doesn't support generating cross-committee aggregations"); - } - } - - if (firstAttestation.getCommitteeBits().isPresent()) { - committeeBitsSupplier = - firstAttestation - .getSchema() - .getCommitteeBitsSchema() - .map( - committeeBitsSchema -> - (Supplier) - () -> committeeBitsSchema.ofBits(participationIndices)) - .orElse(() -> null); - } else { - committeeBitsSupplier = () -> null; } return firstAttestation .getSchema() .create( - aggregateBits, + aggregateBits.getAggregationBits(), firstAttestation.getData(), BLS.aggregate(signatures), - committeeBitsSupplier); + aggregateBits::getCommitteeBits); } }