diff --git a/modules/reindex/src/main/java/org/elasticsearch/reindex/Reindexer.java b/modules/reindex/src/main/java/org/elasticsearch/reindex/Reindexer.java index 1dc3b3f3ff4c4..d3ffc0e2e4de3 100644 --- a/modules/reindex/src/main/java/org/elasticsearch/reindex/Reindexer.java +++ b/modules/reindex/src/main/java/org/elasticsearch/reindex/Reindexer.java @@ -564,9 +564,8 @@ private void openRemotePitAndExecute( /** * Closes the RestClient on the generic thread pool (to avoid closing from the client's own thread), then runs the given action. - * Package-private for unit tests. */ - void closeRestClientAndRun(RestClient restClient, Runnable onCompletion) { + private void closeRestClientAndRun(RestClient restClient, Runnable onCompletion) { threadPool.generic().submit(() -> { try { restClient.close(); diff --git a/modules/reindex/src/test/java/org/elasticsearch/reindex/BulkByPaginatedSearchParallelizationHelperTests.java b/modules/reindex/src/test/java/org/elasticsearch/reindex/BulkByPaginatedSearchParallelizationHelperTests.java index 5b57acf7f324a..06b231709d372 100644 --- a/modules/reindex/src/test/java/org/elasticsearch/reindex/BulkByPaginatedSearchParallelizationHelperTests.java +++ b/modules/reindex/src/test/java/org/elasticsearch/reindex/BulkByPaginatedSearchParallelizationHelperTests.java @@ -15,18 +15,13 @@ import org.elasticsearch.client.internal.Client; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodeUtils; -import org.elasticsearch.common.bytes.BytesArray; -import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.core.TimeValue; import org.elasticsearch.index.mapper.IdFieldMapper; import org.elasticsearch.index.reindex.BulkByScrollResponse; import org.elasticsearch.index.reindex.BulkByScrollTask; import org.elasticsearch.index.reindex.ReindexAction; import org.elasticsearch.index.reindex.ReindexRequest; -import org.elasticsearch.search.builder.PointInTimeBuilder; import org.elasticsearch.search.builder.SearchSourceBuilder; -import org.elasticsearch.search.slice.SliceBuilder; import org.elasticsearch.tasks.TaskManager; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.threadpool.TestThreadPool; @@ -43,7 +38,6 @@ import static org.elasticsearch.search.RandomSearchRequestGenerator.randomSearchRequest; import static org.elasticsearch.search.RandomSearchRequestGenerator.randomSearchSourceBuilder; import static org.hamcrest.Matchers.containsString; -import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.nullValue; import static org.hamcrest.Matchers.sameInstance; @@ -90,30 +84,6 @@ public void testSliceIntoSubRequests() { } } - /** - * Sliced sub-requests must keep the parent search's {@link PointInTimeBuilder} (id and keep-alive) - * so all slices search against the same PIT while applying distinct {@link SliceBuilder} settings. - */ - public void testSliceIntoSubRequestsPreservesPointInTimeBuilderOnEachSlice() { - BytesReference pitId = new BytesArray(randomAlphaOfLengthBetween(8, 24)); - TimeValue keepAlive = TimeValue.timeValueMinutes(randomIntBetween(1, 30)); - int times = randomIntBetween(2, 8); - SearchRequest request = new SearchRequest(); - request.source(new SearchSourceBuilder().pointInTimeBuilder(new PointInTimeBuilder(pitId).setKeepAlive(keepAlive))); - SearchRequest[] slices = sliceIntoSubRequests(request, IdFieldMapper.NAME, times); - assertThat(slices.length, equalTo(times)); - for (int i = 0; i < times; i++) { - PointInTimeBuilder pit = slices[i].source().pointInTimeBuilder(); - assertNotNull(pit); - assertThat(pit.getEncodedId(), equalTo(pitId)); - assertThat(pit.getKeepAlive(), equalTo(keepAlive)); - SliceBuilder slice = slices[i].source().slice(); - assertThat(slice.getField(), equalTo(IdFieldMapper.NAME)); - assertThat(slice.getId(), equalTo(i)); - assertThat(slice.getMax(), equalTo(times)); - } - } - /** * When the task is a worker, executeSlicedAction invokes the worker action with the given remote version. */ diff --git a/modules/reindex/src/test/java/org/elasticsearch/reindex/ReindexerTests.java b/modules/reindex/src/test/java/org/elasticsearch/reindex/ReindexerTests.java index d7dad3b6ad9b9..3a49a2a763fbe 100644 --- a/modules/reindex/src/test/java/org/elasticsearch/reindex/ReindexerTests.java +++ b/modules/reindex/src/test/java/org/elasticsearch/reindex/ReindexerTests.java @@ -30,9 +30,7 @@ import org.elasticsearch.action.search.TransportClosePointInTimeAction; import org.elasticsearch.action.search.TransportOpenPointInTimeAction; import org.elasticsearch.action.search.TransportSearchAction; -import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.action.support.PlainActionFuture; -import org.elasticsearch.client.RestClient; import org.elasticsearch.client.internal.Client; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.Metadata; @@ -56,7 +54,6 @@ import org.elasticsearch.index.reindex.BulkByScrollResponse; import org.elasticsearch.index.reindex.BulkByScrollTask; import org.elasticsearch.index.reindex.PaginatedSearchFailure; -import org.elasticsearch.index.reindex.ReindexAction; import org.elasticsearch.index.reindex.ReindexRequest; import org.elasticsearch.index.reindex.RemoteInfo; import org.elasticsearch.index.reindex.ResumeBulkByScrollRequest; @@ -99,10 +96,7 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiPredicate; import java.util.function.Consumer; @@ -120,7 +114,6 @@ import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.doAnswer; -import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; import static org.mockito.Mockito.spy; @@ -807,7 +800,6 @@ public ExecutorService executor(String name) { ); // Simulate a worker request: PIT already set by leader, slice info from leader - final int sliceMax = randomIntBetween(2, 10); final ReindexRequest request = new ReindexRequest(); request.setDestIndex("dest"); request.setSlices(1); @@ -815,10 +807,8 @@ public ExecutorService executor(String name) { request.getSearchRequest() .source( new SearchSourceBuilder().pointInTimeBuilder( - new PointInTimeBuilder(new BytesArray(randomAlphaOfLengthBetween(4, 16))).setKeepAlive( - TimeValue.timeValueMinutes(randomIntBetween(1, 15)) - ) - ).slice(new SliceBuilder(IdFieldMapper.NAME, 0, sliceMax)) + new PointInTimeBuilder(new BytesArray("pit-id")).setKeepAlive(TimeValue.timeValueMinutes(5)) + ).slice(new SliceBuilder(IdFieldMapper.NAME, 0, 5)) ); final BulkByScrollTask task = new BulkByScrollTask( @@ -1421,7 +1411,6 @@ public ExecutorService executor(String name) { ); // Simulate a worker request: PIT already set by leader, slice info from leader - final int sliceMax = randomIntBetween(2, 10); final ReindexRequest request = new ReindexRequest(); request.setDestIndex("dest"); request.setSlices(1); @@ -1429,10 +1418,8 @@ public ExecutorService executor(String name) { request.getSearchRequest() .source( new SearchSourceBuilder().pointInTimeBuilder( - new PointInTimeBuilder(new BytesArray(randomAlphaOfLengthBetween(4, 16))).setKeepAlive( - TimeValue.timeValueMinutes(randomIntBetween(1, 15)) - ) - ).slice(new SliceBuilder(IdFieldMapper.NAME, 0, sliceMax)) + new PointInTimeBuilder(new BytesArray("pit-id")).setKeepAlive(TimeValue.timeValueMinutes(5)) + ).slice(new SliceBuilder(IdFieldMapper.NAME, 0, 5)) ); final BulkByScrollTask task = new BulkByScrollTask( @@ -1690,644 +1677,6 @@ public ExecutorService executor(String name) { } } - /** - * When PIT search is enabled and reindex uses multiple slices, the leader opens a single point-in-time context and slice workers - * share it. Verifies there is exactly one {@code OpenPointInTime} and one {@code ClosePointInTime} request on the local client - */ - public void testLocalSlicedReindexOpensPitOnceAndClosesOnce() { - assumeTrue("PIT search must be enabled", ReindexPlugin.REINDEX_PIT_SEARCH_ENABLED); - - final TestThreadPool threadPool = new TestThreadPool(getTestName()) { - @Override - public ExecutorService executor(String name) { - return DIRECT_EXECUTOR_SERVICE; - } - }; - try { - final ClusterService clusterService = mock(ClusterService.class); - final DiscoveryNode localNode = DiscoveryNodeUtils.builder("local-node").build(); - when(clusterService.state()).thenReturn(ClusterState.EMPTY_STATE); - when(clusterService.localNode()).thenReturn(localNode); - - final ProjectResolver projectResolver = mock(ProjectResolver.class); - when(projectResolver.getProjectState(any())).thenReturn(ClusterState.EMPTY_STATE.projectState(Metadata.DEFAULT_PROJECT_ID)); - - final FeatureService featureService = mock(FeatureService.class); - when(featureService.clusterHasFeature(any(), eq(ReindexPlugin.REINDEX_PIT_SEARCH_FEATURE))).thenReturn(true); - - final long leaderTaskId = randomLongBetween(1_000L, Long.MAX_VALUE / 4); - final long firstChildTaskId = leaderTaskId + randomIntBetween(1, 10_000); - final int numSlices = randomIntBetween(2, 5); - - final BulkByScrollTask leaderTask = new BulkByScrollTask( - new TaskId("local-node", leaderTaskId), - "reindex", - ReindexAction.NAME, - "test", - TaskId.EMPTY_TASK_ID, - Collections.emptyMap(), - false, - randomOrigin() - ); - - final TaskManager taskManager = mock(TaskManager.class); - when(taskManager.getCancellableTasks()).thenReturn(Map.of(leaderTaskId, leaderTask)); - final TransportService transportService = mock(TransportService.class); - when(transportService.getTaskManager()).thenReturn(taskManager); - - final AtomicReference reindexerRef = new AtomicReference<>(); - final SlicedLocalPitReindexClient client = new SlicedLocalPitReindexClient( - getTestName(), - clusterService, - reindexerRef, - firstChildTaskId - ); - try { - final Reindexer reindexer = new Reindexer( - clusterService, - projectResolver, - client, - threadPool, - mock(ScriptService.class), - mock(ReindexSslConfig.class), - null, - transportService, - mock(ReindexRelocationNodePicker.class), - featureService - ); - reindexerRef.set(reindexer); - - final ReindexRequest request = new ReindexRequest(); - request.setSourceIndices("source"); - request.setDestIndex("dest"); - request.setSlices(numSlices); - - final PlainActionFuture initFuture = new PlainActionFuture<>(); - reindexer.initTask( - leaderTask, - request, - initFuture.delegateFailure((l, v) -> reindexer.execute(leaderTask, request, client, l)) - ); - initFuture.actionGet(); - - assertThat("one shared PIT open for the leader", client.getOpenPitCount(), equalTo(1)); - assertThat("leader closes PIT once after all " + numSlices + " slices", client.getCloseCount(), equalTo(1)); - } finally { - client.shutdown(); - } - } finally { - terminate(threadPool); - } - } - - /** - * Remote clusters before PIT support (below 7.10) must use scroll-based pagination. Verifies the mock server receives no - * {@code POST} to open a point-in-time while a 7.9-style version response still allows reindex to complete using scroll. - */ - @SuppressForbidden(reason = "use http server for testing") - public void testRemoteReindexUsesScrollWhenVersionBeforePitSupport() throws Exception { - assumeTrue("PIT search must be enabled", ReindexPlugin.REINDEX_PIT_SEARCH_ENABLED); - - final AtomicInteger pitPostCount = new AtomicInteger(); - final String version79 = "{\"version\":{\"number\":\"7.9.0\"},\"tagline\":\"You Know, for Search\"}"; - final String scrollBody = "{\"_scroll_id\":\"fake-scroll\",\"timed_out\":false,\"hits\":{\"total\":0,\"hits\":[]}" - + ",\"_shards\":{\"total\":1,\"successful\":1,\"failed\":0}}"; - - HttpServer server = MockHttpServer.createHttp(new InetSocketAddress(InetAddress.getLoopbackAddress(), 0), 0); - server.createContext("/", exchange -> { - try { - String path = exchange.getRequestURI().getPath(); - String method = exchange.getRequestMethod(); - if (path.contains("_pit") && "POST".equals(method)) { - pitPostCount.incrementAndGet(); - respondJson(exchange, 500, "{\"error\":\"unexpected pit\"}"); - return; - } - if (path.equals("/") || path.isEmpty()) { - respondJson(exchange, 200, version79); - return; - } - if (path.contains("/_search") && "POST".equals(method) && path.contains("scroll") == false) { - respondJson(exchange, 200, scrollBody); - return; - } - if (path.contains("_search/scroll") && "POST".equals(method)) { - respondJson(exchange, 200, scrollBody); - return; - } - if (path.contains("_search/scroll") && "DELETE".equals(method)) { - exchange.sendResponseHeaders(200, -1); - return; - } - exchange.sendResponseHeaders(404, -1); - } finally { - exchange.close(); - } - }); - server.start(); - try { - runRemotePitTestWithMockServer(server, r -> {}, initFuture -> { - BulkByScrollResponse response = initFuture.actionGet(); - assertNotNull(response); - assertThat(pitPostCount.get(), equalTo(0)); - }); - } finally { - server.stop(0); - } - } - - /** - * Local PIT open uses the reindex request's scroll keep-alive as the PIT keep-alive when the user has set a scroll timeout. - */ - public void testLocalOpenPitUsesScrollTimeAsKeepAliveWhenSet() { - assumeTrue("PIT search must be enabled", ReindexPlugin.REINDEX_PIT_SEARCH_ENABLED); - - final OpenPitCapturingClient client = new OpenPitCapturingClient(getTestName()); - try { - final TestThreadPool threadPool = new TestThreadPool(getTestName()) { - @Override - public ExecutorService executor(String name) { - return DIRECT_EXECUTOR_SERVICE; - } - }; - try { - final ClusterService clusterService = mock(ClusterService.class); - final DiscoveryNode localNode = DiscoveryNodeUtils.builder("local-node").build(); - when(clusterService.state()).thenReturn(ClusterState.EMPTY_STATE); - when(clusterService.localNode()).thenReturn(localNode); - - final ProjectResolver projectResolver = mock(ProjectResolver.class); - when(projectResolver.getProjectState(any())).thenReturn(ClusterState.EMPTY_STATE.projectState(Metadata.DEFAULT_PROJECT_ID)); - - final FeatureService featureService = mock(FeatureService.class); - when(featureService.clusterHasFeature(any(), eq(ReindexPlugin.REINDEX_PIT_SEARCH_FEATURE))).thenReturn(true); - - final Reindexer reindexer = new Reindexer( - clusterService, - projectResolver, - client, - threadPool, - mock(ScriptService.class), - mock(ReindexSslConfig.class), - null, - mock(TransportService.class), - mock(ReindexRelocationNodePicker.class), - featureService - ); - - final int scrollMinutes = randomIntBetween(3, 20); - final TimeValue scrollKeepAlive = TimeValue.timeValueMinutes(scrollMinutes); - final ReindexRequest request = new ReindexRequest(); - request.setSourceIndices("source"); - request.setDestIndex("dest"); - request.setSlices(1); - request.setScroll(scrollKeepAlive); - - final BulkByScrollTask task = new BulkByScrollTask( - randomTaskId(), - "reindex", - "reindex", - "test", - TaskId.EMPTY_TASK_ID, - Collections.emptyMap(), - false, - randomOrigin() - ); - - final PlainActionFuture initFuture = new PlainActionFuture<>(); - reindexer.initTask(task, request, initFuture.delegateFailure((l, v) -> reindexer.execute(task, request, client, l))); - initFuture.actionGet(); - - OpenPointInTimeRequest pitRequest = client.getCapturedPitRequest(); - assertNotNull(pitRequest); - assertThat(pitRequest.keepAlive(), equalTo(scrollKeepAlive)); - } finally { - terminate(threadPool); - } - } finally { - client.shutdown(); - } - } - - /** - * When the search request has no scroll keep-alive, local PIT open uses the default keep-alive (five minutes), matching - * {@link Reindexer}'s {@code pitKeepAlive} fallback. - */ - public void testLocalOpenPitUsesDefaultKeepAliveWhenScrollTimeUnset() { - assumeTrue("PIT search must be enabled", ReindexPlugin.REINDEX_PIT_SEARCH_ENABLED); - - final OpenPitCapturingClient client = new OpenPitCapturingClient(getTestName()); - try { - final TestThreadPool threadPool = new TestThreadPool(getTestName()) { - @Override - public ExecutorService executor(String name) { - return DIRECT_EXECUTOR_SERVICE; - } - }; - try { - final ClusterService clusterService = mock(ClusterService.class); - final DiscoveryNode localNode = DiscoveryNodeUtils.builder("local-node").build(); - when(clusterService.state()).thenReturn(ClusterState.EMPTY_STATE); - when(clusterService.localNode()).thenReturn(localNode); - - final ProjectResolver projectResolver = mock(ProjectResolver.class); - when(projectResolver.getProjectState(any())).thenReturn(ClusterState.EMPTY_STATE.projectState(Metadata.DEFAULT_PROJECT_ID)); - - final FeatureService featureService = mock(FeatureService.class); - when(featureService.clusterHasFeature(any(), eq(ReindexPlugin.REINDEX_PIT_SEARCH_FEATURE))).thenReturn(true); - - final Reindexer reindexer = new Reindexer( - clusterService, - projectResolver, - client, - threadPool, - mock(ScriptService.class), - mock(ReindexSslConfig.class), - null, - mock(TransportService.class), - mock(ReindexRelocationNodePicker.class), - featureService - ); - - final ReindexRequest request = new ReindexRequest(); - request.setSourceIndices("source"); - request.setDestIndex("dest"); - request.setSlices(1); - request.getSearchRequest().scroll((TimeValue) null); - - final BulkByScrollTask task = new BulkByScrollTask( - randomTaskId(), - "reindex", - "reindex", - "test", - TaskId.EMPTY_TASK_ID, - Collections.emptyMap(), - false, - randomOrigin() - ); - - final PlainActionFuture initFuture = new PlainActionFuture<>(); - reindexer.initTask(task, request, initFuture.delegateFailure((l, v) -> reindexer.execute(task, request, client, l))); - initFuture.actionGet(); - - OpenPointInTimeRequest pitRequest = client.getCapturedPitRequest(); - assertNotNull(pitRequest); - assertThat(pitRequest.keepAlive(), equalTo(TimeValue.timeValueMinutes(5))); - } finally { - terminate(threadPool); - } - } finally { - client.shutdown(); - } - } - - /** - * {@link OpenPointInTimeRequest} must carry the same source index names and - * {@link IndicesOptions} as the reindex search request so PIT resolution matches the user's source selection. - */ - public void testLocalOpenPitRequestCopiesIndicesAndIndicesOptions() { - assumeTrue("PIT search must be enabled", ReindexPlugin.REINDEX_PIT_SEARCH_ENABLED); - - final OpenPitCapturingClient client = new OpenPitCapturingClient(getTestName()); - try { - final TestThreadPool threadPool = new TestThreadPool(getTestName()) { - @Override - public ExecutorService executor(String name) { - return DIRECT_EXECUTOR_SERVICE; - } - }; - try { - final ClusterService clusterService = mock(ClusterService.class); - final DiscoveryNode localNode = DiscoveryNodeUtils.builder("local-node").build(); - when(clusterService.state()).thenReturn(ClusterState.EMPTY_STATE); - when(clusterService.localNode()).thenReturn(localNode); - - final ProjectResolver projectResolver = mock(ProjectResolver.class); - when(projectResolver.getProjectState(any())).thenReturn(ClusterState.EMPTY_STATE.projectState(Metadata.DEFAULT_PROJECT_ID)); - - final FeatureService featureService = mock(FeatureService.class); - when(featureService.clusterHasFeature(any(), eq(ReindexPlugin.REINDEX_PIT_SEARCH_FEATURE))).thenReturn(true); - - final Reindexer reindexer = new Reindexer( - clusterService, - projectResolver, - client, - threadPool, - mock(ScriptService.class), - mock(ReindexSslConfig.class), - null, - mock(TransportService.class), - mock(ReindexRelocationNodePicker.class), - featureService - ); - - final IndicesOptions indicesOptions = IndicesOptions.lenientExpandOpen(); - final ReindexRequest request = new ReindexRequest(); - request.setDestIndex("dest"); - request.setSlices(1); - request.getSearchRequest().indices("idx-a", "idx-b"); - request.getSearchRequest().indicesOptions(indicesOptions); - - final BulkByScrollTask task = new BulkByScrollTask( - randomTaskId(), - "reindex", - "reindex", - "test", - TaskId.EMPTY_TASK_ID, - Collections.emptyMap(), - false, - randomOrigin() - ); - - final PlainActionFuture initFuture = new PlainActionFuture<>(); - reindexer.initTask(task, request, initFuture.delegateFailure((l, v) -> reindexer.execute(task, request, client, l))); - initFuture.actionGet(); - - OpenPointInTimeRequest pitRequest = client.getCapturedPitRequest(); - assertNotNull(pitRequest); - assertThat(pitRequest.indices(), equalTo(new String[] { "idx-a", "idx-b" })); - assertThat(pitRequest.indicesOptions(), equalTo(indicesOptions)); - } finally { - terminate(threadPool); - } - } finally { - client.shutdown(); - } - } - - /** - * If the cluster does not advertise {@link ReindexPlugin#REINDEX_PIT_SEARCH_FEATURE}, reindex stays on the scroll path and - * must not invoke local open-PIT transport actions. - */ - public void testLocalReindexDoesNotOpenPitWhenClusterFeatureDisabled() { - final OpenPitCapturingClient client = new OpenPitCapturingClient(getTestName()); - try { - final TestThreadPool threadPool = new TestThreadPool(getTestName()) { - @Override - public ExecutorService executor(String name) { - return DIRECT_EXECUTOR_SERVICE; - } - }; - try { - final ClusterService clusterService = mock(ClusterService.class); - final DiscoveryNode localNode = DiscoveryNodeUtils.builder("local-node").build(); - when(clusterService.state()).thenReturn(ClusterState.EMPTY_STATE); - when(clusterService.localNode()).thenReturn(localNode); - - final ProjectResolver projectResolver = mock(ProjectResolver.class); - when(projectResolver.getProjectState(any())).thenReturn(ClusterState.EMPTY_STATE.projectState(Metadata.DEFAULT_PROJECT_ID)); - - final FeatureService featureService = mock(FeatureService.class); - when(featureService.clusterHasFeature(any(), eq(ReindexPlugin.REINDEX_PIT_SEARCH_FEATURE))).thenReturn(false); - - final Reindexer reindexer = new Reindexer( - clusterService, - projectResolver, - client, - threadPool, - mock(ScriptService.class), - mock(ReindexSslConfig.class), - null, - mock(TransportService.class), - mock(ReindexRelocationNodePicker.class), - featureService - ); - - final ReindexRequest request = new ReindexRequest(); - request.setSourceIndices("source"); - request.setDestIndex("dest"); - request.setSlices(1); - - final BulkByScrollTask task = new BulkByScrollTask( - randomTaskId(), - "reindex", - "reindex", - "test", - TaskId.EMPTY_TASK_ID, - Collections.emptyMap(), - false, - randomOrigin() - ); - - final PlainActionFuture initFuture = new PlainActionFuture<>(); - reindexer.initTask(task, request, initFuture.delegateFailure((l, v) -> reindexer.execute(task, request, client, l))); - initFuture.actionGet(); - - assertThat(client.getOpenPitCount(), equalTo(0)); - assertNull(client.getCapturedPitRequest()); - } finally { - terminate(threadPool); - } - } finally { - client.shutdown(); - } - } - - /** - * After version negotiation, if the search request already contains a point-in-time id (e.g. resume or internal injection), - * remote reindex must not send an extra {@code POST} to open PIT on the remote cluster. - */ - @SuppressForbidden(reason = "use http server for testing") - public void testRemoteReindexSkipsOpenPitWhenPointInTimeAlreadySet() throws Exception { - assumeTrue("PIT search must be enabled", ReindexPlugin.REINDEX_PIT_SEARCH_ENABLED); - - final AtomicInteger pitOpenPostCount = new AtomicInteger(); - HttpServer server = MockHttpServer.createHttp(new InetSocketAddress(InetAddress.getLoopbackAddress(), 0), 0); - server.createContext("/", exchange -> { - try { - String path = exchange.getRequestURI().getPath(); - String method = exchange.getRequestMethod(); - if (path.contains("_pit") && "POST".equals(method)) { - pitOpenPostCount.incrementAndGet(); - respondJson(exchange, 200, REMOTE_PIT_OPEN_RESPONSE); - return; - } - if (path.equals("/") || path.isEmpty()) { - respondJson(exchange, 200, REMOTE_PIT_TEST_VERSION_JSON); - return; - } - if (path.contains("_search") && "POST".equals(method)) { - respondJson(exchange, 200, REMOTE_PIT_EMPTY_SEARCH_RESPONSE); - return; - } - if (path.contains("_pit") && "DELETE".equals(method)) { - exchange.sendResponseHeaders(200, -1); - return; - } - exchange.sendResponseHeaders(404, -1); - } finally { - exchange.close(); - } - }); - server.start(); - try { - BytesArray matchAll = new BytesArray("{\"match_all\":{}}"); - RemoteInfo remoteInfo = new RemoteInfo( - "http", - server.getAddress().getHostString(), - server.getAddress().getPort(), - null, - matchAll, - null, - null, - emptyMap(), - TimeValue.timeValueSeconds(5), - TimeValue.timeValueSeconds(5) - ); - - ClusterService clusterService = mock(ClusterService.class); - DiscoveryNode localNode = DiscoveryNodeUtils.builder("local-node").build(); - when(clusterService.state()).thenReturn(ClusterState.EMPTY_STATE); - when(clusterService.localNode()).thenReturn(localNode); - - ProjectResolver projectResolver = mock(ProjectResolver.class); - when(projectResolver.getProjectState(any())).thenReturn(ClusterState.EMPTY_STATE.projectState(Metadata.DEFAULT_PROJECT_ID)); - - TestThreadPool threadPool = new TestThreadPool(getTestName()) { - @Override - public ExecutorService executor(String name) { - return DIRECT_EXECUTOR_SERVICE; - } - }; - try { - Environment environment = TestEnvironment.newEnvironment(Settings.builder().put("path.home", createTempDir()).build()); - ReindexSslConfig sslConfig = new ReindexSslConfig(environment.settings(), environment, mock(ResourceWatcherService.class)); - - FeatureService featureService = mock(FeatureService.class); - when(featureService.clusterHasFeature(any(), eq(ReindexPlugin.REINDEX_PIT_SEARCH_FEATURE))).thenReturn(true); - - Reindexer reindexer = new Reindexer( - clusterService, - projectResolver, - mock(Client.class), - threadPool, - mock(ScriptService.class), - sslConfig, - null, - mock(TransportService.class), - mock(ReindexRelocationNodePicker.class), - featureService - ); - - ReindexRequest request = new ReindexRequest(); - request.setDestIndex("dest"); - request.setRemoteInfo(remoteInfo); - request.setSlices(1); - request.getSearchRequest().indices(Strings.EMPTY_ARRAY); - request.getSearchRequest() - .source( - new SearchSourceBuilder().pointInTimeBuilder( - new PointInTimeBuilder(new BytesArray("c29tZXBpdGlk")).setKeepAlive(TimeValue.timeValueMinutes(5)) - ) - ); - // ReindexRequest applies default scroll; PIT and scroll cannot both be set. - request.getSearchRequest().scroll(null); - assertNull(request.validate()); - - BulkByScrollTask task = new BulkByScrollTask( - randomTaskId(), - "reindex", - "reindex", - "test", - TaskId.EMPTY_TASK_ID, - Collections.emptyMap(), - false, - randomOrigin() - ); - - PlainActionFuture initFuture = new PlainActionFuture<>(); - reindexer.initTask( - task, - request, - initFuture.delegateFailure((l, v) -> reindexer.execute(task, request, mock(Client.class), l)) - ); - assertNotNull(initFuture.actionGet()); - assertThat("remote open PIT must be skipped when id already on request", pitOpenPostCount.get(), equalTo(0)); - } finally { - terminate(threadPool); - } - } finally { - server.stop(0); - } - } - - /** - * When the remote cluster returns HTTP 429 (too many requests) for open PIT, the reindex request fails with that status - * after retries are exhausted. - */ - @SuppressForbidden(reason = "use http server for testing") - public void testRemoteReindexingFailsWhenOpenPitIsRejected() throws Exception { - assumeTrue("PIT search must be enabled", ReindexPlugin.REINDEX_PIT_SEARCH_ENABLED); - - AtomicInteger requestSeq = new AtomicInteger(); - HttpServer server = MockHttpServer.createHttp(new InetSocketAddress(InetAddress.getLoopbackAddress(), 0), 0); - server.createContext("/", exchange -> { - try { - String path = exchange.getRequestURI().getPath(); - String method = exchange.getRequestMethod(); - int seq = requestSeq.getAndIncrement(); - if (path.equals("/") || path.isEmpty()) { - respondJson(exchange, 200, REMOTE_PIT_TEST_VERSION_JSON); - return; - } - if (path.contains("_pit") && "POST".equals(method)) { - if (seq >= 1) { - exchange.sendResponseHeaders(TOO_MANY_REQUESTS.getStatus(), -1); - return; - } - } - exchange.sendResponseHeaders(404, -1); - } finally { - exchange.close(); - } - }); - server.start(); - try { - runRemotePitTestWithMockServer(server, r -> r.setMaxRetries(0), initFuture -> { - ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class, initFuture::actionGet); - assertThat(e.status(), equalTo(TOO_MANY_REQUESTS)); - }); - } finally { - server.stop(0); - } - } - - /** - * If {@link RestClient#close()} throws while shutting down after remote version or reindex work, the failure is logged at WARN - * and the completion runnable still runs, so callers are not left hanging. - */ - public void testCloseRestClientAndRunLogsWhenCloseThrows() throws IOException { - ThreadPool threadPool = mock(ThreadPool.class); - when(threadPool.generic()).thenReturn(DIRECT_EXECUTOR_SERVICE); - Reindexer reindexer = new Reindexer( - mock(ClusterService.class), - mock(ProjectResolver.class), - mock(Client.class), - threadPool, - mock(ScriptService.class), - mock(ReindexSslConfig.class), - null, - mock(TransportService.class), - mock(ReindexRelocationNodePicker.class), - mock(FeatureService.class) - ); - RestClient restClient = mock(RestClient.class); - IOException ioException = new IOException("simulated close failure"); - doThrow(ioException).when(restClient).close(); - AtomicBoolean completion = new AtomicBoolean(false); - MockLog.awaitLogger( - () -> reindexer.closeRestClientAndRun(restClient, () -> completion.set(true)), - Reindexer.class, - new MockLog.SeenEventExpectation( - "Failed to close RestClient after version lookup should be logged", - Reindexer.class.getCanonicalName(), - Level.WARN, - "Failed to close RestClient after version lookup" - ) - ); - assertTrue(completion.get()); - } - /** * Client that succeeds on OpenPointInTime and Search (empty results) but fails on ClosePointInTime. * Used to verify that PIT close failures are logged but don't propagate to the main listener. @@ -2409,18 +1758,13 @@ void shutdown() { * Client that captures the OpenPointInTimeRequest when received and returns success. * Counts ClosePointInTimeRequest invocations. Used to verify PIT close behavior. */ - private static class OpenPitCapturingClient extends NoOpClient { + private static final class OpenPitCapturingClient extends NoOpClient { private final AtomicInteger closeCount = new AtomicInteger(0); - private final AtomicInteger openPitCount = new AtomicInteger(0); int getCloseCount() { return closeCount.get(); } - int getOpenPitCount() { - return openPitCount.get(); - } - private final TestThreadPool threadPool; private volatile OpenPointInTimeRequest capturedPitRequest; @@ -2441,7 +1785,6 @@ protected void ActionListener listener ) { if (action == TransportOpenPointInTimeAction.TYPE && request instanceof OpenPointInTimeRequest pitRequest) { - openPitCount.incrementAndGet(); capturedPitRequest = pitRequest; OpenPointInTimeResponse response = new OpenPointInTimeResponse(new BytesArray("pit-id"), 1, 1, 0, 0); listener.onResponse((Response) response); @@ -2468,60 +1811,6 @@ void shutdown() { } } - /** - * Runs slice child {@link ReindexAction} requests by delegating to the same {@link Reindexer} (see {@code reindexerRef}). - */ - private static final class SlicedLocalPitReindexClient extends OpenPitCapturingClient { - private final ClusterService clusterService; - private final AtomicReference reindexerRef; - private final AtomicLong nextChildTaskId; - - SlicedLocalPitReindexClient( - String threadPoolName, - ClusterService clusterService, - AtomicReference reindexerRef, - long firstChildTaskId - ) { - super(threadPoolName); - this.clusterService = clusterService; - this.reindexerRef = reindexerRef; - this.nextChildTaskId = new AtomicLong(firstChildTaskId); - } - - @Override - @SuppressWarnings("unchecked") - protected void doExecute( - ActionType action, - Request request, - ActionListener listener - ) { - if (action == ReindexAction.INSTANCE && request instanceof ReindexRequest rr && rr.getParentTask().isSet()) { - Reindexer reindexer = reindexerRef.get(); - assertNotNull("Reindexer must be set before slice execution", reindexer); - long childId = nextChildTaskId.getAndIncrement(); - TaskId workerTaskId = new TaskId(clusterService.localNode().getId(), childId); - BulkByScrollTask workerTask = new BulkByScrollTask( - workerTaskId, - "reindex", - ReindexAction.NAME, - rr.getDescription(), - rr.getParentTask(), - Collections.emptyMap(), - false, - new ResumeInfo.RelocationOrigin(workerTaskId, System.currentTimeMillis()) - ); - ActionListener bulkListener = (ActionListener) listener; - reindexer.initTask( - workerTask, - rr, - ActionListener.wrap(v -> reindexer.execute(workerTask, rr, this, bulkListener), bulkListener::onFailure) - ); - return; - } - super.doExecute(action, request, listener); - } - } - // --- helpers --- private static final String REMOTE_PIT_TEST_VERSION_JSON = "{\"version\":{\"number\":\"7.10.0\"},\"tagline\":\"You Know, for Search\"}";