Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
- Set Ethereum Classic mainnet activation block for Spiral network upgrade [#6267](https://github.com/hyperledger/besu/pull/6267)
- Add custom genesis file name to config overview if specified [#6297](https://github.com/hyperledger/besu/pull/6297)
- Update Gradle plugins and replace unmaintained License Gradle Plugin with the actively maintained Gradle License Report [#6275](https://github.com/hyperledger/besu/pull/6275)
- Disable transaction handling when the node is not in sync, to avoid unnecessary transaction validation work [#6302](https://github.com/hyperledger/besu/pull/6302)
- Optimize RocksDB WAL files, allows for faster restart and a more linear disk space utilization [#6328](https://github.com/hyperledger/besu/pull/6328)

### Bug fixes
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@
import org.hyperledger.besu.ethereum.eth.manager.EthMessages;
import org.hyperledger.besu.ethereum.eth.manager.EthPeer;
import org.hyperledger.besu.ethereum.eth.manager.EthPeers;
import org.hyperledger.besu.ethereum.eth.manager.EthScheduler;
import org.hyperledger.besu.ethereum.eth.sync.BlockBroadcaster;
import org.hyperledger.besu.ethereum.eth.sync.state.SyncState;
import org.hyperledger.besu.ethereum.eth.transactions.BlobCache;
Expand All @@ -64,6 +63,7 @@
import org.hyperledger.besu.plugin.data.PropagatedBlockContext;
import org.hyperledger.besu.plugin.data.SyncStatus;
import org.hyperledger.besu.services.kvstore.InMemoryKeyValueStorage;
import org.hyperledger.besu.testutil.DeterministicEthScheduler;
import org.hyperledger.besu.testutil.TestClock;

import java.math.BigInteger;
Expand Down Expand Up @@ -100,7 +100,6 @@ public class BesuEventsImplTest {
@Mock private EthPeers mockEthPeers;
@Mock private EthContext mockEthContext;
@Mock private EthMessages mockEthMessages;
@Mock private EthScheduler mockEthScheduler;

@Mock(answer = Answers.RETURNS_DEEP_STUBS)
private TransactionValidatorFactory mockTransactionValidatorFactory;
Expand Down Expand Up @@ -128,7 +127,7 @@ public void setUp() {

when(mockEthContext.getEthMessages()).thenReturn(mockEthMessages);
when(mockEthContext.getEthPeers()).thenReturn(mockEthPeers);
when(mockEthContext.getScheduler()).thenReturn(mockEthScheduler);
when(mockEthContext.getScheduler()).thenReturn(new DeterministicEthScheduler());
lenient().when(mockEthPeers.streamAvailablePeers()).thenAnswer(z -> Stream.empty());
when(mockProtocolContext.getBlockchain()).thenReturn(blockchain);
lenient().when(mockProtocolContext.getWorldStateArchive()).thenReturn(mockWorldStateArchive);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import static java.util.Collections.emptyList;
import static java.util.stream.Collectors.toList;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

Expand Down Expand Up @@ -107,7 +108,7 @@ public void setUp() {
blockchain::getChainHeadHeader);
final ProtocolContext protocolContext = executionContext.getProtocolContext();

EthContext ethContext = mock(EthContext.class);
EthContext ethContext = mock(EthContext.class, RETURNS_DEEP_STUBS);
EthPeers ethPeers = mock(EthPeers.class);
when(ethContext.getEthPeers()).thenReturn(ethPeers);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import static java.util.Collections.emptyList;
import static java.util.stream.Collectors.toList;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

Expand Down Expand Up @@ -107,7 +108,7 @@ public void setUp() {
blockchain::getChainHeadHeader);
final ProtocolContext protocolContext = executionContext.getProtocolContext();

EthContext ethContext = mock(EthContext.class);
EthContext ethContext = mock(EthContext.class, RETURNS_DEEP_STUBS);
EthPeers ethPeers = mock(EthPeers.class);
when(ethContext.getEthPeers()).thenReturn(ethPeers);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,11 @@

import java.time.Duration;
import java.util.Collection;
import java.util.Queue;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
Expand All @@ -34,6 +36,8 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;

Expand Down Expand Up @@ -295,4 +299,49 @@ public <T> void failAfterTimeout(final CompletableFuture<T> promise, final Durat
delay,
unit);
}

public <ITEM> OrderedProcessor<ITEM> createOrderedProcessor(final Consumer<ITEM> processor) {
return new OrderedProcessor<>(processor);
}

/**
* This class is a way to execute a set of tasks, one by one, in a strict order, without blocking
* the caller in case there are still previous tasks queued
*
* @param <ITEM> the class of item to be processed
*/
public class OrderedProcessor<ITEM> {
private final Queue<ITEM> blockAddedQueue = new ConcurrentLinkedQueue<>();
private final ReentrantLock blockAddedLock = new ReentrantLock();
private final Consumer<ITEM> processor;

private OrderedProcessor(final Consumer<ITEM> processor) {
this.processor = processor;
}

public void submit(final ITEM item) {
// add the item to the processing queue
blockAddedQueue.add(item);

if (blockAddedLock.hasQueuedThreads()) {
// another thread is already waiting to process the queue with our item, there is no need to
// schedule another thread
LOG.trace(
"Block added event queue is already being processed and an already queued thread is present, nothing to do");
} else {
servicesExecutor.submit(
() -> {
blockAddedLock.lock();
try {
// now that we have the lock, process as many items as possible
for (ITEM i = blockAddedQueue.poll(); i != null; i = blockAddedQueue.poll()) {
processor.accept(i);
}
} finally {
blockAddedLock.unlock();
}
});
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.hyperledger.besu.ethereum.core.Transaction;
import org.hyperledger.besu.ethereum.eth.manager.EthContext;
import org.hyperledger.besu.ethereum.eth.manager.EthPeer;
import org.hyperledger.besu.ethereum.eth.manager.EthScheduler;
import org.hyperledger.besu.ethereum.mainnet.ProtocolSchedule;
import org.hyperledger.besu.ethereum.mainnet.TransactionValidationParams;
import org.hyperledger.besu.ethereum.mainnet.TransactionValidator;
Expand Down Expand Up @@ -61,11 +62,9 @@
import java.util.Map;
import java.util.Optional;
import java.util.OptionalLong;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
Expand Down Expand Up @@ -107,8 +106,7 @@ public class TransactionPool implements BlockAddedObserver {
private volatile OptionalLong subscribeConnectId = OptionalLong.empty();
private final SaveRestoreManager saveRestoreManager = new SaveRestoreManager();
private final Set<Address> localSenders = ConcurrentHashMap.newKeySet();
private final Lock blockAddedLock = new ReentrantLock();
private final Queue<BlockAddedEvent> blockAddedQueue = new ConcurrentLinkedQueue<>();
private final EthScheduler.OrderedProcessor<BlockAddedEvent> blockAddedEventOrderedProcessor;

public TransactionPool(
final Supplier<PendingTransactions> pendingTransactionsSupplier,
Expand All @@ -130,6 +128,8 @@ public TransactionPool(
pluginTransactionValidatorFactory == null
? null
: pluginTransactionValidatorFactory.create();
this.blockAddedEventOrderedProcessor =
ethContext.getScheduler().createOrderedProcessor(this::processBlockAddedEvent);
initLogForReplay();
}

Expand Down Expand Up @@ -322,58 +322,29 @@ public void unsubscribeDroppedTransactions(final long id) {
@Override
public void onBlockAdded(final BlockAddedEvent event) {
if (isPoolEnabled.get()) {
final long started = System.currentTimeMillis();
if (event.getEventType().equals(BlockAddedEvent.EventType.HEAD_ADVANCED)
|| event.getEventType().equals(BlockAddedEvent.EventType.CHAIN_REORG)) {

// add the event to the processing queue
blockAddedQueue.add(event);

// we want to process the added block asynchronously,
// but at the same time we must ensure that blocks are processed in order one at time
ethContext
.getScheduler()
.scheduleServiceTask(
() -> {
while (!blockAddedQueue.isEmpty()) {
if (blockAddedLock.tryLock()) {
// no other thread is processing the queue, so start processing it
try {
BlockAddedEvent e = blockAddedQueue.poll();
// check again since another thread could have stolen our task
if (e != null) {
pendingTransactions.manageBlockAdded(
e.getBlock().getHeader(),
e.getAddedTransactions(),
e.getRemovedTransactions(),
protocolSchedule
.getByBlockHeader(e.getBlock().getHeader())
.getFeeMarket());
reAddTransactions(e.getRemovedTransactions());
LOG.atTrace()
.setMessage("Block added event {} processed in {}ms")
.addArgument(e)
.addArgument(() -> System.currentTimeMillis() - started)
.log();
}
} finally {
blockAddedLock.unlock();
}
} else {
try {
// wait a bit before retrying
Thread.sleep(100);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
return null;
});
blockAddedEventOrderedProcessor.submit(event);
}
}
}

private void processBlockAddedEvent(final BlockAddedEvent e) {
final long started = System.currentTimeMillis();
pendingTransactions.manageBlockAdded(
e.getBlock().getHeader(),
e.getAddedTransactions(),
e.getRemovedTransactions(),
protocolSchedule.getByBlockHeader(e.getBlock().getHeader()).getFeeMarket());
reAddTransactions(e.getRemovedTransactions());
LOG.atTrace()
.setMessage("Block added event {} processed in {}ms")
.addArgument(e)
.addArgument(() -> System.currentTimeMillis() - started)
.log();
}

private void reAddTransactions(final List<Transaction> reAddTransactions) {
if (!reAddTransactions.isEmpty()) {
// if adding a blob tx, and it is missing its blob, is a re-org and we should restore the blob
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,24 +156,62 @@ static TransactionPool createTransactionPool(
@Override
public void onInitialSyncCompleted() {
LOG.info("Enabling transaction handling following initial sync");
transactionTracker.reset();
transactionPool.setEnabled();
transactionsMessageHandler.setEnabled();
pooledTransactionsMessageHandler.setEnabled();
enableTransactionHandling(
transactionTracker,
transactionPool,
transactionsMessageHandler,
pooledTransactionsMessageHandler);
}

@Override
public void onInitialSyncRestart() {
LOG.info("Disabling transaction handling during re-sync");
pooledTransactionsMessageHandler.setDisabled();
transactionsMessageHandler.setDisabled();
transactionPool.setDisabled();
disableTransactionHandling(
transactionPool, transactionsMessageHandler, pooledTransactionsMessageHandler);
}
});

syncState.subscribeInSync(
isInSync -> {
if (isInSync != transactionPool.isEnabled()) {
if (isInSync) {
LOG.info("Node is in sync, enabling transaction handling");
enableTransactionHandling(
transactionTracker,
transactionPool,
transactionsMessageHandler,
pooledTransactionsMessageHandler);
} else {
LOG.info("Node out of sync, disabling transaction handling");
disableTransactionHandling(
transactionPool, transactionsMessageHandler, pooledTransactionsMessageHandler);
}
}
});

return transactionPool;
}

private static void enableTransactionHandling(
final PeerTransactionTracker transactionTracker,
final TransactionPool transactionPool,
final TransactionsMessageHandler transactionsMessageHandler,
final NewPooledTransactionHashesMessageHandler pooledTransactionsMessageHandler) {
transactionTracker.reset();
transactionPool.setEnabled();
transactionsMessageHandler.setEnabled();
pooledTransactionsMessageHandler.setEnabled();
}

private static void disableTransactionHandling(
final TransactionPool transactionPool,
final TransactionsMessageHandler transactionsMessageHandler,
final NewPooledTransactionHashesMessageHandler pooledTransactionsMessageHandler) {
transactionPool.setDisabled();
transactionsMessageHandler.setDisabled();
pooledTransactionsMessageHandler.setDisabled();
}

private static void subscribeTransactionHandlers(
final ProtocolContext protocolContext,
final EthContext ethContext,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

import org.hyperledger.besu.ethereum.transaction.TransactionInvalidReason;
import org.hyperledger.besu.metrics.BesuMetricCategory;
import org.hyperledger.besu.metrics.ReplaceableDoubleSupplier;
import org.hyperledger.besu.metrics.RunnableCounter;
import org.hyperledger.besu.plugin.services.MetricsSystem;
import org.hyperledger.besu.plugin.services.metrics.Counter;
Expand Down Expand Up @@ -46,6 +47,9 @@ public class TransactionPoolMetrics {
private final LabelledMetric<Counter> expiredMessagesCounter;
private final Map<String, RunnableCounter> expiredMessagesRunnableCounters = new HashMap<>();
private final LabelledMetric<Counter> alreadySeenTransactionsCounter;
private final Map<String, ReplaceableDoubleSupplier> spaceUsedSuppliers = new HashMap<>();
private final Map<String, ReplaceableDoubleSupplier> transactionCountSuppliers = new HashMap<>();
private final Map<String, ReplaceableDoubleSupplier> uniqueSendersSuppliers = new HashMap<>();

public TransactionPoolMetrics(final MetricsSystem metricsSystem) {
this.metricsSystem = metricsSystem;
Expand Down Expand Up @@ -120,17 +124,44 @@ public MetricsSystem getMetricsSystem() {
}

public void initSpaceUsed(final DoubleSupplier spaceUsedSupplier, final String layer) {
spaceUsed.labels(spaceUsedSupplier, layer);
spaceUsedSuppliers.compute(
layer,
(unused, existingSupplier) -> {
if (existingSupplier == null) {
final var newSupplier = new ReplaceableDoubleSupplier(spaceUsedSupplier);
spaceUsed.labels(newSupplier, layer);
return newSupplier;
}
return existingSupplier.replaceDoubleSupplier(spaceUsedSupplier);
});
}

public void initTransactionCount(
final DoubleSupplier transactionCountSupplier, final String layer) {
transactionCount.labels(transactionCountSupplier, layer);
transactionCountSuppliers.compute(
layer,
(unused, existingSupplier) -> {
if (existingSupplier == null) {
final var newSupplier = new ReplaceableDoubleSupplier(transactionCountSupplier);
transactionCount.labels(newSupplier, layer);
return newSupplier;
}
return existingSupplier.replaceDoubleSupplier(transactionCountSupplier);
});
}

public void initUniqueSenderCount(
final DoubleSupplier uniqueSenderCountSupplier, final String layer) {
uniqueSenderCount.labels(uniqueSenderCountSupplier, layer);
uniqueSendersSuppliers.compute(
layer,
(unused, existingSupplier) -> {
if (existingSupplier == null) {
final var newSupplier = new ReplaceableDoubleSupplier(uniqueSenderCountSupplier);
uniqueSenderCount.labels(newSupplier, layer);
return newSupplier;
}
return existingSupplier.replaceDoubleSupplier(uniqueSenderCountSupplier);
});
}

public void initExpiredMessagesCounter(final String message) {
Expand Down
Loading