Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,7 @@ public void recordRequestTimeout(final int requestCode) {
.addArgument(this::getLoggableId)
.log();
LOG.trace("Timed out while waiting for response from peer {}", this);
reputation.recordRequestTimeout(requestCode).ifPresent(this::disconnect);
reputation.recordRequestTimeout(requestCode, this).ifPresent(this::disconnect);
}

public void recordUselessResponse(final String requestType) {
Expand All @@ -224,7 +224,7 @@ public void recordUselessResponse(final String requestType) {
.addArgument(requestType)
.addArgument(this::getLoggableId)
.log();
reputation.recordUselessResponse(System.currentTimeMillis()).ifPresent(this::disconnect);
reputation.recordUselessResponse(System.currentTimeMillis(), this).ifPresent(this::disconnect);
}

public void recordUsefulResponse() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,13 +62,15 @@ public PeerReputation(final int initialScore, final int maxScore) {
this.score = initialScore;
}

public Optional<DisconnectReason> recordRequestTimeout(final int requestCode) {
public Optional<DisconnectReason> recordRequestTimeout(
final int requestCode, final EthPeer peer) {
final int newTimeoutCount = getOrCreateTimeoutCount(requestCode).incrementAndGet();
if (newTimeoutCount >= TIMEOUT_THRESHOLD) {
LOG.debug(
"Disconnection triggered by {} repeated timeouts for requestCode {}",
"Disconnection triggered by {} repeated timeouts for requestCode {} for peer {}",
newTimeoutCount,
requestCode);
requestCode,
peer.getLoggableId());
score -= LARGE_ADJUSTMENT;
return Optional.of(DisconnectReason.TIMEOUT);
} else {
Expand All @@ -89,14 +91,17 @@ public Map<Integer, AtomicInteger> timeoutCounts() {
return timeoutCountByRequestType;
}

public Optional<DisconnectReason> recordUselessResponse(final long timestamp) {
public Optional<DisconnectReason> recordUselessResponse(
final long timestamp, final EthPeer peer) {
uselessResponseTimes.add(timestamp);
while (shouldRemove(uselessResponseTimes.peek(), timestamp)) {
uselessResponseTimes.poll();
}
if (uselessResponseTimes.size() >= USELESS_RESPONSE_THRESHOLD) {
score -= LARGE_ADJUSTMENT;
LOG.debug("Disconnection triggered by exceeding useless response threshold");
LOG.debug(
"Disconnection triggered by exceeding useless response threshold for peer {}",
peer.getLoggableId());
return Optional.of(DisconnectReason.USELESS_PEER_USELESS_RESPONSES);
} else {
score -= SMALL_ADJUSTMENT;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,14 @@ public CompletableFuture<Task<SnapDataRequest>> requestAccount(
accountDataRequest.addResponse(
worldStateProofProvider, response.accounts(), response.proofs());
}
if (error != null) {
LOG.atDebug()
.setMessage("Error handling account download accounts ({} - {}) task: {}")
.addArgument(accountDataRequest.getStartKeyHash())
.addArgument(accountDataRequest.getEndKeyHash())
.addArgument(error)
.log();
}
return requestTask;
});
}
Expand Down Expand Up @@ -167,6 +175,12 @@ public CompletableFuture<List<Task<SnapDataRequest>>> requestStorage(
LOG.error("Error while processing storage range response", e);
}
}
if (error != null) {
LOG.atDebug()
.setMessage("Error handling storage range request task: {}")
.addArgument(error)
.log();
}
return requestTasks;
});
}
Expand Down Expand Up @@ -200,6 +214,12 @@ public CompletableFuture<List<Task<SnapDataRequest>>> requestCode(
}
}
}
if (error != null) {
LOG.atDebug()
.setMessage("Error handling code request task: {}")
.addArgument(error)
.log();
}
return requestTasks;
});
}
Expand Down Expand Up @@ -240,6 +260,12 @@ public CompletableFuture<List<Task<SnapDataRequest>>> requestTrieNodeByPath(
}
}
}
if (error != null) {
LOG.atDebug()
.setMessage("Error handling trie node request task: {}")
.addArgument(error)
.log();
}
return requestTasks;
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,11 +52,13 @@ void sendTransactionsToPeer(final EthPeer peer) {
LOG.atTrace()
.setMessage(
"Sending transactions to peer {} all transactions count {}, "
+ "single message transactions {}, single message list {}")
+ "single message transactions {}, single message list {}, transactions {}, AgreedCapabilities {}")
.addArgument(peer)
.addArgument(allTxToSend::size)
.addArgument(includedTransactions::size)
.addArgument(() -> toHashList(includedTransactions))
.addArgument(() -> includedTransactions)
.addArgument(peer::getAgreedCapabilities)
.log();
allTxToSend.removeAll(limitedTransactionsMessages.getIncludedTransactions());
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import static org.assertj.core.api.Assertions.assertThat;
import static org.hyperledger.besu.ethereum.p2p.rlpx.wire.messages.DisconnectMessage.DisconnectReason.TIMEOUT;
import static org.hyperledger.besu.ethereum.p2p.rlpx.wire.messages.DisconnectMessage.DisconnectReason.USELESS_PEER_USELESS_RESPONSES;
import static org.mockito.Mockito.mock;

import org.hyperledger.besu.ethereum.eth.messages.EthPV62;

Expand All @@ -28,6 +29,7 @@ public class PeerReputationTest {
private static final int INITIAL_SCORE = 25;
private static final int MAX_SCORE = 50;
private final PeerReputation reputation = new PeerReputation(INITIAL_SCORE, MAX_SCORE);
private final EthPeer mockEthPeer = mock(EthPeer.class);

@Test
public void shouldThrowOnInvalidInitialScore() {
Expand All @@ -37,16 +39,19 @@ public void shouldThrowOnInvalidInitialScore() {
@Test
public void shouldOnlyDisconnectWhenTimeoutLimitReached() {
sendRequestTimeouts(EthPV62.GET_BLOCK_HEADERS, PeerReputation.TIMEOUT_THRESHOLD - 1);
assertThat(reputation.recordRequestTimeout(EthPV62.GET_BLOCK_HEADERS)).contains(TIMEOUT);
assertThat(reputation.recordRequestTimeout(EthPV62.GET_BLOCK_HEADERS, mockEthPeer))
.contains(TIMEOUT);
}

@Test
public void shouldTrackTimeoutsSeparatelyForDifferentRequestTypes() {
sendRequestTimeouts(EthPV62.GET_BLOCK_HEADERS, PeerReputation.TIMEOUT_THRESHOLD - 1);
sendRequestTimeouts(EthPV62.GET_BLOCK_BODIES, PeerReputation.TIMEOUT_THRESHOLD - 1);

assertThat(reputation.recordRequestTimeout(EthPV62.GET_BLOCK_HEADERS)).contains(TIMEOUT);
assertThat(reputation.recordRequestTimeout(EthPV62.GET_BLOCK_BODIES)).contains(TIMEOUT);
assertThat(reputation.recordRequestTimeout(EthPV62.GET_BLOCK_HEADERS, mockEthPeer))
.contains(TIMEOUT);
assertThat(reputation.recordRequestTimeout(EthPV62.GET_BLOCK_BODIES, mockEthPeer))
.contains(TIMEOUT);
}

@Test
Expand All @@ -55,14 +60,16 @@ public void shouldResetTimeoutCountForRequestType() {
sendRequestTimeouts(EthPV62.GET_BLOCK_BODIES, PeerReputation.TIMEOUT_THRESHOLD - 1);

reputation.resetTimeoutCount(EthPV62.GET_BLOCK_HEADERS);
assertThat(reputation.recordRequestTimeout(EthPV62.GET_BLOCK_HEADERS)).isEmpty();
assertThat(reputation.recordRequestTimeout(EthPV62.GET_BLOCK_BODIES)).contains(TIMEOUT);
assertThat(reputation.recordRequestTimeout(EthPV62.GET_BLOCK_HEADERS, mockEthPeer)).isEmpty();
assertThat(reputation.recordRequestTimeout(EthPV62.GET_BLOCK_BODIES, mockEthPeer))
.contains(TIMEOUT);
}

@Test
public void shouldOnlyDisconnectWhenEmptyResponseThresholdReached() {
sendUselessResponses(1001, PeerReputation.USELESS_RESPONSE_THRESHOLD - 1);
assertThat(reputation.recordUselessResponse(1005)).contains(USELESS_PEER_USELESS_RESPONSES);
assertThat(reputation.recordUselessResponse(1005, mockEthPeer))
.contains(USELESS_PEER_USELESS_RESPONSES);
}

@Test
Expand All @@ -73,7 +80,7 @@ public void shouldDiscardEmptyResponseRecordsAfterTimeWindowElapses() {
// But then the next empty response doesn't come in until after the window expires on the first
assertThat(
reputation.recordUselessResponse(
1001 + PeerReputation.USELESS_RESPONSE_WINDOW_IN_MILLIS + 1))
1001 + PeerReputation.USELESS_RESPONSE_WINDOW_IN_MILLIS + 1, mockEthPeer))
.isEmpty();
}

Expand All @@ -93,13 +100,13 @@ public void shouldNotIncreaseScoreOverMax() {

private void sendRequestTimeouts(final int requestType, final int repeatCount) {
for (int i = 0; i < repeatCount; i++) {
assertThat(reputation.recordRequestTimeout(requestType)).isEmpty();
assertThat(reputation.recordRequestTimeout(requestType, mockEthPeer)).isEmpty();
}
}

private void sendUselessResponses(final long timestamp, final int repeatCount) {
for (int i = 0; i < repeatCount; i++) {
assertThat(reputation.recordUselessResponse(timestamp + i)).isEmpty();
assertThat(reputation.recordUselessResponse(timestamp + i, mockEthPeer)).isEmpty();
}
}
}