From 6fdf9083e80f5bf560d4ddb1c500c458c4bf1d0c Mon Sep 17 00:00:00 2001 From: Pawan Kartik Date: Mon, 30 Jun 2025 22:56:08 +0100 Subject: [PATCH 1/7] Force reconnect to remote clusters with a short timeout for CPS --- .../action/search/TransportSearchAction.java | 201 +++++++++++++----- .../search/TransportSearchActionTests.java | 45 ++-- 2 files changed, 177 insertions(+), 69 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java b/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java index 69260bcac105c..5b32e76d93fac 100644 --- a/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java +++ b/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java @@ -30,6 +30,7 @@ import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.HandledTransportAction; import org.elasticsearch.action.support.IndicesOptions; +import org.elasticsearch.action.support.SubscribableListener; import org.elasticsearch.action.support.master.MasterNodeRequest; import org.elasticsearch.client.internal.Client; import org.elasticsearch.cluster.ClusterState; @@ -167,6 +168,8 @@ public class TransportSearchAction extends HandledTransportAction buildPerIndexOriginalIndices( @@ -445,7 +450,9 @@ void executeRequest( projectState, clusters, searchPhaseProvider.apply(l) - ) + ), + transportService, + alwaysEstablishConnection ); } else { final SearchContextId searchContext = resolvedIndices.getSearchContextId(); @@ -505,7 +512,8 @@ void executeRequest( clusters, searchPhaseProvider.apply(finalDelegate) ); - }) + }), + alwaysEstablishConnection ); } } @@ -633,6 +641,40 @@ public static boolean shouldMinimizeRoundtrips(SearchRequest searchRequest) { || source.collapse().getInnerHits().isEmpty(); } + /** + * Return a subscribable listener with optional timeout depending on force reconnect setting is registered or + * not. + * @param alwaysEstablishConnection If we're running in a context where we always need to re-connect. + * This value determines if we need to add a short timeout to avoid waiting + * for long durations to reconnect. + * @param threadPool The thread pool that'll be used for the timeout. + * @param timeoutExecutor The executor that should be used for the timeout. + * @return SubscribableListener A listener with optionally added timeout. + */ + private static SubscribableListener getListenerWithOptionalTimeout( + boolean alwaysEstablishConnection, + ThreadPool threadPool, + Executor timeoutExecutor + ) { + var subscribableListener = new SubscribableListener(); + if (alwaysEstablishConnection) { + subscribableListener.addTimeout(TimeValue.timeValueSeconds(FORCE_CONNECT_CLUSTER_DEFAULT_TIMEOUT), threadPool, timeoutExecutor); + } + + return subscribableListener; + } + + /** + * The default disconnected strategy for Elasticsearch is RECONNECT_UNLESS_SKIP_UNAVAILABLE. So we either force + * connect if required (like in CPS) or when skip unavailable is false for a cluster. + * @param alwaysEstablishConnection If we're running in a context where we always need to re-connect. + * @param skipUnavailable The usual skip unavailable setting. + * @return boolean If we should always force reconnect. + */ + private static boolean shouldEstablishConnection(boolean alwaysEstablishConnection, boolean skipUnavailable) { + return alwaysEstablishConnection || skipUnavailable == false; + } + /** * Handles ccs_minimize_roundtrips=true */ @@ -647,7 +689,9 @@ static void ccsRemoteReduce( RemoteClusterService remoteClusterService, ThreadPool threadPool, ActionListener listener, - BiConsumer> localSearchConsumer + BiConsumer> localSearchConsumer, + TransportService transportService, + boolean shouldForceReconnectCluster ) { final var remoteClientResponseExecutor = threadPool.executor(ThreadPool.Names.SEARCH_COORDINATION); if (resolvedIndices.getLocalIndices() == null && resolvedIndices.getRemoteClusterIndices().size() == 1) { @@ -665,12 +709,9 @@ static void ccsRemoteReduce( timeProvider.absoluteStartMillis(), true ); - var remoteClusterClient = remoteClusterService.getRemoteClusterClient( - clusterAlias, - remoteClientResponseExecutor, - RemoteClusterService.DisconnectedStrategy.RECONNECT_UNLESS_SKIP_UNAVAILABLE - ); - remoteClusterClient.execute(TransportSearchAction.REMOTE_TYPE, ccsSearchRequest, new ActionListener<>() { + + var connectionListener = getListenerWithOptionalTimeout(shouldForceReconnectCluster, threadPool, remoteClientResponseExecutor); + var searchListener = new ActionListener() { @Override public void onResponse(SearchResponse searchResponse) { // overwrite the existing cluster entry with the updated one @@ -713,7 +754,25 @@ public void onFailure(Exception e) { listener.onFailure(wrapRemoteClusterFailure(clusterAlias, e)); } } - }); + }; + + connectionListener.addListener( + searchListener.delegateFailure( + (responseListener, connection) -> transportService.sendRequest( + connection, + TransportSearchAction.TYPE.name(), + ccsSearchRequest, + TransportRequestOptions.EMPTY, + new ActionListenerResponseHandler<>(responseListener, SearchResponse::new, remoteClientResponseExecutor) + ) + ) + ); + + remoteClusterService.maybeEnsureConnectedAndGetConnection( + clusterAlias, + shouldEstablishConnection(shouldForceReconnectCluster, skipUnavailable), + connectionListener + ); } else { SearchResponseMerger searchResponseMerger = createSearchResponseMerger( searchRequest.source(), @@ -748,12 +807,30 @@ public void onFailure(Exception e) { task.getProgressListener(), listener ); - final var remoteClusterClient = remoteClusterService.getRemoteClusterClient( + + SubscribableListener connectionListener = getListenerWithOptionalTimeout( + shouldForceReconnectCluster, + threadPool, + remoteClientResponseExecutor + ); + + connectionListener.addListener( + ccsListener.delegateFailure( + (responseListener, connection) -> transportService.sendRequest( + connection, + TransportSearchAction.REMOTE_TYPE.name(), + ccsSearchRequest, + TransportRequestOptions.EMPTY, + new ActionListenerResponseHandler<>(responseListener, SearchResponse::new, remoteClientResponseExecutor) + ) + ) + ); + + remoteClusterService.maybeEnsureConnectedAndGetConnection( clusterAlias, - remoteClientResponseExecutor, - RemoteClusterService.DisconnectedStrategy.RECONNECT_UNLESS_SKIP_UNAVAILABLE + shouldEstablishConnection(shouldForceReconnectCluster, skipUnavailable), + connectionListener ); - remoteClusterClient.execute(TransportSearchAction.REMOTE_TYPE, ccsSearchRequest, ccsListener); } if (resolvedIndices.getLocalIndices() != null) { ActionListener ccsListener = createCCSListener( @@ -819,7 +896,8 @@ static void collectSearchShards( SearchResponse.Clusters clusters, SearchTimeProvider timeProvider, TransportService transportService, - ActionListener> listener + ActionListener> listener, + boolean shouldForceReconnectCluster ) { RemoteClusterService remoteClusterService = transportService.getRemoteClusterService(); final CountDown responsesCountDown = new CountDown(remoteIndicesByCluster.size()); @@ -848,49 +926,59 @@ Map createFinalResponse() { return searchShardsResponses; } }; + + var threadPool = transportService.getThreadPool(); + var connectionListener = getListenerWithOptionalTimeout( + shouldForceReconnectCluster, + threadPool, + threadPool.executor(ThreadPool.Names.SEARCH_COORDINATION) + ); + + connectionListener.addListener(singleListener.delegateFailure((responseListener, connection) -> { + final String[] indices = entry.getValue().indices(); + final Executor responseExecutor = transportService.getThreadPool().executor(ThreadPool.Names.SEARCH_COORDINATION); + // TODO: support point-in-time + if (searchContext == null && connection.getTransportVersion().onOrAfter(TransportVersions.V_8_9_X)) { + SearchShardsRequest searchShardsRequest = new SearchShardsRequest( + indices, + indicesOptions, + query, + routing, + preference, + allowPartialResults, + clusterAlias + ); + transportService.sendRequest( + connection, + TransportSearchShardsAction.TYPE.name(), + searchShardsRequest, + TransportRequestOptions.EMPTY, + new ActionListenerResponseHandler<>(responseListener, SearchShardsResponse::new, responseExecutor) + ); + } else { + // does not do a can-match + ClusterSearchShardsRequest searchShardsRequest = new ClusterSearchShardsRequest( + MasterNodeRequest.INFINITE_MASTER_NODE_TIMEOUT, + indices + ).indicesOptions(indicesOptions).local(true).preference(preference).routing(routing); + transportService.sendRequest( + connection, + TransportClusterSearchShardsAction.TYPE.name(), + searchShardsRequest, + TransportRequestOptions.EMPTY, + new ActionListenerResponseHandler<>( + singleListener.map(SearchShardsResponse::fromLegacyResponse), + ClusterSearchShardsResponse::new, + responseExecutor + ) + ); + } + })); + remoteClusterService.maybeEnsureConnectedAndGetConnection( clusterAlias, - skipUnavailable == false, - singleListener.delegateFailureAndWrap((delegate, connection) -> { - final String[] indices = entry.getValue().indices(); - final Executor responseExecutor = transportService.getThreadPool().executor(ThreadPool.Names.SEARCH_COORDINATION); - // TODO: support point-in-time - if (searchContext == null && connection.getTransportVersion().onOrAfter(TransportVersions.V_8_9_X)) { - SearchShardsRequest searchShardsRequest = new SearchShardsRequest( - indices, - indicesOptions, - query, - routing, - preference, - allowPartialResults, - clusterAlias - ); - transportService.sendRequest( - connection, - TransportSearchShardsAction.TYPE.name(), - searchShardsRequest, - TransportRequestOptions.EMPTY, - new ActionListenerResponseHandler<>(delegate, SearchShardsResponse::new, responseExecutor) - ); - } else { - // does not do a can-match - ClusterSearchShardsRequest searchShardsRequest = new ClusterSearchShardsRequest( - MasterNodeRequest.INFINITE_MASTER_NODE_TIMEOUT, - indices - ).indicesOptions(indicesOptions).local(true).preference(preference).routing(routing); - transportService.sendRequest( - connection, - TransportClusterSearchShardsAction.TYPE.name(), - searchShardsRequest, - TransportRequestOptions.EMPTY, - new ActionListenerResponseHandler<>( - delegate.map(SearchShardsResponse::fromLegacyResponse), - ClusterSearchShardsResponse::new, - responseExecutor - ) - ); - } - }) + shouldEstablishConnection(shouldForceReconnectCluster, skipUnavailable), + connectionListener ); } } @@ -1990,6 +2078,7 @@ private void recordTelemetry() { /** * Extract telemetry data from the search response. + * * @param searchResponse The final response from the search. */ private void extractCCSTelemetry(SearchResponse searchResponse) { diff --git a/server/src/test/java/org/elasticsearch/action/search/TransportSearchActionTests.java b/server/src/test/java/org/elasticsearch/action/search/TransportSearchActionTests.java index 4346351c1576c..82d136d0c22ac 100644 --- a/server/src/test/java/org/elasticsearch/action/search/TransportSearchActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/search/TransportSearchActionTests.java @@ -546,7 +546,9 @@ public void testCCSRemoteReduceMergeFails() throws Exception { remoteClusterService, threadPool, listener, - (r, l) -> setOnce.set(Tuple.tuple(r, l)) + (r, l) -> setOnce.set(Tuple.tuple(r, l)), + service, + false ); if (localIndices == null) { assertNull(setOnce.get()); @@ -621,7 +623,9 @@ public void testCCSRemoteReduce() throws Exception { remoteClusterService, threadPool, listener, - (r, l) -> setOnce.set(Tuple.tuple(r, l)) + (r, l) -> setOnce.set(Tuple.tuple(r, l)), + service, + false ); if (localIndices == null) { assertNull(setOnce.get()); @@ -677,7 +681,9 @@ public void testCCSRemoteReduce() throws Exception { remoteClusterService, threadPool, listener, - (r, l) -> setOnce.set(Tuple.tuple(r, l)) + (r, l) -> setOnce.set(Tuple.tuple(r, l)), + service, + false ); if (localIndices == null) { assertNull(setOnce.get()); @@ -765,7 +771,9 @@ public void testCCSRemoteReduceWhereRemoteClustersFail() throws Exception { remoteClusterService, threadPool, listener, - (r, l) -> setOnce.set(Tuple.tuple(r, l)) + (r, l) -> setOnce.set(Tuple.tuple(r, l)), + service, + false ); if (localIndices == null) { assertNull(setOnce.get()); @@ -864,7 +872,9 @@ public void onNodeDisconnected(DiscoveryNode node, @Nullable Exception closeExce remoteClusterService, threadPool, listener, - (r, l) -> setOnce.set(Tuple.tuple(r, l)) + (r, l) -> setOnce.set(Tuple.tuple(r, l)), + service, + false ); if (localIndices == null) { assertNull(setOnce.get()); @@ -878,7 +888,7 @@ public void onNodeDisconnected(DiscoveryNode node, @Nullable Exception closeExce assertNotNull(failure.get()); assertThat(failure.get(), instanceOf(RemoteTransportException.class)); assertThat(failure.get().getMessage(), containsString("error while communicating with remote cluster [")); - assertThat(failure.get().getCause(), instanceOf(NodeDisconnectedException.class)); + // assertThat(failure.get().getCause(), instanceOf(NodeDisconnectedException.class)); } // setting skip_unavailable to true for all the disconnected clusters will make the request succeed again @@ -915,7 +925,9 @@ public void onNodeDisconnected(DiscoveryNode node, @Nullable Exception closeExce remoteClusterService, threadPool, listener, - (r, l) -> setOnce.set(Tuple.tuple(r, l)) + (r, l) -> setOnce.set(Tuple.tuple(r, l)), + service, + false ); if (localIndices == null) { assertNull(setOnce.get()); @@ -988,7 +1000,9 @@ public void onNodeDisconnected(DiscoveryNode node, @Nullable Exception closeExce remoteClusterService, threadPool, listener, - (r, l) -> setOnce.set(Tuple.tuple(r, l)) + (r, l) -> setOnce.set(Tuple.tuple(r, l)), + service, + false ); if (localIndices == null) { assertNull(setOnce.get()); @@ -1083,7 +1097,8 @@ public void testCollectSearchShards() throws Exception { clusters, timeProvider, service, - new LatchedActionListener<>(ActionTestUtils.assertNoFailureListener(response::set), latch) + new LatchedActionListener<>(ActionTestUtils.assertNoFailureListener(response::set), latch), + false ); awaitLatch(latch, 5, TimeUnit.SECONDS); assertNotNull(response.get()); @@ -1112,7 +1127,8 @@ public void testCollectSearchShards() throws Exception { clusters, timeProvider, service, - new LatchedActionListener<>(ActionListener.wrap(r -> fail("no response expected"), failure::set), latch) + new LatchedActionListener<>(ActionListener.wrap(r -> fail("no response expected"), failure::set), latch), + false ); awaitLatch(latch, 5, TimeUnit.SECONDS); assertEquals(numClusters, clusters.getClusterStateCount(SearchResponse.Cluster.Status.FAILED)); @@ -1160,7 +1176,8 @@ public void onNodeDisconnected(DiscoveryNode node, @Nullable Exception closeExce clusters, timeProvider, service, - new LatchedActionListener<>(ActionListener.wrap(r -> fail("no response expected"), failure::set), latch) + new LatchedActionListener<>(ActionListener.wrap(r -> fail("no response expected"), failure::set), latch), + false ); awaitLatch(latch, 5, TimeUnit.SECONDS); assertEquals(numDisconnectedClusters, clusters.getClusterStateCount(SearchResponse.Cluster.Status.FAILED)); @@ -1190,7 +1207,8 @@ public void onNodeDisconnected(DiscoveryNode node, @Nullable Exception closeExce clusters, timeProvider, service, - new LatchedActionListener<>(ActionTestUtils.assertNoFailureListener(response::set), latch) + new LatchedActionListener<>(ActionTestUtils.assertNoFailureListener(response::set), latch), + false ); awaitLatch(latch, 5, TimeUnit.SECONDS); assertNotNull(response.get()); @@ -1236,7 +1254,8 @@ public void onNodeDisconnected(DiscoveryNode node, @Nullable Exception closeExce clusters, timeProvider, service, - new LatchedActionListener<>(ActionTestUtils.assertNoFailureListener(response::set), latch) + new LatchedActionListener<>(ActionTestUtils.assertNoFailureListener(response::set), latch), + false ); awaitLatch(latch, 5, TimeUnit.SECONDS); assertEquals(0, clusters.getClusterStateCount(SearchResponse.Cluster.Status.SKIPPED)); From d2c727ab9aeb47792516d66b606957674486ec9f Mon Sep 17 00:00:00 2001 From: Pawan Kartik Date: Wed, 2 Jul 2025 14:51:17 +0100 Subject: [PATCH 2/7] Remove unnecessarily introduced new line --- .../org/elasticsearch/action/search/TransportSearchAction.java | 1 - 1 file changed, 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java b/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java index 5b32e76d93fac..4fcd038d08d80 100644 --- a/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java +++ b/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java @@ -2078,7 +2078,6 @@ private void recordTelemetry() { /** * Extract telemetry data from the search response. - * * @param searchResponse The final response from the search. */ private void extractCCSTelemetry(SearchResponse searchResponse) { From da8bd52fa7468d76608a32ac36756445f7721814 Mon Sep 17 00:00:00 2001 From: Pawan Kartik Date: Thu, 10 Jul 2025 11:06:38 +0100 Subject: [PATCH 3/7] Use a user configurable timeout than a boolean --- .../action/search/TransportSearchAction.java | 45 +++++++++---------- .../search/TransportSearchActionTests.java | 26 +++++------ 2 files changed, 35 insertions(+), 36 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java b/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java index 4fcd038d08d80..9e6ac9653ae76 100644 --- a/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java +++ b/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java @@ -168,8 +168,7 @@ public class TransportSearchAction extends HandledTransportAction buildPerIndexOriginalIndices( @@ -452,7 +451,7 @@ void executeRequest( searchPhaseProvider.apply(l) ), transportService, - alwaysEstablishConnection + forceConnectTimeoutSecs ); } else { final SearchContextId searchContext = resolvedIndices.getSearchContextId(); @@ -513,7 +512,7 @@ void executeRequest( searchPhaseProvider.apply(finalDelegate) ); }), - alwaysEstablishConnection + forceConnectTimeoutSecs ); } } @@ -644,21 +643,20 @@ public static boolean shouldMinimizeRoundtrips(SearchRequest searchRequest) { /** * Return a subscribable listener with optional timeout depending on force reconnect setting is registered or * not. - * @param alwaysEstablishConnection If we're running in a context where we always need to re-connect. - * This value determines if we need to add a short timeout to avoid waiting - * for long durations to reconnect. + * @param forceConnectTimeoutSecs Timeout in seconds that determines how long we'll wait to establish a connection + * to a remote. * @param threadPool The thread pool that'll be used for the timeout. * @param timeoutExecutor The executor that should be used for the timeout. * @return SubscribableListener A listener with optionally added timeout. */ private static SubscribableListener getListenerWithOptionalTimeout( - boolean alwaysEstablishConnection, + TimeValue forceConnectTimeoutSecs, ThreadPool threadPool, Executor timeoutExecutor ) { var subscribableListener = new SubscribableListener(); - if (alwaysEstablishConnection) { - subscribableListener.addTimeout(TimeValue.timeValueSeconds(FORCE_CONNECT_CLUSTER_DEFAULT_TIMEOUT), threadPool, timeoutExecutor); + if (forceConnectTimeoutSecs != null) { + subscribableListener.addTimeout(forceConnectTimeoutSecs, threadPool, timeoutExecutor); } return subscribableListener; @@ -667,12 +665,13 @@ private static SubscribableListener getListenerWithOptiona /** * The default disconnected strategy for Elasticsearch is RECONNECT_UNLESS_SKIP_UNAVAILABLE. So we either force * connect if required (like in CPS) or when skip unavailable is false for a cluster. - * @param alwaysEstablishConnection If we're running in a context where we always need to re-connect. + * @param forceConnectTimeoutSecs The timeout value from the force connect setting. + * If it is set, use it as it takes precedence. * @param skipUnavailable The usual skip unavailable setting. * @return boolean If we should always force reconnect. */ - private static boolean shouldEstablishConnection(boolean alwaysEstablishConnection, boolean skipUnavailable) { - return alwaysEstablishConnection || skipUnavailable == false; + private static boolean shouldEstablishConnection(TimeValue forceConnectTimeoutSecs, boolean skipUnavailable) { + return forceConnectTimeoutSecs != null || skipUnavailable == false; } /** @@ -691,7 +690,7 @@ static void ccsRemoteReduce( ActionListener listener, BiConsumer> localSearchConsumer, TransportService transportService, - boolean shouldForceReconnectCluster + TimeValue forceConnectTimeoutSecs ) { final var remoteClientResponseExecutor = threadPool.executor(ThreadPool.Names.SEARCH_COORDINATION); if (resolvedIndices.getLocalIndices() == null && resolvedIndices.getRemoteClusterIndices().size() == 1) { @@ -710,7 +709,7 @@ static void ccsRemoteReduce( true ); - var connectionListener = getListenerWithOptionalTimeout(shouldForceReconnectCluster, threadPool, remoteClientResponseExecutor); + var connectionListener = getListenerWithOptionalTimeout(forceConnectTimeoutSecs, threadPool, remoteClientResponseExecutor); var searchListener = new ActionListener() { @Override public void onResponse(SearchResponse searchResponse) { @@ -770,7 +769,7 @@ public void onFailure(Exception e) { remoteClusterService.maybeEnsureConnectedAndGetConnection( clusterAlias, - shouldEstablishConnection(shouldForceReconnectCluster, skipUnavailable), + shouldEstablishConnection(forceConnectTimeoutSecs, skipUnavailable), connectionListener ); } else { @@ -809,7 +808,7 @@ public void onFailure(Exception e) { ); SubscribableListener connectionListener = getListenerWithOptionalTimeout( - shouldForceReconnectCluster, + forceConnectTimeoutSecs, threadPool, remoteClientResponseExecutor ); @@ -828,7 +827,7 @@ public void onFailure(Exception e) { remoteClusterService.maybeEnsureConnectedAndGetConnection( clusterAlias, - shouldEstablishConnection(shouldForceReconnectCluster, skipUnavailable), + shouldEstablishConnection(forceConnectTimeoutSecs, skipUnavailable), connectionListener ); } @@ -897,7 +896,7 @@ static void collectSearchShards( SearchTimeProvider timeProvider, TransportService transportService, ActionListener> listener, - boolean shouldForceReconnectCluster + TimeValue forceConnectTimeoutSecs ) { RemoteClusterService remoteClusterService = transportService.getRemoteClusterService(); final CountDown responsesCountDown = new CountDown(remoteIndicesByCluster.size()); @@ -929,7 +928,7 @@ Map createFinalResponse() { var threadPool = transportService.getThreadPool(); var connectionListener = getListenerWithOptionalTimeout( - shouldForceReconnectCluster, + forceConnectTimeoutSecs, threadPool, threadPool.executor(ThreadPool.Names.SEARCH_COORDINATION) ); @@ -977,7 +976,7 @@ Map createFinalResponse() { remoteClusterService.maybeEnsureConnectedAndGetConnection( clusterAlias, - shouldEstablishConnection(shouldForceReconnectCluster, skipUnavailable), + shouldEstablishConnection(forceConnectTimeoutSecs, skipUnavailable), connectionListener ); } diff --git a/server/src/test/java/org/elasticsearch/action/search/TransportSearchActionTests.java b/server/src/test/java/org/elasticsearch/action/search/TransportSearchActionTests.java index 82d136d0c22ac..8ca76fdf07799 100644 --- a/server/src/test/java/org/elasticsearch/action/search/TransportSearchActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/search/TransportSearchActionTests.java @@ -548,7 +548,7 @@ public void testCCSRemoteReduceMergeFails() throws Exception { listener, (r, l) -> setOnce.set(Tuple.tuple(r, l)), service, - false + null ); if (localIndices == null) { assertNull(setOnce.get()); @@ -625,7 +625,7 @@ public void testCCSRemoteReduce() throws Exception { listener, (r, l) -> setOnce.set(Tuple.tuple(r, l)), service, - false + null ); if (localIndices == null) { assertNull(setOnce.get()); @@ -683,7 +683,7 @@ public void testCCSRemoteReduce() throws Exception { listener, (r, l) -> setOnce.set(Tuple.tuple(r, l)), service, - false + null ); if (localIndices == null) { assertNull(setOnce.get()); @@ -773,7 +773,7 @@ public void testCCSRemoteReduceWhereRemoteClustersFail() throws Exception { listener, (r, l) -> setOnce.set(Tuple.tuple(r, l)), service, - false + null ); if (localIndices == null) { assertNull(setOnce.get()); @@ -874,7 +874,7 @@ public void onNodeDisconnected(DiscoveryNode node, @Nullable Exception closeExce listener, (r, l) -> setOnce.set(Tuple.tuple(r, l)), service, - false + null ); if (localIndices == null) { assertNull(setOnce.get()); @@ -888,7 +888,7 @@ public void onNodeDisconnected(DiscoveryNode node, @Nullable Exception closeExce assertNotNull(failure.get()); assertThat(failure.get(), instanceOf(RemoteTransportException.class)); assertThat(failure.get().getMessage(), containsString("error while communicating with remote cluster [")); - // assertThat(failure.get().getCause(), instanceOf(NodeDisconnectedException.class)); + assertThat(failure.get().getCause(), instanceOf(NodeDisconnectedException.class)); } // setting skip_unavailable to true for all the disconnected clusters will make the request succeed again @@ -927,7 +927,7 @@ public void onNodeDisconnected(DiscoveryNode node, @Nullable Exception closeExce listener, (r, l) -> setOnce.set(Tuple.tuple(r, l)), service, - false + null ); if (localIndices == null) { assertNull(setOnce.get()); @@ -1002,7 +1002,7 @@ public void onNodeDisconnected(DiscoveryNode node, @Nullable Exception closeExce listener, (r, l) -> setOnce.set(Tuple.tuple(r, l)), service, - false + null ); if (localIndices == null) { assertNull(setOnce.get()); @@ -1098,7 +1098,7 @@ public void testCollectSearchShards() throws Exception { timeProvider, service, new LatchedActionListener<>(ActionTestUtils.assertNoFailureListener(response::set), latch), - false + null ); awaitLatch(latch, 5, TimeUnit.SECONDS); assertNotNull(response.get()); @@ -1128,7 +1128,7 @@ public void testCollectSearchShards() throws Exception { timeProvider, service, new LatchedActionListener<>(ActionListener.wrap(r -> fail("no response expected"), failure::set), latch), - false + null ); awaitLatch(latch, 5, TimeUnit.SECONDS); assertEquals(numClusters, clusters.getClusterStateCount(SearchResponse.Cluster.Status.FAILED)); @@ -1177,7 +1177,7 @@ public void onNodeDisconnected(DiscoveryNode node, @Nullable Exception closeExce timeProvider, service, new LatchedActionListener<>(ActionListener.wrap(r -> fail("no response expected"), failure::set), latch), - false + null ); awaitLatch(latch, 5, TimeUnit.SECONDS); assertEquals(numDisconnectedClusters, clusters.getClusterStateCount(SearchResponse.Cluster.Status.FAILED)); @@ -1208,7 +1208,7 @@ public void onNodeDisconnected(DiscoveryNode node, @Nullable Exception closeExce timeProvider, service, new LatchedActionListener<>(ActionTestUtils.assertNoFailureListener(response::set), latch), - false + null ); awaitLatch(latch, 5, TimeUnit.SECONDS); assertNotNull(response.get()); @@ -1255,7 +1255,7 @@ public void onNodeDisconnected(DiscoveryNode node, @Nullable Exception closeExce timeProvider, service, new LatchedActionListener<>(ActionTestUtils.assertNoFailureListener(response::set), latch), - false + null ); awaitLatch(latch, 5, TimeUnit.SECONDS); assertEquals(0, clusters.getClusterStateCount(SearchResponse.Cluster.Status.SKIPPED)); From 3a2bbd4160637feeddd3da09e6c6f525aad8bc2a Mon Sep 17 00:00:00 2001 From: Pawan Kartik Date: Mon, 14 Jul 2025 12:00:04 +0100 Subject: [PATCH 4/7] Add IT --- .../RemoteSearchForceConnectTimeoutIT.java | 125 ++++++++++++++++++ 1 file changed, 125 insertions(+) create mode 100644 server/src/internalClusterTest/java/org/elasticsearch/indices/cluster/RemoteSearchForceConnectTimeoutIT.java diff --git a/server/src/internalClusterTest/java/org/elasticsearch/indices/cluster/RemoteSearchForceConnectTimeoutIT.java b/server/src/internalClusterTest/java/org/elasticsearch/indices/cluster/RemoteSearchForceConnectTimeoutIT.java new file mode 100644 index 0000000000000..bcacb01568d3b --- /dev/null +++ b/server/src/internalClusterTest/java/org/elasticsearch/indices/cluster/RemoteSearchForceConnectTimeoutIT.java @@ -0,0 +1,125 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.indices.cluster; + +import org.elasticsearch.action.search.SearchRequest; +import org.elasticsearch.action.search.TransportSearchAction; +import org.elasticsearch.common.settings.Setting; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.CollectionUtils; +import org.elasticsearch.plugins.ClusterPlugin; +import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.test.AbstractMultiClustersTestCase; +import org.elasticsearch.test.transport.MockTransportService; +import org.elasticsearch.transport.TransportService; +import org.hamcrest.Matchers; + +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CountDownLatch; + +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; + +public class RemoteSearchForceConnectTimeoutIT extends AbstractMultiClustersTestCase { + private static final String REMOTE_CLUSTER_1 = "cluster-a"; + + public static class ForceConnectTimeoutPlugin extends Plugin implements ClusterPlugin { + @Override + public List> getSettings() { + return List.of(ForceConnectTimeoutSetting); + } + } + + private static final Setting ForceConnectTimeoutSetting = Setting.simpleString( + "search.ccs.force_connect_timeout", + Setting.Property.NodeScope + ); + + @Override + protected List remoteClusterAlias() { + return List.of(REMOTE_CLUSTER_1); + } + + @Override + protected Collection> nodePlugins(String clusterAlias) { + return CollectionUtils.appendToCopy(super.nodePlugins(clusterAlias), ForceConnectTimeoutPlugin.class); + } + + @Override + protected Settings nodeSettings() { + /* + * This is the setting that controls how long TransportSearchAction will wait for establishing a connection + * with a remote. At present, we set it to low 1s to prevent stalling the test for too long -- this is consistent + * with what we've done in other tests. + */ + return Settings.builder().put(super.nodeSettings()).put("search.ccs.force_connect_timeout", "1s").build(); + } + + @Override + protected Map skipUnavailableForRemoteClusters() { + return Map.of(REMOTE_CLUSTER_1, true); + } + + public void testTimeoutSetting() { + var latch = new CountDownLatch(1); + for (String nodeName : cluster(LOCAL_CLUSTER).getNodeNames()) { + MockTransportService mts = (MockTransportService) cluster(LOCAL_CLUSTER).getInstance(TransportService.class, nodeName); + + mts.addConnectBehavior( + cluster(REMOTE_CLUSTER_1).getInstance(TransportService.class, randomFrom(cluster(REMOTE_CLUSTER_1).getNodeNames())), + ((transport, discoveryNode, profile, listener) -> { + try { + latch.await(); + } catch (InterruptedException e) { + throw new AssertionError(e); + } + + transport.openConnection(discoveryNode, profile, listener); + }) + ); + } + + // Add some dummy data to prove we are communicating fine with the remote. + assertAcked(client(REMOTE_CLUSTER_1).admin().indices().prepareCreate("test-index")); + client(REMOTE_CLUSTER_1).prepareIndex("test-index").setSource("sample-field", "sample-value").get(); + client(REMOTE_CLUSTER_1).admin().indices().prepareRefresh("test-index").get(); + + /* + * Do a full restart so that our custom connect behaviour takes effect since it does not apply to + * pre-existing connections -- they're already established by the time this test runs. + */ + try { + cluster(REMOTE_CLUSTER_1).fullRestart(); + } catch (Exception e) { + throw new AssertionError(e); + } finally { + var searchRequest = new SearchRequest("*", "*:*"); + searchRequest.allowPartialSearchResults(false); + var result = safeGet(client().execute(TransportSearchAction.TYPE, searchRequest)); + + // The remote cluster should've failed. + var failures = result.getClusters().getCluster(REMOTE_CLUSTER_1).getFailures(); + assertThat(failures.size(), Matchers.equalTo(1)); + + /* + * Reason should be a timed out exception. The timeout should be equal to what we've set and there should + * be a reference to the subscribable listener -- which is what we use to listen for a valid connection. + */ + var failureReason = failures.getFirst().reason(); + assertThat( + failureReason, + Matchers.containsString("org.elasticsearch.ElasticsearchTimeoutException: timed out after [1s/1000ms]") + ); + assertThat(failureReason, Matchers.containsString("SubscribableListener")); + latch.countDown(); + } + } +} From d251fe1adf02514e53edddbb4f71a5992286465e Mon Sep 17 00:00:00 2001 From: Pawan Kartik Date: Mon, 14 Jul 2025 12:41:42 +0100 Subject: [PATCH 5/7] try: cleanup resources at the end of test --- .../indices/cluster/RemoteSearchForceConnectTimeoutIT.java | 1 + 1 file changed, 1 insertion(+) diff --git a/server/src/internalClusterTest/java/org/elasticsearch/indices/cluster/RemoteSearchForceConnectTimeoutIT.java b/server/src/internalClusterTest/java/org/elasticsearch/indices/cluster/RemoteSearchForceConnectTimeoutIT.java index bcacb01568d3b..4856f20d07353 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/indices/cluster/RemoteSearchForceConnectTimeoutIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/indices/cluster/RemoteSearchForceConnectTimeoutIT.java @@ -120,6 +120,7 @@ public void testTimeoutSetting() { ); assertThat(failureReason, Matchers.containsString("SubscribableListener")); latch.countDown(); + result.decRef(); } } } From 3354ff795075bf68ea83a9ba0d803d180f09c878 Mon Sep 17 00:00:00 2001 From: Pawan Kartik Date: Fri, 18 Jul 2025 12:10:06 +0100 Subject: [PATCH 6/7] Drop timeout logging --- .../org/elasticsearch/action/search/TransportSearchAction.java | 1 - 1 file changed, 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java b/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java index 9e6ac9653ae76..00b065fad4f04 100644 --- a/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java +++ b/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java @@ -218,7 +218,6 @@ public TransportSearchAction( this.client = client; this.usageService = usageService; forceConnectTimeoutSecs = settings.getAsTime("search.ccs.force_connect_timeout", null); - logger.info("Should force reconnect cluster: {}", forceConnectTimeoutSecs); } private Map buildPerIndexOriginalIndices( From a4cd905b09c0290c7d638a90a45c04c0bb7b9b4c Mon Sep 17 00:00:00 2001 From: Pawan Kartik Date: Fri, 18 Jul 2025 15:01:50 +0100 Subject: [PATCH 7/7] Update docs/changelog/130463.yaml --- docs/changelog/130463.yaml | 5 +++++ 1 file changed, 5 insertions(+) create mode 100644 docs/changelog/130463.yaml diff --git a/docs/changelog/130463.yaml b/docs/changelog/130463.yaml new file mode 100644 index 0000000000000..e1a38ae6c96e6 --- /dev/null +++ b/docs/changelog/130463.yaml @@ -0,0 +1,5 @@ +pr: 130463 +summary: Refresh potential lost connections at query start for `_search` +area: Search +type: enhancement +issues: []