Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,6 @@
- Third party library updates.

### Bug Fixes
- It is no longer possible to set both `--checkpoint-sync-url` and `--initial-state`.
- It is no longer possible to set both `--checkpoint-sync-url` and `--initial-state`.
- Aggregating attestations using DVT does not cause missed aggregations when multiple validators are
scheduled for the same slot [#9347](https://github.com/Consensys/teku/issues/9347).
Original file line number Diff line number Diff line change
Expand Up @@ -472,12 +472,17 @@ private void scheduleValidatorsDuties(
final BlockDutyFactory blockDutyFactory =
new BlockDutyFactory(
forkProvider, validatorApiChannel, blockContainerSigner, spec, validatorDutyMetrics);
final boolean dvtSelectionsEndpointEnabled =
config.getValidatorConfig().isDvtSelectionsEndpointEnabled();
final AttestationDutyFactory attestationDutyFactory =
new AttestationDutyFactory(spec, forkProvider, validatorApiChannel, validatorDutyMetrics);
new AttestationDutyFactory(
spec,
forkProvider,
validatorApiChannel,
validatorDutyMetrics,
dvtSelectionsEndpointEnabled);
final BeaconCommitteeSubscriptions beaconCommitteeSubscriptions =
new BeaconCommitteeSubscriptions(validatorApiChannel);
final boolean dvtSelectionsEndpointEnabled =
config.getValidatorConfig().isDvtSelectionsEndpointEnabled();
final DutyLoader<?> attestationDutyLoader =
new RetryingDutyLoader<>(
asyncRunner,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,10 @@ public static <T> ProductionResult<T> noop(final BLSPublicKey validatorPublicKey
return new ProductionResult<>(Set.of(validatorPublicKey), DutyResult.NO_OP);
}

public static <T> ProductionResult<T> noop(final Set<BLSPublicKey> validatorPublicKeys) {
return new ProductionResult<>(validatorPublicKeys, DutyResult.NO_OP);
}

public static <T> SafeFuture<DutyResult> send(
final List<ProductionResult<T>> results,
final Function<List<T>, SafeFuture<List<SubmitDataError>>> submitFunction) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,8 @@

package tech.pegasys.teku.validator.client.duties.attestations;

import com.google.common.base.MoreObjects;
import com.google.common.annotations.VisibleForTesting;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import tech.pegasys.teku.bls.BLSSignature;
Expand All @@ -38,14 +36,14 @@
import tech.pegasys.teku.validator.client.duties.DutyResult;
import tech.pegasys.teku.validator.client.duties.ProductionResult;
import tech.pegasys.teku.validator.client.duties.ValidatorDutyMetrics;
import tech.pegasys.teku.validator.client.duties.attestations.AggregationDutyAggregators.CommitteeAggregator;

public class AggregationDuty implements Duty {
private static final Logger LOG = LogManager.getLogger();
private final ConcurrentMap<Integer, CommitteeAggregator> aggregatorsByCommitteeIndex =
new ConcurrentHashMap<>();
private final Spec spec;
private final UInt64 slot;
private final ValidatorApiChannel validatorApiChannel;
private final AggregationDutyAggregators aggregators;
private final ForkProvider forkProvider;
private final ValidatorLogger validatorLogger;
private final SendingStrategy<SignedAggregateAndProof> sendingStrategy;
Expand All @@ -55,13 +53,15 @@ public AggregationDuty(
final Spec spec,
final UInt64 slot,
final ValidatorApiChannel validatorApiChannel,
final AggregationDutyAggregators aggregators,
final ForkProvider forkProvider,
final ValidatorLogger validatorLogger,
final SendingStrategy<SignedAggregateAndProof> sendingStrategy,
final ValidatorDutyMetrics validatorDutyMetrics) {
this.spec = spec;
this.slot = slot;
this.validatorApiChannel = validatorApiChannel;
this.aggregators = aggregators;
this.forkProvider = forkProvider;
this.validatorLogger = validatorLogger;
this.sendingStrategy = sendingStrategy;
Expand Down Expand Up @@ -91,42 +91,34 @@ public void addValidator(
final BLSSignature proof,
final int attestationCommitteeIndex,
final SafeFuture<Optional<AttestationData>> unsignedAttestationFuture) {
aggregatorsByCommitteeIndex.computeIfAbsent(
attestationCommitteeIndex,
committeeIndex ->
new CommitteeAggregator(
validator,
UInt64.valueOf(validatorIndex),
UInt64.valueOf(attestationCommitteeIndex),
proof,
unsignedAttestationFuture));
aggregators.addValidator(
validator, validatorIndex, proof, attestationCommitteeIndex, unsignedAttestationFuture);
}

@Override
public SafeFuture<DutyResult> performDuty() {
LOG.trace("Aggregating attestations at slot {}", slot);
if (aggregatorsByCommitteeIndex.isEmpty()) {
if (!aggregators.hasAggregators()) {
return SafeFuture.completedFuture(DutyResult.NO_OP);
}
return sendingStrategy.send(
aggregatorsByCommitteeIndex.values().stream().map(this::aggregateCommittee));
return sendingStrategy.send(aggregators.streamAggregators().map(this::aggregateCommittee));
}

public SafeFuture<ProductionResult<SignedAggregateAndProof>> aggregateCommittee(
private SafeFuture<ProductionResult<SignedAggregateAndProof>> aggregateCommittee(
final CommitteeAggregator aggregator) {
return aggregator
.unsignedAttestationFuture
.unsignedAttestationFuture()
.thenCompose(maybeAttestation -> createAggregate(aggregator, maybeAttestation))
.exceptionally(
error -> ProductionResult.failure(aggregator.validator.getPublicKey(), error));
error -> ProductionResult.failure(aggregator.validator().getPublicKey(), error));
}

public SafeFuture<ProductionResult<SignedAggregateAndProof>> createAggregate(
private SafeFuture<ProductionResult<SignedAggregateAndProof>> createAggregate(
final CommitteeAggregator aggregator, final Optional<AttestationData> maybeAttestation) {
if (maybeAttestation.isEmpty()) {
return SafeFuture.completedFuture(
ProductionResult.failure(
aggregator.validator.getPublicKey(),
aggregator.validator().getPublicKey(),
new IllegalStateException(
"Unable to perform aggregation for committee because no attestation was produced")));
}
Expand All @@ -139,16 +131,16 @@ public SafeFuture<ProductionResult<SignedAggregateAndProof>> createAggregate(
validatorApiChannel.createAggregate(
slot,
attestationData.hashTreeRoot(),
Optional.of(aggregator.attestationCommitteeIndex)),
Optional.of(aggregator.attestationCommitteeIndex())),
this,
ValidatorDutyMetricsSteps.CREATE);

return createAggregationFuture.thenCompose(
maybeAggregate -> {
if (maybeAggregate.isEmpty()) {
validatorLogger.aggregationSkipped(slot, aggregator.attestationCommitteeIndex);
validatorLogger.aggregationSkipped(slot, aggregator.attestationCommitteeIndex());
return SafeFuture.completedFuture(
ProductionResult.noop(aggregator.validator.getPublicKey()));
ProductionResult.noop(aggregator.validator().getPublicKey()));
}
final Attestation aggregate = maybeAggregate.get();
return validatorDutyMetrics.record(
Expand All @@ -165,55 +157,27 @@ private SafeFuture<ProductionResult<SignedAggregateAndProof>> createSignedAggreg
final AggregateAndProof aggregateAndProof =
schemaDefinitions
.getAggregateAndProofSchema()
.create(aggregator.validatorIndex, aggregate, aggregator.proof);
.create(aggregator.validatorIndex(), aggregate, aggregator.proof());
return forkProvider
.getForkInfo(slot)
.thenCompose(
forkInfo ->
aggregator
.validator
.validator()
.getSigner()
.signAggregateAndProof(aggregateAndProof, forkInfo)
.thenApply(
signature ->
ProductionResult.success(
aggregator.validator.getPublicKey(),
aggregator.validator().getPublicKey(),
aggregateAndProof.getAggregate().getData().getBeaconBlockRoot(),
schemaDefinitions
.getSignedAggregateAndProofSchema()
.create(aggregateAndProof, signature))));
}

private static class CommitteeAggregator {

private final Validator validator;
private final UInt64 validatorIndex;
private final UInt64 attestationCommitteeIndex;
private final BLSSignature proof;
private final SafeFuture<Optional<AttestationData>> unsignedAttestationFuture;

private CommitteeAggregator(
final Validator validator,
final UInt64 validatorIndex,
final UInt64 attestationCommitteeIndex,
final BLSSignature proof,
final SafeFuture<Optional<AttestationData>> unsignedAttestationFuture) {
this.validator = validator;
this.validatorIndex = validatorIndex;
this.attestationCommitteeIndex = attestationCommitteeIndex;
this.proof = proof;
this.unsignedAttestationFuture = unsignedAttestationFuture;
}

@Override
public String toString() {
return MoreObjects.toStringHelper(this)
.add("validator", validator)
.add("validatorIndex", validatorIndex)
.add("attestationCommitteeIndex", attestationCommitteeIndex)
.add("proof", proof)
.add("unsignedAttestationFuture", unsignedAttestationFuture)
.toString();
}
@VisibleForTesting
AggregationDutyAggregators getAggregators() {
return aggregators;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Copyright Consensys Software Inc., 2025
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package tech.pegasys.teku.validator.client.duties.attestations;

import com.google.common.base.MoreObjects;
import java.util.Optional;
import java.util.stream.Stream;
import tech.pegasys.teku.bls.BLSSignature;
import tech.pegasys.teku.infrastructure.async.SafeFuture;
import tech.pegasys.teku.infrastructure.unsigned.UInt64;
import tech.pegasys.teku.spec.datastructures.operations.AttestationData;
import tech.pegasys.teku.validator.client.Validator;

public interface AggregationDutyAggregators {

void addValidator(
final Validator validator,
final int validatorIndex,
final BLSSignature proof,
final int attestationCommitteeIndex,
final SafeFuture<Optional<AttestationData>> unsignedAttestationFuture);

boolean hasAggregators();

Stream<CommitteeAggregator> streamAggregators();

record CommitteeAggregator(
Validator validator,
UInt64 validatorIndex,
UInt64 attestationCommitteeIndex,
BLSSignature proof,
SafeFuture<Optional<AttestationData>> unsignedAttestationFuture) {

@Override
public String toString() {
return MoreObjects.toStringHelper(this)
.add("validator", validator)
.add("validatorIndex", validatorIndex)
.add("attestationCommitteeIndex", attestationCommitteeIndex)
.add("proof", proof)
.add("unsignedAttestationFuture", unsignedAttestationFuture)
.toString();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
* Copyright Consensys Software Inc., 2025
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package tech.pegasys.teku.validator.client.duties.attestations;

import com.google.common.annotations.VisibleForTesting;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.stream.Stream;
import tech.pegasys.teku.bls.BLSSignature;
import tech.pegasys.teku.infrastructure.async.SafeFuture;
import tech.pegasys.teku.infrastructure.unsigned.UInt64;
import tech.pegasys.teku.spec.datastructures.operations.AttestationData;
import tech.pegasys.teku.validator.client.Validator;

public class AggregatorsGroupedByCommittee implements AggregationDutyAggregators {

private final ConcurrentMap<Integer, CommitteeAggregator> aggregatorsByCommitteeIndex =
new ConcurrentHashMap<>();

@Override
public void addValidator(
final Validator validator,
final int validatorIndex,
final BLSSignature proof,
final int attestationCommitteeIndex,
final SafeFuture<Optional<AttestationData>> unsignedAttestationFuture) {
aggregatorsByCommitteeIndex.computeIfAbsent(
attestationCommitteeIndex,
committeeIndex ->
new CommitteeAggregator(
validator,
UInt64.valueOf(validatorIndex),
UInt64.valueOf(attestationCommitteeIndex),
proof,
unsignedAttestationFuture));
}

@Override
public boolean hasAggregators() {
return !aggregatorsByCommitteeIndex.isEmpty();
}

@Override
public Stream<CommitteeAggregator> streamAggregators() {
return aggregatorsByCommitteeIndex.values().stream();
}

@VisibleForTesting
Map<Integer, CommitteeAggregator> getAggregatorsByCommitteeIndex() {
return aggregatorsByCommitteeIndex;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,18 +29,20 @@ public class AttestationDutyFactory
private final Spec spec;
private final ForkProvider forkProvider;
private final ValidatorApiChannel validatorApiChannel;

private final ValidatorDutyMetrics validatorDutyMetrics;
private final boolean isDvtEnabled;

public AttestationDutyFactory(
final Spec spec,
final ForkProvider forkProvider,
final ValidatorApiChannel validatorApiChannel,
final ValidatorDutyMetrics validatorDutyMetrics) {
final ValidatorDutyMetrics validatorDutyMetrics,
final boolean isDvtEnabled) {
this.spec = spec;
this.forkProvider = forkProvider;
this.validatorApiChannel = validatorApiChannel;
this.validatorDutyMetrics = validatorDutyMetrics;
this.isDvtEnabled = isDvtEnabled;
}

@Override
Expand All @@ -61,6 +63,7 @@ public AggregationDuty createAggregationDuty(final UInt64 slot, final Validator
spec,
slot,
validatorApiChannel,
isDvtEnabled ? new UngroupedAggregators() : new AggregatorsGroupedByCommittee(),
forkProvider,
VALIDATOR_LOGGER,
new BatchAttestationSendingStrategy<>(validatorApiChannel::sendAggregateAndProofs),
Expand Down
Loading