Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,8 @@ public BlockProcessingResult processBlock(
blockHashLookup,
blobGasPrice,
blockAccessListBuilder,
blockAccessList);
blockAccessList,
maybeParentHeader);

boolean parallelizedTxFound = false;
int nbParallelTx = 0;
Expand Down Expand Up @@ -666,7 +667,8 @@ Optional<PreprocessingContext> run(
final BlockHashLookup blockHashLookup,
final Wei blobGasPrice,
final Optional<BlockAccessListBuilder> blockAccessListBuilder,
final Optional<BlockAccessList> maybeBlockBal);
final Optional<BlockAccessList> maybeBlockBal,
final Optional<BlockHeader> maybeParentHeader);

class NoPreprocessing implements PreprocessingFunction {

Expand All @@ -679,7 +681,8 @@ public Optional<PreprocessingContext> run(
final BlockHashLookup blockHashLookup,
final Wei blobGasPrice,
final Optional<BlockAccessListBuilder> blockAccessListBuilder,
final Optional<BlockAccessList> maybeBlockBal) {
final Optional<BlockAccessList> maybeBlockBal,
final Optional<BlockHeader> maybeParentHeader) {
return Optional.empty();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,10 +75,17 @@ protected ParallelizedTransactionContext runTransaction(
final Address miningBeneficiary,
final BlockHashLookup blockHashLookup,
final Wei blobGasPrice,
final Optional<BlockAccessListBuilder> blockAccessListBuilder) {
final Optional<BlockAccessListBuilder> blockAccessListBuilder,
final Optional<BlockHeader> maybeParentHeader) {

final BonsaiWorldState ws = getWorldState(protocolContext, blockHeader);
if (ws == null) return null;
if (maybeParentHeader.isEmpty()) {
return null;
}
final BonsaiWorldState ws =
getWorldState(protocolContext, maybeParentHeader.get()).orElse(null);
if (ws == null) {
return null;
}

try {
ws.disableCacheMerkleTrieLoader();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,8 @@ public void runAsyncBlock(
final BlockHashLookup blockHashLookup,
final Wei blobGasPrice,
final Executor executor,
final Optional<BlockAccessListBuilder> blockAccessListBuilder) {
final Optional<BlockAccessListBuilder> blockAccessListBuilder,
final Optional<BlockHeader> maybeParentHeader) {

futures = new CompletableFuture[transactions.size()];

Expand All @@ -64,25 +65,19 @@ public void runAsyncBlock(
miningBeneficiary,
blockHashLookup,
blobGasPrice,
blockAccessListBuilder),
blockAccessListBuilder,
maybeParentHeader),
executor);
}
}

protected BonsaiWorldState getWorldState(
final ProtocolContext protocolContext, final BlockHeader blockHeader) {

final BlockHeader chainHeadHeader = protocolContext.getBlockchain().getChainHeadHeader();
if (!chainHeadHeader.getHash().equals(blockHeader.getParentHash())) {
return null;
}

return (BonsaiWorldState)
protocolContext
.getWorldStateArchive()
.getWorldState(
WorldStateQueryParams.withBlockHeaderAndNoUpdateNodeHead(chainHeadHeader))
.orElse(null);
/** World state at the parent block. Call only when the parent header is known to be present. */
protected Optional<BonsaiWorldState> getWorldState(
final ProtocolContext protocolContext, final BlockHeader parentHeader) {
return protocolContext
.getWorldStateArchive()
.getWorldState(WorldStateQueryParams.withBlockHeaderAndNoUpdateNodeHead(parentHeader))
.map(BonsaiWorldState.class::cast);
}

protected abstract ParallelizedTransactionContext runTransaction(
Expand All @@ -93,7 +88,8 @@ protected abstract ParallelizedTransactionContext runTransaction(
Address miningBeneficiary,
BlockHashLookup blockHashLookup,
Wei blobGasPrice,
Optional<BlockAccessListBuilder> blockAccessListBuilder);
Optional<BlockAccessListBuilder> blockAccessListBuilder,
Optional<BlockHeader> maybeParentHeader);

public abstract Optional<TransactionProcessingResult> getProcessingResult(
MutableWorldState worldState,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,8 @@ public Optional<PreprocessingContext> run(
final BlockHashLookup blockHashLookup,
final Wei blobGasPrice,
final Optional<BlockAccessListBuilder> blockAccessListBuilder,
final Optional<BlockAccessList> maybeBlockBal) {
final Optional<BlockAccessList> maybeBlockBal,
final Optional<BlockHeader> maybeParentHeader) {
if (!(protocolContext.getWorldStateArchive() instanceof PathBasedWorldStateProvider)) {
return Optional.empty();
}
Expand All @@ -78,7 +79,8 @@ public Optional<PreprocessingContext> run(
blockHashLookup,
blobGasPrice,
executor,
blockAccessListBuilder);
blockAccessListBuilder,
maybeParentHeader);

return Optional.of(new PreprocessingContext(parallelProcessor));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,10 +83,17 @@ protected ParallelizedTransactionContext runTransaction(
final Address miningBeneficiary,
final BlockHashLookup blockHashLookup,
final Wei blobGasPrice,
final Optional<BlockAccessListBuilder> blockAccessListBuilder) {
final Optional<BlockAccessListBuilder> blockAccessListBuilder,
final Optional<BlockHeader> maybeParentHeader) {

final BonsaiWorldState ws = getWorldState(protocolContext, blockHeader);
if (ws == null) return null;
if (maybeParentHeader.isEmpty()) {
return null;
}
final BonsaiWorldState ws =
getWorldState(protocolContext, maybeParentHeader.get()).orElse(null);
if (ws == null) {
return null;
}

try {
ws.disableCacheMerkleTrieLoader();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import org.hyperledger.besu.ethereum.mainnet.block.access.list.BlockAccessList;
import org.hyperledger.besu.ethereum.mainnet.block.access.list.BlockAccessList.BalanceChange;
import org.hyperledger.besu.ethereum.trie.pathbased.bonsai.BonsaiAccount;
import org.hyperledger.besu.ethereum.trie.pathbased.common.provider.WorldStateQueryParams;
import org.hyperledger.besu.metrics.noop.NoOpMetricsSystem;
import org.hyperledger.besu.plugin.services.MetricsSystem;
import org.hyperledger.besu.plugin.services.storage.DataStorageFormat;
Expand Down Expand Up @@ -180,7 +181,21 @@ protected Block createBlock(
final Wei baseFee,
final Address coinbase,
final Transaction... txs) {
final BlockHeader parentHeader = ctx.getBlockchain().getChainHeadHeader();
return createBlock(
ctx, ctx.getBlockchain().getChainHeadHeader(), stateRoot, baseFee, coinbase, txs);
}

/**
* Builds a block whose parent is {@code parentHeader} (not necessarily the chain head). Block
* number is {@code parentHeader.getNumber() + 1}.
*/
protected Block createBlock(
final ExecutionContextTestFixture ctx,
final BlockHeader parentHeader,
final Hash stateRoot,
final Wei baseFee,
final Address coinbase,
final Transaction... txs) {
final BlockHeader blockHeader =
new BlockHeaderTestFixture()
.number(parentHeader.getNumber() + 1L)
Expand All @@ -204,8 +219,26 @@ protected Hash discoverStateRoot(final Wei baseFee, final Transaction... txs) {
protected Hash discoverStateRoot(
final Wei baseFee, final Address coinbase, final Transaction... txs) {
final ExecutionContextTestFixture ctx = createFreshContext();
final MutableWorldState ws = ctx.getStateArchive().getWorldState();
final Block block = createBlock(ctx, Hash.ZERO, baseFee, coinbase, txs);
return discoverStateRootAtParent(
ctx, ctx.getBlockchain().getChainHeadHeader(), baseFee, coinbase, txs);
}

/**
* Discovers the post-execution state root for {@code txs} when the block's parent is {@code
* parentHeader}, using a mutable world state layered on that parent's state (same basis as block
* import).
*/
protected Hash discoverStateRootAtParent(
final ExecutionContextTestFixture ctx,
final BlockHeader parentHeader,
final Wei baseFee,
final Address coinbase,
final Transaction... txs) {
final MutableWorldState ws =
ctx.getStateArchive()
.getWorldState(WorldStateQueryParams.withBlockHeaderAndUpdateNodeHead(parentHeader))
.orElseThrow();
final Block block = createBlock(ctx, parentHeader, Hash.ZERO, baseFee, coinbase, txs);
final BlockProcessor processor = createSequentialProcessor(ctx);
final BlockProcessingResult result =
processor.processBlock(ctx.getProtocolContext(), ctx.getBlockchain(), ws, block);
Expand All @@ -225,8 +258,160 @@ protected Hash discoverStateRoot(
return Hash.fromHexString(msg.substring(idx + marker.length()));
}

/** Header of the parent of the current chain head (i.e. chain head number minus one). */
protected BlockHeader parentOfChainHead(final ExecutionContextTestFixture ctx) {
final BlockHeader head = ctx.getBlockchain().getChainHeadHeader();
return ctx.getBlockchain()
.getBlockHeader(head.getParentHash())
.orElseThrow(
() ->
new IllegalStateException(
"Chain head has no recorded parent; advance the chain by one block first"));
}

/**
* Appends one empty canonical block on top of the current head (same rules as normal import:
* sequential process then {@link
* org.hyperledger.besu.ethereum.chain.MutableBlockchain#appendBlock}).
*/
protected void advanceCanonicalChainWithOneEmptyBlock(
final ExecutionContextTestFixture ctx, final Wei baseFee) {
final BlockHeader parent = ctx.getBlockchain().getChainHeadHeader();
final Hash stateRoot = discoverStateRootAtParent(ctx, parent, baseFee, MINING_BENEFICIARY);
final MutableWorldState ws =
ctx.getStateArchive()
.getWorldState(WorldStateQueryParams.withBlockHeaderAndUpdateNodeHead(parent))
.orElseThrow();
final Block block = createBlock(ctx, parent, stateRoot, baseFee, MINING_BENEFICIARY);
final BlockProcessor processor = createSequentialProcessor(ctx);
final BlockProcessingResult result =
processor.processBlock(ctx.getProtocolContext(), ctx.getBlockchain(), ws, block);
assertTrue(
result.isSuccessful(),
"Empty block advance failed: " + result.errorMessage.orElse("(no message)"));
ctx.getBlockchain()
.appendBlock(
block, result.getYield().orElseThrow().getReceipts(), getBlockAccessList(result));
}

/**
* Like {@link #executeAndCompare(Wei, Transaction...)} but the transaction block is built on the
* <em>parent of the chain head</em> (head minus one): the chain is advanced by one empty block
* first, then transfers are executed as a sibling at the same height as that empty block, with
* world state loaded from that parent (mirroring {@link
* org.hyperledger.besu.ethereum.MainnetBlockValidator}).
*/
protected ComparisonResult executeAndCompareParentOfChainHead(
final Wei baseFee, final Transaction... txs) {
final ExecutionContextTestFixture discoveryCtx = createFreshContext();
advanceCanonicalChainWithOneEmptyBlock(discoveryCtx, baseFee);
final Hash stateRoot =
discoverStateRootAtParent(
discoveryCtx, parentOfChainHead(discoveryCtx), baseFee, MINING_BENEFICIARY, txs);

final ExecutionContextTestFixture seqCtx = createFreshContext();
advanceCanonicalChainWithOneEmptyBlock(seqCtx, baseFee);
final BlockHeader seqTxParent = parentOfChainHead(seqCtx);
final MutableWorldState seqWs =
seqCtx
.getStateArchive()
.getWorldState(WorldStateQueryParams.withBlockHeaderAndUpdateNodeHead(seqTxParent))
.orElseThrow();
final Block seqBlock =
createBlock(seqCtx, seqTxParent, stateRoot, baseFee, MINING_BENEFICIARY, txs);
final BlockProcessor seqProcessor = createSequentialProcessor(seqCtx);
final BlockProcessingResult seqResult =
seqProcessor.processBlock(
seqCtx.getProtocolContext(), seqCtx.getBlockchain(), seqWs, seqBlock);
assertTrue(
seqResult.isSuccessful(),
"Sequential processing failed: " + seqResult.errorMessage.orElse("(no message)"));

final ExecutionContextTestFixture parCtx = createFreshContext();
advanceCanonicalChainWithOneEmptyBlock(parCtx, baseFee);
final BlockHeader parTxParent = parentOfChainHead(parCtx);
final MutableWorldState parWs =
parCtx
.getStateArchive()
.getWorldState(WorldStateQueryParams.withBlockHeaderAndUpdateNodeHead(parTxParent))
.orElseThrow();
final Block parBlock =
createBlock(parCtx, parTxParent, stateRoot, baseFee, MINING_BENEFICIARY, txs);
final ProtocolSpec spec =
parCtx
.getProtocolSchedule()
.getByBlockHeader(new BlockHeaderTestFixture().number(0L).buildHeader());
final BlockProcessor parProcessor = createParallelProcessor(parCtx);
final ParallelTransactionPreprocessing preprocessing =
createParallelPreprocessing(spec.getTransactionProcessor());
final BlockProcessingResult parResult =
parProcessor.processBlock(
parCtx.getProtocolContext(), parCtx.getBlockchain(), parWs, parBlock, preprocessing);
assertTrue(
parResult.isSuccessful(),
getVariantName()
+ " parallel processing failed: "
+ parResult.errorMessage.orElse("(no message)"));
assertParallelizationRecordedInResults(seqResult, parResult, txs.length);

assertThat(parWs.rootHash())
.as(getVariantName() + " parallel state root must match sequential")
.isEqualTo(seqWs.rootHash());

final Optional<BlockAccessList> seqBal = getBlockAccessList(seqResult);
final Optional<BlockAccessList> parBal = getBlockAccessList(parResult);
assertThat(seqBal).as("Sequential BAL should be present").isPresent();
assertThat(parBal).as(getVariantName() + " parallel BAL should be present").isPresent();
assertThat(BodyValidation.balHash(parBal.get()))
.as(getVariantName() + " parallel BAL hash must match sequential")
.isEqualTo(BodyValidation.balHash(seqBal.get()));

return new ComparisonResult(
seqWs.rootHash(), parWs.rootHash(), seqResult, parResult, seqWs, parWs);
}

// ==================== Core Comparison ====================

/**
* {@link BlockProcessingResult#getNbParallelizedTransactions()} is populated by {@link
* org.hyperledger.besu.ethereum.mainnet.AbstractBlockProcessor} when at least one transaction was
* applied via the parallel path ({@code isProcessedInParallel} on the tx result).
*
* <p>With perfect parallelization (BAL), the parallel processor must report this for non-empty
* blocks. With optimistic collision handling, {@link
* org.hyperledger.besu.ethereum.mainnet.parallelization.MainnetParallelBlockProcessor} may fall
* back to fully sequential processing for the whole block, in which case the field stays empty;
* we then only assert that any reported count is positive.
*/
protected void assertParallelizationRecordedInResults(
final BlockProcessingResult sequentialResult,
final BlockProcessingResult parallelResult,
final int transactionCount) {
if (transactionCount < 1) {
return;
}
assertThat(sequentialResult.getNbParallelizedTransactions())
.as("Sequential MainnetBlockProcessor must not report parallelized transaction metrics")
.isEmpty();
final Optional<Integer> nbParallel = parallelResult.getNbParallelizedTransactions();
if (getBalConfiguration().isPerfectParallelizationEnabled()) {
assertThat(nbParallel)
.as(
getVariantName()
+ " parallel run must report nbParallelizedTransactions (parallel path was used)")
.isPresent()
.hasValueSatisfying(n -> assertThat(n).isEqualTo(transactionCount));
} else {
nbParallel.ifPresent(
n ->
assertThat(n)
.as(
getVariantName()
+ " when parallel metrics are present, count must be positive")
.isPositive());
}
}

protected ComparisonResult executeAndCompare(final Wei baseFee, final Transaction... txs) {
final Hash stateRoot = discoverStateRoot(baseFee, txs);

Expand Down Expand Up @@ -261,6 +446,7 @@ protected ComparisonResult executeAndCompare(final Wei baseFee, final Transactio
getVariantName()
+ " parallel processing failed: "
+ parResult.errorMessage.orElse("(no message)"));
assertParallelizationRecordedInResults(seqResult, parResult, txs.length);

// State root comparison
assertThat(parWs.rootHash())
Expand Down
Loading
Loading