From 5f3686058c7ef1aa02a0e86f95e49a39212a6514 Mon Sep 17 00:00:00 2001 From: David Turner Date: Tue, 25 Jan 2022 11:36:42 +0000 Subject: [PATCH 1/5] Correct context for ClusterConnManager listener Today `ClusterConnectionManager#connectToNode` completes its listeners in the thread context in which the connection completes, which may not be the correct context if there are multiple concurrent connection attempts. With this commit we make sure to complete each listener in the context in which it was passed to the corresponding call to `connectToNode`. --- .../transport/ClusterConnectionManager.java | 18 +++-- .../transport/RemoteClusterConnection.java | 2 +- .../transport/TransportService.java | 2 +- .../cluster/coordination/JoinHelperTests.java | 5 +- .../discovery/PeerFinderTests.java | 11 +++- .../ClusterConnectionManagerTests.java | 55 +++++++++------- .../ProxyConnectionStrategyTests.java | 42 ++++++++++-- .../RemoteConnectionManagerTests.java | 6 +- .../RemoteConnectionStrategyTests.java | 21 +++++- .../SniffConnectionStrategyTests.java | 66 +++++++++++++++---- .../test/transport/MockTransport.java | 4 +- .../test/transport/MockTransportService.java | 2 +- 12 files changed, 178 insertions(+), 56 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/transport/ClusterConnectionManager.java b/server/src/main/java/org/elasticsearch/transport/ClusterConnectionManager.java index dd8c86b67c7fe..95a1dddd94da6 100644 --- a/server/src/main/java/org/elasticsearch/transport/ClusterConnectionManager.java +++ b/server/src/main/java/org/elasticsearch/transport/ClusterConnectionManager.java @@ -10,11 +10,13 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.support.ContextPreservingActionListener; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.common.util.concurrent.ListenableFuture; import org.elasticsearch.common.util.concurrent.RunOnce; +import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.core.AbstractRefCounted; import org.elasticsearch.core.Nullable; import org.elasticsearch.core.Releasable; @@ -44,18 +46,20 @@ public class ClusterConnectionManager implements ConnectionManager { private final AbstractRefCounted connectingRefCounter = AbstractRefCounted.of(this::pendingConnectionsComplete); private final Transport transport; + private final ThreadContext threadContext; private final ConnectionProfile defaultProfile; private final AtomicBoolean closing = new AtomicBoolean(false); private final CountDownLatch closeLatch = new CountDownLatch(1); private final DelegatingNodeConnectionListener connectionListener = new DelegatingNodeConnectionListener(); - public ClusterConnectionManager(Settings settings, Transport transport) { - this(ConnectionProfile.buildDefaultConnectionProfile(settings), transport); + public ClusterConnectionManager(Settings settings, Transport transport, ThreadContext threadContext) { + this(ConnectionProfile.buildDefaultConnectionProfile(settings), transport, threadContext); } - public ClusterConnectionManager(ConnectionProfile connectionProfile, Transport transport) { + public ClusterConnectionManager(ConnectionProfile connectionProfile, Transport transport, ThreadContext threadContext) { this.transport = transport; this.defaultProfile = connectionProfile; + this.threadContext = threadContext; } @Override @@ -91,7 +95,13 @@ public void connectToNode( ConnectionValidator connectionValidator, ActionListener listener ) throws ConnectTransportException { - connectToNodeOrRetry(node, connectionProfile, connectionValidator, 0, listener); + connectToNodeOrRetry( + node, + connectionProfile, + connectionValidator, + 0, + ContextPreservingActionListener.wrapPreservingContext(listener, threadContext) + ); } /** diff --git a/server/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java b/server/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java index 3421545bb6451..ea2474d7c74e4 100644 --- a/server/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java +++ b/server/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java @@ -197,7 +197,7 @@ int getNumNodesConnected() { } private static ConnectionManager createConnectionManager(ConnectionProfile connectionProfile, TransportService transportService) { - return new ClusterConnectionManager(connectionProfile, transportService.transport); + return new ClusterConnectionManager(connectionProfile, transportService.transport, transportService.threadPool.getThreadContext()); } ConnectionManager getConnectionManager() { diff --git a/server/src/main/java/org/elasticsearch/transport/TransportService.java b/server/src/main/java/org/elasticsearch/transport/TransportService.java index 9ad29fee55ae4..73065a0d8a9ea 100644 --- a/server/src/main/java/org/elasticsearch/transport/TransportService.java +++ b/server/src/main/java/org/elasticsearch/transport/TransportService.java @@ -186,7 +186,7 @@ public TransportService( localNodeFactory, clusterSettings, taskHeaders, - new ClusterConnectionManager(settings, transport) + new ClusterConnectionManager(settings, transport, threadPool.getThreadContext()) ); } diff --git a/server/src/test/java/org/elasticsearch/cluster/coordination/JoinHelperTests.java b/server/src/test/java/org/elasticsearch/cluster/coordination/JoinHelperTests.java index ca30d4bfb4c3b..04e67e175f31b 100644 --- a/server/src/test/java/org/elasticsearch/cluster/coordination/JoinHelperTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/coordination/JoinHelperTests.java @@ -66,15 +66,16 @@ public void testJoinDeduplication() { DeterministicTaskQueue deterministicTaskQueue = new DeterministicTaskQueue(); CapturingTransport capturingTransport = new HandshakingCapturingTransport(); DiscoveryNode localNode = new DiscoveryNode("node0", buildNewFakeTransportAddress(), Version.CURRENT); + final ThreadPool threadPool = deterministicTaskQueue.getThreadPool(); TransportService transportService = new TransportService( Settings.EMPTY, capturingTransport, - deterministicTaskQueue.getThreadPool(), + threadPool, TransportService.NOOP_TRANSPORT_INTERCEPTOR, x -> localNode, null, Collections.emptySet(), - new ClusterConnectionManager(Settings.EMPTY, capturingTransport) + new ClusterConnectionManager(Settings.EMPTY, capturingTransport, threadPool.getThreadContext()) ); JoinHelper joinHelper = new JoinHelper( Settings.EMPTY, diff --git a/server/src/test/java/org/elasticsearch/discovery/PeerFinderTests.java b/server/src/test/java/org/elasticsearch/discovery/PeerFinderTests.java index 2dc24cf73d3b5..7369e3e25b06d 100644 --- a/server/src/test/java/org/elasticsearch/discovery/PeerFinderTests.java +++ b/server/src/test/java/org/elasticsearch/discovery/PeerFinderTests.java @@ -29,6 +29,7 @@ import org.elasticsearch.test.transport.CapturingTransport; import org.elasticsearch.test.transport.CapturingTransport.CapturedRequest; import org.elasticsearch.test.transport.StubbableConnectionManager; +import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.ClusterConnectionManager; import org.elasticsearch.transport.ConnectionManager; import org.elasticsearch.transport.TransportException; @@ -210,7 +211,13 @@ public void setup() { localNode = newDiscoveryNode("local-node"); - ConnectionManager innerConnectionManager = new ClusterConnectionManager(settings, capturingTransport); + final ThreadPool threadPool = deterministicTaskQueue.getThreadPool(); + + final ConnectionManager innerConnectionManager = new ClusterConnectionManager( + settings, + capturingTransport, + threadPool.getThreadContext() + ); StubbableConnectionManager connectionManager = new StubbableConnectionManager(innerConnectionManager); connectionManager.setDefaultNodeConnectedBehavior((cm, discoveryNode) -> { final boolean isConnected = connectedNodes.contains(discoveryNode); @@ -222,7 +229,7 @@ public void setup() { transportService = new TransportService( settings, capturingTransport, - deterministicTaskQueue.getThreadPool(), + threadPool, TransportService.NOOP_TRANSPORT_INTERCEPTOR, boundTransportAddress -> localNode, null, diff --git a/server/src/test/java/org/elasticsearch/transport/ClusterConnectionManagerTests.java b/server/src/test/java/org/elasticsearch/transport/ClusterConnectionManagerTests.java index f486659268ee6..3d39e21f67f09 100644 --- a/server/src/test/java/org/elasticsearch/transport/ClusterConnectionManagerTests.java +++ b/server/src/test/java/org/elasticsearch/transport/ClusterConnectionManagerTests.java @@ -19,6 +19,7 @@ import org.elasticsearch.common.logging.Loggers; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.TransportAddress; +import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.core.Releasable; import org.elasticsearch.core.Releasables; import org.elasticsearch.core.TimeValue; @@ -47,6 +48,7 @@ import java.util.function.Supplier; import static org.elasticsearch.test.ActionListenerUtils.anyActionListener; +import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.notNullValue; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.eq; @@ -65,7 +67,7 @@ public void createConnectionManager() { Settings settings = Settings.builder().put("node.name", ClusterConnectionManagerTests.class.getSimpleName()).build(); threadPool = new ThreadPool(settings); transport = mock(Transport.class); - connectionManager = new ClusterConnectionManager(settings, transport); + connectionManager = new ClusterConnectionManager(settings, transport, threadPool.getThreadContext()); TimeValue oneSecond = new TimeValue(1000); TimeValue oneMinute = TimeValue.timeValueMinutes(1); connectionProfile = ConnectionProfile.buildSingleChannelProfile( @@ -256,6 +258,9 @@ public void testConcurrentConnects() throws Exception { int threadCount = between(1, 10); Releasable[] releasables = new Releasable[threadCount]; + final ThreadContext threadContext = threadPool.getThreadContext(); + final String contextHeader = "test-context-header"; + CyclicBarrier barrier = new CyclicBarrier(threadCount + 1); Semaphore pendingCloses = new Semaphore(threadCount); for (int i = 0; i < threadCount; i++) { @@ -267,27 +272,33 @@ public void testConcurrentConnects() throws Exception { throw new RuntimeException(e); } CountDownLatch latch = new CountDownLatch(1); - connectionManager.connectToNode(node, connectionProfile, validator, ActionListener.wrap(c -> { - assert connectionManager.nodeConnected(node); - - assertTrue(pendingCloses.tryAcquire()); - connectionManager.getConnection(node).addRemovedListener(ActionListener.wrap(pendingCloses::release)); - - if (randomBoolean()) { - releasables[threadIndex] = c; - nodeConnectedCount.incrementAndGet(); - } else { - Releasables.close(c); - nodeClosedCount.incrementAndGet(); - } - - assert latch.getCount() == 1; - latch.countDown(); - }, e -> { - nodeFailureCount.incrementAndGet(); - assert latch.getCount() == 1; - latch.countDown(); - })); + try (ThreadContext.StoredContext ignored = threadContext.stashContext()) { + final String contextValue = randomAlphaOfLength(10); + threadContext.putHeader(contextHeader, contextValue); + connectionManager.connectToNode(node, connectionProfile, validator, ActionListener.wrap(c -> { + assert connectionManager.nodeConnected(node); + assertThat(threadContext.getHeader(contextHeader), equalTo(contextValue)); + + assertTrue(pendingCloses.tryAcquire()); + connectionManager.getConnection(node).addRemovedListener(ActionListener.wrap(pendingCloses::release)); + + if (randomBoolean()) { + releasables[threadIndex] = c; + nodeConnectedCount.incrementAndGet(); + } else { + Releasables.close(c); + nodeClosedCount.incrementAndGet(); + } + + assert latch.getCount() == 1; + latch.countDown(); + }, e -> { + assertThat(threadContext.getHeader(contextHeader), equalTo(contextValue)); + nodeFailureCount.incrementAndGet(); + assert latch.getCount() == 1; + latch.countDown(); + })); + } try { latch.await(); } catch (InterruptedException e) { diff --git a/server/src/test/java/org/elasticsearch/transport/ProxyConnectionStrategyTests.java b/server/src/test/java/org/elasticsearch/transport/ProxyConnectionStrategyTests.java index ec1ef2b5aab09..e77a755a6207f 100644 --- a/server/src/test/java/org/elasticsearch/transport/ProxyConnectionStrategyTests.java +++ b/server/src/test/java/org/elasticsearch/transport/ProxyConnectionStrategyTests.java @@ -83,7 +83,11 @@ public void testProxyStrategyWillOpenExpectedNumberOfConnectionsToAddress() { localService.start(); localService.acceptIncomingRequests(); - ClusterConnectionManager connectionManager = new ClusterConnectionManager(profile, localService.transport); + final ClusterConnectionManager connectionManager = new ClusterConnectionManager( + profile, + localService.transport, + threadPool.getThreadContext() + ); int numOfConnections = randomIntBetween(4, 8); try ( RemoteConnectionManager remoteConnectionManager = new RemoteConnectionManager(clusterAlias, connectionManager); @@ -127,7 +131,11 @@ public void testProxyStrategyWillOpenNewConnectionsOnDisconnect() throws Excepti localService.start(); localService.acceptIncomingRequests(); - ClusterConnectionManager connectionManager = new ClusterConnectionManager(profile, localService.transport); + final ClusterConnectionManager connectionManager = new ClusterConnectionManager( + profile, + localService.transport, + threadPool.getThreadContext() + ); int numOfConnections = randomIntBetween(4, 8); AtomicBoolean useAddress1 = new AtomicBoolean(true); @@ -189,7 +197,11 @@ public void testConnectFailsWithIncompatibleNodes() { localService.start(); localService.acceptIncomingRequests(); - ClusterConnectionManager connectionManager = new ClusterConnectionManager(profile, localService.transport); + final ClusterConnectionManager connectionManager = new ClusterConnectionManager( + profile, + localService.transport, + threadPool.getThreadContext() + ); int numOfConnections = randomIntBetween(4, 8); try ( RemoteConnectionManager remoteConnectionManager = new RemoteConnectionManager(clusterAlias, connectionManager); @@ -232,7 +244,11 @@ public void testClusterNameValidationPreventConnectingToDifferentClusters() thro localService.start(); localService.acceptIncomingRequests(); - ClusterConnectionManager connectionManager = new ClusterConnectionManager(profile, localService.transport); + final ClusterConnectionManager connectionManager = new ClusterConnectionManager( + profile, + localService.transport, + threadPool.getThreadContext() + ); int numOfConnections = randomIntBetween(4, 8); AtomicBoolean useAddress1 = new AtomicBoolean(true); @@ -295,7 +311,11 @@ public void testProxyStrategyWillResolveAddressesEachConnect() throws Exception localService.start(); localService.acceptIncomingRequests(); - ClusterConnectionManager connectionManager = new ClusterConnectionManager(profile, localService.transport); + final ClusterConnectionManager connectionManager = new ClusterConnectionManager( + profile, + localService.transport, + threadPool.getThreadContext() + ); int numOfConnections = randomIntBetween(4, 8); try ( RemoteConnectionManager remoteConnectionManager = new RemoteConnectionManager(clusterAlias, connectionManager); @@ -330,7 +350,11 @@ public void testProxyStrategyWillNeedToBeRebuiltIfNumOfSocketsOrAddressesOrServe localService.start(); localService.acceptIncomingRequests(); - ClusterConnectionManager connectionManager = new ClusterConnectionManager(profile, localService.transport); + final ClusterConnectionManager connectionManager = new ClusterConnectionManager( + profile, + localService.transport, + threadPool.getThreadContext() + ); int numOfConnections = randomIntBetween(4, 8); try ( RemoteConnectionManager remoteConnectionManager = new RemoteConnectionManager(clusterAlias, connectionManager); @@ -435,7 +459,11 @@ public void testServerNameAttributes() { String address = "localhost:" + address1.getPort(); - ClusterConnectionManager connectionManager = new ClusterConnectionManager(profile, localService.transport); + final ClusterConnectionManager connectionManager = new ClusterConnectionManager( + profile, + localService.transport, + threadPool.getThreadContext() + ); int numOfConnections = randomIntBetween(4, 8); try ( RemoteConnectionManager remoteConnectionManager = new RemoteConnectionManager(clusterAlias, connectionManager); diff --git a/server/src/test/java/org/elasticsearch/transport/RemoteConnectionManagerTests.java b/server/src/test/java/org/elasticsearch/transport/RemoteConnectionManagerTests.java index b045376848038..031bafbaf78fe 100644 --- a/server/src/test/java/org/elasticsearch/transport/RemoteConnectionManagerTests.java +++ b/server/src/test/java/org/elasticsearch/transport/RemoteConnectionManagerTests.java @@ -13,6 +13,7 @@ import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.TransportAddress; +import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.test.ESTestCase; import java.net.InetAddress; @@ -35,7 +36,10 @@ public class RemoteConnectionManagerTests extends ESTestCase { public void setUp() throws Exception { super.setUp(); transport = mock(Transport.class); - remoteConnectionManager = new RemoteConnectionManager("remote-cluster", new ClusterConnectionManager(Settings.EMPTY, transport)); + remoteConnectionManager = new RemoteConnectionManager( + "remote-cluster", + new ClusterConnectionManager(Settings.EMPTY, transport, new ThreadContext(Settings.EMPTY)) + ); } @SuppressWarnings("unchecked") diff --git a/server/src/test/java/org/elasticsearch/transport/RemoteConnectionStrategyTests.java b/server/src/test/java/org/elasticsearch/transport/RemoteConnectionStrategyTests.java index a4b0cda7e0a8a..76cfafc412664 100644 --- a/server/src/test/java/org/elasticsearch/transport/RemoteConnectionStrategyTests.java +++ b/server/src/test/java/org/elasticsearch/transport/RemoteConnectionStrategyTests.java @@ -10,6 +10,7 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.core.TimeValue; import org.elasticsearch.test.ESTestCase; @@ -17,8 +18,14 @@ public class RemoteConnectionStrategyTests extends ESTestCase { + private static final ThreadContext threadContext = new ThreadContext(Settings.EMPTY); + public void testStrategyChangeMeansThatStrategyMustBeRebuilt() { - ClusterConnectionManager connectionManager = new ClusterConnectionManager(Settings.EMPTY, mock(Transport.class)); + final ClusterConnectionManager connectionManager = new ClusterConnectionManager( + Settings.EMPTY, + mock(Transport.class), + threadContext + ); RemoteConnectionManager remoteConnectionManager = new RemoteConnectionManager("cluster-alias", connectionManager); FakeConnectionStrategy first = new FakeConnectionStrategy( "cluster-alias", @@ -34,7 +41,11 @@ public void testStrategyChangeMeansThatStrategyMustBeRebuilt() { } public void testSameStrategyChangeMeansThatStrategyDoesNotNeedToBeRebuilt() { - ClusterConnectionManager connectionManager = new ClusterConnectionManager(Settings.EMPTY, mock(Transport.class)); + final ClusterConnectionManager connectionManager = new ClusterConnectionManager( + Settings.EMPTY, + mock(Transport.class), + threadContext + ); RemoteConnectionManager remoteConnectionManager = new RemoteConnectionManager("cluster-alias", connectionManager); FakeConnectionStrategy first = new FakeConnectionStrategy( "cluster-alias", @@ -50,7 +61,11 @@ public void testSameStrategyChangeMeansThatStrategyDoesNotNeedToBeRebuilt() { } public void testChangeInConnectionProfileMeansTheStrategyMustBeRebuilt() { - ClusterConnectionManager connectionManager = new ClusterConnectionManager(TestProfiles.LIGHT_PROFILE, mock(Transport.class)); + final ClusterConnectionManager connectionManager = new ClusterConnectionManager( + TestProfiles.LIGHT_PROFILE, + mock(Transport.class), + threadContext + ); assertEquals(TimeValue.MINUS_ONE, connectionManager.getConnectionProfile().getPingInterval()); assertEquals(Compression.Enabled.INDEXING_DATA, connectionManager.getConnectionProfile().getCompressionEnabled()); assertEquals(Compression.Scheme.LZ4, connectionManager.getConnectionProfile().getCompressionScheme()); diff --git a/server/src/test/java/org/elasticsearch/transport/SniffConnectionStrategyTests.java b/server/src/test/java/org/elasticsearch/transport/SniffConnectionStrategyTests.java index 02c1fd9f6603e..926ebfd892812 100644 --- a/server/src/test/java/org/elasticsearch/transport/SniffConnectionStrategyTests.java +++ b/server/src/test/java/org/elasticsearch/transport/SniffConnectionStrategyTests.java @@ -123,7 +123,11 @@ public void testSniffStrategyWillConnectToAndDiscoverNodes() { localService.start(); localService.acceptIncomingRequests(); - ClusterConnectionManager connectionManager = new ClusterConnectionManager(profile, localService.transport); + final ClusterConnectionManager connectionManager = new ClusterConnectionManager( + profile, + localService.transport, + threadPool.getThreadContext() + ); try ( RemoteConnectionManager remoteConnectionManager = new RemoteConnectionManager(clusterAlias, connectionManager); SniffConnectionStrategy strategy = new SniffConnectionStrategy( @@ -172,7 +176,11 @@ public void testSniffStrategyWillResolveDiscoveryNodesEachConnect() throws Excep localService.start(); localService.acceptIncomingRequests(); - ClusterConnectionManager connectionManager = new ClusterConnectionManager(profile, localService.transport); + final ClusterConnectionManager connectionManager = new ClusterConnectionManager( + profile, + localService.transport, + threadPool.getThreadContext() + ); try ( RemoteConnectionManager remoteConnectionManager = new RemoteConnectionManager(clusterAlias, connectionManager); SniffConnectionStrategy strategy = new SniffConnectionStrategy( @@ -220,7 +228,11 @@ public void testSniffStrategyWillConnectToMaxAllowedNodesAndOpenNewConnectionsOn localService.start(); localService.acceptIncomingRequests(); - ClusterConnectionManager connectionManager = new ClusterConnectionManager(profile, localService.transport); + final ClusterConnectionManager connectionManager = new ClusterConnectionManager( + profile, + localService.transport, + threadPool.getThreadContext() + ); try ( RemoteConnectionManager remoteConnectionManager = new RemoteConnectionManager(clusterAlias, connectionManager); SniffConnectionStrategy strategy = new SniffConnectionStrategy( @@ -277,7 +289,11 @@ public void testDiscoverWithSingleIncompatibleSeedNode() { localService.start(); localService.acceptIncomingRequests(); - ClusterConnectionManager connectionManager = new ClusterConnectionManager(profile, localService.transport); + final ClusterConnectionManager connectionManager = new ClusterConnectionManager( + profile, + localService.transport, + threadPool.getThreadContext() + ); try ( RemoteConnectionManager remoteConnectionManager = new RemoteConnectionManager(clusterAlias, connectionManager); SniffConnectionStrategy strategy = new SniffConnectionStrategy( @@ -316,7 +332,11 @@ public void testConnectFailsWithIncompatibleNodes() { localService.start(); localService.acceptIncomingRequests(); - ClusterConnectionManager connectionManager = new ClusterConnectionManager(profile, localService.transport); + final ClusterConnectionManager connectionManager = new ClusterConnectionManager( + profile, + localService.transport, + threadPool.getThreadContext() + ); try ( RemoteConnectionManager remoteConnectionManager = new RemoteConnectionManager(clusterAlias, connectionManager); SniffConnectionStrategy strategy = new SniffConnectionStrategy( @@ -358,7 +378,11 @@ public void testFilterNodesWithNodePredicate() { localService.start(); localService.acceptIncomingRequests(); - ClusterConnectionManager connectionManager = new ClusterConnectionManager(profile, localService.transport); + final ClusterConnectionManager connectionManager = new ClusterConnectionManager( + profile, + localService.transport, + threadPool.getThreadContext() + ); try ( RemoteConnectionManager remoteConnectionManager = new RemoteConnectionManager(clusterAlias, connectionManager); SniffConnectionStrategy strategy = new SniffConnectionStrategy( @@ -405,7 +429,11 @@ public void testConnectFailsIfNoConnectionsOpened() { localService.acceptIncomingRequests(); // Predicate excludes seed node as a possible connection - ClusterConnectionManager connectionManager = new ClusterConnectionManager(profile, localService.transport); + final ClusterConnectionManager connectionManager = new ClusterConnectionManager( + profile, + localService.transport, + threadPool.getThreadContext() + ); try ( RemoteConnectionManager remoteConnectionManager = new RemoteConnectionManager(clusterAlias, connectionManager); SniffConnectionStrategy strategy = new SniffConnectionStrategy( @@ -454,7 +482,11 @@ public void testClusterNameValidationPreventConnectingToDifferentClusters() thro localService.start(); localService.acceptIncomingRequests(); - ClusterConnectionManager connectionManager = new ClusterConnectionManager(profile, localService.transport); + final ClusterConnectionManager connectionManager = new ClusterConnectionManager( + profile, + localService.transport, + threadPool.getThreadContext() + ); try ( RemoteConnectionManager remoteConnectionManager = new RemoteConnectionManager(clusterAlias, connectionManager); SniffConnectionStrategy strategy = new SniffConnectionStrategy( @@ -522,7 +554,11 @@ public void testMultipleCallsToConnectEnsuresConnection() { localService.start(); localService.acceptIncomingRequests(); - ClusterConnectionManager connectionManager = new ClusterConnectionManager(profile, localService.transport); + final ClusterConnectionManager connectionManager = new ClusterConnectionManager( + profile, + localService.transport, + threadPool.getThreadContext() + ); try ( RemoteConnectionManager remoteConnectionManager = new RemoteConnectionManager(clusterAlias, connectionManager); SniffConnectionStrategy strategy = new SniffConnectionStrategy( @@ -611,7 +647,11 @@ public void testConfiguredProxyAddressModeWillReplaceNodeAddress() { List seedNodes = Collections.singletonList(accessibleNode.toString()); TransportAddress proxyAddress = accessibleNode.getAddress(); - ClusterConnectionManager connectionManager = new ClusterConnectionManager(profile, transport); + final ClusterConnectionManager connectionManager = new ClusterConnectionManager( + profile, + transport, + threadPool.getThreadContext() + ); try ( RemoteConnectionManager remoteConnectionManager = new RemoteConnectionManager(clusterAlias, connectionManager); SniffConnectionStrategy strategy = new SniffConnectionStrategy( @@ -659,7 +699,11 @@ public void testSniffStrategyWillNeedToBeRebuiltIfNumOfConnectionsOrSeedsOrProxy localService.start(); localService.acceptIncomingRequests(); - ClusterConnectionManager connectionManager = new ClusterConnectionManager(profile, localService.transport); + final ClusterConnectionManager connectionManager = new ClusterConnectionManager( + profile, + localService.transport, + threadPool.getThreadContext() + ); try ( RemoteConnectionManager remoteConnectionManager = new RemoteConnectionManager(clusterAlias, connectionManager); SniffConnectionStrategy strategy = new SniffConnectionStrategy( diff --git a/test/framework/src/main/java/org/elasticsearch/test/transport/MockTransport.java b/test/framework/src/main/java/org/elasticsearch/test/transport/MockTransport.java index 55d2e66bdcc0d..1a725392e0af4 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/transport/MockTransport.java +++ b/test/framework/src/main/java/org/elasticsearch/test/transport/MockTransport.java @@ -57,7 +57,9 @@ public TransportService createTransportService( @Nullable ClusterSettings clusterSettings, Set taskHeaders ) { - StubbableConnectionManager connectionManager = new StubbableConnectionManager(new ClusterConnectionManager(settings, this)); + final StubbableConnectionManager connectionManager = new StubbableConnectionManager( + new ClusterConnectionManager(settings, this, threadPool.getThreadContext()) + ); connectionManager.setDefaultNodeConnectedBehavior((cm, node) -> false); connectionManager.setDefaultGetConnectionBehavior((cm, discoveryNode) -> createConnection(discoveryNode)); return new TransportService( diff --git a/test/framework/src/main/java/org/elasticsearch/test/transport/MockTransportService.java b/test/framework/src/main/java/org/elasticsearch/test/transport/MockTransportService.java index fce2ee039d096..624613304a6e0 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/transport/MockTransportService.java +++ b/test/framework/src/main/java/org/elasticsearch/test/transport/MockTransportService.java @@ -232,7 +232,7 @@ private MockTransportService( localNodeFactory, clusterSettings, taskHeaders, - new StubbableConnectionManager(new ClusterConnectionManager(settings, transport)) + new StubbableConnectionManager(new ClusterConnectionManager(settings, transport, threadPool.getThreadContext())) ); this.original = transport.getDelegate(); } From eeff0453092bcd8e6271fc46ec1339b1ef7639e6 Mon Sep 17 00:00:00 2001 From: "ievgen.degtiarenko" Date: Wed, 26 Jan 2022 13:41:33 +0100 Subject: [PATCH 2/5] attempt to fix currently failing tests --- .../java/org/elasticsearch/action/main/MainActionTests.java | 3 ++- .../action/search/MultiSearchActionTookTests.java | 2 +- .../action/support/replication/TransportWriteActionTests.java | 2 +- .../action/TransportSamlInitiateSingleSignOnActionTests.java | 2 +- 4 files changed, 5 insertions(+), 4 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/action/main/MainActionTests.java b/server/src/test/java/org/elasticsearch/action/main/MainActionTests.java index 5db73fd0f8ee9..e23c358b9de26 100644 --- a/server/src/test/java/org/elasticsearch/action/main/MainActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/main/MainActionTests.java @@ -20,6 +20,7 @@ import org.elasticsearch.rest.RestStatus; import org.elasticsearch.tasks.Task; import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.Transport; import org.elasticsearch.transport.TransportService; @@ -77,7 +78,7 @@ public void testMainActionClusterAvailable() { TransportService transportService = new TransportService( Settings.EMPTY, mock(Transport.class), - null, + mock(ThreadPool.class), TransportService.NOOP_TRANSPORT_INTERCEPTOR, x -> null, null, diff --git a/server/src/test/java/org/elasticsearch/action/search/MultiSearchActionTookTests.java b/server/src/test/java/org/elasticsearch/action/search/MultiSearchActionTookTests.java index 495b441c65884..ca430d521936f 100644 --- a/server/src/test/java/org/elasticsearch/action/search/MultiSearchActionTookTests.java +++ b/server/src/test/java/org/elasticsearch/action/search/MultiSearchActionTookTests.java @@ -122,7 +122,7 @@ private TransportMultiSearchAction createTransportMultiSearchAction(boolean cont TransportService transportService = new TransportService( Settings.EMPTY, mock(Transport.class), - null, + mock(ThreadPool.class), TransportService.NOOP_TRANSPORT_INTERCEPTOR, boundAddress -> DiscoveryNode.createLocal(settings, boundAddress.publishAddress(), UUIDs.randomBase64UUID()), null, diff --git a/server/src/test/java/org/elasticsearch/action/support/replication/TransportWriteActionTests.java b/server/src/test/java/org/elasticsearch/action/support/replication/TransportWriteActionTests.java index 43e4464ddb910..987bf708f453a 100644 --- a/server/src/test/java/org/elasticsearch/action/support/replication/TransportWriteActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/support/replication/TransportWriteActionTests.java @@ -387,7 +387,7 @@ protected TestAction(boolean withDocumentFailureOnPrimary, boolean withDocumentF new TransportService( Settings.EMPTY, mock(Transport.class), - null, + mock(ThreadPool.class), TransportService.NOOP_TRANSPORT_INTERCEPTOR, x -> null, null, diff --git a/x-pack/plugin/identity-provider/src/test/java/org/elasticsearch/xpack/idp/action/TransportSamlInitiateSingleSignOnActionTests.java b/x-pack/plugin/identity-provider/src/test/java/org/elasticsearch/xpack/idp/action/TransportSamlInitiateSingleSignOnActionTests.java index dc6ba1ff3d152..ca18d12939a7e 100644 --- a/x-pack/plugin/identity-provider/src/test/java/org/elasticsearch/xpack/idp/action/TransportSamlInitiateSingleSignOnActionTests.java +++ b/x-pack/plugin/identity-provider/src/test/java/org/elasticsearch/xpack/idp/action/TransportSamlInitiateSingleSignOnActionTests.java @@ -135,7 +135,7 @@ private TransportSamlInitiateSingleSignOnAction setupTransportAction(boolean wit final TransportService transportService = new TransportService( Settings.EMPTY, mock(Transport.class), - null, + threadPool, TransportService.NOOP_TRANSPORT_INTERCEPTOR, x -> null, null, From b56fb9d6b1f3f6a8074c4006609a13ca6fabd336 Mon Sep 17 00:00:00 2001 From: David Turner Date: Wed, 26 Jan 2022 12:42:08 +0000 Subject: [PATCH 3/5] Update docs/changelog/83035.yaml --- docs/changelog/83035.yaml | 5 +++++ 1 file changed, 5 insertions(+) create mode 100644 docs/changelog/83035.yaml diff --git a/docs/changelog/83035.yaml b/docs/changelog/83035.yaml new file mode 100644 index 0000000000000..cfec033421bf9 --- /dev/null +++ b/docs/changelog/83035.yaml @@ -0,0 +1,5 @@ +pr: 83035 +summary: Correct context for `ClusterConnManager` listener +area: Network +type: bug +issues: [] From 8f63a8aec29c26d79fb4abddfd382c55827f4f17 Mon Sep 17 00:00:00 2001 From: "ievgen.degtiarenko" Date: Wed, 26 Jan 2022 14:31:19 +0100 Subject: [PATCH 4/5] attempt to fix failing tests --- .../TransportOpenIdConnectLogoutActionTests.java | 2 +- .../action/role/TransportDeleteRoleActionTests.java | 7 ++++--- .../action/role/TransportGetRolesActionTests.java | 9 +++++---- .../action/role/TransportPutRoleActionTests.java | 11 ++++++----- .../TransportGetRoleMappingsActionTests.java | 3 ++- .../TransportPutRoleMappingActionTests.java | 3 ++- .../TransportSamlInvalidateSessionActionTests.java | 2 +- .../action/saml/TransportSamlLogoutActionTests.java | 2 +- .../user/TransportAuthenticateActionTests.java | 7 ++++--- .../user/TransportChangePasswordActionTests.java | 11 ++++++----- .../action/user/TransportDeleteUserActionTests.java | 11 ++++++----- .../action/user/TransportGetUsersActionTests.java | 12 ++++++------ .../action/user/TransportPutUserActionTests.java | 12 ++++++------ .../action/user/TransportSetEnabledActionTests.java | 10 +++++----- 14 files changed, 55 insertions(+), 47 deletions(-) diff --git a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/action/oidc/TransportOpenIdConnectLogoutActionTests.java b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/action/oidc/TransportOpenIdConnectLogoutActionTests.java index 7e6a7e58aeb67..e658a0129c457 100644 --- a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/action/oidc/TransportOpenIdConnectLogoutActionTests.java +++ b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/action/oidc/TransportOpenIdConnectLogoutActionTests.java @@ -190,7 +190,7 @@ public void setup() throws Exception { final TransportService transportService = new TransportService( Settings.EMPTY, mock(Transport.class), - null, + mock(ThreadPool.class), TransportService.NOOP_TRANSPORT_INTERCEPTOR, x -> null, null, diff --git a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/action/role/TransportDeleteRoleActionTests.java b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/action/role/TransportDeleteRoleActionTests.java index d2aaa56d98fc4..1dd1b1ffd7719 100644 --- a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/action/role/TransportDeleteRoleActionTests.java +++ b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/action/role/TransportDeleteRoleActionTests.java @@ -12,6 +12,7 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.tasks.Task; import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.Transport; import org.elasticsearch.transport.TransportService; import org.elasticsearch.xpack.core.security.action.role.DeleteRoleRequest; @@ -45,7 +46,7 @@ public void testReservedRole() { TransportService transportService = new TransportService( Settings.EMPTY, mock(Transport.class), - null, + mock(ThreadPool.class), TransportService.NOOP_TRANSPORT_INTERCEPTOR, (x) -> null, null, @@ -82,7 +83,7 @@ public void testValidRole() { TransportService transportService = new TransportService( Settings.EMPTY, mock(Transport.class), - null, + mock(ThreadPool.class), TransportService.NOOP_TRANSPORT_INTERCEPTOR, (x) -> null, null, @@ -130,7 +131,7 @@ public void testException() { TransportService transportService = new TransportService( Settings.EMPTY, mock(Transport.class), - null, + mock(ThreadPool.class), TransportService.NOOP_TRANSPORT_INTERCEPTOR, (x) -> null, null, diff --git a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/action/role/TransportGetRolesActionTests.java b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/action/role/TransportGetRolesActionTests.java index 88365e24581e6..e5b776057d210 100644 --- a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/action/role/TransportGetRolesActionTests.java +++ b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/action/role/TransportGetRolesActionTests.java @@ -13,6 +13,7 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.tasks.Task; import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.Transport; import org.elasticsearch.transport.TransportService; import org.elasticsearch.xpack.core.security.action.role.GetRolesRequest; @@ -50,7 +51,7 @@ public void testReservedRoles() { TransportService transportService = new TransportService( Settings.EMPTY, mock(Transport.class), - null, + mock(ThreadPool.class), TransportService.NOOP_TRANSPORT_INTERCEPTOR, x -> null, null, @@ -110,7 +111,7 @@ public void testStoreRoles() { TransportService transportService = new TransportService( Settings.EMPTY, mock(Transport.class), - null, + mock(ThreadPool.class), TransportService.NOOP_TRANSPORT_INTERCEPTOR, x -> null, null, @@ -176,7 +177,7 @@ public void testGetAllOrMix() { TransportService transportService = new TransportService( Settings.EMPTY, mock(Transport.class), - null, + mock(ThreadPool.class), TransportService.NOOP_TRANSPORT_INTERCEPTOR, x -> null, null, @@ -255,7 +256,7 @@ public void testException() { TransportService transportService = new TransportService( Settings.EMPTY, mock(Transport.class), - null, + mock(ThreadPool.class), TransportService.NOOP_TRANSPORT_INTERCEPTOR, x -> null, null, diff --git a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/action/role/TransportPutRoleActionTests.java b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/action/role/TransportPutRoleActionTests.java index 6bbf5cc870660..9d884a0045778 100644 --- a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/action/role/TransportPutRoleActionTests.java +++ b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/action/role/TransportPutRoleActionTests.java @@ -21,6 +21,7 @@ import org.elasticsearch.join.query.HasParentQueryBuilder; import org.elasticsearch.tasks.Task; import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.Transport; import org.elasticsearch.transport.TransportService; import org.elasticsearch.xcontent.NamedXContentRegistry; @@ -87,7 +88,7 @@ public void testReservedRole() { TransportService transportService = new TransportService( Settings.EMPTY, mock(Transport.class), - null, + mock(ThreadPool.class), TransportService.NOOP_TRANSPORT_INTERCEPTOR, x -> null, null, @@ -129,7 +130,7 @@ public void testValidRole() { TransportService transportService = new TransportService( Settings.EMPTY, mock(Transport.class), - null, + mock(ThreadPool.class), TransportService.NOOP_TRANSPORT_INTERCEPTOR, x -> null, null, @@ -182,7 +183,7 @@ public void testException() { TransportService transportService = new TransportService( Settings.EMPTY, mock(Transport.class), - null, + mock(ThreadPool.class), TransportService.NOOP_TRANSPORT_INTERCEPTOR, x -> null, null, @@ -232,7 +233,7 @@ public void testCreationOfRoleWithMalformedQueryJsonFails() { TransportService transportService = new TransportService( Settings.EMPTY, mock(Transport.class), - null, + mock(ThreadPool.class), TransportService.NOOP_TRANSPORT_INTERCEPTOR, x -> null, null, @@ -287,7 +288,7 @@ public void testCreationOfRoleWithUnsupportedQueryFails() throws Exception { TransportService transportService = new TransportService( Settings.EMPTY, mock(Transport.class), - null, + mock(ThreadPool.class), TransportService.NOOP_TRANSPORT_INTERCEPTOR, x -> null, null, diff --git a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/action/rolemapping/TransportGetRoleMappingsActionTests.java b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/action/rolemapping/TransportGetRoleMappingsActionTests.java index cfe056c80f919..6e8698f095d32 100644 --- a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/action/rolemapping/TransportGetRoleMappingsActionTests.java +++ b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/action/rolemapping/TransportGetRoleMappingsActionTests.java @@ -13,6 +13,7 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.tasks.Task; import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.Transport; import org.elasticsearch.transport.TransportService; import org.elasticsearch.xpack.core.security.action.rolemapping.GetRoleMappingsRequest; @@ -51,7 +52,7 @@ public void setupMocks() { TransportService transportService = new TransportService( Settings.EMPTY, mock(Transport.class), - null, + mock(ThreadPool.class), TransportService.NOOP_TRANSPORT_INTERCEPTOR, x -> null, null, diff --git a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/action/rolemapping/TransportPutRoleMappingActionTests.java b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/action/rolemapping/TransportPutRoleMappingActionTests.java index 13fb17bddeca9..6f789a10a3a6c 100644 --- a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/action/rolemapping/TransportPutRoleMappingActionTests.java +++ b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/action/rolemapping/TransportPutRoleMappingActionTests.java @@ -12,6 +12,7 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.tasks.Task; import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.Transport; import org.elasticsearch.transport.TransportService; import org.elasticsearch.xpack.core.security.action.rolemapping.PutRoleMappingRequest; @@ -48,7 +49,7 @@ public void setupMocks() { TransportService transportService = new TransportService( Settings.EMPTY, mock(Transport.class), - null, + mock(ThreadPool.class), TransportService.NOOP_TRANSPORT_INTERCEPTOR, x -> null, null, diff --git a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/action/saml/TransportSamlInvalidateSessionActionTests.java b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/action/saml/TransportSamlInvalidateSessionActionTests.java index 3079cb450ee41..3364f3f085fa6 100644 --- a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/action/saml/TransportSamlInvalidateSessionActionTests.java +++ b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/action/saml/TransportSamlInvalidateSessionActionTests.java @@ -261,7 +261,7 @@ protected void final TransportService transportService = new TransportService( Settings.EMPTY, mock(Transport.class), - null, + mock(ThreadPool.class), TransportService.NOOP_TRANSPORT_INTERCEPTOR, x -> null, null, diff --git a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/action/saml/TransportSamlLogoutActionTests.java b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/action/saml/TransportSamlLogoutActionTests.java index 42f267017b5a0..07dd115e3e563 100644 --- a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/action/saml/TransportSamlLogoutActionTests.java +++ b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/action/saml/TransportSamlLogoutActionTests.java @@ -221,7 +221,7 @@ public void setup() throws Exception { final TransportService transportService = new TransportService( Settings.EMPTY, mock(Transport.class), - null, + mock(ThreadPool.class), TransportService.NOOP_TRANSPORT_INTERCEPTOR, x -> null, null, diff --git a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/action/user/TransportAuthenticateActionTests.java b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/action/user/TransportAuthenticateActionTests.java index e77f8cb83b256..9089501802636 100644 --- a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/action/user/TransportAuthenticateActionTests.java +++ b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/action/user/TransportAuthenticateActionTests.java @@ -14,6 +14,7 @@ import org.elasticsearch.common.util.ArrayUtils; import org.elasticsearch.tasks.Task; import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.Transport; import org.elasticsearch.transport.TransportService; import org.elasticsearch.xpack.core.security.SecurityContext; @@ -62,7 +63,7 @@ public void testInternalUser() { TransportService transportService = new TransportService( Settings.EMPTY, mock(Transport.class), - null, + mock(ThreadPool.class), TransportService.NOOP_TRANSPORT_INTERCEPTOR, x -> null, null, @@ -99,7 +100,7 @@ public void testNullUser() { TransportService transportService = new TransportService( Settings.EMPTY, mock(Transport.class), - null, + mock(ThreadPool.class), TransportService.NOOP_TRANSPORT_INTERCEPTOR, x -> null, null, @@ -256,7 +257,7 @@ private TransportAuthenticateAction prepareAction(AnonymousUser anonymousUser, U TransportService transportService = new TransportService( Settings.EMPTY, mock(Transport.class), - null, + mock(ThreadPool.class), TransportService.NOOP_TRANSPORT_INTERCEPTOR, x -> null, null, diff --git a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/action/user/TransportChangePasswordActionTests.java b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/action/user/TransportChangePasswordActionTests.java index da0eefb752177..ebd75c28fddea 100644 --- a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/action/user/TransportChangePasswordActionTests.java +++ b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/action/user/TransportChangePasswordActionTests.java @@ -14,6 +14,7 @@ import org.elasticsearch.tasks.Task; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.SecuritySettingsSourceField; +import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.Transport; import org.elasticsearch.transport.TransportService; import org.elasticsearch.xpack.core.XPackSettings; @@ -61,7 +62,7 @@ public void testAnonymousUser() { TransportService transportService = new TransportService( Settings.EMPTY, mock(Transport.class), - null, + mock(ThreadPool.class), TransportService.NOOP_TRANSPORT_INTERCEPTOR, x -> null, null, @@ -105,7 +106,7 @@ public void testInternalUsers() { TransportService transportService = new TransportService( Settings.EMPTY, mock(Transport.class), - null, + mock(ThreadPool.class), TransportService.NOOP_TRANSPORT_INTERCEPTOR, x -> null, null, @@ -167,7 +168,7 @@ public void testValidUser() { TransportService transportService = new TransportService( Settings.EMPTY, mock(Transport.class), - null, + mock(ThreadPool.class), TransportService.NOOP_TRANSPORT_INTERCEPTOR, x -> null, null, @@ -212,7 +213,7 @@ public void testIncorrectPasswordHashingAlgorithm() { TransportService transportService = new TransportService( Settings.EMPTY, mock(Transport.class), - null, + mock(ThreadPool.class), TransportService.NOOP_TRANSPORT_INTERCEPTOR, x -> null, null, @@ -266,7 +267,7 @@ public void testException() { TransportService transportService = new TransportService( Settings.EMPTY, mock(Transport.class), - null, + mock(ThreadPool.class), TransportService.NOOP_TRANSPORT_INTERCEPTOR, x -> null, null, diff --git a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/action/user/TransportDeleteUserActionTests.java b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/action/user/TransportDeleteUserActionTests.java index 8c2f964152ec1..1511c906ca1df 100644 --- a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/action/user/TransportDeleteUserActionTests.java +++ b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/action/user/TransportDeleteUserActionTests.java @@ -12,6 +12,7 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.tasks.Task; import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.Transport; import org.elasticsearch.transport.TransportService; import org.elasticsearch.xpack.core.security.action.user.DeleteUserRequest; @@ -51,7 +52,7 @@ public void testAnonymousUser() { TransportService transportService = new TransportService( Settings.EMPTY, mock(Transport.class), - null, + mock(ThreadPool.class), TransportService.NOOP_TRANSPORT_INTERCEPTOR, x -> null, null, @@ -86,7 +87,7 @@ public void testInternalUser() { TransportService transportService = new TransportService( Settings.EMPTY, mock(Transport.class), - null, + mock(ThreadPool.class), TransportService.NOOP_TRANSPORT_INTERCEPTOR, x -> null, null, @@ -134,7 +135,7 @@ public void testReservedUser() { TransportService transportService = new TransportService( Settings.EMPTY, mock(Transport.class), - null, + mock(ThreadPool.class), TransportService.NOOP_TRANSPORT_INTERCEPTOR, x -> null, null, @@ -175,7 +176,7 @@ public void testValidUser() { TransportService transportService = new TransportService( Settings.EMPTY, mock(Transport.class), - null, + mock(ThreadPool.class), TransportService.NOOP_TRANSPORT_INTERCEPTOR, x -> null, null, @@ -226,7 +227,7 @@ public void testException() { TransportService transportService = new TransportService( Settings.EMPTY, mock(Transport.class), - null, + mock(ThreadPool.class), TransportService.NOOP_TRANSPORT_INTERCEPTOR, x -> null, null, diff --git a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/action/user/TransportGetUsersActionTests.java b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/action/user/TransportGetUsersActionTests.java index ed6de0ca20943..bbbafb1f83e0e 100644 --- a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/action/user/TransportGetUsersActionTests.java +++ b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/action/user/TransportGetUsersActionTests.java @@ -94,7 +94,7 @@ public void testAnonymousUser() { TransportService transportService = new TransportService( Settings.EMPTY, mock(Transport.class), - null, + mock(ThreadPool.class), TransportService.NOOP_TRANSPORT_INTERCEPTOR, x -> null, null, @@ -141,7 +141,7 @@ public void testInternalUser() { TransportService transportService = new TransportService( Settings.EMPTY, mock(Transport.class), - null, + mock(ThreadPool.class), TransportService.NOOP_TRANSPORT_INTERCEPTOR, x -> null, null, @@ -207,7 +207,7 @@ public void testReservedUsersOnly() { TransportService transportService = new TransportService( Settings.EMPTY, mock(Transport.class), - null, + mock(ThreadPool.class), TransportService.NOOP_TRANSPORT_INTERCEPTOR, x -> null, null, @@ -268,7 +268,7 @@ public void testGetAllUsers() { TransportService transportService = new TransportService( Settings.EMPTY, mock(Transport.class), - null, + mock(ThreadPool.class), TransportService.NOOP_TRANSPORT_INTERCEPTOR, x -> null, null, @@ -329,7 +329,7 @@ public void testGetStoreOnlyUsers() { TransportService transportService = new TransportService( Settings.EMPTY, mock(Transport.class), - null, + mock(ThreadPool.class), TransportService.NOOP_TRANSPORT_INTERCEPTOR, x -> null, null, @@ -393,7 +393,7 @@ public void testException() { TransportService transportService = new TransportService( Settings.EMPTY, mock(Transport.class), - null, + mock(ThreadPool.class), TransportService.NOOP_TRANSPORT_INTERCEPTOR, x -> null, null, diff --git a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/action/user/TransportPutUserActionTests.java b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/action/user/TransportPutUserActionTests.java index fb4def7261459..abc722bba4de2 100644 --- a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/action/user/TransportPutUserActionTests.java +++ b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/action/user/TransportPutUserActionTests.java @@ -64,7 +64,7 @@ public void testAnonymousUser() { TransportService transportService = new TransportService( Settings.EMPTY, mock(Transport.class), - null, + mock(ThreadPool.class), TransportService.NOOP_TRANSPORT_INTERCEPTOR, x -> null, null, @@ -100,7 +100,7 @@ public void testSystemUser() { TransportService transportService = new TransportService( Settings.EMPTY, mock(Transport.class), - null, + mock(ThreadPool.class), TransportService.NOOP_TRANSPORT_INTERCEPTOR, x -> null, null, @@ -157,7 +157,7 @@ public void testReservedUser() { TransportService transportService = new TransportService( Settings.EMPTY, mock(Transport.class), - null, + mock(ThreadPool.class), TransportService.NOOP_TRANSPORT_INTERCEPTOR, x -> null, null, @@ -193,7 +193,7 @@ public void testValidUser() { TransportService transportService = new TransportService( Settings.EMPTY, mock(Transport.class), - null, + mock(ThreadPool.class), TransportService.NOOP_TRANSPORT_INTERCEPTOR, x -> null, null, @@ -243,7 +243,7 @@ public void testInvalidUser() { TransportService transportService = new TransportService( Settings.EMPTY, mock(Transport.class), - null, + mock(ThreadPool.class), TransportService.NOOP_TRANSPORT_INTERCEPTOR, x -> null, null, @@ -271,7 +271,7 @@ public void testException() { TransportService transportService = new TransportService( Settings.EMPTY, mock(Transport.class), - null, + mock(ThreadPool.class), TransportService.NOOP_TRANSPORT_INTERCEPTOR, x -> null, null, diff --git a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/action/user/TransportSetEnabledActionTests.java b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/action/user/TransportSetEnabledActionTests.java index 30159e18b5057..280ebe2c6d246 100644 --- a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/action/user/TransportSetEnabledActionTests.java +++ b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/action/user/TransportSetEnabledActionTests.java @@ -70,7 +70,7 @@ public void testAnonymousUser() throws Exception { TransportService transportService = new TransportService( Settings.EMPTY, mock(Transport.class), - null, + mock(ThreadPool.class), TransportService.NOOP_TRANSPORT_INTERCEPTOR, x -> null, null, @@ -124,7 +124,7 @@ public void testInternalUser() throws Exception { TransportService transportService = new TransportService( Settings.EMPTY, mock(Transport.class), - null, + mock(ThreadPool.class), TransportService.NOOP_TRANSPORT_INTERCEPTOR, x -> null, null, @@ -198,7 +198,7 @@ public void testValidUser() throws Exception { TransportService transportService = new TransportService( Settings.EMPTY, mock(Transport.class), - null, + mock(ThreadPool.class), TransportService.NOOP_TRANSPORT_INTERCEPTOR, x -> null, null, @@ -267,7 +267,7 @@ public void testException() throws Exception { TransportService transportService = new TransportService( Settings.EMPTY, mock(Transport.class), - null, + mock(ThreadPool.class), TransportService.NOOP_TRANSPORT_INTERCEPTOR, x -> null, null, @@ -326,7 +326,7 @@ public void testUserModifyingThemselves() throws Exception { TransportService transportService = new TransportService( Settings.EMPTY, mock(Transport.class), - null, + mock(ThreadPool.class), TransportService.NOOP_TRANSPORT_INTERCEPTOR, x -> null, null, From 913e4a348dd7fc2f6472f42e1457d3e668c66798 Mon Sep 17 00:00:00 2001 From: David Turner Date: Wed, 26 Jan 2022 15:43:07 +0000 Subject: [PATCH 5/5] Use existing thread context where available --- .../action/search/MultiSearchActionTookTests.java | 2 +- .../support/replication/TransportWriteActionTests.java | 2 +- .../oidc/TransportOpenIdConnectLogoutActionTests.java | 2 +- .../TransportSamlInvalidateSessionActionTests.java | 2 +- .../action/saml/TransportSamlLogoutActionTests.java | 2 +- .../action/user/TransportPutUserActionTests.java | 2 +- .../action/user/TransportSetEnabledActionTests.java | 10 +++++----- 7 files changed, 11 insertions(+), 11 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/action/search/MultiSearchActionTookTests.java b/server/src/test/java/org/elasticsearch/action/search/MultiSearchActionTookTests.java index ca430d521936f..01354a3702fbc 100644 --- a/server/src/test/java/org/elasticsearch/action/search/MultiSearchActionTookTests.java +++ b/server/src/test/java/org/elasticsearch/action/search/MultiSearchActionTookTests.java @@ -122,7 +122,7 @@ private TransportMultiSearchAction createTransportMultiSearchAction(boolean cont TransportService transportService = new TransportService( Settings.EMPTY, mock(Transport.class), - mock(ThreadPool.class), + threadPool, TransportService.NOOP_TRANSPORT_INTERCEPTOR, boundAddress -> DiscoveryNode.createLocal(settings, boundAddress.publishAddress(), UUIDs.randomBase64UUID()), null, diff --git a/server/src/test/java/org/elasticsearch/action/support/replication/TransportWriteActionTests.java b/server/src/test/java/org/elasticsearch/action/support/replication/TransportWriteActionTests.java index 987bf708f453a..24d95025c09d6 100644 --- a/server/src/test/java/org/elasticsearch/action/support/replication/TransportWriteActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/support/replication/TransportWriteActionTests.java @@ -387,7 +387,7 @@ protected TestAction(boolean withDocumentFailureOnPrimary, boolean withDocumentF new TransportService( Settings.EMPTY, mock(Transport.class), - mock(ThreadPool.class), + TransportWriteActionTests.threadPool, TransportService.NOOP_TRANSPORT_INTERCEPTOR, x -> null, null, diff --git a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/action/oidc/TransportOpenIdConnectLogoutActionTests.java b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/action/oidc/TransportOpenIdConnectLogoutActionTests.java index e658a0129c457..0070dcff1950d 100644 --- a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/action/oidc/TransportOpenIdConnectLogoutActionTests.java +++ b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/action/oidc/TransportOpenIdConnectLogoutActionTests.java @@ -190,7 +190,7 @@ public void setup() throws Exception { final TransportService transportService = new TransportService( Settings.EMPTY, mock(Transport.class), - mock(ThreadPool.class), + threadPool, TransportService.NOOP_TRANSPORT_INTERCEPTOR, x -> null, null, diff --git a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/action/saml/TransportSamlInvalidateSessionActionTests.java b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/action/saml/TransportSamlInvalidateSessionActionTests.java index 3364f3f085fa6..0ba33bb720dfb 100644 --- a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/action/saml/TransportSamlInvalidateSessionActionTests.java +++ b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/action/saml/TransportSamlInvalidateSessionActionTests.java @@ -261,7 +261,7 @@ protected void final TransportService transportService = new TransportService( Settings.EMPTY, mock(Transport.class), - mock(ThreadPool.class), + threadPool, TransportService.NOOP_TRANSPORT_INTERCEPTOR, x -> null, null, diff --git a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/action/saml/TransportSamlLogoutActionTests.java b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/action/saml/TransportSamlLogoutActionTests.java index 07dd115e3e563..ee0deeabc7bb2 100644 --- a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/action/saml/TransportSamlLogoutActionTests.java +++ b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/action/saml/TransportSamlLogoutActionTests.java @@ -221,7 +221,7 @@ public void setup() throws Exception { final TransportService transportService = new TransportService( Settings.EMPTY, mock(Transport.class), - mock(ThreadPool.class), + threadPool, TransportService.NOOP_TRANSPORT_INTERCEPTOR, x -> null, null, diff --git a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/action/user/TransportPutUserActionTests.java b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/action/user/TransportPutUserActionTests.java index abc722bba4de2..69c7a7be85ad1 100644 --- a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/action/user/TransportPutUserActionTests.java +++ b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/action/user/TransportPutUserActionTests.java @@ -157,7 +157,7 @@ public void testReservedUser() { TransportService transportService = new TransportService( Settings.EMPTY, mock(Transport.class), - mock(ThreadPool.class), + threadPool, TransportService.NOOP_TRANSPORT_INTERCEPTOR, x -> null, null, diff --git a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/action/user/TransportSetEnabledActionTests.java b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/action/user/TransportSetEnabledActionTests.java index 280ebe2c6d246..2d94e6c37325c 100644 --- a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/action/user/TransportSetEnabledActionTests.java +++ b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/action/user/TransportSetEnabledActionTests.java @@ -70,7 +70,7 @@ public void testAnonymousUser() throws Exception { TransportService transportService = new TransportService( Settings.EMPTY, mock(Transport.class), - mock(ThreadPool.class), + threadPool, TransportService.NOOP_TRANSPORT_INTERCEPTOR, x -> null, null, @@ -124,7 +124,7 @@ public void testInternalUser() throws Exception { TransportService transportService = new TransportService( Settings.EMPTY, mock(Transport.class), - mock(ThreadPool.class), + threadPool, TransportService.NOOP_TRANSPORT_INTERCEPTOR, x -> null, null, @@ -198,7 +198,7 @@ public void testValidUser() throws Exception { TransportService transportService = new TransportService( Settings.EMPTY, mock(Transport.class), - mock(ThreadPool.class), + threadPool, TransportService.NOOP_TRANSPORT_INTERCEPTOR, x -> null, null, @@ -267,7 +267,7 @@ public void testException() throws Exception { TransportService transportService = new TransportService( Settings.EMPTY, mock(Transport.class), - mock(ThreadPool.class), + threadPool, TransportService.NOOP_TRANSPORT_INTERCEPTOR, x -> null, null, @@ -326,7 +326,7 @@ public void testUserModifyingThemselves() throws Exception { TransportService transportService = new TransportService( Settings.EMPTY, mock(Transport.class), - mock(ThreadPool.class), + threadPool, TransportService.NOOP_TRANSPORT_INTERCEPTOR, x -> null, null,