diff --git a/CHANGELOG.md b/CHANGELOG.md index 115599843fb..e0ab16896c0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -14,4 +14,5 @@ - Updated third party libraries. - Added an info message on startup for the highest supported milestone and associated epoch. - Added jdk 24 docker image build. +- Improved performance when scheduling attestations in the beginning of the epoch for a large number of validators. ### Bug Fixes \ No newline at end of file diff --git a/validator/client/src/main/java/tech/pegasys/teku/validator/client/AbstractAttestationDutySchedulingStrategy.java b/validator/client/src/main/java/tech/pegasys/teku/validator/client/AbstractAttestationDutySchedulingStrategy.java new file mode 100644 index 00000000000..752846d6559 --- /dev/null +++ b/validator/client/src/main/java/tech/pegasys/teku/validator/client/AbstractAttestationDutySchedulingStrategy.java @@ -0,0 +1,186 @@ +/* + * Copyright Consensys Software Inc., 2025 + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package tech.pegasys.teku.validator.client; + +import java.util.List; +import java.util.Optional; +import java.util.function.Function; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.tuweni.bytes.Bytes32; +import tech.pegasys.teku.ethereum.json.types.validator.AttesterDuties; +import tech.pegasys.teku.ethereum.json.types.validator.AttesterDuty; +import tech.pegasys.teku.infrastructure.async.SafeFuture; +import tech.pegasys.teku.infrastructure.unsigned.UInt64; +import tech.pegasys.teku.spec.Spec; +import tech.pegasys.teku.spec.SpecVersion; +import tech.pegasys.teku.spec.datastructures.operations.AttestationData; +import tech.pegasys.teku.validator.api.CommitteeSubscriptionRequest; +import tech.pegasys.teku.validator.client.duties.BeaconCommitteeSubscriptions; +import tech.pegasys.teku.validator.client.duties.SlotBasedScheduledDuties; +import tech.pegasys.teku.validator.client.duties.attestations.AggregationDuty; +import tech.pegasys.teku.validator.client.duties.attestations.AttestationProductionDuty; +import tech.pegasys.teku.validator.client.loader.OwnedValidators; + +abstract class AbstractAttestationDutySchedulingStrategy + implements AttestationDutySchedulingStrategy { + + private static final Logger LOG = LogManager.getLogger(); + + protected final Spec spec; + private final ForkProvider forkProvider; + private final Function< + Bytes32, SlotBasedScheduledDuties> + scheduledDutiesFactory; + protected final OwnedValidators validators; + protected final BeaconCommitteeSubscriptions beaconCommitteeSubscriptions; + + AbstractAttestationDutySchedulingStrategy( + final Spec spec, + final ForkProvider forkProvider, + final Function> + scheduledDutiesFactory, + final OwnedValidators validators, + final BeaconCommitteeSubscriptions beaconCommitteeSubscriptions) { + this.spec = spec; + this.forkProvider = forkProvider; + this.scheduledDutiesFactory = scheduledDutiesFactory; + this.validators = validators; + this.beaconCommitteeSubscriptions = beaconCommitteeSubscriptions; + } + + protected SlotBasedScheduledDuties getScheduledDuties( + final AttesterDuties duties) { + return scheduledDutiesFactory.apply(duties.getDependentRoot()); + } + + protected SafeFuture scheduleDuties( + final SlotBasedScheduledDuties scheduledDuties, + final List duties, + final Optional dvtAttestationAggregations) { + return SafeFuture.allOf( + duties.stream() + .map(duty -> scheduleDuty(scheduledDuties, duty, dvtAttestationAggregations)) + .toArray(SafeFuture[]::new)); + } + + protected SafeFuture scheduleDuty( + final SlotBasedScheduledDuties scheduledDuties, + final AttesterDuty duty, + final Optional dvtAttestationAggregations) { + final Optional maybeValidator = validators.getValidator(duty.getPublicKey()); + if (maybeValidator.isEmpty()) { + return SafeFuture.COMPLETE; + } + final Validator validator = maybeValidator.get(); + final int aggregatorModulo = + spec.atSlot(duty.getSlot()) + .getValidatorsUtil() + .getAggregatorModulo(duty.getCommitteeLength()); + + final SafeFuture> unsignedAttestationFuture = + scheduleAttestationProduction( + scheduledDuties, + duty.getCommitteeIndex(), + duty.getValidatorCommitteeIndex(), + duty.getCommitteeLength(), + duty.getValidatorIndex(), + validator, + duty.getSlot()); + + return scheduleAggregation( + scheduledDuties, + duty.getCommitteeIndex(), + duty.getCommitteesAtSlot(), + duty.getValidatorIndex(), + validator, + duty.getSlot(), + aggregatorModulo, + unsignedAttestationFuture, + dvtAttestationAggregations); + } + + protected SafeFuture> scheduleAttestationProduction( + final SlotBasedScheduledDuties scheduledDuties, + final int attestationCommitteeIndex, + final int attestationCommitteePosition, + final int attestationCommitteeSize, + final int validatorIndex, + final Validator validator, + final UInt64 slot) { + return scheduledDuties.scheduleProduction( + slot, + validator, + duty -> + duty.addValidator( + validator, + attestationCommitteeIndex, + attestationCommitteePosition, + validatorIndex, + attestationCommitteeSize)); + } + + protected SafeFuture scheduleAggregation( + final SlotBasedScheduledDuties scheduledDuties, + final int attestationCommitteeIndex, + final int committeesAtSlot, + final int validatorIndex, + final Validator validator, + final UInt64 slot, + final int aggregatorModulo, + final SafeFuture> unsignedAttestationFuture, + final Optional dvtAttestationAggregations) { + return forkProvider + .getForkInfo(slot) + .thenCompose(forkInfo -> validator.getSigner().signAggregationSlot(slot, forkInfo)) + .thenCompose( + slotSignature -> + dvtAttestationAggregations + .map( + dvt -> + dvt.getCombinedSelectionProofFuture( + validatorIndex, slot, slotSignature)) + .orElse(SafeFuture.completedFuture(slotSignature))) + .thenAccept( + slotSignature -> { + final SpecVersion specVersion = spec.atSlot(slot); + final boolean isAggregator = + specVersion.getValidatorsUtil().isAggregator(slotSignature, aggregatorModulo); + beaconCommitteeSubscriptions.subscribeToBeaconCommittee( + new CommitteeSubscriptionRequest( + validatorIndex, + attestationCommitteeIndex, + UInt64.valueOf(committeesAtSlot), + slot, + isAggregator)); + if (isAggregator) { + scheduledDuties.scheduleAggregation( + slot, + validator, + duty -> + duty.addValidator( + validator, + validatorIndex, + slotSignature, + attestationCommitteeIndex, + unsignedAttestationFuture)); + } + }) + .exceptionally( + error -> { + LOG.error("Failed to schedule aggregation duties", error); + return null; + }); + } +} diff --git a/validator/client/src/main/java/tech/pegasys/teku/validator/client/AbstractDutyLoader.java b/validator/client/src/main/java/tech/pegasys/teku/validator/client/AbstractDutyLoader.java index 589d0bf10b4..5971ab33e62 100644 --- a/validator/client/src/main/java/tech/pegasys/teku/validator/client/AbstractDutyLoader.java +++ b/validator/client/src/main/java/tech/pegasys/teku/validator/client/AbstractDutyLoader.java @@ -58,7 +58,7 @@ public SafeFuture> loadDutiesForEpoch(final UInt64 epoch) { } protected abstract SafeFuture> requestDuties( - final UInt64 epoch, final IntCollection validatorIndices); + UInt64 epoch, IntCollection validatorIndices); protected abstract SafeFuture scheduleAllDuties(UInt64 epoch, D duties); } diff --git a/validator/client/src/main/java/tech/pegasys/teku/validator/client/AttestationDutyBatchSchedulingStrategy.java b/validator/client/src/main/java/tech/pegasys/teku/validator/client/AttestationDutyBatchSchedulingStrategy.java new file mode 100644 index 00000000000..a00d1afe1c1 --- /dev/null +++ b/validator/client/src/main/java/tech/pegasys/teku/validator/client/AttestationDutyBatchSchedulingStrategy.java @@ -0,0 +1,196 @@ +/* + * Copyright Consensys Software Inc., 2025 + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package tech.pegasys.teku.validator.client; + +import com.google.common.annotations.VisibleForTesting; +import java.time.Duration; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.TreeMap; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Function; +import java.util.stream.Collectors; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.tuweni.bytes.Bytes32; +import tech.pegasys.teku.api.response.ValidatorStatus; +import tech.pegasys.teku.bls.BLSPublicKey; +import tech.pegasys.teku.ethereum.json.types.validator.AttesterDuties; +import tech.pegasys.teku.ethereum.json.types.validator.AttesterDuty; +import tech.pegasys.teku.infrastructure.async.AsyncRunner; +import tech.pegasys.teku.infrastructure.async.SafeFuture; +import tech.pegasys.teku.infrastructure.unsigned.UInt64; +import tech.pegasys.teku.spec.Spec; +import tech.pegasys.teku.spec.datastructures.operations.AttesterSlashing; +import tech.pegasys.teku.spec.datastructures.operations.ProposerSlashing; +import tech.pegasys.teku.validator.api.ValidatorTimingChannel; +import tech.pegasys.teku.validator.client.duties.BeaconCommitteeSubscriptions; +import tech.pegasys.teku.validator.client.duties.SlotBasedScheduledDuties; +import tech.pegasys.teku.validator.client.duties.attestations.AggregationDuty; +import tech.pegasys.teku.validator.client.duties.attestations.AttestationProductionDuty; +import tech.pegasys.teku.validator.client.loader.OwnedValidators; + +/** + * Attestation duty scheduling is batched by slots and delays are added in order to avoid expensive + * aggregation slot signing in beginning of the epoch when a node is running a large number of + * validators + */ +public class AttestationDutyBatchSchedulingStrategy + extends AbstractAttestationDutySchedulingStrategy implements ValidatorTimingChannel { + + private static final Logger LOG = LogManager.getLogger(); + + public record SlotBatchingOptions( + int currentEpochSlotsToScheduleBeforeDelay, + Duration currentEpochSchedulingDelay, + int futureEpochSlotsToScheduleBeforeDelay, + Duration futureEpochSchedulingDelay) {} + + public static final SlotBatchingOptions DEFAULT_SLOT_BATCHING_OPTIONS = + new SlotBatchingOptions(4, Duration.ofMillis(500), 1, Duration.ofSeconds(1)); + + private final AtomicReference currentSlot = new AtomicReference<>(UInt64.ZERO); + + private final AsyncRunner asyncRunner; + private final SlotBatchingOptions slotBatchingOptions; + + public AttestationDutyBatchSchedulingStrategy( + final Spec spec, + final ForkProvider forkProvider, + final Function> + scheduledDutiesFactory, + final OwnedValidators validators, + final BeaconCommitteeSubscriptions beaconCommitteeSubscriptions, + final AsyncRunner asyncRunner) { + this( + spec, + forkProvider, + scheduledDutiesFactory, + validators, + beaconCommitteeSubscriptions, + asyncRunner, + DEFAULT_SLOT_BATCHING_OPTIONS); + } + + @VisibleForTesting + AttestationDutyBatchSchedulingStrategy( + final Spec spec, + final ForkProvider forkProvider, + final Function> + scheduledDutiesFactory, + final OwnedValidators validators, + final BeaconCommitteeSubscriptions beaconCommitteeSubscriptions, + final AsyncRunner asyncRunner, + final SlotBatchingOptions slotBatchingOptions) { + super(spec, forkProvider, scheduledDutiesFactory, validators, beaconCommitteeSubscriptions); + this.asyncRunner = asyncRunner; + this.slotBatchingOptions = slotBatchingOptions; + } + + @Override + public SafeFuture> scheduleAllDuties( + final UInt64 epoch, final AttesterDuties duties) { + final SlotBasedScheduledDuties scheduledDuties = + getScheduledDuties(duties); + + // every X amount of slots a delay is added to the scheduling (values are based on if current or + // future epoch) + final boolean isCurrentEpoch = + epoch.isLessThanOrEqualTo(spec.computeEpochAtSlot(currentSlot.get())); + final int slotsBeforeDelay = + isCurrentEpoch + ? slotBatchingOptions.currentEpochSlotsToScheduleBeforeDelay() + : slotBatchingOptions.futureEpochSlotsToScheduleBeforeDelay(); + final Duration schedulingDelay = + isCurrentEpoch + ? slotBatchingOptions.currentEpochSchedulingDelay() + : slotBatchingOptions.futureEpochSchedulingDelay(); + + final Map> dutiesBySlot = + duties.getDuties().stream() + .collect( + Collectors.groupingBy(AttesterDuty::getSlot, TreeMap::new, Collectors.toList())); + + LOG.info( + "Scheduling {} attestation duties for epoch {}, batched across {} slot(s) with a {} ms delay every {} slot(s)", + duties.getDuties().size(), + epoch, + dutiesBySlot.size(), + schedulingDelay.toMillis(), + slotsBeforeDelay); + + SafeFuture dutiesScheduling = SafeFuture.COMPLETE; + + int i = 0; + for (final List dutiesForSlot : dutiesBySlot.values()) { + // no delay at start + if (i != 0 && i % slotsBeforeDelay == 0) { + dutiesScheduling = + dutiesScheduling.thenCompose( + __ -> + asyncRunner.runAfterDelay( + () -> scheduleDuties(scheduledDuties, dutiesForSlot, Optional.empty()), + schedulingDelay)); + } else { + dutiesScheduling = + dutiesScheduling.thenCompose( + __ -> scheduleDuties(scheduledDuties, dutiesForSlot, Optional.empty())); + } + i++; + } + + return dutiesScheduling + .>thenApply(__ -> scheduledDuties) + .alwaysRun(beaconCommitteeSubscriptions::sendRequests); + } + + @Override + public void onSlot(final UInt64 slot) { + currentSlot.set(slot); + } + + @Override + public void onHeadUpdate( + final UInt64 slot, + final Bytes32 previousDutyDependentRoot, + final Bytes32 currentDutyDependentRoot, + final Bytes32 headBlockRoot) {} + + @Override + public void onPossibleMissedEvents() {} + + @Override + public void onValidatorsAdded() {} + + @Override + public void onBlockProductionDue(final UInt64 slot) {} + + @Override + public void onAttestationCreationDue(final UInt64 slot) {} + + @Override + public void onAttestationAggregationDue(final UInt64 slot) {} + + @Override + public void onAttesterSlashing(final AttesterSlashing attesterSlashing) {} + + @Override + public void onProposerSlashing(final ProposerSlashing proposerSlashing) {} + + @Override + public void onUpdatedValidatorStatuses( + final Map newValidatorStatuses, + final boolean possibleMissingEvents) {} +} diff --git a/validator/client/src/main/java/tech/pegasys/teku/validator/client/AttestationDutyDefaultSchedulingStrategy.java b/validator/client/src/main/java/tech/pegasys/teku/validator/client/AttestationDutyDefaultSchedulingStrategy.java new file mode 100644 index 00000000000..c4da83b44c4 --- /dev/null +++ b/validator/client/src/main/java/tech/pegasys/teku/validator/client/AttestationDutyDefaultSchedulingStrategy.java @@ -0,0 +1,66 @@ +/* + * Copyright Consensys Software Inc., 2025 + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package tech.pegasys.teku.validator.client; + +import java.util.Optional; +import java.util.function.Function; +import org.apache.tuweni.bytes.Bytes32; +import tech.pegasys.teku.ethereum.json.types.validator.AttesterDuties; +import tech.pegasys.teku.infrastructure.async.SafeFuture; +import tech.pegasys.teku.infrastructure.unsigned.UInt64; +import tech.pegasys.teku.spec.Spec; +import tech.pegasys.teku.validator.api.ValidatorApiChannel; +import tech.pegasys.teku.validator.client.duties.BeaconCommitteeSubscriptions; +import tech.pegasys.teku.validator.client.duties.SlotBasedScheduledDuties; +import tech.pegasys.teku.validator.client.duties.attestations.AggregationDuty; +import tech.pegasys.teku.validator.client.duties.attestations.AttestationProductionDuty; +import tech.pegasys.teku.validator.client.loader.OwnedValidators; + +public class AttestationDutyDefaultSchedulingStrategy + extends AbstractAttestationDutySchedulingStrategy { + + private final ValidatorApiChannel validatorApiChannel; + private final boolean useDvtEndpoint; + + public AttestationDutyDefaultSchedulingStrategy( + final Spec spec, + final ForkProvider forkProvider, + final Function> + scheduledDutiesFactory, + final OwnedValidators validators, + final BeaconCommitteeSubscriptions beaconCommitteeSubscriptions, + final ValidatorApiChannel validatorApiChannel, + final boolean useDvtEndpoint) { + super(spec, forkProvider, scheduledDutiesFactory, validators, beaconCommitteeSubscriptions); + this.validatorApiChannel = validatorApiChannel; + this.useDvtEndpoint = useDvtEndpoint; + } + + @Override + public SafeFuture> scheduleAllDuties( + final UInt64 epoch, final AttesterDuties duties) { + final SlotBasedScheduledDuties scheduledDuties = + getScheduledDuties(duties); + + final Optional dvtAttestationAggregations = + useDvtEndpoint + ? Optional.of( + new DvtAttestationAggregations(validatorApiChannel, duties.getDuties().size())) + : Optional.empty(); + + return scheduleDuties(scheduledDuties, duties.getDuties(), dvtAttestationAggregations) + .>thenApply(__ -> scheduledDuties) + .alwaysRun(beaconCommitteeSubscriptions::sendRequests); + } +} diff --git a/validator/client/src/main/java/tech/pegasys/teku/validator/client/AttestationDutyLoader.java b/validator/client/src/main/java/tech/pegasys/teku/validator/client/AttestationDutyLoader.java index 4595eab41c6..565a80d066e 100644 --- a/validator/client/src/main/java/tech/pegasys/teku/validator/client/AttestationDutyLoader.java +++ b/validator/client/src/main/java/tech/pegasys/teku/validator/client/AttestationDutyLoader.java @@ -15,55 +15,27 @@ import it.unimi.dsi.fastutil.ints.IntCollection; import java.util.Optional; -import java.util.function.Function; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; -import org.apache.tuweni.bytes.Bytes32; import tech.pegasys.teku.ethereum.json.types.validator.AttesterDuties; -import tech.pegasys.teku.ethereum.json.types.validator.AttesterDuty; import tech.pegasys.teku.infrastructure.async.SafeFuture; import tech.pegasys.teku.infrastructure.unsigned.UInt64; -import tech.pegasys.teku.spec.Spec; -import tech.pegasys.teku.spec.SpecVersion; -import tech.pegasys.teku.spec.datastructures.operations.AttestationData; -import tech.pegasys.teku.validator.api.CommitteeSubscriptionRequest; import tech.pegasys.teku.validator.api.ValidatorApiChannel; -import tech.pegasys.teku.validator.client.duties.BeaconCommitteeSubscriptions; import tech.pegasys.teku.validator.client.duties.SlotBasedScheduledDuties; -import tech.pegasys.teku.validator.client.duties.attestations.AggregationDuty; -import tech.pegasys.teku.validator.client.duties.attestations.AttestationProductionDuty; import tech.pegasys.teku.validator.client.loader.OwnedValidators; public class AttestationDutyLoader extends AbstractDutyLoader> { - private static final Logger LOG = LogManager.getLogger(); private final ValidatorApiChannel validatorApiChannel; - private final ForkProvider forkProvider; - private final Function< - Bytes32, SlotBasedScheduledDuties> - scheduledDutiesFactory; - private final BeaconCommitteeSubscriptions beaconCommitteeSubscriptions; - private final Spec spec; - private final boolean useDvtEndpoint; + private final AttestationDutySchedulingStrategySelector attestationDutySchedulingStrategySelector; public AttestationDutyLoader( - final ValidatorApiChannel validatorApiChannel, - final ForkProvider forkProvider, - final Function> - scheduledDutiesFactory, final OwnedValidators validators, final ValidatorIndexProvider validatorIndexProvider, - final BeaconCommitteeSubscriptions beaconCommitteeSubscriptions, - final Spec spec, - final boolean useDvtEndpoint) { + final ValidatorApiChannel validatorApiChannel, + final AttestationDutySchedulingStrategySelector attestationDutySchedulingStrategySelector) { super(validators, validatorIndexProvider); this.validatorApiChannel = validatorApiChannel; - this.forkProvider = forkProvider; - this.scheduledDutiesFactory = scheduledDutiesFactory; - this.beaconCommitteeSubscriptions = beaconCommitteeSubscriptions; - this.spec = spec; - this.useDvtEndpoint = useDvtEndpoint; + this.attestationDutySchedulingStrategySelector = attestationDutySchedulingStrategySelector; } @Override @@ -78,131 +50,8 @@ protected SafeFuture> requestDuties( @Override protected SafeFuture> scheduleAllDuties( final UInt64 epoch, final AttesterDuties duties) { - final SlotBasedScheduledDuties scheduledDuties = - scheduledDutiesFactory.apply(duties.getDependentRoot()); - - final Optional dvtAttestationAggregationsForEpoch = - useDvtEndpoint - ? Optional.of( - new DvtAttestationAggregations(validatorApiChannel, duties.getDuties().size())) - : Optional.empty(); - - return SafeFuture.allOf( - duties.getDuties().stream() - .map( - duty -> - scheduleDuties(scheduledDuties, duty, dvtAttestationAggregationsForEpoch)) - .toArray(SafeFuture[]::new)) - .>thenApply(__ -> scheduledDuties) - .alwaysRun(beaconCommitteeSubscriptions::sendRequests); - } - - private SafeFuture scheduleDuties( - final SlotBasedScheduledDuties scheduledDuties, - final AttesterDuty duty, - final Optional dvtAttestationAggregationLoader) { - final Optional maybeValidator = validators.getValidator(duty.getPublicKey()); - if (maybeValidator.isEmpty()) { - return SafeFuture.COMPLETE; - } - final Validator validator = maybeValidator.get(); - final int aggregatorModulo = - spec.atSlot(duty.getSlot()) - .getValidatorsUtil() - .getAggregatorModulo(duty.getCommitteeLength()); - - final SafeFuture> unsignedAttestationFuture = - scheduleAttestationProduction( - scheduledDuties, - duty.getCommitteeIndex(), - duty.getValidatorCommitteeIndex(), - duty.getCommitteeLength(), - duty.getValidatorIndex(), - validator, - duty.getSlot()); - - return scheduleAggregation( - scheduledDuties, - duty.getCommitteeIndex(), - duty.getCommitteesAtSlot(), - duty.getValidatorIndex(), - validator, - duty.getSlot(), - aggregatorModulo, - unsignedAttestationFuture, - dvtAttestationAggregationLoader); - } - - private SafeFuture> scheduleAttestationProduction( - final SlotBasedScheduledDuties scheduledDuties, - final int attestationCommitteeIndex, - final int attestationCommitteePosition, - final int attestationCommitteeSize, - final int validatorIndex, - final Validator validator, - final UInt64 slot) { - return scheduledDuties.scheduleProduction( - slot, - validator, - duty -> - duty.addValidator( - validator, - attestationCommitteeIndex, - attestationCommitteePosition, - validatorIndex, - attestationCommitteeSize)); - } - - private SafeFuture scheduleAggregation( - final SlotBasedScheduledDuties scheduledDuties, - final int attestationCommitteeIndex, - final int committeesAtSlot, - final int validatorIndex, - final Validator validator, - final UInt64 slot, - final int aggregatorModulo, - final SafeFuture> unsignedAttestationFuture, - final Optional dvtAttestationAggregation) { - return forkProvider - .getForkInfo(slot) - .thenCompose(forkInfo -> validator.getSigner().signAggregationSlot(slot, forkInfo)) - .thenCompose( - slotSignature -> - dvtAttestationAggregation - .map( - dvt -> - dvt.getCombinedSelectionProofFuture( - validatorIndex, slot, slotSignature)) - .orElse(SafeFuture.completedFuture(slotSignature))) - .thenAccept( - slotSignature -> { - final SpecVersion specVersion = spec.atSlot(slot); - final boolean isAggregator = - specVersion.getValidatorsUtil().isAggregator(slotSignature, aggregatorModulo); - beaconCommitteeSubscriptions.subscribeToBeaconCommittee( - new CommitteeSubscriptionRequest( - validatorIndex, - attestationCommitteeIndex, - UInt64.valueOf(committeesAtSlot), - slot, - isAggregator)); - if (isAggregator) { - scheduledDuties.scheduleAggregation( - slot, - validator, - duty -> - duty.addValidator( - validator, - validatorIndex, - slotSignature, - attestationCommitteeIndex, - unsignedAttestationFuture)); - } - }) - .exceptionally( - error -> { - LOG.error("Failed to schedule aggregation duties", error); - return null; - }); + return attestationDutySchedulingStrategySelector + .selectStrategy(duties.getDuties().size()) + .scheduleAllDuties(epoch, duties); } } diff --git a/validator/client/src/main/java/tech/pegasys/teku/validator/client/AttestationDutySchedulingStrategy.java b/validator/client/src/main/java/tech/pegasys/teku/validator/client/AttestationDutySchedulingStrategy.java new file mode 100644 index 00000000000..3da4f564437 --- /dev/null +++ b/validator/client/src/main/java/tech/pegasys/teku/validator/client/AttestationDutySchedulingStrategy.java @@ -0,0 +1,24 @@ +/* + * Copyright Consensys Software Inc., 2025 + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package tech.pegasys.teku.validator.client; + +import tech.pegasys.teku.ethereum.json.types.validator.AttesterDuties; +import tech.pegasys.teku.infrastructure.async.SafeFuture; +import tech.pegasys.teku.infrastructure.unsigned.UInt64; +import tech.pegasys.teku.validator.client.duties.SlotBasedScheduledDuties; + +public interface AttestationDutySchedulingStrategy { + + SafeFuture> scheduleAllDuties(UInt64 epoch, AttesterDuties duties); +} diff --git a/validator/client/src/main/java/tech/pegasys/teku/validator/client/AttestationDutySchedulingStrategySelector.java b/validator/client/src/main/java/tech/pegasys/teku/validator/client/AttestationDutySchedulingStrategySelector.java new file mode 100644 index 00000000000..18af219fdce --- /dev/null +++ b/validator/client/src/main/java/tech/pegasys/teku/validator/client/AttestationDutySchedulingStrategySelector.java @@ -0,0 +1,41 @@ +/* + * Copyright Consensys Software Inc., 2025 + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package tech.pegasys.teku.validator.client; + +public class AttestationDutySchedulingStrategySelector { + + private final int minSizeToBatchBySlot; + private final boolean useDvtEndpoint; + private final AttestationDutyDefaultSchedulingStrategy defaultStrategy; + private final AttestationDutyBatchSchedulingStrategy batchStrategy; + + public AttestationDutySchedulingStrategySelector( + final int minSizeToBatchBySlot, + final boolean useDvtEndpoint, + final AttestationDutyDefaultSchedulingStrategy defaultStrategy, + final AttestationDutyBatchSchedulingStrategy batchStrategy) { + this.minSizeToBatchBySlot = minSizeToBatchBySlot; + this.useDvtEndpoint = useDvtEndpoint; + this.defaultStrategy = defaultStrategy; + this.batchStrategy = batchStrategy; + } + + public AttestationDutySchedulingStrategy selectStrategy(final int expectedDutiesCount) { + // disabling batch flow when DVT is enabled + if (expectedDutiesCount < minSizeToBatchBySlot || useDvtEndpoint) { + return defaultStrategy; + } + return batchStrategy; + } +} diff --git a/validator/client/src/main/java/tech/pegasys/teku/validator/client/ValidatorClientService.java b/validator/client/src/main/java/tech/pegasys/teku/validator/client/ValidatorClientService.java index 429690b885b..83ba8e08ecb 100644 --- a/validator/client/src/main/java/tech/pegasys/teku/validator/client/ValidatorClientService.java +++ b/validator/client/src/main/java/tech/pegasys/teku/validator/client/ValidatorClientService.java @@ -63,7 +63,9 @@ import tech.pegasys.teku.validator.client.duties.BlockDutyFactory; import tech.pegasys.teku.validator.client.duties.SlotBasedScheduledDuties; import tech.pegasys.teku.validator.client.duties.ValidatorDutyMetrics; +import tech.pegasys.teku.validator.client.duties.attestations.AggregationDuty; import tech.pegasys.teku.validator.client.duties.attestations.AttestationDutyFactory; +import tech.pegasys.teku.validator.client.duties.attestations.AttestationProductionDuty; import tech.pegasys.teku.validator.client.duties.synccommittee.ChainHeadTracker; import tech.pegasys.teku.validator.client.duties.synccommittee.SyncCommitteeScheduledDuties; import tech.pegasys.teku.validator.client.loader.HttpClientExternalSignerFactory; @@ -91,6 +93,7 @@ public class ValidatorClientService extends Service { private static final Duration DOPPELGANGER_DETECTOR_CHECK_DELAY = Duration.ofSeconds(12); private static final Duration DOPPELGANGER_DETECTOR_TIMEOUT = Duration.ofMinutes(15); private static final int DOPPELGANGER_DETECTOR_MAX_EPOCHS = 2; + private static final int MIN_SIZE_TO_SCHEDULE_ATTESTATION_DUTIES_IN_BATCHES = 1000; private final EventChannels eventChannels; private final ValidatorLoader validatorLoader; private final BeaconNodeApi beaconNodeApi; @@ -483,23 +486,44 @@ private void scheduleValidatorsDuties( dvtSelectionsEndpointEnabled); final BeaconCommitteeSubscriptions beaconCommitteeSubscriptions = new BeaconCommitteeSubscriptions(validatorApiChannel); + final Function> + scheduledAttestationDutiesFactory = + dependentRoot -> + new SlotBasedScheduledDuties<>( + attestationDutyFactory, + dependentRoot, + validatorDutyMetrics::performDutyWithMetrics); + final AttestationDutyDefaultSchedulingStrategy attestationDutyDefaultSchedulingStrategy = + new AttestationDutyDefaultSchedulingStrategy( + spec, + forkProvider, + scheduledAttestationDutiesFactory, + validators, + beaconCommitteeSubscriptions, + validatorApiChannel, + dvtSelectionsEndpointEnabled); + final AttestationDutyBatchSchedulingStrategy attestationDutyBatchSchedulingStrategy = + new AttestationDutyBatchSchedulingStrategy( + spec, + forkProvider, + scheduledAttestationDutiesFactory, + validators, + beaconCommitteeSubscriptions, + asyncRunner); + validatorTimingChannels.add(attestationDutyBatchSchedulingStrategy); final DutyLoader attestationDutyLoader = new RetryingDutyLoader<>( asyncRunner, timeProvider, new AttestationDutyLoader( - validatorApiChannel, - forkProvider, - dependentRoot -> - new SlotBasedScheduledDuties<>( - attestationDutyFactory, - dependentRoot, - validatorDutyMetrics::performDutyWithMetrics), validators, validatorIndexProvider, - beaconCommitteeSubscriptions, - spec, - dvtSelectionsEndpointEnabled)); + validatorApiChannel, + new AttestationDutySchedulingStrategySelector( + MIN_SIZE_TO_SCHEDULE_ATTESTATION_DUTIES_IN_BATCHES, + dvtSelectionsEndpointEnabled, + attestationDutyDefaultSchedulingStrategy, + attestationDutyBatchSchedulingStrategy))); final DutyLoader blockDutyLoader = new RetryingDutyLoader<>( asyncRunner, diff --git a/validator/client/src/test/java/tech/pegasys/teku/validator/client/AttestationDutyBatchSchedulingStrategyTest.java b/validator/client/src/test/java/tech/pegasys/teku/validator/client/AttestationDutyBatchSchedulingStrategyTest.java new file mode 100644 index 00000000000..607937ac065 --- /dev/null +++ b/validator/client/src/test/java/tech/pegasys/teku/validator/client/AttestationDutyBatchSchedulingStrategyTest.java @@ -0,0 +1,160 @@ +/* + * Copyright Consensys Software Inc., 2025 + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package tech.pegasys.teku.validator.client; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import it.unimi.dsi.fastutil.ints.IntList; +import java.time.Duration; +import java.util.List; +import java.util.Map; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import tech.pegasys.teku.bls.BLSPublicKey; +import tech.pegasys.teku.ethereum.json.types.validator.AttesterDuties; +import tech.pegasys.teku.ethereum.json.types.validator.AttesterDuty; +import tech.pegasys.teku.infrastructure.async.SafeFuture; +import tech.pegasys.teku.infrastructure.async.StubAsyncRunner; +import tech.pegasys.teku.infrastructure.time.StubTimeProvider; +import tech.pegasys.teku.infrastructure.unsigned.UInt64; +import tech.pegasys.teku.spec.Spec; +import tech.pegasys.teku.spec.TestSpecFactory; +import tech.pegasys.teku.spec.datastructures.state.ForkInfo; +import tech.pegasys.teku.spec.signatures.Signer; +import tech.pegasys.teku.spec.util.DataStructureUtil; +import tech.pegasys.teku.validator.api.FileBackedGraffitiProvider; +import tech.pegasys.teku.validator.client.AttestationDutyBatchSchedulingStrategy.SlotBatchingOptions; +import tech.pegasys.teku.validator.client.duties.BeaconCommitteeSubscriptions; +import tech.pegasys.teku.validator.client.duties.SlotBasedScheduledDuties; +import tech.pegasys.teku.validator.client.duties.attestations.AggregationDuty; +import tech.pegasys.teku.validator.client.duties.attestations.AttestationProductionDuty; +import tech.pegasys.teku.validator.client.loader.OwnedValidators; + +class AttestationDutyBatchSchedulingStrategyTest { + + private static final IntList VALIDATOR_INDICES = IntList.of(1, 2, 3, 4, 5, 6, 7, 8); + private static final SlotBatchingOptions SLOT_BATCHING_TEST_OPTIONS = + new SlotBatchingOptions(4, Duration.ofMillis(50), 1, Duration.ofMillis(50)); + + private final Spec spec = TestSpecFactory.createMinimalPhase0(); + private final DataStructureUtil dataStructureUtil = new DataStructureUtil(spec); + private final ForkProvider forkProvider = mock(ForkProvider.class); + private final BeaconCommitteeSubscriptions beaconCommitteeSubscriptions = + mock(BeaconCommitteeSubscriptions.class); + + @SuppressWarnings("unchecked") + private final SlotBasedScheduledDuties + scheduledDuties = mock(SlotBasedScheduledDuties.class); + + private final BLSPublicKey validatorKey = dataStructureUtil.randomPublicKey(); + private final Signer signer = mock(Signer.class); + private final Validator validator = + new Validator(validatorKey, signer, new FileBackedGraffitiProvider()); + private final Map validators = Map.of(validatorKey, validator); + private final ForkInfo forkInfo = dataStructureUtil.randomForkInfo(); + private final StubTimeProvider timeProvider = StubTimeProvider.withTimeInSeconds(0); + private final StubAsyncRunner asyncRunner = new StubAsyncRunner(timeProvider); + + private final AttestationDutyBatchSchedulingStrategy dutySchedulingStrategy = + new AttestationDutyBatchSchedulingStrategy( + spec, + forkProvider, + dependentRoot -> scheduledDuties, + new OwnedValidators(validators), + beaconCommitteeSubscriptions, + asyncRunner, + SLOT_BATCHING_TEST_OPTIONS); + + @BeforeEach + void setUp() { + when(forkProvider.getForkInfo(any())).thenReturn(SafeFuture.completedFuture(forkInfo)); + } + + @Test + void shouldBatchBySlotsWhenSchedulingEpochIsCurrent() { + final UInt64 epoch = UInt64.ZERO; + final AttesterDuties duties = getTestDuties(epoch); + + when(signer.signAggregationSlot(any(UInt64.class), eq(forkInfo))) + .thenReturn(SafeFuture.completedFuture(dataStructureUtil.randomSignature())); + + final SafeFuture> result = + dutySchedulingStrategy.scheduleAllDuties(epoch, duties); + + // there should be 1 delay: 8 / 4 - 1 (no delay at start) + asyncRunner.executeDueActions(); + assertThat(result).isNotCompleted(); + assertThat(asyncRunner.countDelayedActions()).isOne(); + timeProvider.advanceTimeBy(SLOT_BATCHING_TEST_OPTIONS.currentEpochSchedulingDelay()); + + asyncRunner.executeDueActions(); + assertThat(result).isCompleted(); + + verify(beaconCommitteeSubscriptions, times(8)).subscribeToBeaconCommittee(any()); + verify(beaconCommitteeSubscriptions).sendRequests(); + } + + @Test + void shouldBatchBySlotsWhenSchedulingEpochIsInTheFuture() { + // still in epoch 0 + dutySchedulingStrategy.onSlot(UInt64.valueOf(3)); + final AttesterDuties duties = getTestDuties(UInt64.ZERO); + + when(signer.signAggregationSlot(any(UInt64.class), eq(forkInfo))) + .thenReturn(SafeFuture.completedFuture(dataStructureUtil.randomSignature())); + + final SafeFuture> result = + dutySchedulingStrategy.scheduleAllDuties(UInt64.ONE, duties); + + // there should be total of 7 delays: 8 - 1 (no delay at the start) + for (int i = 0; i < 7; i++) { + asyncRunner.executeDueActions(); + assertThat(result).isNotCompleted(); + assertThat(asyncRunner.countDelayedActions()).isOne(); + timeProvider.advanceTimeBy(SLOT_BATCHING_TEST_OPTIONS.futureEpochSchedulingDelay()); + } + + asyncRunner.executeDueActions(); + assertThat(result).isCompleted(); + + verify(beaconCommitteeSubscriptions, times(8)).subscribeToBeaconCommittee(any()); + verify(beaconCommitteeSubscriptions).sendRequests(); + } + + private AttesterDuties getTestDuties(final UInt64 epoch) { + // 8 duties + final List duties = + UInt64.range( + spec.computeStartSlotAtEpoch(epoch), + spec.computeStartSlotAtEpoch(epoch.increment())) + .map( + slot -> + new AttesterDuty( + validatorKey, + VALIDATOR_INDICES.getInt(slot.intValue() % 2), + 1, + 3, + 4, + 0, + slot)) + .toList(); + return new AttesterDuties(false, dataStructureUtil.randomBytes32(), duties); + } +} diff --git a/validator/client/src/test/java/tech/pegasys/teku/validator/client/AttestationDutyLoaderTest.java b/validator/client/src/test/java/tech/pegasys/teku/validator/client/AttestationDutyDefaultSchedulingStrategyTest.java similarity index 78% rename from validator/client/src/test/java/tech/pegasys/teku/validator/client/AttestationDutyLoaderTest.java rename to validator/client/src/test/java/tech/pegasys/teku/validator/client/AttestationDutyDefaultSchedulingStrategyTest.java index 23537ff6c55..ed9e74177d7 100644 --- a/validator/client/src/test/java/tech/pegasys/teku/validator/client/AttestationDutyLoaderTest.java +++ b/validator/client/src/test/java/tech/pegasys/teku/validator/client/AttestationDutyDefaultSchedulingStrategyTest.java @@ -23,7 +23,6 @@ import it.unimi.dsi.fastutil.ints.IntList; import java.util.List; import java.util.Map; -import java.util.Optional; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import tech.pegasys.teku.bls.BLSPublicKey; @@ -45,9 +44,9 @@ import tech.pegasys.teku.validator.client.duties.attestations.AttestationProductionDuty; import tech.pegasys.teku.validator.client.loader.OwnedValidators; -class AttestationDutyLoaderTest { +class AttestationDutyDefaultSchedulingStrategyTest { - private static final IntList VALIDATOR_INDICES = IntList.of(1); + private static final IntList VALIDATOR_INDICES = IntList.of(1, 2, 3, 4, 5, 6, 7, 8); private final Spec spec = TestSpecFactory.createMinimalPhase0(); private final DataStructureUtil dataStructureUtil = new DataStructureUtil(spec); @@ -60,7 +59,6 @@ class AttestationDutyLoaderTest { private final SlotBasedScheduledDuties scheduledDuties = mock(SlotBasedScheduledDuties.class); - private final ValidatorIndexProvider validatorIndexProvider = mock(ValidatorIndexProvider.class); private final BLSPublicKey validatorKey = dataStructureUtil.randomPublicKey(); private final Signer signer = mock(Signer.class); private final Validator validator = @@ -68,21 +66,18 @@ class AttestationDutyLoaderTest { private final Map validators = Map.of(validatorKey, validator); private final ForkInfo forkInfo = dataStructureUtil.randomForkInfo(); - private final AttestationDutyLoader dutyLoader = - new AttestationDutyLoader( - validatorApiChannel, + private final AttestationDutyDefaultSchedulingStrategy dutySchedulingStrategy = + new AttestationDutyDefaultSchedulingStrategy( + spec, forkProvider, dependentRoot -> scheduledDuties, new OwnedValidators(validators), - validatorIndexProvider, beaconCommitteeSubscriptions, - spec, + validatorApiChannel, false); @BeforeEach void setUp() { - when(validatorIndexProvider.getValidatorIndices()) - .thenReturn(SafeFuture.completedFuture(VALIDATOR_INDICES)); when(forkProvider.getForkInfo(any())).thenReturn(SafeFuture.completedFuture(forkInfo)); } @@ -102,18 +97,15 @@ void shouldSubscribeToSubnetWhenValidatorIsAggregator() { committeesAtSlot, 0, slot); - when(validatorApiChannel.getAttestationDuties(UInt64.ONE, VALIDATOR_INDICES)) - .thenReturn( - SafeFuture.completedFuture( - Optional.of( - new AttesterDuties(false, dataStructureUtil.randomBytes32(), List.of(duty))))); + final AttesterDuties duties = + new AttesterDuties(false, dataStructureUtil.randomBytes32(), List.of(duty)); when(scheduledDuties.scheduleProduction(any(), any(), any())).thenReturn(new SafeFuture<>()); when(signer.signAggregationSlot(slot, forkInfo)) .thenReturn(SafeFuture.completedFuture(dataStructureUtil.randomSignature())); - final SafeFuture>> result = - dutyLoader.loadDutiesForEpoch(UInt64.ONE); + final SafeFuture> result = + dutySchedulingStrategy.scheduleAllDuties(UInt64.ONE, duties); assertThat(result).isCompleted(); verify(beaconCommitteeSubscriptions) @@ -139,18 +131,15 @@ void shouldSubscribeToSubnetWhenValidatorIsNotAggregator() { committeesAtSlot, 0, slot); - when(validatorApiChannel.getAttestationDuties(UInt64.ONE, VALIDATOR_INDICES)) - .thenReturn( - SafeFuture.completedFuture( - Optional.of( - new AttesterDuties(false, dataStructureUtil.randomBytes32(), List.of(duty))))); + final AttesterDuties duties = + new AttesterDuties(false, dataStructureUtil.randomBytes32(), List.of(duty)); when(scheduledDuties.scheduleProduction(any(), any(), any())).thenReturn(new SafeFuture<>()); when(signer.signAggregationSlot(slot, forkInfo)) .thenReturn(SafeFuture.completedFuture(dataStructureUtil.randomSignature())); - final SafeFuture>> result = - dutyLoader.loadDutiesForEpoch(UInt64.ONE); + final SafeFuture> result = + dutySchedulingStrategy.scheduleAllDuties(UInt64.ONE, duties); assertThat(result).isCompleted(); verify(beaconCommitteeSubscriptions) @@ -162,13 +151,10 @@ void shouldSubscribeToSubnetWhenValidatorIsNotAggregator() { @Test void shouldSendSubscriptionRequestsWhenAllDutiesAreScheduled() { - when(validatorApiChannel.getAttestationDuties(UInt64.ONE, VALIDATOR_INDICES)) - .thenReturn( - SafeFuture.completedFuture( - Optional.of( - new AttesterDuties(false, dataStructureUtil.randomBytes32(), emptyList())))); - final SafeFuture>> result = - dutyLoader.loadDutiesForEpoch(UInt64.ONE); + final AttesterDuties duties = + new AttesterDuties(false, dataStructureUtil.randomBytes32(), emptyList()); + final SafeFuture> result = + dutySchedulingStrategy.scheduleAllDuties(UInt64.ONE, duties); assertThat(result).isCompleted(); verify(beaconCommitteeSubscriptions).sendRequests(); diff --git a/validator/client/src/test/java/tech/pegasys/teku/validator/client/AttestationDutySchedulerTest.java b/validator/client/src/test/java/tech/pegasys/teku/validator/client/AttestationDutySchedulerTest.java index 6f830cbaaba..b90a950c51b 100644 --- a/validator/client/src/test/java/tech/pegasys/teku/validator/client/AttestationDutySchedulerTest.java +++ b/validator/client/src/test/java/tech/pegasys/teku/validator/client/AttestationDutySchedulerTest.java @@ -16,6 +16,7 @@ import static java.util.Collections.emptyList; import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; @@ -68,6 +69,10 @@ public class AttestationDutySchedulerTest extends AbstractDutySchedulerTest { private final SlotBasedScheduledDuties scheduledDuties = mock(SlotBasedScheduledDuties.class); + private final AttestationDutySchedulingStrategySelector + attestationDutySchedulingStrategySelector = + mock(AttestationDutySchedulingStrategySelector.class); + private final StubMetricsSystem metricsSystem2 = new StubMetricsSystem(); private final Spec spec = TestSpecFactory.createMinimalPhase0(); @@ -936,40 +941,60 @@ public void shouldOnlyUpdateNextAttestationSlotFromCurrentScheduledSlotOnwards() } private void createDutySchedulerWithRealDuties() { - final AttestationDutyLoader attestationDutyLoader = - new AttestationDutyLoader( - validatorApiChannel, + final OwnedValidators validators = + new OwnedValidators(Map.of(VALIDATOR1_KEY, validator1, VALIDATOR2_KEY, validator2)); + final AttestationDutyDefaultSchedulingStrategy dutySchedulingStrategy = + new AttestationDutyDefaultSchedulingStrategy( + spec, forkProvider, dependentRoot -> new SlotBasedScheduledDuties<>( attestationDutyFactory, dependentRoot, Duty::performDuty), - new OwnedValidators(Map.of(VALIDATOR1_KEY, validator1, VALIDATOR2_KEY, validator2)), - validatorIndexProvider, + validators, beaconCommitteeSubscriptions, - spec, + validatorApiChannel, false); + when(attestationDutySchedulingStrategySelector.selectStrategy(anyInt())) + .thenReturn(dutySchedulingStrategy); dutyScheduler = new AttestationDutyScheduler( metricsSystem, - new RetryingDutyLoader<>(asyncRunner, timeProvider, attestationDutyLoader), + new RetryingDutyLoader<>( + asyncRunner, + timeProvider, + new AttestationDutyLoader( + validators, + validatorIndexProvider, + validatorApiChannel, + attestationDutySchedulingStrategySelector)), spec); } private void createDutySchedulerWithMockDuties() { - final AttestationDutyLoader attestationDutyLoader = - new AttestationDutyLoader( - validatorApiChannel, + final OwnedValidators validators = + new OwnedValidators(Map.of(VALIDATOR1_KEY, validator1, VALIDATOR2_KEY, validator2)); + final AttestationDutyDefaultSchedulingStrategy dutySchedulingStrategy = + new AttestationDutyDefaultSchedulingStrategy( + spec, forkProvider, dependentRoot -> scheduledDuties, - new OwnedValidators(Map.of(VALIDATOR1_KEY, validator1, VALIDATOR2_KEY, validator2)), - validatorIndexProvider, + validators, beaconCommitteeSubscriptions, - spec, + validatorApiChannel, false); + when(attestationDutySchedulingStrategySelector.selectStrategy(anyInt())) + .thenReturn(dutySchedulingStrategy); dutyScheduler = new AttestationDutyScheduler( metricsSystem2, - new RetryingDutyLoader<>(asyncRunner, timeProvider, attestationDutyLoader), + new RetryingDutyLoader<>( + asyncRunner, + timeProvider, + new AttestationDutyLoader( + validators, + validatorIndexProvider, + validatorApiChannel, + attestationDutySchedulingStrategySelector)), spec); } } diff --git a/validator/client/src/test/java/tech/pegasys/teku/validator/client/AttestationDutySchedulingStrategySelectorTest.java b/validator/client/src/test/java/tech/pegasys/teku/validator/client/AttestationDutySchedulingStrategySelectorTest.java new file mode 100644 index 00000000000..8c64587b412 --- /dev/null +++ b/validator/client/src/test/java/tech/pegasys/teku/validator/client/AttestationDutySchedulingStrategySelectorTest.java @@ -0,0 +1,59 @@ +/* + * Copyright Consensys Software Inc., 2025 + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package tech.pegasys.teku.validator.client; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.Mockito.mock; + +import org.junit.jupiter.api.Test; + +public class AttestationDutySchedulingStrategySelectorTest { + + private static final int MIN_SIZE_TO_BATCH_BY_SLOT = 42; + + private final AttestationDutyDefaultSchedulingStrategy defaultStrategy = + mock(AttestationDutyDefaultSchedulingStrategy.class); + + private final AttestationDutyBatchSchedulingStrategy batchStrategy = + mock(AttestationDutyBatchSchedulingStrategy.class); + + private final AttestationDutySchedulingStrategySelector strategySelector = + new AttestationDutySchedulingStrategySelector( + MIN_SIZE_TO_BATCH_BY_SLOT, false, defaultStrategy, batchStrategy); + + @Test + public void selectsDefaultStrategyWhenSizeIsLowerThanMinSizeToBatch() { + final AttestationDutySchedulingStrategy result = strategySelector.selectStrategy(40); + + assertThat(result).isEqualTo(defaultStrategy); + } + + @Test + public void selectsBatchStrategyWhenSizeIsHigherThanMinSizeToBatch() { + final AttestationDutySchedulingStrategy result = strategySelector.selectStrategy(43); + + assertThat(result).isEqualTo(batchStrategy); + } + + @Test + public void selectsDefaultStrategyWhenDvtIsEnabled() { + final AttestationDutySchedulingStrategySelector strategySelector = + new AttestationDutySchedulingStrategySelector( + MIN_SIZE_TO_BATCH_BY_SLOT, true, defaultStrategy, batchStrategy); + + final AttestationDutySchedulingStrategy result = strategySelector.selectStrategy(43); + + assertThat(result).isEqualTo(defaultStrategy); + } +}