Skip to content

Commit

Permalink
Send only hash announcement for blob transaction type (hyperledger#4940)
Browse files Browse the repository at this point in the history
Signed-off-by: Fabio Di Fabio <[email protected]>
  • Loading branch information
fab-10 authored and eum602 committed Nov 3, 2023
1 parent ddf6862 commit 4ddf506
Show file tree
Hide file tree
Showing 6 changed files with 243 additions and 91 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
### Breaking Changes

### Additions and Improvements
- Send only hash announcement for blob transaction type [#4940](https://github.com/hyperledger/besu/pull/4940)
- Add `excess_data_gas` field to block header [#4958](https://github.com/hyperledger/besu/pull/4958)

### Bug Fixes
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -752,6 +752,7 @@ private static Bytes32 computeSenderRecoveryHash(
case FRONTIER:
preimage = frontierPreimage(nonce, gasPrice, gasLimit, to, value, payload, chainId);
break;
case BLOB: // ToDo 4844: specialize for blob when more field will be added for it
case EIP1559:
preimage =
eip1559Preimage(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -373,6 +373,8 @@ public Transaction transaction(
return eip1559Transaction(payload, to);
case ACCESS_LIST:
return accessListTransaction(payload, to);
case BLOB:
return blobTransaction(payload, to);
default:
throw new RuntimeException(
String.format(
Expand Down Expand Up @@ -422,6 +424,21 @@ private Transaction eip1559Transaction(final Bytes payload, final Address to) {
.signAndBuild(generateKeyPair());
}

private Transaction blobTransaction(final Bytes payload, final Address to) {
return Transaction.builder()
.type(TransactionType.BLOB)
.nonce(random.nextLong())
.maxPriorityFeePerGas(Wei.wrap(bytesValue(4)))
.maxFeePerGas(Wei.wrap(bytesValue(4)))
.gasLimit(positiveLong())
.to(to)
.value(Wei.of(positiveLong()))
.payload(payload)
.chainId(BigInteger.ONE)
.signAndBuild(generateKeyPair());
// ToDo 4844: specialize for blob when more field will be added for it
}

private Transaction frontierTransaction(final Bytes payload, final Address to) {
return Transaction.builder()
.type(TransactionType.FRONTIER)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,31 +15,41 @@
package org.hyperledger.besu.ethereum.eth.transactions;

import static org.hyperledger.besu.ethereum.eth.transactions.PendingTransaction.toTransactionList;
import static org.hyperledger.besu.plugin.data.TransactionType.BLOB;
import static org.hyperledger.besu.util.Slf4jLambdaHelper.traceLambda;

import org.hyperledger.besu.ethereum.core.Transaction;
import org.hyperledger.besu.ethereum.eth.manager.EthContext;
import org.hyperledger.besu.ethereum.eth.manager.EthPeer;
import org.hyperledger.besu.ethereum.eth.messages.EthPV65;
import org.hyperledger.besu.ethereum.eth.transactions.TransactionPool.TransactionBatchAddedListener;
import org.hyperledger.besu.plugin.data.TransactionType;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.EnumSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TransactionBroadcaster implements TransactionBatchAddedListener {
private static final Logger LOG = LoggerFactory.getLogger(TransactionBroadcaster.class);

private static final EnumSet<TransactionType> ANNOUNCE_HASH_ONLY_TX_TYPES = EnumSet.of(BLOB);

private static final Boolean HASH_ONLY_BROADCAST = Boolean.TRUE;
private static final Boolean FULL_BROADCAST = Boolean.FALSE;

private final PendingTransactions pendingTransactions;
private final PeerTransactionTracker transactionTracker;
private final TransactionsMessageSender transactionsMessageSender;
private final NewPooledTransactionHashesMessageSender newPooledTransactionHashesMessageSender;
private final EthContext ethContext;
private final int numPeersToSendFullTransactions;

public TransactionBroadcaster(
final EthContext ethContext,
Expand All @@ -52,8 +62,6 @@ public TransactionBroadcaster(
this.transactionsMessageSender = transactionsMessageSender;
this.newPooledTransactionHashesMessageSender = newPooledTransactionHashesMessageSender;
this.ethContext = ethContext;
this.numPeersToSendFullTransactions =
(int) Math.ceil(Math.sqrt(ethContext.getEthPeers().getMaxPeers()));
}

public void relayTransactionPoolTo(final EthPeer peer) {
Expand All @@ -69,80 +77,118 @@ public void relayTransactionPoolTo(final EthPeer peer) {
}

@Override
public void onTransactionsAdded(final Iterable<Transaction> transactions) {
public void onTransactionsAdded(final Collection<Transaction> transactions) {
final int currPeerCount = ethContext.getEthPeers().peerCount();
if (currPeerCount == 0) {
return;
}

List<EthPeer> peersWithOnlyTransactionSupport = new ArrayList<>(currPeerCount);
List<EthPeer> peersWithTransactionHashesSupport = new ArrayList<>(currPeerCount);
final int numPeersToSendFullTransactions = (int) Math.ceil(Math.sqrt(currPeerCount));

final Map<Boolean, List<Transaction>> transactionByBroadcastMode =
transactions.stream()
.collect(
Collectors.partitioningBy(
tx -> ANNOUNCE_HASH_ONLY_TX_TYPES.contains(tx.getType())));

final List<EthPeer> sendOnlyFullTransactionPeers = new ArrayList<>(currPeerCount);
final List<EthPeer> sendOnlyHashPeers = new ArrayList<>(currPeerCount);
final List<EthPeer> sendMixedPeers = new ArrayList<>(currPeerCount);

ethContext
.getEthPeers()
.streamAvailablePeers()
.forEach(
peer -> {
if (peer.hasSupportForMessage(EthPV65.NEW_POOLED_TRANSACTION_HASHES)) {
peersWithTransactionHashesSupport.add(peer);
sendOnlyHashPeers.add(peer);
} else {
peersWithOnlyTransactionSupport.add(peer);
sendOnlyFullTransactionPeers.add(peer);
}
});

if (peersWithOnlyTransactionSupport.size() < numPeersToSendFullTransactions) {
if (sendOnlyFullTransactionPeers.size() < numPeersToSendFullTransactions) {
final int delta =
Math.min(
numPeersToSendFullTransactions - peersWithOnlyTransactionSupport.size(),
peersWithTransactionHashesSupport.size());
numPeersToSendFullTransactions - sendOnlyFullTransactionPeers.size(),
sendOnlyHashPeers.size());

Collections.shuffle(peersWithTransactionHashesSupport);
Collections.shuffle(sendOnlyHashPeers);

// move peers from the other list to reach the required size for full transaction peers
movePeersBetweenLists(
peersWithTransactionHashesSupport, peersWithOnlyTransactionSupport, delta);
// move peers from the mixed list to reach the required size for full transaction peers
movePeersBetweenLists(sendOnlyHashPeers, sendMixedPeers, delta);
}

traceLambda(
LOG,
"Sending full transactions to {} peers and transaction hashes to {} peers."
+ " Peers w/o eth/66 {}, peers with eth/66 {}",
peersWithOnlyTransactionSupport::size,
peersWithTransactionHashesSupport::size,
peersWithOnlyTransactionSupport::toString,
peersWithTransactionHashesSupport::toString);
"Sending full transactions to {} peers, transaction hashes only to {} peers and mixed to {} peers."
+ " Peers w/o eth/65 {}, peers with eth/65 {}",
sendOnlyFullTransactionPeers::size,
sendOnlyHashPeers::size,
sendMixedPeers::size,
sendOnlyFullTransactionPeers::toString,
() -> sendOnlyHashPeers.toString() + sendMixedPeers.toString());

sendToFullTransactionsPeers(
transactionByBroadcastMode.get(FULL_BROADCAST), sendOnlyFullTransactionPeers);

sendFullTransactions(transactions, peersWithOnlyTransactionSupport);
sendToOnlyHashPeers(transactionByBroadcastMode, sendOnlyHashPeers);

sendTransactionHashes(transactions, peersWithTransactionHashesSupport);
sendToMixedPeers(transactionByBroadcastMode, sendMixedPeers);
}

private void sendToFullTransactionsPeers(
final List<Transaction> fullBroadcastTransactions, final List<EthPeer> fullTransactionPeers) {
sendFullTransactions(fullBroadcastTransactions, fullTransactionPeers);
}

private void sendToOnlyHashPeers(
final Map<Boolean, List<Transaction>> txsByHashOnlyBroadcast,
final List<EthPeer> hashOnlyPeers) {
final List<Transaction> allTransactions =
txsByHashOnlyBroadcast.values().stream().flatMap(List::stream).collect(Collectors.toList());

sendTransactionHashes(allTransactions, hashOnlyPeers);
}

private void sendToMixedPeers(
final Map<Boolean, List<Transaction>> txsByHashOnlyBroadcast,
final List<EthPeer> mixedPeers) {
sendFullTransactions(txsByHashOnlyBroadcast.get(FULL_BROADCAST), mixedPeers);
sendTransactionHashes(txsByHashOnlyBroadcast.get(HASH_ONLY_BROADCAST), mixedPeers);
}

private void sendFullTransactions(
final Iterable<Transaction> transactions, final List<EthPeer> fullTransactionPeers) {
fullTransactionPeers.forEach(
peer -> {
transactions.forEach(
transaction -> transactionTracker.addToPeerSendQueue(peer, transaction));
ethContext
.getScheduler()
.scheduleSyncWorkerTask(() -> transactionsMessageSender.sendTransactionsToPeer(peer));
});
final List<Transaction> transactions, final List<EthPeer> fullTransactionPeers) {
if (!transactions.isEmpty()) {
fullTransactionPeers.forEach(
peer -> {
transactions.forEach(
transaction -> transactionTracker.addToPeerSendQueue(peer, transaction));
ethContext
.getScheduler()
.scheduleSyncWorkerTask(
() -> transactionsMessageSender.sendTransactionsToPeer(peer));
});
}
}

private void sendTransactionHashes(
final Iterable<Transaction> transactions, final List<EthPeer> transactionHashPeers) {
transactionHashPeers.stream()
.forEach(
peer -> {
transactions.forEach(
transaction -> transactionTracker.addToPeerSendQueue(peer, transaction));
ethContext
.getScheduler()
.scheduleSyncWorkerTask(
() ->
newPooledTransactionHashesMessageSender.sendTransactionHashesToPeer(
peer));
});
final List<Transaction> transactions, final List<EthPeer> transactionHashPeers) {
if (!transactions.isEmpty()) {
transactionHashPeers.stream()
.forEach(
peer -> {
transactions.forEach(
transaction -> transactionTracker.addToPeerSendQueue(peer, transaction));
ethContext
.getScheduler()
.scheduleSyncWorkerTask(
() ->
newPooledTransactionHashesMessageSender.sendTransactionHashesToPeer(
peer));
});
}
}

private void movePeersBetweenLists(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -403,7 +403,7 @@ private Optional<BlockHeader> getChainHeadBlockHeader() {

public interface TransactionBatchAddedListener {

void onTransactionsAdded(Iterable<Transaction> transactions);
void onTransactionsAdded(Collection<Transaction> transactions);
}

private static class ValidationResultAndAccount {
Expand Down
Loading

0 comments on commit 4ddf506

Please sign in to comment.