Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -193,11 +193,7 @@ public void setIndexedAttestation(final IndexedAttestation indexedAttestation) {

public void saveCommitteeShufflingSeedAndCommitteesSize(final BeaconState state) {
saveCommitteeShufflingSeed(state);
// The committees size is only required when the committee_bits field is present in the
// Attestation
if (attestation.isSingleAttestation() || attestation.requiresCommitteeBits()) {
saveCommitteesSize(state);
}
saveCommitteesSize(state);
}

private void saveCommitteeShufflingSeed(final BeaconState state) {
Expand All @@ -212,10 +208,16 @@ private void saveCommitteeShufflingSeed(final BeaconState state) {
this.committeeShufflingSeed = Optional.of(committeeShufflingSeed);
}

private void saveCommitteesSize(final BeaconState state) {
public void saveCommitteesSize(final BeaconState state) {
if (committeesSize.isPresent()) {
return;
}

if (!(attestation.isSingleAttestation() || attestation.requiresCommitteeBits())) {
// it isn't a PECTRA attestation, do nothing
return;
}

final Int2IntMap committeesSize =
spec.getBeaconCommitteesSize(state, attestation.getData().getSlot());
this.committeesSize = Optional.of(committeesSize);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,31 @@

import java.util.List;
import java.util.Optional;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.tuweni.bytes.Bytes32;
import tech.pegasys.teku.ethereum.events.SlotEventsChannel;
import tech.pegasys.teku.infrastructure.async.SafeFuture;
import tech.pegasys.teku.infrastructure.ssz.SszList;
import tech.pegasys.teku.infrastructure.unsigned.UInt64;
import tech.pegasys.teku.spec.Spec;
import tech.pegasys.teku.spec.datastructures.attestation.ValidatableAttestation;
import tech.pegasys.teku.spec.datastructures.operations.Attestation;
import tech.pegasys.teku.spec.datastructures.operations.AttestationData;
import tech.pegasys.teku.spec.datastructures.state.beaconstate.BeaconState;
import tech.pegasys.teku.spec.logic.common.helpers.MiscHelpers;
import tech.pegasys.teku.storage.client.RecentChainData;

public abstract class AggregatingAttestationPool implements SlotEventsChannel {
private static final Logger LOG = LogManager.getLogger();
protected final Spec spec;
protected final RecentChainData recentChainData;

AggregatingAttestationPool(final Spec spec, final RecentChainData recentChainData) {
this.spec = spec;
this.recentChainData = recentChainData;
}

public interface AggregatingAttestationPool extends SlotEventsChannel {
/**
* Default maximum number of attestations to store in the pool.
*
Expand All @@ -33,25 +49,91 @@ public interface AggregatingAttestationPool extends SlotEventsChannel {
* <p>Strictly to cache all attestations for a full 2 epochs is significantly larger than this
* cache.
*/
int DEFAULT_MAXIMUM_ATTESTATION_COUNT = 187_500;
public static final int DEFAULT_MAXIMUM_ATTESTATION_COUNT = 187_500;

/** The valid attestation retention period is 64 slots in deneb */
long ATTESTATION_RETENTION_SLOTS = 64;
public static final long ATTESTATION_RETENTION_SLOTS = 64;

int getSize();
public abstract int getSize();

void add(ValidatableAttestation attestation);
public abstract void add(ValidatableAttestation attestation);

SszList<Attestation> getAttestationsForBlock(
public abstract SszList<Attestation> getAttestationsForBlock(
BeaconState stateAtBlockSlot, AttestationForkChecker forkChecker);

Optional<Attestation> createAggregateFor(
public abstract Optional<Attestation> createAggregateFor(
Bytes32 attestationHashTreeRoot, Optional<UInt64> committeeIndex);

List<Attestation> getAttestations(
public abstract List<Attestation> getAttestations(
Optional<UInt64> maybeSlot, Optional<UInt64> maybeCommitteeIndex);

void onAttestationsIncludedInBlock(UInt64 slot, Iterable<Attestation> attestations);
public abstract void onAttestationsIncludedInBlock(
UInt64 slot, Iterable<Attestation> attestations);

public abstract void onReorg(UInt64 commonAncestorSlot);

/**
* Ensures that the committees size is set in the attestation. This is needed for the
*
* @return false if it was not possible to set the committees size but was required, true
* otherwise.
*/
protected boolean ensureCommitteesSizeInAttestation(final ValidatableAttestation attestation) {
if (attestation.getCommitteesSize().isPresent()
|| !attestation.getAttestation().requiresCommitteeBits()) {
return true;
}

final Optional<BeaconState> maybeState =
retrieveStateForAttestation(attestation.getAttestation().getData());
if (maybeState.isEmpty()) {
return false;
}

attestation.saveCommitteesSize(maybeState.get());

return true;
}

private Optional<BeaconState> retrieveStateForAttestation(final AttestationData attestationData) {
// we can use the first state of the epoch to get committees for an attestation
final MiscHelpers miscHelpers = spec.atSlot(attestationData.getSlot()).miscHelpers();
final Optional<UInt64> maybeEpoch = recentChainData.getCurrentEpoch();
// the only reason this can happen is we don't have a store yet.
if (maybeEpoch.isEmpty()) {
return Optional.empty();
}
final UInt64 currentEpoch = maybeEpoch.get();
final UInt64 attestationEpoch = miscHelpers.computeEpochAtSlot(attestationData.getSlot());

LOG.debug("currentEpoch {}, attestationEpoch {}", currentEpoch, attestationEpoch);
if (attestationEpoch.equals(currentEpoch)
|| attestationEpoch.equals(currentEpoch.minusMinZero(1))) {

try {
return recentChainData.getBestState().map(SafeFuture::getImmediately);
} catch (final IllegalStateException e) {
LOG.debug("Couldn't retrieve state for attestation at slot {}", attestationData.getSlot());
return Optional.empty();
}
}

void onReorg(UInt64 commonAncestorSlot);
// attestation is not from the current or previous epoch
// this is really an edge case because the current or previous epoch is at least 31 slots
// and the attestation is only valid for 64 slots, so it may be epoch-2 but not beyond.
final UInt64 attestationEpochStartSlot = miscHelpers.computeStartSlotAtEpoch(attestationEpoch);
LOG.debug("State at slot {} needed", attestationEpochStartSlot);
try {
// Assuming retrieveStateInEffectAtSlot and getBeaconCommitteesSize are thread-safe
return recentChainData
.retrieveStateInEffectAtSlot(attestationEpochStartSlot)
.getImmediately();
} catch (final IllegalStateException e) {
LOG.debug(
"Couldn't retrieve state in effect at slot {} for attestation at slot {}",
attestationEpochStartSlot,
attestationData.getSlot());
return Optional.empty();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@
import tech.pegasys.teku.spec.datastructures.operations.AttestationData;
import tech.pegasys.teku.spec.datastructures.operations.AttestationSchema;
import tech.pegasys.teku.spec.datastructures.state.beaconstate.BeaconState;
import tech.pegasys.teku.spec.logic.common.helpers.MiscHelpers;
import tech.pegasys.teku.spec.schemas.SchemaDefinitions;
import tech.pegasys.teku.storage.client.RecentChainData;

Expand All @@ -54,7 +53,7 @@
* cases the returned attestations are aggregated to maximise the number of validators that can be
* included.
*/
public class AggregatingAttestationPoolV1 implements AggregatingAttestationPool {
public class AggregatingAttestationPoolV1 extends AggregatingAttestationPool {
private static final Logger LOG = LogManager.getLogger();

static final Comparator<PooledAttestationWithData> ATTESTATION_INCLUSION_COMPARATOR =
Expand All @@ -66,8 +65,6 @@ public class AggregatingAttestationPoolV1 implements AggregatingAttestationPool
new HashMap<>();
private final NavigableMap<UInt64, Set<Bytes>> dataHashBySlot = new TreeMap<>();

private final Spec spec;
private final RecentChainData recentChainData;
private final SettableGauge sizeGauge;
private final int maximumAttestationCount;

Expand All @@ -78,8 +75,7 @@ public AggregatingAttestationPoolV1(
final RecentChainData recentChainData,
final MetricsSystem metricsSystem,
final int maximumAttestationCount) {
this.spec = spec;
this.recentChainData = recentChainData;
super(spec, recentChainData);
this.sizeGauge =
SettableGauge.create(
metricsSystem,
Expand All @@ -91,9 +87,16 @@ public AggregatingAttestationPoolV1(

@Override
public synchronized void add(final ValidatableAttestation attestation) {
final Optional<Int2IntMap> committeesSize =
attestation.getCommitteesSize().or(() -> getCommitteesSize(attestation.getAttestation()));
getOrCreateAttestationGroup(attestation.getAttestation(), committeesSize)
if (!ensureCommitteesSizeInAttestation(attestation)) {
LOG.debug(
"Attestation at slot {}, block root {} and target root {} has no committee size. Will NOT add this attestation to the pool.",
attestation.getData().getSlot(),
attestation.getData().getBeaconBlockRoot(),
attestation.getData().getTarget().getRoot());
return;
}

getOrCreateAttestationGroup(attestation.getData(), attestation.getCommitteesSize())
.ifPresent(
attestationGroup -> {
final boolean added =
Expand All @@ -114,30 +117,12 @@ public synchronized void add(final ValidatableAttestation attestation) {
}
}

private Optional<Int2IntMap> getCommitteesSize(final Attestation attestation) {
if (attestation.requiresCommitteeBits()) {
return getCommitteesSizeUsingTheState(attestation.getData());
}
return Optional.empty();
}

/**
* @param committeesSize Required for aggregating attestations as per <a
* href="https://eips.ethereum.org/EIPS/eip-7549">EIP-7549</a>
*/
private Optional<MatchingDataAttestationGroup> getOrCreateAttestationGroup(
final Attestation attestation, final Optional<Int2IntMap> committeesSize) {
final AttestationData attestationData = attestation.getData();
// if an attestation has committee bits, committees size should have been computed. If this is
// not the case, we should ignore this attestation and not add it to the pool
if (attestation.requiresCommitteeBits() && committeesSize.isEmpty()) {
LOG.debug(
"Committees size couldn't be retrieved for attestation at slot {}, block root {} and target root {}. Will NOT add this attestation to the pool.",
attestationData.getSlot(),
attestationData.getBeaconBlockRoot(),
attestationData.getTarget().getRoot());
return Optional.empty();
}
final AttestationData attestationData, final Optional<Int2IntMap> committeesSize) {
dataHashBySlot
.computeIfAbsent(attestationData.getSlot(), slot -> new HashSet<>())
.add(attestationData.hashTreeRoot());
Expand All @@ -148,58 +133,6 @@ private Optional<MatchingDataAttestationGroup> getOrCreateAttestationGroup(
return Optional.of(attestationGroup);
}

private Optional<Int2IntMap> getCommitteesSizeUsingTheState(
final AttestationData attestationData) {
// we can use the first state of the epoch to get committees for an attestation
final MiscHelpers miscHelpers = spec.atSlot(attestationData.getSlot()).miscHelpers();
final Optional<UInt64> maybeEpoch = recentChainData.getCurrentEpoch();
// the only reason this can happen is we don't have a store yet.
if (maybeEpoch.isEmpty()) {
return Optional.empty();
}
final UInt64 currentEpoch = maybeEpoch.get();
final UInt64 attestationEpoch = miscHelpers.computeEpochAtSlot(attestationData.getSlot());

LOG.debug("currentEpoch {}, attestationEpoch {}", currentEpoch, attestationEpoch);
if (attestationEpoch.equals(currentEpoch)
|| attestationEpoch.equals(currentEpoch.minusMinZero(1))) {

return recentChainData
.getBestState()
.flatMap(
state -> {
try {
return Optional.of(
spec.getBeaconCommitteesSize(
state.getImmediately(), attestationData.getSlot()));
} catch (IllegalStateException e) {
LOG.debug(
"Couldn't retrieve state for committee calculation of slot {}",
attestationData.getSlot());
return Optional.empty();
}
});
}

// attestation is not from the current or previous epoch
// this is really an edge case because the current or previous epoch is at least 31 slots
// and the attestation is only valid for 64 slots, so it may be epoch-2 but not beyond.
final UInt64 attestationEpochStartSlot = miscHelpers.computeStartSlotAtEpoch(attestationEpoch);
LOG.debug("State at slot {} needed", attestationEpochStartSlot);
try {
return recentChainData
.retrieveStateInEffectAtSlot(attestationEpochStartSlot)
.getImmediately()
.map(state -> spec.getBeaconCommitteesSize(state, attestationData.getSlot()));
} catch (final IllegalStateException e) {
LOG.debug(
"Couldn't retrieve state in effect at slot {} for committee calculation of slot {}",
attestationEpochStartSlot,
attestationData.getSlot());
return Optional.empty();
}
}

@Override
public synchronized void onSlot(final UInt64 slot) {
if (slot.compareTo(ATTESTATION_RETENTION_SLOTS) <= 0) {
Expand Down Expand Up @@ -236,7 +169,17 @@ public synchronized void onAttestationsIncludedInBlock(
}

private void onAttestationIncludedInBlock(final UInt64 slot, final Attestation attestation) {
getOrCreateAttestationGroup(attestation, getCommitteesSize(attestation))
final ValidatableAttestation validatableAttestation =
ValidatableAttestation.from(spec, attestation);
if (!ensureCommitteesSizeInAttestation(validatableAttestation)) {
LOG.debug(
"Attestation at slot {}, block root {} and target root {} has no committee size. Unable to call onAttestationIncludedInBlock.",
attestation.getData().getSlot(),
attestation.getData().getBeaconBlockRoot(),
attestation.getData().getTarget().getRoot());
return;
}
getOrCreateAttestationGroup(attestation.getData(), validatableAttestation.getCommitteesSize())
.ifPresent(
attestationGroup -> {
final int numRemoved =
Expand Down
Loading