Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
* Copyright Consensys Software Inc., 2024
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package tech.pegasys.teku.beacon.sync.events;

import java.util.Collection;
import tech.pegasys.teku.infrastructure.events.VoidReturningChannelInterface;
import tech.pegasys.teku.spec.datastructures.blocks.SignedBeaconBlock;

public interface SyncPreImportBlockChannel extends VoidReturningChannelInterface {

void onNewPreImportBlocks(Collection<SignedBeaconBlock> blocks);
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,9 @@ public SyncSource getOrCreateSyncSource(final Eth2Peer peer, final Spec spec) {
final Optional<Integer> maybeMaxBlobSidecarsPerMinute =
maybeMaxBlobsPerBlock.map(
maxBlobsPerBlock -> this.maxBlobSidecarsPerMinute - (batchSize * maxBlobsPerBlock) - 1);
final Optional<Integer> maxDataColumnSidecarsPerMinute =
spec.getNumberOfDataColumns()
.map(dataColumnsPerBlock -> maxBlocksPerMinute * dataColumnsPerBlock.intValue());
return syncSourcesByPeer.computeIfAbsent(
peer,
source ->
Expand All @@ -62,7 +65,8 @@ public SyncSource getOrCreateSyncSource(final Eth2Peer peer, final Spec spec) {
source,
maxBlocksPerMinute,
maybeMaxBlobsPerBlock,
maybeMaxBlobSidecarsPerMinute));
maybeMaxBlobSidecarsPerMinute,
maxDataColumnSidecarsPerMinute));
}

public void onPeerDisconnected(final Eth2Peer peer) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import com.google.common.annotations.VisibleForTesting;
import java.time.Duration;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.logging.log4j.LogManager;
Expand All @@ -29,6 +30,7 @@
import tech.pegasys.teku.networking.p2p.reputation.ReputationAdjustment;
import tech.pegasys.teku.networking.p2p.rpc.RpcResponseListener;
import tech.pegasys.teku.spec.datastructures.blobs.versions.deneb.BlobSidecar;
import tech.pegasys.teku.spec.datastructures.blobs.versions.fulu.DataColumnSidecar;
import tech.pegasys.teku.spec.datastructures.blocks.SignedBeaconBlock;

public class ThrottlingSyncSource implements SyncSource {
Expand All @@ -43,23 +45,40 @@ public class ThrottlingSyncSource implements SyncSource {
private final RateTracker blocksRateTracker;
private final Optional<Integer> maybeMaxBlobsPerBlock;
private final RateTracker blobSidecarsRateTracker;
private final RateTracker dataColumnSidecarsRateTracker;

public ThrottlingSyncSource(
final AsyncRunner asyncRunner,
final TimeProvider timeProvider,
final SyncSource delegate,
final int maxBlocksPerMinute,
final Optional<Integer> maybeMaxBlobsPerBlock,
final Optional<Integer> maybeMaxBlobSidecarsPerMinute) {
final Optional<Integer> maybeMaxBlobSidecarsPerMinute,
final Optional<Integer> maybeMaxDataColumnSidecarsPerMinute) {
this.asyncRunner = asyncRunner;
this.delegate = delegate;
this.blocksRateTracker = RateTracker.create(maxBlocksPerMinute, TIMEOUT_SECONDS, timeProvider);
this.blocksRateTracker =
RateTracker.create(maxBlocksPerMinute, TIMEOUT_SECONDS, timeProvider, "throttling-blocks");
this.maybeMaxBlobsPerBlock = maybeMaxBlobsPerBlock;
this.blobSidecarsRateTracker =
maybeMaxBlobSidecarsPerMinute
.map(
maxBlobSidecarsPerMinute ->
RateTracker.create(maxBlobSidecarsPerMinute, TIMEOUT_SECONDS, timeProvider))
RateTracker.create(
maxBlobSidecarsPerMinute,
TIMEOUT_SECONDS,
timeProvider,
"throttling-blobs"))
.orElse(RateTracker.NOOP);
this.dataColumnSidecarsRateTracker =
maybeMaxDataColumnSidecarsPerMinute
.map(
maxDataColumnSidecarsPerMinute ->
RateTracker.create(
maxDataColumnSidecarsPerMinute,
TIMEOUT_SECONDS,
timeProvider,
"throttling-dataColumn"))
.orElse(RateTracker.NOOP);
}

Expand Down Expand Up @@ -125,6 +144,23 @@ public SafeFuture<Void> requestBlobSidecarsByRange(
});
}

@Override
public SafeFuture<Void> requestDataColumnSidecarsByRange(
final UInt64 startSlot,
final UInt64 count,
final List<UInt64> columns,
final RpcResponseListener<DataColumnSidecar> listener) {
final long maxColumnsSidecarsCount = count.times(columns.size()).longValue();
if (dataColumnSidecarsRateTracker.approveObjectsRequest(maxColumnsSidecarsCount).isPresent()) {
LOG.debug("Sending request for {} data column sidecars on {} columns", count, columns.size());
return delegate.requestDataColumnSidecarsByRange(startSlot, count, columns, listener);
} else {
return asyncRunner.runAfterDelay(
() -> requestDataColumnSidecarsByRange(startSlot, count, columns, listener),
PEER_REQUEST_DELAY);
}
}

@Override
public SafeFuture<Void> disconnectCleanly(final DisconnectReason reason) {
return delegate.disconnectCleanly(reason);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,8 @@ class ThrottlingSyncSourceTest {
delegate,
MAX_BLOCKS_PER_MINUTE,
Optional.of(MAX_BLOBS_PER_BLOCK),
Optional.of(MAX_BLOB_SIDECARS_PER_MINUTE));
Optional.of(MAX_BLOB_SIDECARS_PER_MINUTE),
Optional.empty());

@BeforeEach
void setup() {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright Consensys Software Inc., 2025
* Copyright Consensys Software Inc., 2022
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
/*
* Copyright Consensys Software Inc., 2024
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package tech.pegasys.teku.statetransition.datacolumns.log.rpc;

import java.util.List;
import java.util.Optional;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.function.BiFunction;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import tech.pegasys.teku.infrastructure.logging.LogFormatter;
import tech.pegasys.teku.infrastructure.time.TimeProvider;
import tech.pegasys.teku.infrastructure.unsigned.UInt64;
import tech.pegasys.teku.spec.datastructures.blobs.versions.fulu.DataColumnSidecar;
import tech.pegasys.teku.spec.datastructures.blocks.SlotAndBlockRoot;
import tech.pegasys.teku.spec.datastructures.util.DataColumnSlotAndIdentifier;
import tech.pegasys.teku.statetransition.datacolumns.util.StringifyUtil;

abstract class AbstractDasResponseLogger<TRequest>
extends AbstractResponseLogger<TRequest, DataColumnSidecar, DataColumnSlotAndIdentifier> {
private static final Logger LOG = LogManager.getLogger(DasReqRespLogger.class);

protected static final UInt64 UNKNOWN_SLOT = UInt64.MAX_VALUE;

private final int columnCount = 128;
private final int maxResponseLongStringLength = 512;

public AbstractDasResponseLogger(
final TimeProvider timeProvider,
final Direction direction,
final LoggingPeerId peerId,
final TRequest request) {
super(timeProvider, direction, peerId, request, DataColumnSlotAndIdentifier::fromDataColumn);
}

protected abstract int requestedMaxCount();

@Override
protected Logger getLogger() {
return LOG;
}

protected String responseString(
final List<DataColumnSlotAndIdentifier> responses, final Optional<Throwable> result) {
final String responsesString;
if (responses.isEmpty()) {
responsesString = "<empty>";
} else if (responses.size() == requestedMaxCount()) {
responsesString = "<all requested>";
} else {
responsesString = columnIdsToString(responses);
}

if (result.isEmpty()) {
return responsesString;
} else if (responses.isEmpty()) {
return "error: " + result.get();
} else {
return responsesString + ", error: " + result.get();
}
}

protected String columnIdsToString(final List<DataColumnSlotAndIdentifier> responses) {
final String longString = columnIdsToStringLong(responses);
if (longString.length() <= maxResponseLongStringLength) {
return longString;
} else {
return columnIdsToStringShorter(responses);
}
}

protected String columnIdsToStringLong(final List<DataColumnSlotAndIdentifier> responses) {
return responses.size()
+ " columns: "
+ mapGroupingByBlock(
responses,
(blockId, columns) ->
blockIdString(blockId) + " colIdxs: " + blockResponsesToString(columns))
.collect(Collectors.joining(", "));
}

protected String columnIdsToStringShorter(final List<DataColumnSlotAndIdentifier> responses) {

return mapGroupingByBlock(
responses, (blockId, columns) -> blockIdString(blockId) + ": " + columns.size())
.collect(Collectors.joining(", "));
}

protected <R> Stream<R> mapGroupingByBlock(
final List<DataColumnSlotAndIdentifier> responses,
final BiFunction<SlotAndBlockRoot, List<DataColumnSlotAndIdentifier>, R> mapper) {
SortedMap<SlotAndBlockRoot, List<DataColumnSlotAndIdentifier>> responsesByBlock =
new TreeMap<>(
responses.stream()
.collect(Collectors.groupingBy(AbstractDasResponseLogger::blockIdFromColumnId)));
return responsesByBlock.entrySet().stream()
.map(entry -> mapper.apply(entry.getKey(), entry.getValue()));
}

protected String blockResponsesToString(final List<DataColumnSlotAndIdentifier> responses) {
return StringifyUtil.columnIndexesToString(
responses.stream().map(it -> it.columnIndex().intValue()).toList(), columnCount);
}

private static String blockIdString(final SlotAndBlockRoot blockId) {
if (blockId.getSlot().equals(UNKNOWN_SLOT)) {
return blockId.getBlockRoot().toHexString();
} else {
return "#"
+ blockId.getSlot()
+ " (0x"
+ LogFormatter.formatAbbreviatedHashRoot(blockId.getBlockRoot())
+ ")";
}
}

private static SlotAndBlockRoot blockIdFromColumnId(final DataColumnSlotAndIdentifier columnId) {
return new SlotAndBlockRoot(columnId.slot(), columnId.blockRoot());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
/*
* Copyright Consensys Software Inc., 2024
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package tech.pegasys.teku.statetransition.datacolumns.log.rpc;

import java.util.ArrayList;
import java.util.List;
import java.util.Locale;
import java.util.Optional;
import java.util.function.Function;
import org.apache.logging.log4j.Logger;
import tech.pegasys.teku.infrastructure.time.TimeProvider;

abstract class AbstractResponseLogger<TRequest, TResponse, TResponseSummary>
implements ReqRespResponseLogger<TResponse> {

enum Direction {
INBOUND,
OUTBOUND;

@Override
public String toString() {
return name().toLowerCase(Locale.US);
}
}

protected record Timestamped<T>(long time, T value) {}

protected final TimeProvider timeProvider;
protected final Direction direction;
protected final LoggingPeerId peerId;
protected final TRequest request;
private final Function<TResponse, TResponseSummary> responseSummarizer;
protected final long requestTime;

private final List<Timestamped<TResponseSummary>> responseSummaries = new ArrayList<>();
private volatile boolean done = false;

public AbstractResponseLogger(
final TimeProvider timeProvider,
final Direction direction,
final LoggingPeerId peerId,
final TRequest request,
final Function<TResponse, TResponseSummary> responseSummarizer) {
this.timeProvider = timeProvider;
this.direction = direction;
this.peerId = peerId;
this.request = request;
this.responseSummarizer = responseSummarizer;
this.requestTime = timeProvider.getTimeInMillis().longValue();
}

protected abstract Logger getLogger();

protected abstract void responseComplete(
List<Timestamped<TResponseSummary>> responseSummaries, Optional<Throwable> result);

@Override
public synchronized void onNextItem(final TResponse responseItem) {
if (getLogger().isDebugEnabled()) {
final TResponseSummary responseSummary = responseSummarizer.apply(responseItem);
if (done) {
getLogger().debug("ERROR: Extra onNextItem: " + responseSummary);
return;
}
responseSummaries.add(
new Timestamped<>(timeProvider.getTimeInMillis().longValue(), responseSummary));
}
}

@Override
public void onComplete() {
if (getLogger().isDebugEnabled()) {
if (done) {
getLogger().debug("ERROR: Extra onComplete");
return;
}
finalize(Optional.empty());
}
}

@Override
public void onError(final Throwable error) {
if (getLogger().isDebugEnabled()) {
if (done) {
getLogger().debug("ERROR: Extra onError: " + error);
return;
}
finalize(Optional.ofNullable(error));
}
}

private void finalize(final Optional<Throwable> result) {
done = true;
responseComplete(responseSummaries, result);
}
}
Loading