diff --git a/docs/changelog/83035.yaml b/docs/changelog/83035.yaml new file mode 100644 index 0000000000000..cfec033421bf9 --- /dev/null +++ b/docs/changelog/83035.yaml @@ -0,0 +1,5 @@ +pr: 83035 +summary: Correct context for `ClusterConnManager` listener +area: Network +type: bug +issues: [] diff --git a/server/src/main/java/org/elasticsearch/transport/ClusterConnectionManager.java b/server/src/main/java/org/elasticsearch/transport/ClusterConnectionManager.java index dd8c86b67c7fe..95a1dddd94da6 100644 --- a/server/src/main/java/org/elasticsearch/transport/ClusterConnectionManager.java +++ b/server/src/main/java/org/elasticsearch/transport/ClusterConnectionManager.java @@ -10,11 +10,13 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.support.ContextPreservingActionListener; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.common.util.concurrent.ListenableFuture; import org.elasticsearch.common.util.concurrent.RunOnce; +import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.core.AbstractRefCounted; import org.elasticsearch.core.Nullable; import org.elasticsearch.core.Releasable; @@ -44,18 +46,20 @@ public class ClusterConnectionManager implements ConnectionManager { private final AbstractRefCounted connectingRefCounter = AbstractRefCounted.of(this::pendingConnectionsComplete); private final Transport transport; + private final ThreadContext threadContext; private final ConnectionProfile defaultProfile; private final AtomicBoolean closing = new AtomicBoolean(false); private final CountDownLatch closeLatch = new CountDownLatch(1); private final DelegatingNodeConnectionListener connectionListener = new DelegatingNodeConnectionListener(); - public ClusterConnectionManager(Settings settings, Transport transport) { - this(ConnectionProfile.buildDefaultConnectionProfile(settings), transport); + public ClusterConnectionManager(Settings settings, Transport transport, ThreadContext threadContext) { + this(ConnectionProfile.buildDefaultConnectionProfile(settings), transport, threadContext); } - public ClusterConnectionManager(ConnectionProfile connectionProfile, Transport transport) { + public ClusterConnectionManager(ConnectionProfile connectionProfile, Transport transport, ThreadContext threadContext) { this.transport = transport; this.defaultProfile = connectionProfile; + this.threadContext = threadContext; } @Override @@ -91,7 +95,13 @@ public void connectToNode( ConnectionValidator connectionValidator, ActionListener listener ) throws ConnectTransportException { - connectToNodeOrRetry(node, connectionProfile, connectionValidator, 0, listener); + connectToNodeOrRetry( + node, + connectionProfile, + connectionValidator, + 0, + ContextPreservingActionListener.wrapPreservingContext(listener, threadContext) + ); } /** diff --git a/server/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java b/server/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java index 3421545bb6451..ea2474d7c74e4 100644 --- a/server/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java +++ b/server/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java @@ -197,7 +197,7 @@ int getNumNodesConnected() { } private static ConnectionManager createConnectionManager(ConnectionProfile connectionProfile, TransportService transportService) { - return new ClusterConnectionManager(connectionProfile, transportService.transport); + return new ClusterConnectionManager(connectionProfile, transportService.transport, transportService.threadPool.getThreadContext()); } ConnectionManager getConnectionManager() { diff --git a/server/src/main/java/org/elasticsearch/transport/TransportService.java b/server/src/main/java/org/elasticsearch/transport/TransportService.java index e692bc4908df4..703f964fabb4d 100644 --- a/server/src/main/java/org/elasticsearch/transport/TransportService.java +++ b/server/src/main/java/org/elasticsearch/transport/TransportService.java @@ -211,7 +211,7 @@ public TransportService( localNodeFactory, clusterSettings, taskHeaders, - new ClusterConnectionManager(settings, transport) + new ClusterConnectionManager(settings, transport, threadPool.getThreadContext()) ); } diff --git a/server/src/test/java/org/elasticsearch/action/main/MainActionTests.java b/server/src/test/java/org/elasticsearch/action/main/MainActionTests.java index 5db73fd0f8ee9..e23c358b9de26 100644 --- a/server/src/test/java/org/elasticsearch/action/main/MainActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/main/MainActionTests.java @@ -20,6 +20,7 @@ import org.elasticsearch.rest.RestStatus; import org.elasticsearch.tasks.Task; import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.Transport; import org.elasticsearch.transport.TransportService; @@ -77,7 +78,7 @@ public void testMainActionClusterAvailable() { TransportService transportService = new TransportService( Settings.EMPTY, mock(Transport.class), - null, + mock(ThreadPool.class), TransportService.NOOP_TRANSPORT_INTERCEPTOR, x -> null, null, diff --git a/server/src/test/java/org/elasticsearch/action/search/MultiSearchActionTookTests.java b/server/src/test/java/org/elasticsearch/action/search/MultiSearchActionTookTests.java index ae37a39620a8f..54cba57ffd763 100644 --- a/server/src/test/java/org/elasticsearch/action/search/MultiSearchActionTookTests.java +++ b/server/src/test/java/org/elasticsearch/action/search/MultiSearchActionTookTests.java @@ -122,7 +122,7 @@ private TransportMultiSearchAction createTransportMultiSearchAction(boolean cont TransportService transportService = new TransportService( Settings.EMPTY, mock(Transport.class), - null, + threadPool, TransportService.NOOP_TRANSPORT_INTERCEPTOR, boundAddress -> DiscoveryNode.createLocal(settings, boundAddress.publishAddress(), UUIDs.randomBase64UUID()), null, diff --git a/server/src/test/java/org/elasticsearch/action/support/replication/TransportWriteActionTests.java b/server/src/test/java/org/elasticsearch/action/support/replication/TransportWriteActionTests.java index 506715227c743..d61d5e13fb7ea 100644 --- a/server/src/test/java/org/elasticsearch/action/support/replication/TransportWriteActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/support/replication/TransportWriteActionTests.java @@ -387,7 +387,7 @@ protected TestAction(boolean withDocumentFailureOnPrimary, boolean withDocumentF new TransportService( Settings.EMPTY, mock(Transport.class), - null, + TransportWriteActionTests.threadPool, TransportService.NOOP_TRANSPORT_INTERCEPTOR, x -> null, null, diff --git a/server/src/test/java/org/elasticsearch/cluster/coordination/JoinHelperTests.java b/server/src/test/java/org/elasticsearch/cluster/coordination/JoinHelperTests.java index 0144898913656..ceec8c25c24ac 100644 --- a/server/src/test/java/org/elasticsearch/cluster/coordination/JoinHelperTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/coordination/JoinHelperTests.java @@ -26,6 +26,7 @@ import org.elasticsearch.test.transport.CapturingTransport; import org.elasticsearch.test.transport.CapturingTransport.CapturedRequest; import org.elasticsearch.test.transport.MockTransport; +import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.ClusterConnectionManager; import org.elasticsearch.transport.RemoteTransportException; import org.elasticsearch.transport.TransportException; @@ -53,15 +54,16 @@ public void testJoinDeduplication() { DeterministicTaskQueue deterministicTaskQueue = new DeterministicTaskQueue(); CapturingTransport capturingTransport = new HandshakingCapturingTransport(); DiscoveryNode localNode = new DiscoveryNode("node0", buildNewFakeTransportAddress(), Version.CURRENT); + final ThreadPool threadPool = deterministicTaskQueue.getThreadPool(); TransportService transportService = new TransportService( Settings.EMPTY, capturingTransport, - deterministicTaskQueue.getThreadPool(), + threadPool, TransportService.NOOP_TRANSPORT_INTERCEPTOR, x -> localNode, null, Collections.emptySet(), - new ClusterConnectionManager(Settings.EMPTY, capturingTransport) + new ClusterConnectionManager(Settings.EMPTY, capturingTransport, threadPool.getThreadContext()) ); JoinHelper joinHelper = new JoinHelper( Settings.EMPTY, diff --git a/server/src/test/java/org/elasticsearch/discovery/PeerFinderTests.java b/server/src/test/java/org/elasticsearch/discovery/PeerFinderTests.java index 593d6d7a7baf6..0d31e68d7a4d1 100644 --- a/server/src/test/java/org/elasticsearch/discovery/PeerFinderTests.java +++ b/server/src/test/java/org/elasticsearch/discovery/PeerFinderTests.java @@ -29,6 +29,7 @@ import org.elasticsearch.test.transport.CapturingTransport; import org.elasticsearch.test.transport.CapturingTransport.CapturedRequest; import org.elasticsearch.test.transport.StubbableConnectionManager; +import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.ClusterConnectionManager; import org.elasticsearch.transport.ConnectionManager; import org.elasticsearch.transport.TransportException; @@ -210,7 +211,13 @@ public void setup() { localNode = newDiscoveryNode("local-node"); - ConnectionManager innerConnectionManager = new ClusterConnectionManager(settings, capturingTransport); + final ThreadPool threadPool = deterministicTaskQueue.getThreadPool(); + + final ConnectionManager innerConnectionManager = new ClusterConnectionManager( + settings, + capturingTransport, + threadPool.getThreadContext() + ); StubbableConnectionManager connectionManager = new StubbableConnectionManager(innerConnectionManager); connectionManager.setDefaultNodeConnectedBehavior((cm, discoveryNode) -> { final boolean isConnected = connectedNodes.contains(discoveryNode); @@ -222,7 +229,7 @@ public void setup() { transportService = new TransportService( settings, capturingTransport, - deterministicTaskQueue.getThreadPool(), + threadPool, TransportService.NOOP_TRANSPORT_INTERCEPTOR, boundTransportAddress -> localNode, null, diff --git a/server/src/test/java/org/elasticsearch/transport/ClusterConnectionManagerTests.java b/server/src/test/java/org/elasticsearch/transport/ClusterConnectionManagerTests.java index df0cd7a6a1fa9..41ff27deca615 100644 --- a/server/src/test/java/org/elasticsearch/transport/ClusterConnectionManagerTests.java +++ b/server/src/test/java/org/elasticsearch/transport/ClusterConnectionManagerTests.java @@ -19,6 +19,7 @@ import org.elasticsearch.common.logging.Loggers; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.TransportAddress; +import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.core.Releasable; import org.elasticsearch.core.Releasables; import org.elasticsearch.core.TimeValue; @@ -45,6 +46,7 @@ import java.util.function.Supplier; import static org.elasticsearch.test.ActionListenerUtils.anyActionListener; +import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.notNullValue; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.eq; @@ -63,7 +65,7 @@ public void createConnectionManager() { Settings settings = Settings.builder().put("node.name", ClusterConnectionManagerTests.class.getSimpleName()).build(); threadPool = new ThreadPool(settings); transport = mock(Transport.class); - connectionManager = new ClusterConnectionManager(settings, transport); + connectionManager = new ClusterConnectionManager(settings, transport, threadPool.getThreadContext()); TimeValue oneSecond = new TimeValue(1000); TimeValue oneMinute = TimeValue.timeValueMinutes(1); connectionProfile = ConnectionProfile.buildSingleChannelProfile( @@ -254,6 +256,9 @@ public void testConcurrentConnects() throws Exception { int threadCount = between(1, 10); Releasable[] releasables = new Releasable[threadCount]; + final ThreadContext threadContext = threadPool.getThreadContext(); + final String contextHeader = "test-context-header"; + CyclicBarrier barrier = new CyclicBarrier(threadCount + 1); Semaphore pendingCloses = new Semaphore(threadCount); for (int i = 0; i < threadCount; i++) { @@ -265,27 +270,33 @@ public void testConcurrentConnects() throws Exception { throw new RuntimeException(e); } CountDownLatch latch = new CountDownLatch(1); - connectionManager.connectToNode(node, connectionProfile, validator, ActionListener.wrap(c -> { - assert connectionManager.nodeConnected(node); - - assertTrue(pendingCloses.tryAcquire()); - connectionManager.getConnection(node).addRemovedListener(ActionListener.wrap(pendingCloses::release)); - - if (randomBoolean()) { - releasables[threadIndex] = c; - nodeConnectedCount.incrementAndGet(); - } else { - Releasables.close(c); - nodeClosedCount.incrementAndGet(); - } - - assert latch.getCount() == 1; - latch.countDown(); - }, e -> { - nodeFailureCount.incrementAndGet(); - assert latch.getCount() == 1; - latch.countDown(); - })); + try (ThreadContext.StoredContext ignored = threadContext.stashContext()) { + final String contextValue = randomAlphaOfLength(10); + threadContext.putHeader(contextHeader, contextValue); + connectionManager.connectToNode(node, connectionProfile, validator, ActionListener.wrap(c -> { + assert connectionManager.nodeConnected(node); + assertThat(threadContext.getHeader(contextHeader), equalTo(contextValue)); + + assertTrue(pendingCloses.tryAcquire()); + connectionManager.getConnection(node).addRemovedListener(ActionListener.wrap(pendingCloses::release)); + + if (randomBoolean()) { + releasables[threadIndex] = c; + nodeConnectedCount.incrementAndGet(); + } else { + Releasables.close(c); + nodeClosedCount.incrementAndGet(); + } + + assert latch.getCount() == 1; + latch.countDown(); + }, e -> { + assertThat(threadContext.getHeader(contextHeader), equalTo(contextValue)); + nodeFailureCount.incrementAndGet(); + assert latch.getCount() == 1; + latch.countDown(); + })); + } try { latch.await(); } catch (InterruptedException e) { diff --git a/server/src/test/java/org/elasticsearch/transport/ProxyConnectionStrategyTests.java b/server/src/test/java/org/elasticsearch/transport/ProxyConnectionStrategyTests.java index 41538c1bb20dc..b862c7c29a157 100644 --- a/server/src/test/java/org/elasticsearch/transport/ProxyConnectionStrategyTests.java +++ b/server/src/test/java/org/elasticsearch/transport/ProxyConnectionStrategyTests.java @@ -80,7 +80,11 @@ public void testProxyStrategyWillOpenExpectedNumberOfConnectionsToAddress() { localService.start(); localService.acceptIncomingRequests(); - ClusterConnectionManager connectionManager = new ClusterConnectionManager(profile, localService.transport); + final ClusterConnectionManager connectionManager = new ClusterConnectionManager( + profile, + localService.transport, + threadPool.getThreadContext() + ); int numOfConnections = randomIntBetween(4, 8); try ( RemoteConnectionManager remoteConnectionManager = new RemoteConnectionManager(clusterAlias, connectionManager); @@ -124,7 +128,11 @@ public void testProxyStrategyWillOpenNewConnectionsOnDisconnect() throws Excepti localService.start(); localService.acceptIncomingRequests(); - ClusterConnectionManager connectionManager = new ClusterConnectionManager(profile, localService.transport); + final ClusterConnectionManager connectionManager = new ClusterConnectionManager( + profile, + localService.transport, + threadPool.getThreadContext() + ); int numOfConnections = randomIntBetween(4, 8); AtomicBoolean useAddress1 = new AtomicBoolean(true); @@ -186,7 +194,11 @@ public void testConnectFailsWithIncompatibleNodes() { localService.start(); localService.acceptIncomingRequests(); - ClusterConnectionManager connectionManager = new ClusterConnectionManager(profile, localService.transport); + final ClusterConnectionManager connectionManager = new ClusterConnectionManager( + profile, + localService.transport, + threadPool.getThreadContext() + ); int numOfConnections = randomIntBetween(4, 8); try ( RemoteConnectionManager remoteConnectionManager = new RemoteConnectionManager(clusterAlias, connectionManager); @@ -226,7 +238,11 @@ public void testClusterNameValidationPreventConnectingToDifferentClusters() thro localService.start(); localService.acceptIncomingRequests(); - ClusterConnectionManager connectionManager = new ClusterConnectionManager(profile, localService.transport); + final ClusterConnectionManager connectionManager = new ClusterConnectionManager( + profile, + localService.transport, + threadPool.getThreadContext() + ); int numOfConnections = randomIntBetween(4, 8); AtomicBoolean useAddress1 = new AtomicBoolean(true); @@ -289,7 +305,11 @@ public void testProxyStrategyWillResolveAddressesEachConnect() throws Exception localService.start(); localService.acceptIncomingRequests(); - ClusterConnectionManager connectionManager = new ClusterConnectionManager(profile, localService.transport); + final ClusterConnectionManager connectionManager = new ClusterConnectionManager( + profile, + localService.transport, + threadPool.getThreadContext() + ); int numOfConnections = randomIntBetween(4, 8); try ( RemoteConnectionManager remoteConnectionManager = new RemoteConnectionManager(clusterAlias, connectionManager); @@ -324,7 +344,11 @@ public void testProxyStrategyWillNeedToBeRebuiltIfNumOfSocketsOrAddressesOrServe localService.start(); localService.acceptIncomingRequests(); - ClusterConnectionManager connectionManager = new ClusterConnectionManager(profile, localService.transport); + final ClusterConnectionManager connectionManager = new ClusterConnectionManager( + profile, + localService.transport, + threadPool.getThreadContext() + ); int numOfConnections = randomIntBetween(4, 8); try ( RemoteConnectionManager remoteConnectionManager = new RemoteConnectionManager(clusterAlias, connectionManager); @@ -429,7 +453,11 @@ public void testServerNameAttributes() { String address = "localhost:" + address1.getPort(); - ClusterConnectionManager connectionManager = new ClusterConnectionManager(profile, localService.transport); + final ClusterConnectionManager connectionManager = new ClusterConnectionManager( + profile, + localService.transport, + threadPool.getThreadContext() + ); int numOfConnections = randomIntBetween(4, 8); try ( RemoteConnectionManager remoteConnectionManager = new RemoteConnectionManager(clusterAlias, connectionManager); diff --git a/server/src/test/java/org/elasticsearch/transport/RemoteConnectionManagerTests.java b/server/src/test/java/org/elasticsearch/transport/RemoteConnectionManagerTests.java index b045376848038..031bafbaf78fe 100644 --- a/server/src/test/java/org/elasticsearch/transport/RemoteConnectionManagerTests.java +++ b/server/src/test/java/org/elasticsearch/transport/RemoteConnectionManagerTests.java @@ -13,6 +13,7 @@ import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.TransportAddress; +import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.test.ESTestCase; import java.net.InetAddress; @@ -35,7 +36,10 @@ public class RemoteConnectionManagerTests extends ESTestCase { public void setUp() throws Exception { super.setUp(); transport = mock(Transport.class); - remoteConnectionManager = new RemoteConnectionManager("remote-cluster", new ClusterConnectionManager(Settings.EMPTY, transport)); + remoteConnectionManager = new RemoteConnectionManager( + "remote-cluster", + new ClusterConnectionManager(Settings.EMPTY, transport, new ThreadContext(Settings.EMPTY)) + ); } @SuppressWarnings("unchecked") diff --git a/server/src/test/java/org/elasticsearch/transport/RemoteConnectionStrategyTests.java b/server/src/test/java/org/elasticsearch/transport/RemoteConnectionStrategyTests.java index 6f474ee078672..0cba083a5076e 100644 --- a/server/src/test/java/org/elasticsearch/transport/RemoteConnectionStrategyTests.java +++ b/server/src/test/java/org/elasticsearch/transport/RemoteConnectionStrategyTests.java @@ -10,6 +10,7 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.core.TimeValue; import org.elasticsearch.test.ESTestCase; @@ -17,8 +18,14 @@ public class RemoteConnectionStrategyTests extends ESTestCase { + private static final ThreadContext threadContext = new ThreadContext(Settings.EMPTY); + public void testStrategyChangeMeansThatStrategyMustBeRebuilt() { - ClusterConnectionManager connectionManager = new ClusterConnectionManager(Settings.EMPTY, mock(Transport.class)); + final ClusterConnectionManager connectionManager = new ClusterConnectionManager( + Settings.EMPTY, + mock(Transport.class), + threadContext + ); RemoteConnectionManager remoteConnectionManager = new RemoteConnectionManager("cluster-alias", connectionManager); FakeConnectionStrategy first = new FakeConnectionStrategy( "cluster-alias", @@ -34,7 +41,11 @@ public void testStrategyChangeMeansThatStrategyMustBeRebuilt() { } public void testSameStrategyChangeMeansThatStrategyDoesNotNeedToBeRebuilt() { - ClusterConnectionManager connectionManager = new ClusterConnectionManager(Settings.EMPTY, mock(Transport.class)); + final ClusterConnectionManager connectionManager = new ClusterConnectionManager( + Settings.EMPTY, + mock(Transport.class), + threadContext + ); RemoteConnectionManager remoteConnectionManager = new RemoteConnectionManager("cluster-alias", connectionManager); FakeConnectionStrategy first = new FakeConnectionStrategy( "cluster-alias", @@ -50,7 +61,11 @@ public void testSameStrategyChangeMeansThatStrategyDoesNotNeedToBeRebuilt() { } public void testChangeInConnectionProfileMeansTheStrategyMustBeRebuilt() { - ClusterConnectionManager connectionManager = new ClusterConnectionManager(TestProfiles.LIGHT_PROFILE, mock(Transport.class)); + final ClusterConnectionManager connectionManager = new ClusterConnectionManager( + TestProfiles.LIGHT_PROFILE, + mock(Transport.class), + threadContext + ); assertEquals(TimeValue.MINUS_ONE, connectionManager.getConnectionProfile().getPingInterval()); assertEquals(Compression.Enabled.FALSE, connectionManager.getConnectionProfile().getCompressionEnabled()); assertEquals(Compression.Scheme.DEFLATE, connectionManager.getConnectionProfile().getCompressionScheme()); diff --git a/server/src/test/java/org/elasticsearch/transport/SniffConnectionStrategyTests.java b/server/src/test/java/org/elasticsearch/transport/SniffConnectionStrategyTests.java index bd93993f90639..3f0b844e34ea6 100644 --- a/server/src/test/java/org/elasticsearch/transport/SniffConnectionStrategyTests.java +++ b/server/src/test/java/org/elasticsearch/transport/SniffConnectionStrategyTests.java @@ -123,7 +123,11 @@ public void testSniffStrategyWillConnectToAndDiscoverNodes() { localService.start(); localService.acceptIncomingRequests(); - ClusterConnectionManager connectionManager = new ClusterConnectionManager(profile, localService.transport); + final ClusterConnectionManager connectionManager = new ClusterConnectionManager( + profile, + localService.transport, + threadPool.getThreadContext() + ); try ( RemoteConnectionManager remoteConnectionManager = new RemoteConnectionManager(clusterAlias, connectionManager); SniffConnectionStrategy strategy = new SniffConnectionStrategy( @@ -172,7 +176,11 @@ public void testSniffStrategyWillResolveDiscoveryNodesEachConnect() throws Excep localService.start(); localService.acceptIncomingRequests(); - ClusterConnectionManager connectionManager = new ClusterConnectionManager(profile, localService.transport); + final ClusterConnectionManager connectionManager = new ClusterConnectionManager( + profile, + localService.transport, + threadPool.getThreadContext() + ); try ( RemoteConnectionManager remoteConnectionManager = new RemoteConnectionManager(clusterAlias, connectionManager); SniffConnectionStrategy strategy = new SniffConnectionStrategy( @@ -220,7 +228,11 @@ public void testSniffStrategyWillConnectToMaxAllowedNodesAndOpenNewConnectionsOn localService.start(); localService.acceptIncomingRequests(); - ClusterConnectionManager connectionManager = new ClusterConnectionManager(profile, localService.transport); + final ClusterConnectionManager connectionManager = new ClusterConnectionManager( + profile, + localService.transport, + threadPool.getThreadContext() + ); try ( RemoteConnectionManager remoteConnectionManager = new RemoteConnectionManager(clusterAlias, connectionManager); SniffConnectionStrategy strategy = new SniffConnectionStrategy( @@ -277,7 +289,11 @@ public void testDiscoverWithSingleIncompatibleSeedNode() { localService.start(); localService.acceptIncomingRequests(); - ClusterConnectionManager connectionManager = new ClusterConnectionManager(profile, localService.transport); + final ClusterConnectionManager connectionManager = new ClusterConnectionManager( + profile, + localService.transport, + threadPool.getThreadContext() + ); try ( RemoteConnectionManager remoteConnectionManager = new RemoteConnectionManager(clusterAlias, connectionManager); SniffConnectionStrategy strategy = new SniffConnectionStrategy( @@ -316,7 +332,11 @@ public void testConnectFailsWithIncompatibleNodes() { localService.start(); localService.acceptIncomingRequests(); - ClusterConnectionManager connectionManager = new ClusterConnectionManager(profile, localService.transport); + final ClusterConnectionManager connectionManager = new ClusterConnectionManager( + profile, + localService.transport, + threadPool.getThreadContext() + ); try ( RemoteConnectionManager remoteConnectionManager = new RemoteConnectionManager(clusterAlias, connectionManager); SniffConnectionStrategy strategy = new SniffConnectionStrategy( @@ -358,7 +378,11 @@ public void testFilterNodesWithNodePredicate() { localService.start(); localService.acceptIncomingRequests(); - ClusterConnectionManager connectionManager = new ClusterConnectionManager(profile, localService.transport); + final ClusterConnectionManager connectionManager = new ClusterConnectionManager( + profile, + localService.transport, + threadPool.getThreadContext() + ); try ( RemoteConnectionManager remoteConnectionManager = new RemoteConnectionManager(clusterAlias, connectionManager); SniffConnectionStrategy strategy = new SniffConnectionStrategy( @@ -405,7 +429,11 @@ public void testConnectFailsIfNoConnectionsOpened() { localService.acceptIncomingRequests(); // Predicate excludes seed node as a possible connection - ClusterConnectionManager connectionManager = new ClusterConnectionManager(profile, localService.transport); + final ClusterConnectionManager connectionManager = new ClusterConnectionManager( + profile, + localService.transport, + threadPool.getThreadContext() + ); try ( RemoteConnectionManager remoteConnectionManager = new RemoteConnectionManager(clusterAlias, connectionManager); SniffConnectionStrategy strategy = new SniffConnectionStrategy( @@ -454,7 +482,11 @@ public void testClusterNameValidationPreventConnectingToDifferentClusters() thro localService.start(); localService.acceptIncomingRequests(); - ClusterConnectionManager connectionManager = new ClusterConnectionManager(profile, localService.transport); + final ClusterConnectionManager connectionManager = new ClusterConnectionManager( + profile, + localService.transport, + threadPool.getThreadContext() + ); try ( RemoteConnectionManager remoteConnectionManager = new RemoteConnectionManager(clusterAlias, connectionManager); SniffConnectionStrategy strategy = new SniffConnectionStrategy( @@ -522,7 +554,11 @@ public void testMultipleCallsToConnectEnsuresConnection() { localService.start(); localService.acceptIncomingRequests(); - ClusterConnectionManager connectionManager = new ClusterConnectionManager(profile, localService.transport); + final ClusterConnectionManager connectionManager = new ClusterConnectionManager( + profile, + localService.transport, + threadPool.getThreadContext() + ); try ( RemoteConnectionManager remoteConnectionManager = new RemoteConnectionManager(clusterAlias, connectionManager); SniffConnectionStrategy strategy = new SniffConnectionStrategy( @@ -611,7 +647,11 @@ public void testConfiguredProxyAddressModeWillReplaceNodeAddress() { List seedNodes = Collections.singletonList(accessibleNode.toString()); TransportAddress proxyAddress = accessibleNode.getAddress(); - ClusterConnectionManager connectionManager = new ClusterConnectionManager(profile, transport); + final ClusterConnectionManager connectionManager = new ClusterConnectionManager( + profile, + transport, + threadPool.getThreadContext() + ); try ( RemoteConnectionManager remoteConnectionManager = new RemoteConnectionManager(clusterAlias, connectionManager); SniffConnectionStrategy strategy = new SniffConnectionStrategy( @@ -659,7 +699,11 @@ public void testSniffStrategyWillNeedToBeRebuiltIfNumOfConnectionsOrSeedsOrProxy localService.start(); localService.acceptIncomingRequests(); - ClusterConnectionManager connectionManager = new ClusterConnectionManager(profile, localService.transport); + final ClusterConnectionManager connectionManager = new ClusterConnectionManager( + profile, + localService.transport, + threadPool.getThreadContext() + ); try ( RemoteConnectionManager remoteConnectionManager = new RemoteConnectionManager(clusterAlias, connectionManager); SniffConnectionStrategy strategy = new SniffConnectionStrategy( diff --git a/test/framework/src/main/java/org/elasticsearch/test/transport/MockTransport.java b/test/framework/src/main/java/org/elasticsearch/test/transport/MockTransport.java index 55d2e66bdcc0d..1a725392e0af4 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/transport/MockTransport.java +++ b/test/framework/src/main/java/org/elasticsearch/test/transport/MockTransport.java @@ -57,7 +57,9 @@ public TransportService createTransportService( @Nullable ClusterSettings clusterSettings, Set taskHeaders ) { - StubbableConnectionManager connectionManager = new StubbableConnectionManager(new ClusterConnectionManager(settings, this)); + final StubbableConnectionManager connectionManager = new StubbableConnectionManager( + new ClusterConnectionManager(settings, this, threadPool.getThreadContext()) + ); connectionManager.setDefaultNodeConnectedBehavior((cm, node) -> false); connectionManager.setDefaultGetConnectionBehavior((cm, discoveryNode) -> createConnection(discoveryNode)); return new TransportService( diff --git a/test/framework/src/main/java/org/elasticsearch/test/transport/MockTransportService.java b/test/framework/src/main/java/org/elasticsearch/test/transport/MockTransportService.java index 2a8dde85eba1c..33727957869ac 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/transport/MockTransportService.java +++ b/test/framework/src/main/java/org/elasticsearch/test/transport/MockTransportService.java @@ -223,7 +223,7 @@ private MockTransportService( localNodeFactory, clusterSettings, taskHeaders, - new StubbableConnectionManager(new ClusterConnectionManager(settings, transport)) + new StubbableConnectionManager(new ClusterConnectionManager(settings, transport, threadPool.getThreadContext())) ); this.original = transport.getDelegate(); } diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/action/TransportXPackInfoActionTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/action/TransportXPackInfoActionTests.java index c62b73f7cc944..24c520b95a292 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/action/TransportXPackInfoActionTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/action/TransportXPackInfoActionTests.java @@ -17,6 +17,7 @@ import org.elasticsearch.protocol.xpack.license.LicenseStatus; import org.elasticsearch.tasks.Task; import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.Transport; import org.elasticsearch.transport.TransportService; import org.elasticsearch.xpack.core.XPackFeatureSet; @@ -124,7 +125,7 @@ private void checkAction( TransportService transportService = new TransportService( Settings.EMPTY, mock(Transport.class), - null, + mock(ThreadPool.class), TransportService.NOOP_TRANSPORT_INTERCEPTOR, x -> null, null, diff --git a/x-pack/plugin/identity-provider/src/test/java/org/elasticsearch/xpack/idp/action/TransportSamlInitiateSingleSignOnActionTests.java b/x-pack/plugin/identity-provider/src/test/java/org/elasticsearch/xpack/idp/action/TransportSamlInitiateSingleSignOnActionTests.java index 65e3162ddaee4..ee8cf4f6d7226 100644 --- a/x-pack/plugin/identity-provider/src/test/java/org/elasticsearch/xpack/idp/action/TransportSamlInitiateSingleSignOnActionTests.java +++ b/x-pack/plugin/identity-provider/src/test/java/org/elasticsearch/xpack/idp/action/TransportSamlInitiateSingleSignOnActionTests.java @@ -136,7 +136,7 @@ private TransportSamlInitiateSingleSignOnAction setupTransportAction(boolean wit final TransportService transportService = new TransportService( Settings.EMPTY, mock(Transport.class), - null, + threadPool, TransportService.NOOP_TRANSPORT_INTERCEPTOR, x -> null, null, diff --git a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/action/oidc/TransportOpenIdConnectLogoutActionTests.java b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/action/oidc/TransportOpenIdConnectLogoutActionTests.java index e23adb192c006..d0ef08bf20c43 100644 --- a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/action/oidc/TransportOpenIdConnectLogoutActionTests.java +++ b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/action/oidc/TransportOpenIdConnectLogoutActionTests.java @@ -205,7 +205,7 @@ public void setup() throws Exception { final TransportService transportService = new TransportService( Settings.EMPTY, mock(Transport.class), - null, + threadPool, TransportService.NOOP_TRANSPORT_INTERCEPTOR, x -> null, null, diff --git a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/action/role/TransportDeleteRoleActionTests.java b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/action/role/TransportDeleteRoleActionTests.java index d2aaa56d98fc4..1dd1b1ffd7719 100644 --- a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/action/role/TransportDeleteRoleActionTests.java +++ b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/action/role/TransportDeleteRoleActionTests.java @@ -12,6 +12,7 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.tasks.Task; import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.Transport; import org.elasticsearch.transport.TransportService; import org.elasticsearch.xpack.core.security.action.role.DeleteRoleRequest; @@ -45,7 +46,7 @@ public void testReservedRole() { TransportService transportService = new TransportService( Settings.EMPTY, mock(Transport.class), - null, + mock(ThreadPool.class), TransportService.NOOP_TRANSPORT_INTERCEPTOR, (x) -> null, null, @@ -82,7 +83,7 @@ public void testValidRole() { TransportService transportService = new TransportService( Settings.EMPTY, mock(Transport.class), - null, + mock(ThreadPool.class), TransportService.NOOP_TRANSPORT_INTERCEPTOR, (x) -> null, null, @@ -130,7 +131,7 @@ public void testException() { TransportService transportService = new TransportService( Settings.EMPTY, mock(Transport.class), - null, + mock(ThreadPool.class), TransportService.NOOP_TRANSPORT_INTERCEPTOR, (x) -> null, null, diff --git a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/action/role/TransportGetRolesActionTests.java b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/action/role/TransportGetRolesActionTests.java index 88365e24581e6..e5b776057d210 100644 --- a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/action/role/TransportGetRolesActionTests.java +++ b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/action/role/TransportGetRolesActionTests.java @@ -13,6 +13,7 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.tasks.Task; import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.Transport; import org.elasticsearch.transport.TransportService; import org.elasticsearch.xpack.core.security.action.role.GetRolesRequest; @@ -50,7 +51,7 @@ public void testReservedRoles() { TransportService transportService = new TransportService( Settings.EMPTY, mock(Transport.class), - null, + mock(ThreadPool.class), TransportService.NOOP_TRANSPORT_INTERCEPTOR, x -> null, null, @@ -110,7 +111,7 @@ public void testStoreRoles() { TransportService transportService = new TransportService( Settings.EMPTY, mock(Transport.class), - null, + mock(ThreadPool.class), TransportService.NOOP_TRANSPORT_INTERCEPTOR, x -> null, null, @@ -176,7 +177,7 @@ public void testGetAllOrMix() { TransportService transportService = new TransportService( Settings.EMPTY, mock(Transport.class), - null, + mock(ThreadPool.class), TransportService.NOOP_TRANSPORT_INTERCEPTOR, x -> null, null, @@ -255,7 +256,7 @@ public void testException() { TransportService transportService = new TransportService( Settings.EMPTY, mock(Transport.class), - null, + mock(ThreadPool.class), TransportService.NOOP_TRANSPORT_INTERCEPTOR, x -> null, null, diff --git a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/action/role/TransportPutRoleActionTests.java b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/action/role/TransportPutRoleActionTests.java index 873619ed9b164..9038f26aa4b03 100644 --- a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/action/role/TransportPutRoleActionTests.java +++ b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/action/role/TransportPutRoleActionTests.java @@ -21,6 +21,7 @@ import org.elasticsearch.join.query.HasParentQueryBuilder; import org.elasticsearch.tasks.Task; import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.Transport; import org.elasticsearch.transport.TransportService; import org.elasticsearch.xcontent.NamedXContentRegistry; @@ -87,7 +88,7 @@ public void testReservedRole() { TransportService transportService = new TransportService( Settings.EMPTY, mock(Transport.class), - null, + mock(ThreadPool.class), TransportService.NOOP_TRANSPORT_INTERCEPTOR, x -> null, null, @@ -129,7 +130,7 @@ public void testValidRole() { TransportService transportService = new TransportService( Settings.EMPTY, mock(Transport.class), - null, + mock(ThreadPool.class), TransportService.NOOP_TRANSPORT_INTERCEPTOR, x -> null, null, @@ -182,7 +183,7 @@ public void testException() { TransportService transportService = new TransportService( Settings.EMPTY, mock(Transport.class), - null, + mock(ThreadPool.class), TransportService.NOOP_TRANSPORT_INTERCEPTOR, x -> null, null, @@ -232,7 +233,7 @@ public void testCreationOfRoleWithMalformedQueryJsonFails() { TransportService transportService = new TransportService( Settings.EMPTY, mock(Transport.class), - null, + mock(ThreadPool.class), TransportService.NOOP_TRANSPORT_INTERCEPTOR, x -> null, null, @@ -287,7 +288,7 @@ public void testCreationOfRoleWithUnsupportedQueryFails() throws Exception { TransportService transportService = new TransportService( Settings.EMPTY, mock(Transport.class), - null, + mock(ThreadPool.class), TransportService.NOOP_TRANSPORT_INTERCEPTOR, x -> null, null, diff --git a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/action/rolemapping/TransportGetRoleMappingsActionTests.java b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/action/rolemapping/TransportGetRoleMappingsActionTests.java index cfe056c80f919..6e8698f095d32 100644 --- a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/action/rolemapping/TransportGetRoleMappingsActionTests.java +++ b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/action/rolemapping/TransportGetRoleMappingsActionTests.java @@ -13,6 +13,7 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.tasks.Task; import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.Transport; import org.elasticsearch.transport.TransportService; import org.elasticsearch.xpack.core.security.action.rolemapping.GetRoleMappingsRequest; @@ -51,7 +52,7 @@ public void setupMocks() { TransportService transportService = new TransportService( Settings.EMPTY, mock(Transport.class), - null, + mock(ThreadPool.class), TransportService.NOOP_TRANSPORT_INTERCEPTOR, x -> null, null, diff --git a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/action/rolemapping/TransportPutRoleMappingActionTests.java b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/action/rolemapping/TransportPutRoleMappingActionTests.java index 13fb17bddeca9..6f789a10a3a6c 100644 --- a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/action/rolemapping/TransportPutRoleMappingActionTests.java +++ b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/action/rolemapping/TransportPutRoleMappingActionTests.java @@ -12,6 +12,7 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.tasks.Task; import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.Transport; import org.elasticsearch.transport.TransportService; import org.elasticsearch.xpack.core.security.action.rolemapping.PutRoleMappingRequest; @@ -48,7 +49,7 @@ public void setupMocks() { TransportService transportService = new TransportService( Settings.EMPTY, mock(Transport.class), - null, + mock(ThreadPool.class), TransportService.NOOP_TRANSPORT_INTERCEPTOR, x -> null, null, diff --git a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/action/saml/TransportSamlInvalidateSessionActionTests.java b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/action/saml/TransportSamlInvalidateSessionActionTests.java index 45c1cf6a11c72..6700cc82c5afa 100644 --- a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/action/saml/TransportSamlInvalidateSessionActionTests.java +++ b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/action/saml/TransportSamlInvalidateSessionActionTests.java @@ -266,7 +266,7 @@ protected void final TransportService transportService = new TransportService( Settings.EMPTY, mock(Transport.class), - null, + threadPool, TransportService.NOOP_TRANSPORT_INTERCEPTOR, x -> null, null, diff --git a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/action/saml/TransportSamlLogoutActionTests.java b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/action/saml/TransportSamlLogoutActionTests.java index 2b6663119b233..a04e241aa082b 100644 --- a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/action/saml/TransportSamlLogoutActionTests.java +++ b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/action/saml/TransportSamlLogoutActionTests.java @@ -241,7 +241,7 @@ public void setup() throws Exception { final TransportService transportService = new TransportService( Settings.EMPTY, mock(Transport.class), - null, + threadPool, TransportService.NOOP_TRANSPORT_INTERCEPTOR, x -> null, null, diff --git a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/action/user/TransportAuthenticateActionTests.java b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/action/user/TransportAuthenticateActionTests.java index 75f25396c7539..d0306731d5cdf 100644 --- a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/action/user/TransportAuthenticateActionTests.java +++ b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/action/user/TransportAuthenticateActionTests.java @@ -13,6 +13,7 @@ import org.elasticsearch.core.List; import org.elasticsearch.tasks.Task; import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.Transport; import org.elasticsearch.transport.TransportService; import org.elasticsearch.xpack.core.security.SecurityContext; @@ -52,7 +53,7 @@ public void testInternalUser() { TransportService transportService = new TransportService( Settings.EMPTY, mock(Transport.class), - null, + mock(ThreadPool.class), TransportService.NOOP_TRANSPORT_INTERCEPTOR, x -> null, null, @@ -89,7 +90,7 @@ public void testNullUser() { TransportService transportService = new TransportService( Settings.EMPTY, mock(Transport.class), - null, + mock(ThreadPool.class), TransportService.NOOP_TRANSPORT_INTERCEPTOR, x -> null, null, @@ -136,7 +137,7 @@ public void testValidAuthentication() { TransportService transportService = new TransportService( Settings.EMPTY, mock(Transport.class), - null, + mock(ThreadPool.class), TransportService.NOOP_TRANSPORT_INTERCEPTOR, x -> null, null, diff --git a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/action/user/TransportChangePasswordActionTests.java b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/action/user/TransportChangePasswordActionTests.java index da00818b77d63..d27dd5377cca0 100644 --- a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/action/user/TransportChangePasswordActionTests.java +++ b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/action/user/TransportChangePasswordActionTests.java @@ -14,6 +14,7 @@ import org.elasticsearch.tasks.Task; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.SecuritySettingsSourceField; +import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.Transport; import org.elasticsearch.transport.TransportService; import org.elasticsearch.xpack.core.XPackSettings; @@ -61,7 +62,7 @@ public void testAnonymousUser() { TransportService transportService = new TransportService( Settings.EMPTY, mock(Transport.class), - null, + mock(ThreadPool.class), TransportService.NOOP_TRANSPORT_INTERCEPTOR, x -> null, null, @@ -105,7 +106,7 @@ public void testInternalUsers() { TransportService transportService = new TransportService( Settings.EMPTY, mock(Transport.class), - null, + mock(ThreadPool.class), TransportService.NOOP_TRANSPORT_INTERCEPTOR, x -> null, null, @@ -167,7 +168,7 @@ public void testValidUser() { TransportService transportService = new TransportService( Settings.EMPTY, mock(Transport.class), - null, + mock(ThreadPool.class), TransportService.NOOP_TRANSPORT_INTERCEPTOR, x -> null, null, @@ -212,7 +213,7 @@ public void testIncorrectPasswordHashingAlgorithm() { TransportService transportService = new TransportService( Settings.EMPTY, mock(Transport.class), - null, + mock(ThreadPool.class), TransportService.NOOP_TRANSPORT_INTERCEPTOR, x -> null, null, @@ -266,7 +267,7 @@ public void testException() { TransportService transportService = new TransportService( Settings.EMPTY, mock(Transport.class), - null, + mock(ThreadPool.class), TransportService.NOOP_TRANSPORT_INTERCEPTOR, x -> null, null, diff --git a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/action/user/TransportDeleteUserActionTests.java b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/action/user/TransportDeleteUserActionTests.java index 8c2f964152ec1..1511c906ca1df 100644 --- a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/action/user/TransportDeleteUserActionTests.java +++ b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/action/user/TransportDeleteUserActionTests.java @@ -12,6 +12,7 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.tasks.Task; import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.Transport; import org.elasticsearch.transport.TransportService; import org.elasticsearch.xpack.core.security.action.user.DeleteUserRequest; @@ -51,7 +52,7 @@ public void testAnonymousUser() { TransportService transportService = new TransportService( Settings.EMPTY, mock(Transport.class), - null, + mock(ThreadPool.class), TransportService.NOOP_TRANSPORT_INTERCEPTOR, x -> null, null, @@ -86,7 +87,7 @@ public void testInternalUser() { TransportService transportService = new TransportService( Settings.EMPTY, mock(Transport.class), - null, + mock(ThreadPool.class), TransportService.NOOP_TRANSPORT_INTERCEPTOR, x -> null, null, @@ -134,7 +135,7 @@ public void testReservedUser() { TransportService transportService = new TransportService( Settings.EMPTY, mock(Transport.class), - null, + mock(ThreadPool.class), TransportService.NOOP_TRANSPORT_INTERCEPTOR, x -> null, null, @@ -175,7 +176,7 @@ public void testValidUser() { TransportService transportService = new TransportService( Settings.EMPTY, mock(Transport.class), - null, + mock(ThreadPool.class), TransportService.NOOP_TRANSPORT_INTERCEPTOR, x -> null, null, @@ -226,7 +227,7 @@ public void testException() { TransportService transportService = new TransportService( Settings.EMPTY, mock(Transport.class), - null, + mock(ThreadPool.class), TransportService.NOOP_TRANSPORT_INTERCEPTOR, x -> null, null, diff --git a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/action/user/TransportGetUsersActionTests.java b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/action/user/TransportGetUsersActionTests.java index 204ed487cd08e..bf3a9bb837f8d 100644 --- a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/action/user/TransportGetUsersActionTests.java +++ b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/action/user/TransportGetUsersActionTests.java @@ -102,7 +102,7 @@ public void testAnonymousUser() { TransportService transportService = new TransportService( Settings.EMPTY, mock(Transport.class), - null, + mock(ThreadPool.class), TransportService.NOOP_TRANSPORT_INTERCEPTOR, x -> null, null, @@ -149,7 +149,7 @@ public void testInternalUser() { TransportService transportService = new TransportService( Settings.EMPTY, mock(Transport.class), - null, + mock(ThreadPool.class), TransportService.NOOP_TRANSPORT_INTERCEPTOR, x -> null, null, @@ -217,7 +217,7 @@ public void testReservedUsersOnly() { TransportService transportService = new TransportService( Settings.EMPTY, mock(Transport.class), - null, + mock(ThreadPool.class), TransportService.NOOP_TRANSPORT_INTERCEPTOR, x -> null, null, @@ -279,7 +279,7 @@ public void testGetAllUsers() { TransportService transportService = new TransportService( Settings.EMPTY, mock(Transport.class), - null, + mock(ThreadPool.class), TransportService.NOOP_TRANSPORT_INTERCEPTOR, x -> null, null, @@ -340,7 +340,7 @@ public void testGetStoreOnlyUsers() { TransportService transportService = new TransportService( Settings.EMPTY, mock(Transport.class), - null, + mock(ThreadPool.class), TransportService.NOOP_TRANSPORT_INTERCEPTOR, x -> null, null, @@ -404,7 +404,7 @@ public void testException() { TransportService transportService = new TransportService( Settings.EMPTY, mock(Transport.class), - null, + mock(ThreadPool.class), TransportService.NOOP_TRANSPORT_INTERCEPTOR, x -> null, null, diff --git a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/action/user/TransportPutUserActionTests.java b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/action/user/TransportPutUserActionTests.java index 9cde42f7f1391..4d01c6669b8fd 100644 --- a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/action/user/TransportPutUserActionTests.java +++ b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/action/user/TransportPutUserActionTests.java @@ -65,7 +65,7 @@ public void testAnonymousUser() { TransportService transportService = new TransportService( Settings.EMPTY, mock(Transport.class), - null, + mock(ThreadPool.class), TransportService.NOOP_TRANSPORT_INTERCEPTOR, x -> null, null, @@ -101,7 +101,7 @@ public void testSystemUser() { TransportService transportService = new TransportService( Settings.EMPTY, mock(Transport.class), - null, + mock(ThreadPool.class), TransportService.NOOP_TRANSPORT_INTERCEPTOR, x -> null, null, @@ -161,7 +161,7 @@ public void testReservedUser() { TransportService transportService = new TransportService( Settings.EMPTY, mock(Transport.class), - null, + threadPool, TransportService.NOOP_TRANSPORT_INTERCEPTOR, x -> null, null, @@ -197,7 +197,7 @@ public void testValidUser() { TransportService transportService = new TransportService( Settings.EMPTY, mock(Transport.class), - null, + mock(ThreadPool.class), TransportService.NOOP_TRANSPORT_INTERCEPTOR, x -> null, null, @@ -247,7 +247,7 @@ public void testInvalidUser() { TransportService transportService = new TransportService( Settings.EMPTY, mock(Transport.class), - null, + mock(ThreadPool.class), TransportService.NOOP_TRANSPORT_INTERCEPTOR, x -> null, null, @@ -275,7 +275,7 @@ public void testException() { TransportService transportService = new TransportService( Settings.EMPTY, mock(Transport.class), - null, + mock(ThreadPool.class), TransportService.NOOP_TRANSPORT_INTERCEPTOR, x -> null, null, diff --git a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/action/user/TransportSetEnabledActionTests.java b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/action/user/TransportSetEnabledActionTests.java index 72a4a05355703..b6799d435d522 100644 --- a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/action/user/TransportSetEnabledActionTests.java +++ b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/action/user/TransportSetEnabledActionTests.java @@ -70,7 +70,7 @@ public void testAnonymousUser() throws Exception { TransportService transportService = new TransportService( Settings.EMPTY, mock(Transport.class), - null, + threadPool, TransportService.NOOP_TRANSPORT_INTERCEPTOR, x -> null, null, @@ -124,7 +124,7 @@ public void testInternalUser() throws Exception { TransportService transportService = new TransportService( Settings.EMPTY, mock(Transport.class), - null, + threadPool, TransportService.NOOP_TRANSPORT_INTERCEPTOR, x -> null, null, @@ -198,7 +198,7 @@ public void testValidUser() throws Exception { TransportService transportService = new TransportService( Settings.EMPTY, mock(Transport.class), - null, + threadPool, TransportService.NOOP_TRANSPORT_INTERCEPTOR, x -> null, null, @@ -267,7 +267,7 @@ public void testException() throws Exception { TransportService transportService = new TransportService( Settings.EMPTY, mock(Transport.class), - null, + threadPool, TransportService.NOOP_TRANSPORT_INTERCEPTOR, x -> null, null, @@ -326,7 +326,7 @@ public void testUserModifyingThemselves() throws Exception { TransportService transportService = new TransportService( Settings.EMPTY, mock(Transport.class), - null, + threadPool, TransportService.NOOP_TRANSPORT_INTERCEPTOR, x -> null, null,