Skip to content

Commit 57b81b1

Browse files
Fix Blackholed Connection Behavior in DisruptableMockTransport
It is not realistic to drop messages without eventually failing. To retain the coverage of long pauses this PR adjusts the blockholed behavior to fail a send after 24h (which is assumed to be longer than any timeout in the system) instead of never. Closes #61034
1 parent cebd5d4 commit 57b81b1

File tree

4 files changed

+12
-5
lines changed

4 files changed

+12
-5
lines changed

server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1417,7 +1417,7 @@ protected void connectToNodesAndWait(ClusterState newClusterState) {
14171417
}
14181418
});
14191419
recoverySettings = new RecoverySettings(settings, clusterSettings);
1420-
mockTransport = new DisruptableMockTransport(node, logger) {
1420+
mockTransport = new DisruptableMockTransport(node, logger, deterministicTaskQueue) {
14211421
@Override
14221422
protected ConnectionStatus getConnectionStatus(DiscoveryNode destination) {
14231423
if (node.equals(destination)) {

test/framework/src/main/java/org/elasticsearch/cluster/coordination/AbstractCoordinatorTestCase.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -929,7 +929,7 @@ class ClusterNode {
929929

930930
private void setUp() {
931931
final ThreadPool threadPool = deterministicTaskQueue.getThreadPool(this::onNode);
932-
mockTransport = new DisruptableMockTransport(localNode, logger) {
932+
mockTransport = new DisruptableMockTransport(localNode, logger, deterministicTaskQueue) {
933933
@Override
934934
protected void execute(Runnable runnable) {
935935
deterministicTaskQueue.scheduleNow(onNode(runnable));

test/framework/src/main/java/org/elasticsearch/test/disruption/DisruptableMockTransport.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import org.apache.logging.log4j.Logger;
2222
import org.apache.logging.log4j.message.ParameterizedMessage;
2323
import org.elasticsearch.action.ActionListener;
24+
import org.elasticsearch.cluster.coordination.DeterministicTaskQueue;
2425
import org.elasticsearch.cluster.node.DiscoveryNode;
2526
import org.elasticsearch.common.Nullable;
2627
import org.elasticsearch.common.settings.ClusterSettings;
@@ -44,17 +45,20 @@
4445
import java.io.IOException;
4546
import java.util.Optional;
4647
import java.util.Set;
48+
import java.util.concurrent.TimeUnit;
4749
import java.util.function.Function;
4850

4951
import static org.elasticsearch.test.ESTestCase.copyWriteable;
5052

5153
public abstract class DisruptableMockTransport extends MockTransport {
5254
private final DiscoveryNode localNode;
5355
private final Logger logger;
56+
private final DeterministicTaskQueue deterministicTaskQueue;
5457

55-
public DisruptableMockTransport(DiscoveryNode localNode, Logger logger) {
58+
public DisruptableMockTransport(DiscoveryNode localNode, Logger logger, DeterministicTaskQueue deterministicTaskQueue) {
5659
this.localNode = localNode;
5760
this.logger = logger;
61+
this.deterministicTaskQueue = deterministicTaskQueue;
5862
}
5963

6064
protected abstract ConnectionStatus getConnectionStatus(DiscoveryNode destination);
@@ -159,6 +163,9 @@ protected String getRequestDescription(long requestId, String action, DiscoveryN
159163

160164
protected void onBlackholedDuringSend(long requestId, String action, DisruptableMockTransport destinationTransport) {
161165
logger.trace("dropping {}", getRequestDescription(requestId, action, destinationTransport.getLocalNode()));
166+
// Delaying the request for one day and then disconnect to simulate a very long pause
167+
deterministicTaskQueue.scheduleAt(deterministicTaskQueue.getCurrentTimeMillis() + TimeUnit.DAYS.toMillis(1L),
168+
() -> onDisconnectedDuringSend(requestId, action, destinationTransport));
162169
}
163170

164171
protected void onDisconnectedDuringSend(long requestId, String action, DisruptableMockTransport destinationTransport) {

test/framework/src/test/java/org/elasticsearch/test/disruption/DisruptableMockTransportTests.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,7 @@ public void initTransports() {
103103
deterministicTaskQueue = new DeterministicTaskQueue(
104104
Settings.builder().put(Node.NODE_NAME_SETTING.getKey(), "dummy").build(), random());
105105

106-
final DisruptableMockTransport transport1 = new DisruptableMockTransport(node1, logger) {
106+
final DisruptableMockTransport transport1 = new DisruptableMockTransport(node1, logger, deterministicTaskQueue) {
107107
@Override
108108
protected ConnectionStatus getConnectionStatus(DiscoveryNode destination) {
109109
return DisruptableMockTransportTests.this.getConnectionStatus(getLocalNode(), destination);
@@ -120,7 +120,7 @@ protected void execute(Runnable runnable) {
120120
}
121121
};
122122

123-
final DisruptableMockTransport transport2 = new DisruptableMockTransport(node2, logger) {
123+
final DisruptableMockTransport transport2 = new DisruptableMockTransport(node2, logger, deterministicTaskQueue) {
124124
@Override
125125
protected ConnectionStatus getConnectionStatus(DiscoveryNode destination) {
126126
return DisruptableMockTransportTests.this.getConnectionStatus(getLocalNode(), destination);

0 commit comments

Comments
 (0)