diff --git a/CHANGELOG.md b/CHANGELOG.md index fe9150af129..e95975008c9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -16,4 +16,6 @@ - Third party library updates. ### Bug Fixes - - It is no longer possible to set both `--checkpoint-sync-url` and `--initial-state`. \ No newline at end of file + - It is no longer possible to set both `--checkpoint-sync-url` and `--initial-state`. +- Aggregating attestations using DVT does not cause missed aggregations when multiple validators are + scheduled for the same slot [#9347](https://github.com/Consensys/teku/issues/9347). \ No newline at end of file diff --git a/validator/client/src/main/java/tech/pegasys/teku/validator/client/ValidatorClientService.java b/validator/client/src/main/java/tech/pegasys/teku/validator/client/ValidatorClientService.java index 7fc3202e6e4..429690b885b 100644 --- a/validator/client/src/main/java/tech/pegasys/teku/validator/client/ValidatorClientService.java +++ b/validator/client/src/main/java/tech/pegasys/teku/validator/client/ValidatorClientService.java @@ -472,12 +472,17 @@ private void scheduleValidatorsDuties( final BlockDutyFactory blockDutyFactory = new BlockDutyFactory( forkProvider, validatorApiChannel, blockContainerSigner, spec, validatorDutyMetrics); + final boolean dvtSelectionsEndpointEnabled = + config.getValidatorConfig().isDvtSelectionsEndpointEnabled(); final AttestationDutyFactory attestationDutyFactory = - new AttestationDutyFactory(spec, forkProvider, validatorApiChannel, validatorDutyMetrics); + new AttestationDutyFactory( + spec, + forkProvider, + validatorApiChannel, + validatorDutyMetrics, + dvtSelectionsEndpointEnabled); final BeaconCommitteeSubscriptions beaconCommitteeSubscriptions = new BeaconCommitteeSubscriptions(validatorApiChannel); - final boolean dvtSelectionsEndpointEnabled = - config.getValidatorConfig().isDvtSelectionsEndpointEnabled(); final DutyLoader attestationDutyLoader = new RetryingDutyLoader<>( asyncRunner, diff --git a/validator/client/src/main/java/tech/pegasys/teku/validator/client/duties/ProductionResult.java b/validator/client/src/main/java/tech/pegasys/teku/validator/client/duties/ProductionResult.java index 8d415ff5fb5..1a323644fe4 100644 --- a/validator/client/src/main/java/tech/pegasys/teku/validator/client/duties/ProductionResult.java +++ b/validator/client/src/main/java/tech/pegasys/teku/validator/client/duties/ProductionResult.java @@ -69,6 +69,10 @@ public static ProductionResult noop(final BLSPublicKey validatorPublicKey return new ProductionResult<>(Set.of(validatorPublicKey), DutyResult.NO_OP); } + public static ProductionResult noop(final Set validatorPublicKeys) { + return new ProductionResult<>(validatorPublicKeys, DutyResult.NO_OP); + } + public static SafeFuture send( final List> results, final Function, SafeFuture>> submitFunction) { diff --git a/validator/client/src/main/java/tech/pegasys/teku/validator/client/duties/attestations/AggregationDuty.java b/validator/client/src/main/java/tech/pegasys/teku/validator/client/duties/attestations/AggregationDuty.java index ab7e00b9f7f..f659a704c9c 100644 --- a/validator/client/src/main/java/tech/pegasys/teku/validator/client/duties/attestations/AggregationDuty.java +++ b/validator/client/src/main/java/tech/pegasys/teku/validator/client/duties/attestations/AggregationDuty.java @@ -13,10 +13,8 @@ package tech.pegasys.teku.validator.client.duties.attestations; -import com.google.common.base.MoreObjects; +import com.google.common.annotations.VisibleForTesting; import java.util.Optional; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import tech.pegasys.teku.bls.BLSSignature; @@ -38,14 +36,14 @@ import tech.pegasys.teku.validator.client.duties.DutyResult; import tech.pegasys.teku.validator.client.duties.ProductionResult; import tech.pegasys.teku.validator.client.duties.ValidatorDutyMetrics; +import tech.pegasys.teku.validator.client.duties.attestations.AggregationDutyAggregators.CommitteeAggregator; public class AggregationDuty implements Duty { private static final Logger LOG = LogManager.getLogger(); - private final ConcurrentMap aggregatorsByCommitteeIndex = - new ConcurrentHashMap<>(); private final Spec spec; private final UInt64 slot; private final ValidatorApiChannel validatorApiChannel; + private final AggregationDutyAggregators aggregators; private final ForkProvider forkProvider; private final ValidatorLogger validatorLogger; private final SendingStrategy sendingStrategy; @@ -55,6 +53,7 @@ public AggregationDuty( final Spec spec, final UInt64 slot, final ValidatorApiChannel validatorApiChannel, + final AggregationDutyAggregators aggregators, final ForkProvider forkProvider, final ValidatorLogger validatorLogger, final SendingStrategy sendingStrategy, @@ -62,6 +61,7 @@ public AggregationDuty( this.spec = spec; this.slot = slot; this.validatorApiChannel = validatorApiChannel; + this.aggregators = aggregators; this.forkProvider = forkProvider; this.validatorLogger = validatorLogger; this.sendingStrategy = sendingStrategy; @@ -91,42 +91,34 @@ public void addValidator( final BLSSignature proof, final int attestationCommitteeIndex, final SafeFuture> unsignedAttestationFuture) { - aggregatorsByCommitteeIndex.computeIfAbsent( - attestationCommitteeIndex, - committeeIndex -> - new CommitteeAggregator( - validator, - UInt64.valueOf(validatorIndex), - UInt64.valueOf(attestationCommitteeIndex), - proof, - unsignedAttestationFuture)); + aggregators.addValidator( + validator, validatorIndex, proof, attestationCommitteeIndex, unsignedAttestationFuture); } @Override public SafeFuture performDuty() { LOG.trace("Aggregating attestations at slot {}", slot); - if (aggregatorsByCommitteeIndex.isEmpty()) { + if (!aggregators.hasAggregators()) { return SafeFuture.completedFuture(DutyResult.NO_OP); } - return sendingStrategy.send( - aggregatorsByCommitteeIndex.values().stream().map(this::aggregateCommittee)); + return sendingStrategy.send(aggregators.streamAggregators().map(this::aggregateCommittee)); } - public SafeFuture> aggregateCommittee( + private SafeFuture> aggregateCommittee( final CommitteeAggregator aggregator) { return aggregator - .unsignedAttestationFuture + .unsignedAttestationFuture() .thenCompose(maybeAttestation -> createAggregate(aggregator, maybeAttestation)) .exceptionally( - error -> ProductionResult.failure(aggregator.validator.getPublicKey(), error)); + error -> ProductionResult.failure(aggregator.validator().getPublicKey(), error)); } - public SafeFuture> createAggregate( + private SafeFuture> createAggregate( final CommitteeAggregator aggregator, final Optional maybeAttestation) { if (maybeAttestation.isEmpty()) { return SafeFuture.completedFuture( ProductionResult.failure( - aggregator.validator.getPublicKey(), + aggregator.validator().getPublicKey(), new IllegalStateException( "Unable to perform aggregation for committee because no attestation was produced"))); } @@ -139,16 +131,16 @@ public SafeFuture> createAggregate( validatorApiChannel.createAggregate( slot, attestationData.hashTreeRoot(), - Optional.of(aggregator.attestationCommitteeIndex)), + Optional.of(aggregator.attestationCommitteeIndex())), this, ValidatorDutyMetricsSteps.CREATE); return createAggregationFuture.thenCompose( maybeAggregate -> { if (maybeAggregate.isEmpty()) { - validatorLogger.aggregationSkipped(slot, aggregator.attestationCommitteeIndex); + validatorLogger.aggregationSkipped(slot, aggregator.attestationCommitteeIndex()); return SafeFuture.completedFuture( - ProductionResult.noop(aggregator.validator.getPublicKey())); + ProductionResult.noop(aggregator.validator().getPublicKey())); } final Attestation aggregate = maybeAggregate.get(); return validatorDutyMetrics.record( @@ -165,55 +157,27 @@ private SafeFuture> createSignedAggreg final AggregateAndProof aggregateAndProof = schemaDefinitions .getAggregateAndProofSchema() - .create(aggregator.validatorIndex, aggregate, aggregator.proof); + .create(aggregator.validatorIndex(), aggregate, aggregator.proof()); return forkProvider .getForkInfo(slot) .thenCompose( forkInfo -> aggregator - .validator + .validator() .getSigner() .signAggregateAndProof(aggregateAndProof, forkInfo) .thenApply( signature -> ProductionResult.success( - aggregator.validator.getPublicKey(), + aggregator.validator().getPublicKey(), aggregateAndProof.getAggregate().getData().getBeaconBlockRoot(), schemaDefinitions .getSignedAggregateAndProofSchema() .create(aggregateAndProof, signature)))); } - private static class CommitteeAggregator { - - private final Validator validator; - private final UInt64 validatorIndex; - private final UInt64 attestationCommitteeIndex; - private final BLSSignature proof; - private final SafeFuture> unsignedAttestationFuture; - - private CommitteeAggregator( - final Validator validator, - final UInt64 validatorIndex, - final UInt64 attestationCommitteeIndex, - final BLSSignature proof, - final SafeFuture> unsignedAttestationFuture) { - this.validator = validator; - this.validatorIndex = validatorIndex; - this.attestationCommitteeIndex = attestationCommitteeIndex; - this.proof = proof; - this.unsignedAttestationFuture = unsignedAttestationFuture; - } - - @Override - public String toString() { - return MoreObjects.toStringHelper(this) - .add("validator", validator) - .add("validatorIndex", validatorIndex) - .add("attestationCommitteeIndex", attestationCommitteeIndex) - .add("proof", proof) - .add("unsignedAttestationFuture", unsignedAttestationFuture) - .toString(); - } + @VisibleForTesting + AggregationDutyAggregators getAggregators() { + return aggregators; } } diff --git a/validator/client/src/main/java/tech/pegasys/teku/validator/client/duties/attestations/AggregationDutyAggregators.java b/validator/client/src/main/java/tech/pegasys/teku/validator/client/duties/attestations/AggregationDutyAggregators.java new file mode 100644 index 00000000000..a0b94727c1d --- /dev/null +++ b/validator/client/src/main/java/tech/pegasys/teku/validator/client/duties/attestations/AggregationDutyAggregators.java @@ -0,0 +1,56 @@ +/* + * Copyright Consensys Software Inc., 2025 + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package tech.pegasys.teku.validator.client.duties.attestations; + +import com.google.common.base.MoreObjects; +import java.util.Optional; +import java.util.stream.Stream; +import tech.pegasys.teku.bls.BLSSignature; +import tech.pegasys.teku.infrastructure.async.SafeFuture; +import tech.pegasys.teku.infrastructure.unsigned.UInt64; +import tech.pegasys.teku.spec.datastructures.operations.AttestationData; +import tech.pegasys.teku.validator.client.Validator; + +public interface AggregationDutyAggregators { + + void addValidator( + final Validator validator, + final int validatorIndex, + final BLSSignature proof, + final int attestationCommitteeIndex, + final SafeFuture> unsignedAttestationFuture); + + boolean hasAggregators(); + + Stream streamAggregators(); + + record CommitteeAggregator( + Validator validator, + UInt64 validatorIndex, + UInt64 attestationCommitteeIndex, + BLSSignature proof, + SafeFuture> unsignedAttestationFuture) { + + @Override + public String toString() { + return MoreObjects.toStringHelper(this) + .add("validator", validator) + .add("validatorIndex", validatorIndex) + .add("attestationCommitteeIndex", attestationCommitteeIndex) + .add("proof", proof) + .add("unsignedAttestationFuture", unsignedAttestationFuture) + .toString(); + } + } +} diff --git a/validator/client/src/main/java/tech/pegasys/teku/validator/client/duties/attestations/AggregatorsGroupedByCommittee.java b/validator/client/src/main/java/tech/pegasys/teku/validator/client/duties/attestations/AggregatorsGroupedByCommittee.java new file mode 100644 index 00000000000..8f4aee277f5 --- /dev/null +++ b/validator/client/src/main/java/tech/pegasys/teku/validator/client/duties/attestations/AggregatorsGroupedByCommittee.java @@ -0,0 +1,65 @@ +/* + * Copyright Consensys Software Inc., 2025 + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package tech.pegasys.teku.validator.client.duties.attestations; + +import com.google.common.annotations.VisibleForTesting; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.stream.Stream; +import tech.pegasys.teku.bls.BLSSignature; +import tech.pegasys.teku.infrastructure.async.SafeFuture; +import tech.pegasys.teku.infrastructure.unsigned.UInt64; +import tech.pegasys.teku.spec.datastructures.operations.AttestationData; +import tech.pegasys.teku.validator.client.Validator; + +public class AggregatorsGroupedByCommittee implements AggregationDutyAggregators { + + private final ConcurrentMap aggregatorsByCommitteeIndex = + new ConcurrentHashMap<>(); + + @Override + public void addValidator( + final Validator validator, + final int validatorIndex, + final BLSSignature proof, + final int attestationCommitteeIndex, + final SafeFuture> unsignedAttestationFuture) { + aggregatorsByCommitteeIndex.computeIfAbsent( + attestationCommitteeIndex, + committeeIndex -> + new CommitteeAggregator( + validator, + UInt64.valueOf(validatorIndex), + UInt64.valueOf(attestationCommitteeIndex), + proof, + unsignedAttestationFuture)); + } + + @Override + public boolean hasAggregators() { + return !aggregatorsByCommitteeIndex.isEmpty(); + } + + @Override + public Stream streamAggregators() { + return aggregatorsByCommitteeIndex.values().stream(); + } + + @VisibleForTesting + Map getAggregatorsByCommitteeIndex() { + return aggregatorsByCommitteeIndex; + } +} diff --git a/validator/client/src/main/java/tech/pegasys/teku/validator/client/duties/attestations/AttestationDutyFactory.java b/validator/client/src/main/java/tech/pegasys/teku/validator/client/duties/attestations/AttestationDutyFactory.java index c1088eccc1b..42418e6b662 100644 --- a/validator/client/src/main/java/tech/pegasys/teku/validator/client/duties/attestations/AttestationDutyFactory.java +++ b/validator/client/src/main/java/tech/pegasys/teku/validator/client/duties/attestations/AttestationDutyFactory.java @@ -29,18 +29,20 @@ public class AttestationDutyFactory private final Spec spec; private final ForkProvider forkProvider; private final ValidatorApiChannel validatorApiChannel; - private final ValidatorDutyMetrics validatorDutyMetrics; + private final boolean isDvtEnabled; public AttestationDutyFactory( final Spec spec, final ForkProvider forkProvider, final ValidatorApiChannel validatorApiChannel, - final ValidatorDutyMetrics validatorDutyMetrics) { + final ValidatorDutyMetrics validatorDutyMetrics, + final boolean isDvtEnabled) { this.spec = spec; this.forkProvider = forkProvider; this.validatorApiChannel = validatorApiChannel; this.validatorDutyMetrics = validatorDutyMetrics; + this.isDvtEnabled = isDvtEnabled; } @Override @@ -61,6 +63,7 @@ public AggregationDuty createAggregationDuty(final UInt64 slot, final Validator spec, slot, validatorApiChannel, + isDvtEnabled ? new UngroupedAggregators() : new AggregatorsGroupedByCommittee(), forkProvider, VALIDATOR_LOGGER, new BatchAttestationSendingStrategy<>(validatorApiChannel::sendAggregateAndProofs), diff --git a/validator/client/src/main/java/tech/pegasys/teku/validator/client/duties/attestations/UngroupedAggregators.java b/validator/client/src/main/java/tech/pegasys/teku/validator/client/duties/attestations/UngroupedAggregators.java new file mode 100644 index 00000000000..1373dd731df --- /dev/null +++ b/validator/client/src/main/java/tech/pegasys/teku/validator/client/duties/attestations/UngroupedAggregators.java @@ -0,0 +1,62 @@ +/* + * Copyright Consensys Software Inc., 2025 + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package tech.pegasys.teku.validator.client.duties.attestations; + +import com.google.common.annotations.VisibleForTesting; +import java.util.List; +import java.util.Optional; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.stream.Stream; +import tech.pegasys.teku.bls.BLSSignature; +import tech.pegasys.teku.infrastructure.async.SafeFuture; +import tech.pegasys.teku.infrastructure.unsigned.UInt64; +import tech.pegasys.teku.spec.datastructures.operations.AttestationData; +import tech.pegasys.teku.validator.client.Validator; + +public class UngroupedAggregators implements AggregationDutyAggregators { + + private final CopyOnWriteArrayList aggregators = + new CopyOnWriteArrayList<>(); + + @Override + public void addValidator( + final Validator validator, + final int validatorIndex, + final BLSSignature proof, + final int attestationCommitteeIndex, + final SafeFuture> unsignedAttestationFuture) { + aggregators.add( + new CommitteeAggregator( + validator, + UInt64.valueOf(validatorIndex), + UInt64.valueOf(attestationCommitteeIndex), + proof, + unsignedAttestationFuture)); + } + + @Override + public boolean hasAggregators() { + return !aggregators.isEmpty(); + } + + @Override + public Stream streamAggregators() { + return aggregators.stream(); + } + + @VisibleForTesting + List getAggregators() { + return aggregators; + } +} diff --git a/validator/client/src/test/java/tech/pegasys/teku/validator/client/duties/AggregationDutyTest.java b/validator/client/src/test/java/tech/pegasys/teku/validator/client/duties/AggregationDutyTest.java index d6b946a9376..4eaa0b5067b 100644 --- a/validator/client/src/test/java/tech/pegasys/teku/validator/client/duties/AggregationDutyTest.java +++ b/validator/client/src/test/java/tech/pegasys/teku/validator/client/duties/AggregationDutyTest.java @@ -61,7 +61,9 @@ import tech.pegasys.teku.validator.client.ForkProvider; import tech.pegasys.teku.validator.client.Validator; import tech.pegasys.teku.validator.client.duties.attestations.AggregationDuty; +import tech.pegasys.teku.validator.client.duties.attestations.AggregatorsGroupedByCommittee; import tech.pegasys.teku.validator.client.duties.attestations.BatchAttestationSendingStrategy; +import tech.pegasys.teku.validator.client.duties.attestations.UngroupedAggregators; @TestSpecContext(milestone = {PHASE0, ELECTRA}) class AggregationDutyTest { @@ -108,6 +110,7 @@ public void setUp(final SpecContext specContext) { spec, SLOT, validatorApiChannel, + new AggregatorsGroupedByCommittee(), forkProvider, validatorLogger, new BatchAttestationSendingStrategy<>(validatorApiChannel::sendAggregateAndProofs), @@ -290,6 +293,82 @@ public void shouldProduceSingleAggregateAndProofWhenMultipleValidatorsAggregateS .record(any(), any(AggregationDuty.class), eq(ValidatorDutyMetricsSteps.SIGN)); } + @TestTemplate + public void shouldProduceAllAggregatesForSameCommitteeUsingUngroupedAggregators() { + duty = + new AggregationDuty( + spec, + SLOT, + validatorApiChannel, + new UngroupedAggregators(), + forkProvider, + validatorLogger, + new BatchAttestationSendingStrategy<>(validatorApiChannel::sendAggregateAndProofs), + validatorDutyMetrics); + + final int committeeIndex = 2; + + final int validator1Index = 1; + final BLSSignature validator1Proof = dataStructureUtil.randomSignature(); + + final int validator2Index = 6; + final BLSSignature validator2Proof = dataStructureUtil.randomSignature(); + + final AttestationData attestationData = dataStructureUtil.randomAttestationData(); + final Attestation aggregate = dataStructureUtil.randomAttestation(); + duty.addValidator( + validator1, + validator1Index, + validator1Proof, + committeeIndex, + completedFuture(Optional.of(attestationData))); + duty.addValidator( + validator2, + validator2Index, + validator2Proof, + committeeIndex, + completedFuture(Optional.of(attestationData))); + + when(validatorApiChannel.createAggregate( + SLOT, attestationData.hashTreeRoot(), Optional.of(UInt64.valueOf(committeeIndex)))) + .thenReturn(completedFuture(Optional.of(aggregate))); + when(validatorApiChannel.sendAggregateAndProofs(anyList())) + .thenReturn(SafeFuture.completedFuture(Collections.emptyList())); + + final AggregateAndProof aggregateAndProof1 = + aggregateAndProofSchema.create(UInt64.valueOf(validator1Index), aggregate, validator1Proof); + final AggregateAndProof aggregateAndProof2 = + aggregateAndProofSchema.create(UInt64.valueOf(validator2Index), aggregate, validator2Proof); + final BLSSignature aggregateSignature1 = dataStructureUtil.randomSignature(); + final BLSSignature aggregateSignature2 = dataStructureUtil.randomSignature(); + when(signer1.signAggregateAndProof(aggregateAndProof1, forkInfo)) + .thenReturn(SafeFuture.completedFuture(aggregateSignature1)); + when(signer2.signAggregateAndProof(aggregateAndProof2, forkInfo)) + .thenReturn(SafeFuture.completedFuture(aggregateSignature2)); + + performAndReportDuty(); + + // Two proofs should be sent. Both proofs are sent together so there is only one call to + // sendAggregateAndProofs. + verify(validatorApiChannel) + .sendAggregateAndProofs( + List.of( + signedAggregateAndProofSchema.create(aggregateAndProof1, aggregateSignature1), + signedAggregateAndProofSchema.create(aggregateAndProof2, aggregateSignature2))); + // Duty is completed with two aggregates (for the same slot/committee). So we check success + // count is 2 + verify(validatorLogger) + .dutyCompleted( + TYPE, SLOT, 2, Set.of(aggregate.getData().getBeaconBlockRoot()), Optional.empty()); + verifyNoMoreInteractions(validatorLogger); + + // Two aggregation were created, so we capture the time for both individually + verify(validatorDutyMetrics, times(2)) + .record(any(), any(AggregationDuty.class), eq(ValidatorDutyMetricsSteps.CREATE)); + verify(validatorDutyMetrics, times(2)) + .record(any(), any(AggregationDuty.class), eq(ValidatorDutyMetricsSteps.SIGN)); + } + @TestTemplate public void shouldFailWhenAttestationDataNotCreated() { duty.addValidator( diff --git a/validator/client/src/test/java/tech/pegasys/teku/validator/client/duties/attestations/AggregationDutyAggregatorsTest.java b/validator/client/src/test/java/tech/pegasys/teku/validator/client/duties/attestations/AggregationDutyAggregatorsTest.java new file mode 100644 index 00000000000..0bd12362582 --- /dev/null +++ b/validator/client/src/test/java/tech/pegasys/teku/validator/client/duties/attestations/AggregationDutyAggregatorsTest.java @@ -0,0 +1,91 @@ +/* + * Copyright Consensys Software Inc., 2025 + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package tech.pegasys.teku.validator.client.duties.attestations; + +import static org.assertj.core.api.Assertions.assertThat; +import static tech.pegasys.teku.spec.generator.signatures.NoOpRemoteSigner.NO_OP_REMOTE_SIGNER; + +import java.util.Optional; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import tech.pegasys.teku.bls.BLSKeyPair; +import tech.pegasys.teku.infrastructure.async.SafeFuture; +import tech.pegasys.teku.infrastructure.unsigned.UInt64; +import tech.pegasys.teku.spec.Spec; +import tech.pegasys.teku.spec.TestSpecFactory; +import tech.pegasys.teku.spec.util.DataStructureUtil; +import tech.pegasys.teku.validator.client.Validator; +import tech.pegasys.teku.validator.client.duties.attestations.AggregationDutyAggregators.CommitteeAggregator; + +abstract class AggregationDutyAggregatorsTest { + + private final Spec spec = TestSpecFactory.createMinimalElectra(); + private final DataStructureUtil dataStructureUtil = new DataStructureUtil(spec); + + protected AggregationDutyAggregators aggregationDutyAggregators; + + @BeforeEach + public void setUp() { + aggregationDutyAggregators = getAggregator(); + } + + @Test + public void addValidatorsAndStreaming() { + final CommitteeAggregator committeeAggregator1 = randomCommitteeAggregator(); + final CommitteeAggregator committeeAggregator2 = randomCommitteeAggregator(); + final CommitteeAggregator committeeAggregator3 = randomCommitteeAggregator(); + + addValidator(committeeAggregator1); + addValidator(committeeAggregator2); + addValidator(committeeAggregator3); + + assertThat(aggregationDutyAggregators.hasAggregators()).isTrue(); + assertThat(aggregationDutyAggregators.streamAggregators()) + .contains(committeeAggregator1, committeeAggregator2, committeeAggregator3); + } + + @Test + public void hasAggregatorsReturnFalseWhenNoAggregatorsHaveBeenAdded() { + assertThat(aggregationDutyAggregators.hasAggregators()).isFalse(); + } + + abstract AggregationDutyAggregators getAggregator(); + + protected void addValidator(final CommitteeAggregator committeeAggregator) { + aggregationDutyAggregators.addValidator( + committeeAggregator.validator(), + committeeAggregator.validatorIndex().intValue(), + committeeAggregator.proof(), + committeeAggregator.attestationCommitteeIndex().intValue(), + committeeAggregator.unsignedAttestationFuture()); + } + + protected CommitteeAggregator randomCommitteeAggregator() { + return randomCommitteeAggregator(dataStructureUtil.randomPositiveInt()); + } + + protected CommitteeAggregator randomCommitteeAggregator(final int committeeIndex) { + final BLSKeyPair keyPair = dataStructureUtil.randomKeyPair(); + final Validator validator = + new Validator(keyPair.getPublicKey(), NO_OP_REMOTE_SIGNER, Optional::empty, true); + final UInt64 validatorIndex = dataStructureUtil.randomValidatorIndex(); + + return new CommitteeAggregator( + validator, + validatorIndex, + UInt64.valueOf(committeeIndex), + dataStructureUtil.randomSignature(), + SafeFuture.completedFuture(Optional.of(dataStructureUtil.randomAttestationData()))); + } +} diff --git a/validator/client/src/test/java/tech/pegasys/teku/validator/client/duties/attestations/AggregatorsGroupedByCommitteeTest.java b/validator/client/src/test/java/tech/pegasys/teku/validator/client/duties/attestations/AggregatorsGroupedByCommitteeTest.java new file mode 100644 index 00000000000..2fd08137fdb --- /dev/null +++ b/validator/client/src/test/java/tech/pegasys/teku/validator/client/duties/attestations/AggregatorsGroupedByCommitteeTest.java @@ -0,0 +1,49 @@ +/* + * Copyright Consensys Software Inc., 2025 + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package tech.pegasys.teku.validator.client.duties.attestations; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.util.Map; +import org.junit.jupiter.api.Test; +import tech.pegasys.teku.validator.client.duties.attestations.AggregationDutyAggregators.CommitteeAggregator; + +class AggregatorsGroupedByCommitteeTest extends AggregationDutyAggregatorsTest { + + @Override + AggregationDutyAggregators getAggregator() { + return new AggregatorsGroupedByCommittee(); + } + + @Test + public void addAggregatorsForSameCommitteeGroupThemByCommitteeIndex() { + final CommitteeAggregator committeeAggregator1 = randomCommitteeAggregator(1); + final CommitteeAggregator committeeAggregator2 = randomCommitteeAggregator(1); + final CommitteeAggregator committeeAggregator3 = randomCommitteeAggregator(2); + + addValidator(committeeAggregator1); + addValidator(committeeAggregator2); + addValidator(committeeAggregator3); + + final Map aggregatorsByCommitteeIndex = + ((AggregatorsGroupedByCommittee) aggregationDutyAggregators) + .getAggregatorsByCommitteeIndex(); + + assertThat(aggregatorsByCommitteeIndex).hasSize(2); + // committeeAggregator2 isn't on the map because it is from the same committee as + // committeeAggregator1 + assertThat(aggregatorsByCommitteeIndex) + .containsValues(committeeAggregator1, committeeAggregator3); + } +} diff --git a/validator/client/src/test/java/tech/pegasys/teku/validator/client/duties/attestations/AttestationDutyFactoryTest.java b/validator/client/src/test/java/tech/pegasys/teku/validator/client/duties/attestations/AttestationDutyFactoryTest.java new file mode 100644 index 00000000000..fdb5d67c769 --- /dev/null +++ b/validator/client/src/test/java/tech/pegasys/teku/validator/client/duties/attestations/AttestationDutyFactoryTest.java @@ -0,0 +1,60 @@ +/* + * Copyright Consensys Software Inc., 2025 + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package tech.pegasys.teku.validator.client.duties.attestations; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.Mockito.mock; + +import java.util.stream.Stream; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; +import tech.pegasys.teku.infrastructure.metrics.StubMetricsSystem; +import tech.pegasys.teku.infrastructure.unsigned.UInt64; +import tech.pegasys.teku.spec.Spec; +import tech.pegasys.teku.spec.TestSpecFactory; +import tech.pegasys.teku.validator.api.ValidatorApiChannel; +import tech.pegasys.teku.validator.client.ForkProvider; +import tech.pegasys.teku.validator.client.Validator; +import tech.pegasys.teku.validator.client.duties.ValidatorDutyMetrics; + +class AttestationDutyFactoryTest { + + private final Spec spec = TestSpecFactory.createMinimalElectra(); + private final ValidatorApiChannel validatorApiChannel = mock(ValidatorApiChannel.class); + private final ForkProvider forkProvider = mock(ForkProvider.class); + private final ValidatorDutyMetrics validatorDutyMetrics = + ValidatorDutyMetrics.create(new StubMetricsSystem()); + private final Validator validator = mock(Validator.class); + private final UInt64 slot = UInt64.ONE; + + @ParameterizedTest + @MethodSource("aggregatorTypeArgs") + public void shouldUseCorrectAggregatorTypeWhenCreatingAggregationDuty( + final boolean dvtEnabled, final Class expectedAggregatorClass) { + final AttestationDutyFactory attestationDutyFactory = + new AttestationDutyFactory( + spec, forkProvider, validatorApiChannel, validatorDutyMetrics, dvtEnabled); + final AggregationDuty aggregationDuty = + attestationDutyFactory.createAggregationDuty(slot, validator); + + assertThat(aggregationDuty.getAggregators()).isInstanceOf(expectedAggregatorClass); + } + + private static Stream aggregatorTypeArgs() { + return Stream.of( + Arguments.of(false, AggregatorsGroupedByCommittee.class), + Arguments.of(true, UngroupedAggregators.class)); + } +} diff --git a/validator/client/src/test/java/tech/pegasys/teku/validator/client/duties/attestations/UngroupedAggregatorsTest.java b/validator/client/src/test/java/tech/pegasys/teku/validator/client/duties/attestations/UngroupedAggregatorsTest.java new file mode 100644 index 00000000000..a2fa4ef2806 --- /dev/null +++ b/validator/client/src/test/java/tech/pegasys/teku/validator/client/duties/attestations/UngroupedAggregatorsTest.java @@ -0,0 +1,46 @@ +/* + * Copyright Consensys Software Inc., 2025 + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package tech.pegasys.teku.validator.client.duties.attestations; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.util.List; +import org.junit.jupiter.api.Test; +import tech.pegasys.teku.validator.client.duties.attestations.AggregationDutyAggregators.CommitteeAggregator; + +class UngroupedAggregatorsTest extends AggregationDutyAggregatorsTest { + + @Override + AggregationDutyAggregators getAggregator() { + return new UngroupedAggregators(); + } + + @Test + public void addAggregatorsForSameCommitteeDoNotGroupThem() { + final CommitteeAggregator committeeAggregator1 = randomCommitteeAggregator(1); + final CommitteeAggregator committeeAggregator2 = randomCommitteeAggregator(1); + final CommitteeAggregator committeeAggregator3 = randomCommitteeAggregator(2); + + addValidator(committeeAggregator1); + addValidator(committeeAggregator2); + addValidator(committeeAggregator3); + + final List aggregators = + ((UngroupedAggregators) aggregationDutyAggregators).getAggregators(); + + assertThat(aggregators).hasSize(3); + assertThat(aggregators) + .contains(committeeAggregator1, committeeAggregator2, committeeAggregator3); + } +}