Skip to content

Commit 22509c9

Browse files
Fix Blackholed Connection Behavior in DisruptableMockTransport (#61310) (#61381)
It is not realistic to drop messages without eventually failing. To retain the coverage of long pauses this PR adjusts the blackholed behavior to fail a send after 24h (which is assumed to be longer than any timeout in the system) instead of never. Closes #61034
1 parent 997c73e commit 22509c9

File tree

4 files changed

+28
-11
lines changed

4 files changed

+28
-11
lines changed

server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1420,7 +1420,7 @@ protected void connectToNodesAndWait(ClusterState newClusterState) {
14201420
}
14211421
});
14221422
recoverySettings = new RecoverySettings(settings, clusterSettings);
1423-
mockTransport = new DisruptableMockTransport(node, logger) {
1423+
mockTransport = new DisruptableMockTransport(node, logger, deterministicTaskQueue) {
14241424
@Override
14251425
protected ConnectionStatus getConnectionStatus(DiscoveryNode destination) {
14261426
if (node.equals(destination)) {

test/framework/src/main/java/org/elasticsearch/cluster/coordination/AbstractCoordinatorTestCase.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -929,7 +929,7 @@ class ClusterNode {
929929

930930
private void setUp() {
931931
final ThreadPool threadPool = deterministicTaskQueue.getThreadPool(this::onNode);
932-
mockTransport = new DisruptableMockTransport(localNode, logger) {
932+
mockTransport = new DisruptableMockTransport(localNode, logger, deterministicTaskQueue) {
933933
@Override
934934
protected void execute(Runnable runnable) {
935935
deterministicTaskQueue.scheduleNow(onNode(runnable));

test/framework/src/main/java/org/elasticsearch/test/disruption/DisruptableMockTransport.java

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import org.apache.logging.log4j.Logger;
2222
import org.apache.logging.log4j.message.ParameterizedMessage;
2323
import org.elasticsearch.action.ActionListener;
24+
import org.elasticsearch.cluster.coordination.DeterministicTaskQueue;
2425
import org.elasticsearch.cluster.node.DiscoveryNode;
2526
import org.elasticsearch.common.Nullable;
2627
import org.elasticsearch.common.settings.ClusterSettings;
@@ -44,17 +45,20 @@
4445
import java.io.IOException;
4546
import java.util.Optional;
4647
import java.util.Set;
48+
import java.util.concurrent.TimeUnit;
4749
import java.util.function.Function;
4850

4951
import static org.elasticsearch.test.ESTestCase.copyWriteable;
5052

5153
public abstract class DisruptableMockTransport extends MockTransport {
5254
private final DiscoveryNode localNode;
5355
private final Logger logger;
56+
private final DeterministicTaskQueue deterministicTaskQueue;
5457

55-
public DisruptableMockTransport(DiscoveryNode localNode, Logger logger) {
58+
public DisruptableMockTransport(DiscoveryNode localNode, Logger logger, DeterministicTaskQueue deterministicTaskQueue) {
5659
this.localNode = localNode;
5760
this.logger = logger;
61+
this.deterministicTaskQueue = deterministicTaskQueue;
5862
}
5963

6064
protected abstract ConnectionStatus getConnectionStatus(DiscoveryNode destination);
@@ -159,6 +163,9 @@ protected String getRequestDescription(long requestId, String action, DiscoveryN
159163

160164
protected void onBlackholedDuringSend(long requestId, String action, DisruptableMockTransport destinationTransport) {
161165
logger.trace("dropping {}", getRequestDescription(requestId, action, destinationTransport.getLocalNode()));
166+
// Delaying the request for one day and then disconnect to simulate a very long pause
167+
deterministicTaskQueue.scheduleAt(deterministicTaskQueue.getCurrentTimeMillis() + TimeUnit.DAYS.toMillis(1L),
168+
() -> onDisconnectedDuringSend(requestId, action, destinationTransport));
162169
}
163170

164171
protected void onDisconnectedDuringSend(long requestId, String action, DisruptableMockTransport destinationTransport) {
@@ -199,7 +206,8 @@ public void run() {
199206

200207
case BLACK_HOLE:
201208
case DISCONNECTED:
202-
logger.trace("dropping response to {}: channel is {}", requestDescription, connectionStatus);
209+
logger.trace("delaying response to {}: channel is {}", requestDescription, connectionStatus);
210+
onBlackholedDuringSend(requestId, action, destinationTransport);
203211
break;
204212

205213
default:
@@ -229,7 +237,9 @@ public void run() {
229237

230238
case BLACK_HOLE:
231239
case DISCONNECTED:
232-
logger.trace("dropping exception response to {}: channel is {}", requestDescription, connectionStatus);
240+
logger.trace("delaying exception response to {}: channel is {}",
241+
requestDescription, connectionStatus);
242+
onBlackholedDuringSend(requestId, action, destinationTransport);
233243
break;
234244

235245
default:

test/framework/src/test/java/org/elasticsearch/test/disruption/DisruptableMockTransportTests.java

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@
5656
import static org.elasticsearch.transport.TransportService.NOOP_TRANSPORT_INTERCEPTOR;
5757
import static org.hamcrest.Matchers.containsString;
5858
import static org.hamcrest.Matchers.endsWith;
59+
import static org.hamcrest.Matchers.instanceOf;
5960

6061
public class DisruptableMockTransportTests extends ESTestCase {
6162

@@ -102,7 +103,7 @@ public void initTransports() {
102103
deterministicTaskQueue = new DeterministicTaskQueue(
103104
Settings.builder().put(Node.NODE_NAME_SETTING.getKey(), "dummy").build(), random());
104105

105-
final DisruptableMockTransport transport1 = new DisruptableMockTransport(node1, logger) {
106+
final DisruptableMockTransport transport1 = new DisruptableMockTransport(node1, logger, deterministicTaskQueue) {
106107
@Override
107108
protected ConnectionStatus getConnectionStatus(DiscoveryNode destination) {
108109
return DisruptableMockTransportTests.this.getConnectionStatus(getLocalNode(), destination);
@@ -119,7 +120,7 @@ protected void execute(Runnable runnable) {
119120
}
120121
};
121122

122-
final DisruptableMockTransport transport2 = new DisruptableMockTransport(node2, logger) {
123+
final DisruptableMockTransport transport2 = new DisruptableMockTransport(node2, logger, deterministicTaskQueue) {
123124
@Override
124125
protected ConnectionStatus getConnectionStatus(DiscoveryNode destination) {
125126
return DisruptableMockTransportTests.this.getConnectionStatus(getLocalNode(), destination);
@@ -317,27 +318,33 @@ public void testDisconnectedOnSuccessfulResponse() throws IOException {
317318
AtomicReference<TransportChannel> responseHandlerChannel = new AtomicReference<>();
318319
registerRequestHandler(service2, requestHandlerCaptures(responseHandlerChannel::set));
319320

320-
send(service1, node2, responseHandlerShouldNotBeCalled());
321+
AtomicReference<TransportException> responseHandlerException = new AtomicReference<>();
322+
send(service1, node2, responseHandlerShouldBeCalledExceptionally(responseHandlerException::set));
321323
deterministicTaskQueue.runAllRunnableTasks();
322324
assertNotNull(responseHandlerChannel.get());
325+
assertNull(responseHandlerException.get());
323326

324327
disconnectedLinks.add(Tuple.tuple(node2, node1));
325328
responseHandlerChannel.get().sendResponse(TransportResponse.Empty.INSTANCE);
326-
deterministicTaskQueue.runAllRunnableTasks();
329+
deterministicTaskQueue.runAllTasks();
330+
assertThat(responseHandlerException.get(), instanceOf(ConnectTransportException.class));
327331
}
328332

329333
public void testDisconnectedOnExceptionalResponse() throws IOException {
330334
registerRequestHandler(service1, requestHandlerShouldNotBeCalled());
331335
AtomicReference<TransportChannel> responseHandlerChannel = new AtomicReference<>();
332336
registerRequestHandler(service2, requestHandlerCaptures(responseHandlerChannel::set));
333337

334-
send(service1, node2, responseHandlerShouldNotBeCalled());
338+
AtomicReference<TransportException> responseHandlerException = new AtomicReference<>();
339+
send(service1, node2, responseHandlerShouldBeCalledExceptionally(responseHandlerException::set));
335340
deterministicTaskQueue.runAllRunnableTasks();
336341
assertNotNull(responseHandlerChannel.get());
342+
assertNull(responseHandlerException.get());
337343

338344
disconnectedLinks.add(Tuple.tuple(node2, node1));
339345
responseHandlerChannel.get().sendResponse(new Exception());
340-
deterministicTaskQueue.runAllRunnableTasks();
346+
deterministicTaskQueue.runAllTasks();
347+
assertThat(responseHandlerException.get(), instanceOf(ConnectTransportException.class));
341348
}
342349

343350
public void testUnavailableOnSuccessfulResponse() throws IOException {

0 commit comments

Comments
 (0)