Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ private ValidatableAttestation(
this.attestation = attestation;
this.unconvertedAttestation = attestation;
this.receivedSubnetId = receivedSubnetId;
this.hashTreeRoot = Suppliers.memoize(attestation::hashTreeRoot);
this.hashTreeRoot = Suppliers.memoize(unconvertedAttestation::hashTreeRoot);
this.producedLocally = producedLocally;
}

Expand All @@ -146,7 +146,7 @@ private ValidatableAttestation(
this.attestation = attestation;
this.unconvertedAttestation = attestation;
this.receivedSubnetId = receivedSubnetId;
this.hashTreeRoot = Suppliers.memoize(attestation::hashTreeRoot);
this.hashTreeRoot = Suppliers.memoize(unconvertedAttestation::hashTreeRoot);
this.producedLocally = producedLocally;
this.committeesSize = Optional.of(committeeSizes);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -91,9 +92,9 @@ public class AggregatingAttestationPoolBenchmark {

record AttestationDataRootAndCommitteeIndex(Bytes32 attestationDataRoot, UInt64 committeeIndex) {}

private final List<ValidatableAttestation> attestations = new ArrayList<>();
private BeaconState state;
private BeaconState newBlockState;
private List<ValidatableAttestation> attestations;
private AggregatingAttestationPool pool;
private RecentChainData recentChainData;
private AttestationForkChecker attestationForkChecker;
Expand Down Expand Up @@ -175,6 +176,7 @@ public void init() throws Exception {
attestation -> {
attestation.saveCommitteeShufflingSeedAndCommitteesSize(state);
pool.add(attestation);
attestations.add(attestation);
});

mostFrequentSingleAttestationDataRootAndCI =
Expand Down Expand Up @@ -205,6 +207,15 @@ public void getAttestationsForBlock(final Blackhole bh) {
bh.consume(attestationsForBlock);
}

@Benchmark
@BenchmarkMode(Mode.AverageTime)
public void add(final Blackhole bh) {
var emptyPool =
new AggregatingAttestationPool(
SPEC, recentChainData, new NoOpMetricsSystem(), DEFAULT_MAXIMUM_ATTESTATION_COUNT);
attestations.forEach(emptyPool::add);
}

@Benchmark
@BenchmarkMode(Mode.Throughput)
public void createAggregateFor(final Blackhole bh) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,71 +15,44 @@

import static com.google.common.base.Preconditions.checkState;

import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.Set;
import java.util.List;
import tech.pegasys.teku.bls.BLS;
import tech.pegasys.teku.spec.Spec;
import tech.pegasys.teku.spec.SpecVersion;
import tech.pegasys.teku.spec.datastructures.attestation.ValidatableAttestation;
import tech.pegasys.teku.spec.datastructures.operations.Attestation;
import tech.pegasys.teku.spec.datastructures.operations.AttestationData;
import tech.pegasys.teku.statetransition.attestation.utils.AttestationBitsAggregator;

/**
* Builds an aggregate attestation, providing functions to test if an attestation can be added or is
* made redundant by the current aggregate.
*/
class AggregateAttestationBuilder {
private final Spec spec;
private final Set<ValidatableAttestation> includedAttestations = new HashSet<>();
private final AttestationData attestationData;
private final List<PooledAttestation> includedAttestations = new ArrayList<>();
private AttestationBitsAggregator currentAggregateBits;

AggregateAttestationBuilder(final Spec spec, final AttestationData attestationData) {
this.spec = spec;
this.attestationData = attestationData;
}

public boolean isFullyIncluded(final ValidatableAttestation candidate) {
return currentAggregateBits != null
&& currentAggregateBits.isSuperSetOf(candidate.getAttestation());
}

public boolean aggregate(final ValidatableAttestation attestation) {
public boolean aggregate(final PooledAttestation attestation) {

if (currentAggregateBits == null) {
includedAttestations.add(attestation);
currentAggregateBits = AttestationBitsAggregator.of(attestation);
currentAggregateBits = attestation.bits().copy();
return true;
}
if (currentAggregateBits.aggregateWith(attestation.getAttestation())) {
if (currentAggregateBits.aggregateWith(attestation.bits())) {
includedAttestations.add(attestation);
return true;
}
return false;
}

public ValidatableAttestation buildAggregate() {
public PooledAttestation buildAggregate() {
checkState(currentAggregateBits != null, "Must aggregate at least one attestation");
final SpecVersion specVersion = spec.atSlot(attestationData.getSlot());
return ValidatableAttestation.from(
spec,
specVersion
.getSchemaDefinitions()
.getAttestationSchema()
.create(
currentAggregateBits.getAggregationBits(),
attestationData,
BLS.aggregate(
includedAttestations.stream()
.map(ValidatableAttestation::getAttestation)
.map(Attestation::getAggregateSignature)
.toList()),
currentAggregateBits::getCommitteeBits));
return new PooledAttestation(
currentAggregateBits,
BLS.aggregate(
includedAttestations.stream().map(PooledAttestation::aggregatedSignature).toList()),
false);
}

public Collection<ValidatableAttestation> getIncludedAttestations() {
public Collection<PooledAttestation> getIncludedAttestations() {
return includedAttestations;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import tech.pegasys.teku.spec.datastructures.attestation.ValidatableAttestation;
import tech.pegasys.teku.spec.datastructures.operations.Attestation;
import tech.pegasys.teku.spec.datastructures.operations.AttestationData;
import tech.pegasys.teku.spec.datastructures.operations.AttestationSchema;
import tech.pegasys.teku.spec.datastructures.state.beaconstate.BeaconState;
import tech.pegasys.teku.spec.logic.common.helpers.MiscHelpers;
import tech.pegasys.teku.spec.schemas.SchemaDefinitions;
Expand All @@ -60,9 +61,9 @@ public class AggregatingAttestationPool implements SlotEventsChannel {
/** The valid attestation retention period is 64 slots in deneb */
static final long ATTESTATION_RETENTION_SLOTS = 64;

static final Comparator<Attestation> ATTESTATION_INCLUSION_COMPARATOR =
Comparator.<Attestation>comparingInt(
attestation -> attestation.getAggregationBits().getBitCount())
static final Comparator<PooledAttestationWithData> ATTESTATION_INCLUSION_COMPARATOR =
Comparator.<PooledAttestationWithData>comparingInt(
attestation -> attestation.pooledAttestation().bits().getBitCount())
.reversed();

/**
Expand Down Expand Up @@ -109,7 +110,10 @@ public synchronized void add(final ValidatableAttestation attestation) {
getOrCreateAttestationGroup(attestation.getAttestation(), committeesSize)
.ifPresent(
attestationGroup -> {
final boolean added = attestationGroup.add(attestation);
final boolean added =
attestationGroup.add(
PooledAttestation.fromValidatableAttestation(attestation),
attestation.getCommitteeShufflingSeed());
if (added) {
updateSize(1);
}
Expand Down Expand Up @@ -270,12 +274,13 @@ public synchronized SszList<Attestation> getAttestationsForBlock(

final SchemaDefinitions schemaDefinitions =
spec.atSlot(stateAtBlockSlot.getSlot()).getSchemaDefinitions();

final AttestationSchema<Attestation> attestationSchema =
schemaDefinitions.getAttestationSchema();
final SszListSchema<Attestation, ?> attestationsSchema =
schemaDefinitions.getBeaconBlockBodySchema().getAttestationsSchema();

final boolean blockRequiresAttestationsWithCommitteeBits =
schemaDefinitions.getAttestationSchema().requiresCommitteeBits();
attestationSchema.requiresCommitteeBits();

final AtomicInteger prevEpochCount = new AtomicInteger(0);

Expand All @@ -295,17 +300,17 @@ public synchronized SszList<Attestation> getAttestationsForBlock(
.limit(attestationsSchema.getMaxLength())
.filter(
attestation -> {
if (spec.computeEpochAtSlot(attestation.getData().getSlot())
.isLessThan(currentEpoch)) {
if (spec.computeEpochAtSlot(attestation.data().getSlot()).isLessThan(currentEpoch)) {
final int currentCount = prevEpochCount.getAndIncrement();
return currentCount < previousEpochLimit;
}
return true;
})
.map(pooledAttestation -> pooledAttestation.toAttestation(attestationSchema))
.collect(attestationsSchema.collector());
}

private Stream<Attestation> streamAggregatesForDataHashesBySlot(
private Stream<PooledAttestationWithData> streamAggregatesForDataHashesBySlot(
final Set<Bytes> dataHashSetForSlot,
final BeaconState stateAtBlockSlot,
final AttestationForkChecker forkChecker,
Expand All @@ -317,10 +322,10 @@ private Stream<Attestation> streamAggregatesForDataHashesBySlot(
.filter(group -> isValid(stateAtBlockSlot, group.getAttestationData()))
.filter(forkChecker::areAttestationsFromCorrectFork)
.flatMap(MatchingDataAttestationGroup::stream)
.map(ValidatableAttestation::getAttestation)
.filter(
attestation ->
attestation.requiresCommitteeBits() == blockRequiresAttestationsWithCommitteeBits)
attestation.pooledAttestation().bits().requiresCommitteeBits()
== blockRequiresAttestationsWithCommitteeBits)
.sorted(ATTESTATION_INCLUSION_COMPARATOR);
}

Expand All @@ -332,6 +337,8 @@ public synchronized List<Attestation> getAttestations(

final UInt64 slot = maybeSlot.orElse(recentChainData.getCurrentSlot().orElse(UInt64.ZERO));
final SchemaDefinitions schemaDefinitions = spec.atSlot(slot).getSchemaDefinitions();
final AttestationSchema<Attestation> attestationSchema =
schemaDefinitions.getAttestationSchema();

final boolean requiresCommitteeBits =
schemaDefinitions.getAttestationSchema().requiresCommitteeBits();
Expand All @@ -345,7 +352,7 @@ public synchronized List<Attestation> getAttestations(
.flatMap(
matchingDataAttestationGroup ->
matchingDataAttestationGroup.stream(maybeCommitteeIndex, requiresCommitteeBits))
.map(ValidatableAttestation::getAttestation)
.map(pooledAttestation -> pooledAttestation.toAttestation(attestationSchema))
.toList();
}

Expand All @@ -356,9 +363,20 @@ private boolean isValid(

public synchronized Optional<Attestation> createAggregateFor(
final Bytes32 attestationHashTreeRoot, final Optional<UInt64> committeeIndex) {
return Optional.ofNullable(attestationGroupByDataHash.get(attestationHashTreeRoot))
.flatMap(attestations -> attestations.stream(committeeIndex).findFirst())
.map(ValidatableAttestation::getAttestation);
final MatchingDataAttestationGroup group =
attestationGroupByDataHash.get(attestationHashTreeRoot);
if (group == null) {
return Optional.empty();
}

final AttestationSchema<Attestation> attestationSchema =
spec.atSlot(group.getAttestationData().getSlot())
.getSchemaDefinitions()
.getAttestationSchema();

return group.stream(committeeIndex)
.findFirst()
.map(pooledAttestation -> pooledAttestation.toAttestation(attestationSchema));
}

public synchronized void onReorg(final UInt64 commonAncestorSlot) {
Expand Down
Loading