diff --git a/modules/reindex-management/src/internalClusterTest/java/org/elasticsearch/reindex/management/ReindexCancelIT.java b/modules/reindex-management/src/internalClusterTest/java/org/elasticsearch/reindex/management/ReindexCancelIT.java
index 58ae056ecf736..f6f0b1adb3fa8 100644
--- a/modules/reindex-management/src/internalClusterTest/java/org/elasticsearch/reindex/management/ReindexCancelIT.java
+++ b/modules/reindex-management/src/internalClusterTest/java/org/elasticsearch/reindex/management/ReindexCancelIT.java
@@ -91,6 +91,8 @@ public void setup() {
* We test synchronous (?wait_for_completion=true) invocation of the _cancel endpoint in this test.
*/
public void testCancelEndpointEndToEndSynchronously() throws Exception {
+ assumeFalse("scroll-based reindex uses a different code path", ReindexPlugin.REINDEX_PIT_SEARCH_ENABLED);
+
final TaskId parentTaskId = startAsyncThrottledReindex();
final TaskInfo running = getRunningTask(parentTaskId);
@@ -139,6 +141,8 @@ public void testCancelEndpointEndToEndSynchronously() throws Exception {
/** Same test as above but calling _cancel asynchronously and wrapping assertions after cancellation in assertBusy. */
public void testCancelEndpointEndToEndAsynchronously() throws Exception {
+ assumeFalse("scroll-based reindex uses a different code path", ReindexPlugin.REINDEX_PIT_SEARCH_ENABLED);
+
final TaskId parentTaskId = startAsyncThrottledReindex();
final TaskInfo running = getRunningTask(parentTaskId);
diff --git a/modules/reindex/src/internalClusterTest/java/org/elasticsearch/index/reindex/ReindexPluginMetricsIT.java b/modules/reindex/src/internalClusterTest/java/org/elasticsearch/index/reindex/ReindexPluginMetricsIT.java
index 653afed6f096b..97ccf1477b42d 100644
--- a/modules/reindex/src/internalClusterTest/java/org/elasticsearch/index/reindex/ReindexPluginMetricsIT.java
+++ b/modules/reindex/src/internalClusterTest/java/org/elasticsearch/index/reindex/ReindexPluginMetricsIT.java
@@ -10,13 +10,16 @@
package org.elasticsearch.index.reindex;
import org.elasticsearch.ElasticsearchStatusException;
+import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.core.TimeValue;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.plugins.PluginsService;
import org.elasticsearch.reindex.BulkIndexByScrollResponseMatcher;
import org.elasticsearch.reindex.ReindexPlugin;
+import org.elasticsearch.reindex.Reindexer;
import org.elasticsearch.reindex.TransportReindexAction;
import org.elasticsearch.rest.root.MainRestPlugin;
import org.elasticsearch.search.slice.SliceBuilder;
@@ -44,6 +47,7 @@
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
+import static org.hamcrest.Matchers.notNullValue;
@ESIntegTestCase.ClusterScope(numDataNodes = 0, numClientNodes = 0, scope = ESIntegTestCase.Scope.TEST)
public class ReindexPluginMetricsIT extends ESIntegTestCase {
@@ -329,6 +333,124 @@ public void testReindexMetricsWithAutoSlices() throws Exception {
});
}
+ /**
+ * Verifies that remote reindex metrics record failures when the remote version lookup fails
+ * (e.g. connection refused, host unreachable).
+ */
+ public void testRemoteReindexVersionLookupFailureMetrics() throws Exception {
+ assumeTrue("PIT search must be enabled for remote version lookup path", ReindexPlugin.REINDEX_PIT_SEARCH_ENABLED);
+
+ final String dataNodeName = internalCluster().startNode();
+
+ // Use an invalid host so version lookup fails during connection
+ RemoteInfo invalidRemote = new RemoteInfo(
+ "http",
+ "invalid.invalid",
+ 9200,
+ null,
+ new BytesArray("{\"match_all\":{}}"),
+ null,
+ null,
+ Map.of(),
+ TimeValue.timeValueMillis(100),
+ TimeValue.timeValueMillis(100)
+ );
+
+ final TestTelemetryPlugin testTelemetryPlugin = internalCluster().getInstance(PluginsService.class, dataNodeName)
+ .filterPlugins(TestTelemetryPlugin.class)
+ .findFirst()
+ .orElseThrow();
+
+ expectThrows(Exception.class, () -> reindex().source("source").setRemoteInfo(invalidRemote).destination("dest").get());
+
+ assertBusy(() -> {
+ testTelemetryPlugin.collect();
+ assertThat(testTelemetryPlugin.getLongHistogramMeasurement(REINDEX_TIME_HISTOGRAM).size(), equalTo(1));
+ List completions = testTelemetryPlugin.getLongCounterMeasurement(REINDEX_COMPLETION_COUNTER);
+ assertThat(completions.size(), equalTo(1));
+ assertThat(completions.getFirst().attributes().get(ATTRIBUTE_NAME_ERROR_TYPE), notNullValue());
+ assertThat(completions.getFirst().attributes().get(ATTRIBUTE_NAME_SOURCE), equalTo(ATTRIBUTE_VALUE_SOURCE_REMOTE));
+ });
+ }
+
+ /**
+ * Verifies that no reindex metrics are recorded when validation fails before the {@link Reindexer} runs
+ * (e.g. source index does not exist).
+ */
+ public void testLocalReindexValidationFailureNoMetrics() {
+ final String dataNodeName = internalCluster().startNode();
+
+ final TestTelemetryPlugin testTelemetryPlugin = internalCluster().getInstance(PluginsService.class, dataNodeName)
+ .filterPlugins(TestTelemetryPlugin.class)
+ .findFirst()
+ .orElseThrow();
+
+ expectThrows(Exception.class, () -> reindex().source("non_existent_index").destination("dest").get());
+
+ testTelemetryPlugin.collect();
+ assertThat(testTelemetryPlugin.getLongHistogramMeasurement(REINDEX_TIME_HISTOGRAM).size(), equalTo(0));
+ assertThat(testTelemetryPlugin.getLongCounterMeasurement(REINDEX_COMPLETION_COUNTER).size(), equalTo(0));
+ }
+
+ /**
+ * Verifies that local reindex metrics record failures when PIT open fails (e.g. source index is closed).
+ */
+ public void testLocalReindexPitOpenFailureMetrics() throws Exception {
+ assumeTrue("PIT search must be enabled for local PIT path", ReindexPlugin.REINDEX_PIT_SEARCH_ENABLED);
+
+ final String dataNodeName = internalCluster().startNode();
+
+ // Create and close the source index so PIT open fails (validation passes because index exists)
+ indexRandom(true, prepareIndex("source").setId("1").setSource("foo", "a"));
+ indicesAdmin().prepareClose("source").get();
+
+ final TestTelemetryPlugin testTelemetryPlugin = internalCluster().getInstance(PluginsService.class, dataNodeName)
+ .filterPlugins(TestTelemetryPlugin.class)
+ .findFirst()
+ .orElseThrow();
+
+ // Use STRICT_EXPAND_OPEN_CLOSED so validation resolves the closed index; PIT open will still fail
+ ReindexRequestBuilder builder = reindex().source("source").destination("dest");
+ builder.source().setIndicesOptions(IndicesOptions.STRICT_EXPAND_OPEN_CLOSED);
+ expectThrows(Exception.class, () -> builder.get());
+
+ assertBusy(() -> {
+ testTelemetryPlugin.collect();
+ assertThat(testTelemetryPlugin.getLongHistogramMeasurement(REINDEX_TIME_HISTOGRAM).size(), equalTo(1));
+ List completions = testTelemetryPlugin.getLongCounterMeasurement(REINDEX_COMPLETION_COUNTER);
+ assertThat(completions.size(), equalTo(1));
+ assertThat(completions.getFirst().attributes().get(ATTRIBUTE_NAME_ERROR_TYPE), notNullValue());
+ assertThat(completions.getFirst().attributes().get(ATTRIBUTE_NAME_SOURCE), equalTo(ATTRIBUTE_VALUE_SOURCE_LOCAL));
+ });
+ }
+
+ /**
+ * Verifies reindex metrics for a successful local reindex.
+ * Uses the scroll path when PIT is disabled, or the PIT path when PIT is enabled.
+ */
+ public void testLocalReindexMetrics() throws Exception {
+ final String dataNodeName = internalCluster().startNode();
+
+ indexRandom(true, prepareIndex("source").setId("1").setSource("foo", "a"), prepareIndex("source").setId("2").setSource("foo", "b"));
+ assertHitCount(prepareSearch("source").setSize(0), 2);
+
+ final TestTelemetryPlugin testTelemetryPlugin = internalCluster().getInstance(PluginsService.class, dataNodeName)
+ .filterPlugins(TestTelemetryPlugin.class)
+ .findFirst()
+ .orElseThrow();
+
+ reindex().source("source").destination("dest").get();
+
+ assertBusy(() -> {
+ testTelemetryPlugin.collect();
+ assertThat(testTelemetryPlugin.getLongHistogramMeasurement(REINDEX_TIME_HISTOGRAM).size(), equalTo(1));
+ List completions = testTelemetryPlugin.getLongCounterMeasurement(REINDEX_COMPLETION_COUNTER);
+ assertThat(completions.size(), equalTo(1));
+ assertNull(completions.getFirst().attributes().get(ATTRIBUTE_NAME_ERROR_TYPE));
+ assertThat(completions.getFirst().attributes().get(ATTRIBUTE_NAME_SOURCE), equalTo(ATTRIBUTE_VALUE_SOURCE_LOCAL));
+ });
+ }
+
public void testDeleteByQueryMetrics() throws Exception {
final String dataNodeName = internalCluster().startNode();
diff --git a/modules/reindex/src/main/java/org/elasticsearch/reindex/Reindexer.java b/modules/reindex/src/main/java/org/elasticsearch/reindex/Reindexer.java
index 91b436025eb6d..cae082e8ddaac 100644
--- a/modules/reindex/src/main/java/org/elasticsearch/reindex/Reindexer.java
+++ b/modules/reindex/src/main/java/org/elasticsearch/reindex/Reindexer.java
@@ -25,7 +25,11 @@
import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.action.search.ClosePointInTimeRequest;
+import org.elasticsearch.action.search.OpenPointInTimeRequest;
import org.elasticsearch.action.search.SearchRequest;
+import org.elasticsearch.action.search.TransportClosePointInTimeAction;
+import org.elasticsearch.action.search.TransportOpenPointInTimeAction;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder;
import org.elasticsearch.client.internal.Client;
@@ -45,6 +49,7 @@
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.core.Nullable;
+import org.elasticsearch.core.TimeValue;
import org.elasticsearch.features.FeatureService;
import org.elasticsearch.index.IndexMode;
import org.elasticsearch.index.IndexSettings;
@@ -98,6 +103,8 @@
import static org.elasticsearch.common.BackoffPolicy.exponentialBackoff;
import static org.elasticsearch.index.VersionType.INTERNAL;
import static org.elasticsearch.reindex.ReindexPlugin.REINDEX_PIT_SEARCH_FEATURE;
+import static org.elasticsearch.reindex.remote.RemoteReindexingUtils.closePit;
+import static org.elasticsearch.reindex.remote.RemoteReindexingUtils.openPit;
public class Reindexer {
@@ -163,7 +170,38 @@ public void execute(BulkByScrollTask task, ReindexRequest request, Client bulkCl
startTime
);
- Consumer workerAction = remoteVersion -> {
+ final boolean isRemote = request.getRemoteInfo() != null;
+ Consumer workerAction = createWorkerAction(task, request, bulkClient, responseListener);
+
+ // Point-in-time searching is disabled, so default to scroll
+ if (featureService.clusterHasFeature(clusterService.state(), REINDEX_PIT_SEARCH_FEATURE) == false) {
+ executePaginatedSearch(task, request, responseListener, workerAction, null);
+ }
+ /**
+ * Point-in-time searching is enabled
+ * As this is a request to reindex from remote, we need to determine the remote version prior to execution
+ * NB {@link ReindexRequest} forbids remote requests and slices > 1, so we're guaranteed to be running on the only slice
+ */
+ else if (isRemote) {
+ lookupRemoteVersionAndExecute(task, request, bulkClient, responseListener, workerAction);
+ }
+ // Point-in-time searching is enabled, and this is a local request
+ else {
+ openPitAndExecute(task, request, bulkClient, responseListener);
+ }
+ }
+
+ /**
+ * Creates the worker action that runs the reindex.
+ * When PIT is used, the listener should include runAfter logic to close the PIT.
+ */
+ private Consumer createWorkerAction(
+ BulkByScrollTask task,
+ ReindexRequest request,
+ Client bulkClient,
+ ActionListener listener
+ ) {
+ return remoteVersion -> {
ParentTaskAssigningClient assigningClient = new ParentTaskAssigningClient(client, clusterService.localNode(), task);
ParentTaskAssigningClient assigningBulkClient = new ParentTaskAssigningClient(bulkClient, clusterService.localNode(), task);
AsyncIndexBySearchAction searchAction = new AsyncIndexBySearchAction(
@@ -176,40 +214,94 @@ public void execute(BulkByScrollTask task, ReindexRequest request, Client bulkCl
projectResolver.getProjectState(clusterService.state()),
reindexSslConfig,
request,
- responseListener,
+ listener,
remoteVersion
);
searchAction.start();
};
+ }
- /**
- * If this is a request to reindex from remote, then we need to determine the remote version prior to execution
- * NB {@link ReindexRequest} forbids remote requests and slices > 1, so we're guaranteed to be running on the only slice
- */
- if (featureService.clusterHasFeature(clusterService.state(), REINDEX_PIT_SEARCH_FEATURE) && request.getRemoteInfo() != null) {
- lookupRemoteVersionAndExecute(task, request, responseListener, workerAction);
- } else {
- BulkByPaginatedSearchParallelizationHelper.executeSlicedAction(
- task,
- request,
- ReindexAction.INSTANCE,
- responseListener,
- client,
- clusterService.localNode(),
- null,
- workerAction
+ /**
+ * Runs the sliced action
+ */
+ private void executePaginatedSearch(
+ BulkByScrollTask task,
+ ReindexRequest request,
+ ActionListener listener,
+ Consumer workerAction,
+ @Nullable Version remoteVersion
+ ) {
+ BulkByPaginatedSearchParallelizationHelper.executeSlicedAction(
+ task,
+ request,
+ ReindexAction.INSTANCE,
+ listener,
+ client,
+ clusterService.localNode(),
+ remoteVersion,
+ workerAction
+ );
+ }
+
+ /**
+ * Returns the keep-alive duration for PIT. Uses the request's scroll time when set, otherwise defaults to 5 minutes.
+ * TODO - https://github.com/elastic/elasticsearch-team/issues/2334
+ */
+ private static TimeValue pitKeepAlive(ReindexRequest request) {
+ return request.getScrollTime() != null ? request.getScrollTime() : TimeValue.timeValueMinutes(5);
+ }
+
+ /**
+ * Opens a PIT on the local cluster, runs the sliced action, and closes the PIT when done.
+ */
+ private void openPitAndExecute(
+ BulkByScrollTask task,
+ ReindexRequest request,
+ Client bulkClient,
+ ActionListener listener
+ ) {
+ SearchRequest searchRequest = request.getSearchRequest();
+ String[] indices = searchRequest.indices();
+
+ // The routing and preference parameters can be set for a PIT request. However, scroll currently does not use these,
+ // so for parity we assert here in case that changes
+ assert searchRequest.routing() == null : "Routing is set in the search request, but is not being used when opening the PIT.";
+ assert searchRequest.preference() == null : "Preference is set in the search request, but is not being used when opening the PIT.";
+ assert searchRequest.allowPartialSearchResults() == null || searchRequest.allowPartialSearchResults() == false
+ : "allow_partial_search_results must be false when opening a PIT to match scroll search behavior";
+
+ // TODO - Do we need to set the IndexFilter field here? https://github.com/elastic/elasticsearch-team/issues/2392
+ OpenPointInTimeRequest pitRequest = new OpenPointInTimeRequest(indices).indicesOptions(searchRequest.indicesOptions())
+ .keepAlive(pitKeepAlive(request))
+ .allowPartialSearchResults(false);
+
+ // NB this is a local request, so we call the TransportAction rather than issuing a REST call
+ client.execute(TransportOpenPointInTimeAction.TYPE, pitRequest, listener.delegateFailureAndWrap((l, pitResponse) -> {
+ BytesReference pitId = pitResponse.getPointInTimeId();
+ ActionListener listenerWithClosePit = ActionListener.runAfter(
+ l,
+ () -> client.execute(
+ TransportClosePointInTimeAction.TYPE,
+ new ClosePointInTimeRequest(pitId),
+ ActionListener.wrap(r -> {}, e -> logger.warn("Failed to close local PIT", e))
+ )
);
- }
+ Consumer workerActionWithClosePit = createWorkerAction(task, request, bulkClient, listenerWithClosePit);
+ // TODO - Pass the point-in-time ID into the BulkByPaginatedSearchParallelizationHelper to be used
+ executePaginatedSearch(task, request, listenerWithClosePit, workerActionWithClosePit, null);
+ }));
}
/**
- * Looks up the remote cluster version when reindexing from a remote source, then runs the sliced action with that version.
- * The RestClient used for the lookup is closed after the callback; closing must happen on a thread other than the
- * RestClient's own thread pool to avoid shutdown failures.
+ * Looks up the remote cluster version when reindexing from a remote source.
+ * If the remote supports PIT (7.10.0+), this opens a PIT, runs the sliced action, and closes the PIT when done.
+ * Otherwise, the default search method is scroll.
+ * The RestClient used for lookup (and PIT open/close when applicable) is closed after completion.
*/
private void lookupRemoteVersionAndExecute(
BulkByScrollTask task,
ReindexRequest request,
+ Client bulkClient,
ActionListener listener,
Consumer workerAction
) {
@@ -219,19 +311,14 @@ private void lookupRemoteVersionAndExecute(
RejectAwareActionListener rejectAwareListener = new RejectAwareActionListener<>() {
@Override
public void onResponse(Version version) {
- closeRestClientAndRun(
- restClient,
- () -> BulkByPaginatedSearchParallelizationHelper.executeSlicedAction(
- task,
- request,
- ReindexAction.INSTANCE,
- listener,
- client,
- clusterService.localNode(),
- version,
- workerAction
- )
- );
+ boolean canUsePit = version.onOrAfter(Version.V_7_10_0);
+ if (canUsePit) {
+ openRemotePitAndExecute(task, request, bulkClient, listener, restClient, version);
+ }
+ // Default to scroll-based search
+ else {
+ closeRestClientAndRun(restClient, () -> executePaginatedSearch(task, request, listener, workerAction, version));
+ }
}
@Override
@@ -244,6 +331,7 @@ public void onRejection(Exception e) {
closeRestClientAndRun(restClient, () -> listener.onFailure(e));
}
};
+
RemoteReindexingUtils.lookupRemoteVersionWithRetries(
logger,
exponentialBackoff(request.getRetryBackoffInitialTime(), request.getMaxRetries()),
@@ -255,6 +343,41 @@ public void onRejection(Exception e) {
);
}
+ /**
+ * Opens a PIT on the remote cluster, runs the sliced action, and closes the PIT when done.
+ * The RestClient is closed after the PIT is closed.
+ */
+ private void openRemotePitAndExecute(
+ BulkByScrollTask task,
+ ReindexRequest request,
+ Client bulkClient,
+ ActionListener listenerWithRelocations,
+ RestClient restClient,
+ Version remoteVersion
+ ) {
+ SearchRequest searchRequest = request.getSearchRequest();
+ String[] indices = searchRequest.indices();
+ // Sends a REST request to the remote node to open a PIT
+ openPit(searchRequest, indices, pitKeepAlive(request), RejectAwareActionListener.wrap(pitId -> {
+ ActionListener listenerWithClosePit = ActionListener.runAfter(
+ listenerWithRelocations,
+ () -> closePit(pitId, RejectAwareActionListener.wrap(v -> closeRestClientAndRun(restClient, () -> {}), e -> {
+ logger.warn("Failed to close remote PIT", e);
+ closeRestClientAndRun(restClient, () -> {});
+ }, e -> {
+ logger.warn("Failed to close remote PIT (rejected)", e);
+ closeRestClientAndRun(restClient, () -> {});
+ }), threadPool, restClient)
+ );
+ Consumer workerActionWithClosePit = createWorkerAction(task, request, bulkClient, listenerWithClosePit);
+ // TODO - Pass the point-in-time ID into the BulkByPaginatedSearchParallelizationHelper to be used
+ executePaginatedSearch(task, request, listenerWithClosePit, workerActionWithClosePit, remoteVersion);
+ },
+ e -> closeRestClientAndRun(restClient, () -> listenerWithRelocations.onFailure(e)),
+ e -> closeRestClientAndRun(restClient, () -> listenerWithRelocations.onFailure(e))
+ ), threadPool, restClient);
+ }
+
/**
* Closes the RestClient on the generic thread pool (to avoid closing from the client's own thread), then runs the given action.
*/
diff --git a/modules/reindex/src/main/java/org/elasticsearch/reindex/remote/RemoteReindexingUtils.java b/modules/reindex/src/main/java/org/elasticsearch/reindex/remote/RemoteReindexingUtils.java
index d5c63436f5345..70e6b22965caf 100644
--- a/modules/reindex/src/main/java/org/elasticsearch/reindex/remote/RemoteReindexingUtils.java
+++ b/modules/reindex/src/main/java/org/elasticsearch/reindex/remote/RemoteReindexingUtils.java
@@ -17,15 +17,18 @@
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.Version;
+import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.client.Request;
import org.elasticsearch.client.Response;
import org.elasticsearch.client.ResponseException;
import org.elasticsearch.client.ResponseListener;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.common.BackoffPolicy;
+import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.common.xcontent.LoggingDeprecationHandler;
import org.elasticsearch.core.Nullable;
+import org.elasticsearch.core.TimeValue;
import org.elasticsearch.index.reindex.RejectAwareActionListener;
import org.elasticsearch.index.reindex.RetryListener;
import org.elasticsearch.rest.RestStatus;
@@ -41,6 +44,7 @@
import java.util.function.Supplier;
import static org.elasticsearch.reindex.remote.RemoteResponseParsers.MAIN_ACTION_PARSER;
+import static org.elasticsearch.reindex.remote.RemoteResponseParsers.OPEN_PIT_PARSER;
/**
* Utility methods for reindexing from remote Elasticsearch clusters.
@@ -59,6 +63,53 @@ public static void lookupRemoteVersion(RejectAwareActionListener listen
execute(new Request("GET", "/"), MAIN_ACTION_PARSER, listener, threadPool, client);
}
+ /**
+ * Opens a point-in-time on the remote cluster. Requires remote version 7.10.0 or later.
+ *
+ * @param indices indices to open PIT on
+ * @param keepAlive PIT keep alive duration
+ * @param listener receives the PIT id on success, or failure/rejection on error
+ * @param threadPool thread pool for preserving thread context
+ * @param client REST client for the remote cluster
+ */
+ public static void openPit(
+ SearchRequest request,
+ String[] indices,
+ TimeValue keepAlive,
+ RejectAwareActionListener listener,
+ ThreadPool threadPool,
+ RestClient client
+ ) {
+ // The routing and preference parameters can be set for a PIT request. However, scroll currently does not use these,
+ // so for parity we assert here in case that changes
+ assert request.routing() == null : "Routing is set in the search request, but is not being used when opening the PIT.";
+ assert request.preference() == null : "Preference is set in the search request, but is not being used when opening the PIT.";
+ assert request.allowPartialSearchResults() == null || request.allowPartialSearchResults() == false
+ : "allow_partial_search_results must be false when opening a PIT to match scroll search behavior";
+ execute(RemoteRequestBuilders.openPit(indices, keepAlive), OPEN_PIT_PARSER, listener, threadPool, client);
+ }
+
+ /**
+ * Closes a point-in-time on the remote cluster.
+ *
+ * @param pitId the PIT id to close
+ * @param listener receives on success, or failure on error
+ * @param threadPool thread pool for preserving thread context
+ * @param client REST client for the remote cluster
+ */
+ public static void closePit(BytesReference pitId, RejectAwareActionListener listener, ThreadPool threadPool, RestClient client) {
+ execute(RemoteRequestBuilders.closePit(pitId), (p, xContentType) -> {
+ try {
+ if (p.nextToken() != null) {
+ p.skipChildren();
+ }
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ return null;
+ }, RejectAwareActionListener.withResponseHandler(listener, v -> listener.onResponse(null)), threadPool, client);
+ }
+
/**
* Looks up the remote cluster version with retries on rejection (e.g. 429 Too Many Requests).
* Matches the retry behavior used by {@link RemoteScrollablePaginatedHitSource} when it looks up the version.
@@ -76,9 +127,13 @@ public static void lookupRemoteVersionWithRetries(
RestClient client,
RejectAwareActionListener delegate
) {
- RetryListener retryListener = new RetryListener<>(logger, threadPool, backoffPolicy, listener -> {
- lookupRemoteVersion(listener, threadPool, client);
- }, delegate);
+ RetryListener retryListener = new RetryListener<>(
+ logger,
+ threadPool,
+ backoffPolicy,
+ listener -> lookupRemoteVersion(listener, threadPool, client),
+ delegate
+ );
lookupRemoteVersion(retryListener, threadPool, client);
}
diff --git a/modules/reindex/src/main/java/org/elasticsearch/reindex/remote/RemoteRequestBuilders.java b/modules/reindex/src/main/java/org/elasticsearch/reindex/remote/RemoteRequestBuilders.java
index ce9403b0df8ea..6f6789050286c 100644
--- a/modules/reindex/src/main/java/org/elasticsearch/reindex/remote/RemoteRequestBuilders.java
+++ b/modules/reindex/src/main/java/org/elasticsearch/reindex/remote/RemoteRequestBuilders.java
@@ -182,6 +182,28 @@ static Request initialSearch(SearchRequest searchRequest, BytesReference query,
return request;
}
+ // TODO - Do we need to set the IndexFilter field here? https://github.com/elastic/elasticsearch-team/issues/2392
+ static Request openPit(String[] indices, TimeValue keepAlive) {
+ StringBuilder path = new StringBuilder("/");
+ addIndices(path, indices);
+ path.append("_pit");
+ Request request = new Request("POST", path.toString());
+ request.addParameter("keep_alive", keepAlive.getStringRep());
+ request.addParameter("allow_partial_search_results", "false");
+ return request;
+ }
+
+ static Request closePit(BytesReference pitId) {
+ Request request = new Request("DELETE", "/_pit");
+ try (XContentBuilder entity = JsonXContent.contentBuilder()) {
+ entity.startObject().field("id", java.util.Base64.getUrlEncoder().encodeToString(BytesReference.toBytes(pitId))).endObject();
+ request.setJsonEntity(Strings.toString(entity));
+ } catch (IOException e) {
+ throw new ElasticsearchException("failed to build close pit entity", e);
+ }
+ return request;
+ }
+
private static void addIndices(StringBuilder path, String[] indices) {
if (indices == null || indices.length == 0) {
return;
diff --git a/modules/reindex/src/main/java/org/elasticsearch/reindex/remote/RemoteResponseParsers.java b/modules/reindex/src/main/java/org/elasticsearch/reindex/remote/RemoteResponseParsers.java
index 818aa7670e0cb..b03d6c9e59b06 100644
--- a/modules/reindex/src/main/java/org/elasticsearch/reindex/remote/RemoteResponseParsers.java
+++ b/modules/reindex/src/main/java/org/elasticsearch/reindex/remote/RemoteResponseParsers.java
@@ -12,6 +12,7 @@
import org.apache.lucene.search.TotalHits;
import org.elasticsearch.Version;
import org.elasticsearch.common.ParsingException;
+import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.core.Tuple;
@@ -30,6 +31,7 @@
import org.elasticsearch.xcontent.XContentType;
import java.io.IOException;
+import java.util.Base64;
import java.util.List;
import java.util.function.BiFunction;
@@ -269,6 +271,24 @@ public void setCausedBy(Throwable causedBy) {
}
}
+ /**
+ * Parser for the open point-in-time response. Returns the PIT id as {@link BytesReference}.
+ */
+ public static final ConstructingObjectParser OPEN_PIT_PARSER = new ConstructingObjectParser<>(
+ "open_pit_response",
+ true,
+ a -> {
+ String id = (String) a[0];
+ if (id == null || id.isEmpty()) {
+ throw new IllegalArgumentException("open point-in-time response must contain [id] field");
+ }
+ return new BytesArray(Base64.getUrlDecoder().decode(id));
+ }
+ );
+ static {
+ OPEN_PIT_PARSER.declareString(optionalConstructorArg(), new ParseField("id"));
+ }
+
/**
* Parses the main action to return just the {@linkplain Version} that it returns. We throw everything else out.
*/
diff --git a/modules/reindex/src/test/java/org/elasticsearch/reindex/BulkByPaginatedSearchParallelizationHelperTests.java b/modules/reindex/src/test/java/org/elasticsearch/reindex/BulkByPaginatedSearchParallelizationHelperTests.java
index dad4e9885e0e4..06b231709d372 100644
--- a/modules/reindex/src/test/java/org/elasticsearch/reindex/BulkByPaginatedSearchParallelizationHelperTests.java
+++ b/modules/reindex/src/test/java/org/elasticsearch/reindex/BulkByPaginatedSearchParallelizationHelperTests.java
@@ -29,7 +29,6 @@
import org.junit.After;
import org.junit.Before;
-import java.io.IOException;
import java.util.Collections;
import java.util.concurrent.atomic.AtomicReference;
@@ -58,7 +57,7 @@ public void tearDownTaskManager() {
terminate(threadPool);
}
- public void testSliceIntoSubRequests() throws IOException {
+ public void testSliceIntoSubRequests() {
SearchRequest searchRequest = randomSearchRequest(
() -> randomSearchSourceBuilder(() -> null, () -> null, () -> null, Collections::emptyList, () -> null, () -> null)
);
diff --git a/modules/reindex/src/test/java/org/elasticsearch/reindex/ReindexerTests.java b/modules/reindex/src/test/java/org/elasticsearch/reindex/ReindexerTests.java
index 3fd64ba3ed12f..7bb88afadd4d4 100644
--- a/modules/reindex/src/test/java/org/elasticsearch/reindex/ReindexerTests.java
+++ b/modules/reindex/src/test/java/org/elasticsearch/reindex/ReindexerTests.java
@@ -9,42 +9,89 @@
package org.elasticsearch.reindex;
+import com.sun.net.httpserver.HttpExchange;
+import com.sun.net.httpserver.HttpServer;
+
+import org.apache.logging.log4j.Level;
+import org.apache.lucene.search.TotalHits;
+import org.elasticsearch.ElasticsearchStatusException;
+import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionListener;
+import org.elasticsearch.action.ActionRequest;
+import org.elasticsearch.action.ActionResponse;
+import org.elasticsearch.action.ActionType;
import org.elasticsearch.action.bulk.BulkItemResponse;
+import org.elasticsearch.action.search.ClosePointInTimeRequest;
+import org.elasticsearch.action.search.ClosePointInTimeResponse;
+import org.elasticsearch.action.search.OpenPointInTimeRequest;
+import org.elasticsearch.action.search.OpenPointInTimeResponse;
+import org.elasticsearch.action.search.SearchRequest;
+import org.elasticsearch.action.search.SearchResponse;
+import org.elasticsearch.action.search.TransportClosePointInTimeAction;
+import org.elasticsearch.action.search.TransportOpenPointInTimeAction;
+import org.elasticsearch.action.search.TransportSearchAction;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.client.internal.Client;
import org.elasticsearch.cluster.ClusterState;
+import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodeUtils;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.project.ProjectResolver;
+import org.elasticsearch.cluster.project.TestProjectResolvers;
import org.elasticsearch.cluster.service.ClusterService;
+import org.elasticsearch.common.bytes.BytesArray;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.core.SuppressForbidden;
import org.elasticsearch.core.TimeValue;
+import org.elasticsearch.env.Environment;
+import org.elasticsearch.env.TestEnvironment;
import org.elasticsearch.features.FeatureService;
import org.elasticsearch.index.reindex.BulkByScrollResponse;
import org.elasticsearch.index.reindex.BulkByScrollTask;
import org.elasticsearch.index.reindex.PaginatedHitSource;
import org.elasticsearch.index.reindex.ReindexRequest;
+import org.elasticsearch.index.reindex.RemoteInfo;
import org.elasticsearch.index.reindex.ResumeBulkByScrollRequest;
import org.elasticsearch.index.reindex.ResumeBulkByScrollResponse;
import org.elasticsearch.index.reindex.ResumeInfo;
import org.elasticsearch.index.reindex.ResumeReindexAction;
+import org.elasticsearch.mocksocket.MockHttpServer;
import org.elasticsearch.script.ScriptService;
+import org.elasticsearch.search.SearchHits;
+import org.elasticsearch.search.SearchResponseUtils;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.tasks.TaskManager;
import org.elasticsearch.test.ESTestCase;
+import org.elasticsearch.test.MockLog;
+import org.elasticsearch.test.client.NoOpClient;
+import org.elasticsearch.threadpool.TestThreadPool;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportResponseHandler;
import org.elasticsearch.transport.TransportService;
+import org.elasticsearch.watcher.ResourceWatcherService;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ExecutorService;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.BiPredicate;
+import java.util.function.Consumer;
+import static java.util.Collections.emptyMap;
+import static org.elasticsearch.common.util.concurrent.EsExecutors.DIRECT_EXECUTOR_SERVICE;
import static org.elasticsearch.core.TimeValue.timeValueMillis;
+import static org.elasticsearch.rest.RestStatus.INTERNAL_SERVER_ERROR;
+import static org.elasticsearch.rest.RestStatus.TOO_MANY_REQUESTS;
+import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyBoolean;
@@ -58,6 +105,7 @@
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;
+@SuppressForbidden(reason = "use a http server")
public class ReindexerTests extends ESTestCase {
// --- wrapWithMetrics tests ---
@@ -283,8 +331,869 @@ public void testListenerWithRelocationsTriggersRelocationWhenResumeInfoPresent()
assertThat(exception.getMetadata("es.relocated_task_id"), equalTo(List.of("target-node:123")));
}
+ /**
+ * When the remote version lookup fails in lookupRemoteVersionAndExecute
+ * (e.g. server returns 500), the failure propagates to the listener.
+ * Uses MockHttpServer instead of a non-connectable host to avoid unreliable connection timeouts.
+ */
+ @SuppressForbidden(reason = "use http server for testing")
+ public void testRemoteReindexingRequestFailsWhenVersionLookupFails() throws Exception {
+ assumeTrue("PIT search must be enabled", ReindexPlugin.REINDEX_PIT_SEARCH_ENABLED);
+
+ HttpServer server = MockHttpServer.createHttp(new InetSocketAddress(InetAddress.getLoopbackAddress(), 0), 0);
+ server.createContext("/", exchange -> {
+ exchange.sendResponseHeaders(INTERNAL_SERVER_ERROR.getStatus(), -1);
+ exchange.close();
+ });
+ server.start();
+ try {
+ runRemotePitTestWithMockServer(server, request -> request.setMaxRetries(0), initFuture -> {
+ ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class, initFuture::actionGet);
+ assertThat(e.status(), equalTo(INTERNAL_SERVER_ERROR));
+ });
+ } finally {
+ server.stop(0);
+ }
+ }
+
+ /**
+ * When the remote version lookup is rejected (429), the failure propagates to the listener
+ * after retries are exhausted.
+ */
+ @SuppressForbidden(reason = "use http server for testing")
+ public void testRemoteReindexingRequestFailsWhenVersionLookupRejected() throws Exception {
+ assumeTrue("PIT search must be enabled", ReindexPlugin.REINDEX_PIT_SEARCH_ENABLED);
+
+ HttpServer server = MockHttpServer.createHttp(new InetSocketAddress(InetAddress.getLoopbackAddress(), 0), 0);
+ server.createContext("/", exchange -> {
+ exchange.sendResponseHeaders(TOO_MANY_REQUESTS.getStatus(), -1);
+ exchange.close();
+ });
+ server.start();
+ try {
+ runRemotePitTestWithMockServer(server, request -> request.setMaxRetries(0), initFuture -> {
+ ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class, initFuture::actionGet);
+ assertThat(e.status(), equalTo(TOO_MANY_REQUESTS));
+ });
+ } finally {
+ server.stop(0);
+ }
+ }
+
+ /**
+ * When opening the remote PIT fails in openRemotePitAndExecute, the failure propagates to the listener.
+ */
+ @SuppressForbidden(reason = "use http server for testing")
+ public void testRemoteReindexingRequestFailsToOpenPit() throws Exception {
+ assumeTrue("PIT search must be enabled", ReindexPlugin.REINDEX_PIT_SEARCH_ENABLED);
+
+ AtomicInteger requestCount = new AtomicInteger(0);
+ HttpServer server = MockHttpServer.createHttp(new InetSocketAddress(InetAddress.getLoopbackAddress(), 0), 0);
+ server.createContext("/", exchange -> {
+ int count = requestCount.getAndIncrement();
+ if (count == 0) {
+ respondJson(exchange, 200, REMOTE_PIT_TEST_VERSION_JSON);
+ } else {
+ exchange.sendResponseHeaders(500, -1);
+ }
+ exchange.close();
+ });
+ server.start();
+ try {
+ runRemotePitTestWithMockServer(server, request -> {}, initFuture -> {
+ ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class, initFuture::actionGet);
+ assertThat(e.status(), equalTo(INTERNAL_SERVER_ERROR));
+ });
+ } finally {
+ server.stop(0);
+ }
+ }
+
+ /**
+ * When closing the remote PIT fails in openRemotePitAndExecute, the failure is logged
+ * but the main listener still receives success.
+ */
+ @SuppressForbidden(reason = "use http server for testing")
+ public void testRemoteReindexingRequestFailsToClosePit() throws Exception {
+ assumeTrue("PIT search must be enabled", ReindexPlugin.REINDEX_PIT_SEARCH_ENABLED);
+
+ HttpServer server = createRemotePitMockServer((path, method) -> path.contains("_pit") && "DELETE".equals(method), exchange -> {
+ try {
+ exchange.sendResponseHeaders(500, -1);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ });
+ server.start();
+ try {
+ MockLog.awaitLogger(() -> {
+ try {
+ runRemotePitTestWithMockServer(server, request -> {}, initFuture -> {
+ BulkByScrollResponse response = initFuture.actionGet();
+ assertNotNull(response);
+ });
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ },
+ Reindexer.class,
+ new MockLog.SeenEventExpectation(
+ "Failed to close remote PIT should be logged",
+ Reindexer.class.getCanonicalName(),
+ Level.WARN,
+ "Failed to close remote PIT"
+ )
+ );
+ } finally {
+ server.stop(0);
+ }
+ }
+
+ /**
+ * When closing the remote PIT is rejected (429) in openRemotePitAndExecute,
+ * the rejection is logged but the main listener still receives success.
+ */
+ @SuppressForbidden(reason = "use http server for testing")
+ public void testRemoteReindexingRequestFailsWhenClosePitIsRejected() throws Exception {
+ assumeTrue("PIT search must be enabled", ReindexPlugin.REINDEX_PIT_SEARCH_ENABLED);
+
+ HttpServer server = createRemotePitMockServer((path, method) -> path.contains("_pit") && "DELETE".equals(method), exchange -> {
+ try {
+ exchange.sendResponseHeaders(TOO_MANY_REQUESTS.getStatus(), -1);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ });
+ server.start();
+ try {
+ MockLog.awaitLogger(() -> {
+ try {
+ runRemotePitTestWithMockServer(server, request -> {}, initFuture -> {
+ BulkByScrollResponse response = initFuture.actionGet();
+ assertNotNull(response);
+ });
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ },
+ Reindexer.class,
+ new MockLog.SeenEventExpectation(
+ "Failed to close remote PIT (rejected) should be logged",
+ Reindexer.class.getCanonicalName(),
+ Level.WARN,
+ "Failed to close remote PIT (rejected)"
+ )
+ );
+ } finally {
+ server.stop(0);
+ }
+ }
+
+ /**
+ * When TransportOpenPointInTimeAction fails in openPitAndExecute, the failure propagates to the listener.
+ * We use a custom Client that fails on OpenPointInTimeRequest; the listener receives that failure.
+ */
+ public void testLocalReindexingRequestFailsToOpenPit() {
+ assumeTrue("PIT search must be enabled", ReindexPlugin.REINDEX_PIT_SEARCH_ENABLED);
+
+ final String expectedMessage = "open-pit-failure-" + randomAlphaOfLength(8);
+ final OpenPitFailingClient client = new OpenPitFailingClient(getTestName(), expectedMessage);
+ try {
+ final ThreadPool threadPool = mock(ThreadPool.class);
+ when(threadPool.generic()).thenReturn(DIRECT_EXECUTOR_SERVICE);
+
+ final ClusterService clusterService = mock(ClusterService.class);
+ final DiscoveryNode localNode = DiscoveryNodeUtils.builder("local-node").build();
+ when(clusterService.state()).thenReturn(ClusterState.EMPTY_STATE);
+ when(clusterService.localNode()).thenReturn(localNode);
+
+ final ProjectResolver projectResolver = mock(ProjectResolver.class);
+ when(projectResolver.getProjectState(any())).thenReturn(ClusterState.EMPTY_STATE.projectState(Metadata.DEFAULT_PROJECT_ID));
+
+ FeatureService featureService = mock(FeatureService.class);
+ when(featureService.clusterHasFeature(any(), eq(ReindexPlugin.REINDEX_PIT_SEARCH_FEATURE))).thenReturn(true);
+
+ final Reindexer reindexer = new Reindexer(
+ clusterService,
+ projectResolver,
+ client,
+ threadPool,
+ mock(ScriptService.class),
+ mock(ReindexSslConfig.class),
+ null,
+ mock(TransportService.class),
+ mock(ReindexRelocationNodePicker.class),
+ featureService
+ );
+
+ final ReindexRequest request = new ReindexRequest();
+ request.setSourceIndices("source");
+ request.setDestIndex("dest");
+ request.setSlices(1);
+
+ final BulkByScrollTask task = new BulkByScrollTask(
+ randomLong(),
+ "reindex",
+ "reindex",
+ "test",
+ TaskId.EMPTY_TASK_ID,
+ Collections.emptyMap(),
+ false
+ );
+
+ final PlainActionFuture initFuture = new PlainActionFuture<>();
+ reindexer.initTask(task, request, initFuture.delegateFailure((l, v) -> reindexer.execute(task, request, client, l)));
+ initFuture.actionGet();
+
+ fail("expected listener to receive failure");
+ } catch (Exception e) {
+ assertThat(ExceptionsHelper.unwrapCause(e).getMessage(), containsString(expectedMessage));
+ } finally {
+ client.shutdown();
+ }
+ }
+
+ /**
+ * When PIT search is enabled and the local PIT close fails, the failure is logged but the main listener
+ * still receives success. This verifies that close failures are handled gracefully and don't propagate.
+ */
+ public void testLocalReindexingRequestFailsToClosePit() {
+ assumeTrue("PIT search must be enabled", ReindexPlugin.REINDEX_PIT_SEARCH_ENABLED);
+
+ final String closeFailureMessage = "close-pit-failure-" + randomAlphaOfLength(8);
+ final ClosePitFailingClient client = new ClosePitFailingClient(getTestName(), closeFailureMessage);
+ try {
+ final TestThreadPool threadPool = new TestThreadPool(getTestName()) {
+ @Override
+ public ExecutorService executor(String name) {
+ return DIRECT_EXECUTOR_SERVICE;
+ }
+ };
+ try {
+ final ClusterService clusterService = mock(ClusterService.class);
+ final DiscoveryNode localNode = DiscoveryNodeUtils.builder("local-node").build();
+ when(clusterService.state()).thenReturn(ClusterState.EMPTY_STATE);
+ when(clusterService.localNode()).thenReturn(localNode);
+
+ final ProjectResolver projectResolver = mock(ProjectResolver.class);
+ when(projectResolver.getProjectState(any())).thenReturn(ClusterState.EMPTY_STATE.projectState(Metadata.DEFAULT_PROJECT_ID));
+
+ FeatureService featureService = mock(FeatureService.class);
+ when(featureService.clusterHasFeature(any(), eq(ReindexPlugin.REINDEX_PIT_SEARCH_FEATURE))).thenReturn(true);
+
+ final Reindexer reindexer = new Reindexer(
+ clusterService,
+ projectResolver,
+ client,
+ threadPool,
+ mock(ScriptService.class),
+ mock(ReindexSslConfig.class),
+ null,
+ mock(TransportService.class),
+ mock(ReindexRelocationNodePicker.class),
+ featureService
+ );
+
+ final ReindexRequest request = new ReindexRequest();
+ request.setSourceIndices("source");
+ request.setDestIndex("dest");
+ request.setSlices(1);
+
+ final BulkByScrollTask task = new BulkByScrollTask(
+ randomLong(),
+ "reindex",
+ "reindex",
+ "test",
+ TaskId.EMPTY_TASK_ID,
+ Collections.emptyMap(),
+ false
+ );
+
+ MockLog.awaitLogger(() -> {
+ final PlainActionFuture initFuture = new PlainActionFuture<>();
+ reindexer.initTask(task, request, initFuture.delegateFailure((l, v) -> reindexer.execute(task, request, client, l)));
+ final BulkByScrollResponse response = initFuture.actionGet();
+ assertNotNull(response);
+ },
+ Reindexer.class,
+ new MockLog.SeenEventExpectation(
+ "Failed to close local PIT should be logged",
+ Reindexer.class.getCanonicalName(),
+ Level.WARN,
+ "Failed to close local PIT"
+ )
+ );
+ } finally {
+ terminate(threadPool);
+ }
+ } finally {
+ client.shutdown();
+ }
+ }
+
+ /**
+ * Verifies that the OpenPointInTimeRequest built in openPitAndExecute has routing and preference unset,
+ * and allowPartialSearchResults explicitly set to false.
+ */
+ public void testLocalOpenPitRequestHasExpectedProperties() {
+ assumeTrue("PIT search must be enabled", ReindexPlugin.REINDEX_PIT_SEARCH_ENABLED);
+
+ final OpenPitCapturingClient client = new OpenPitCapturingClient(getTestName());
+ try {
+ final TestThreadPool threadPool = new TestThreadPool(getTestName()) {
+ @Override
+ public ExecutorService executor(String name) {
+ return DIRECT_EXECUTOR_SERVICE;
+ }
+ };
+ try {
+ final ClusterService clusterService = mock(ClusterService.class);
+ final DiscoveryNode localNode = DiscoveryNodeUtils.builder("local-node").build();
+ when(clusterService.state()).thenReturn(ClusterState.EMPTY_STATE);
+ when(clusterService.localNode()).thenReturn(localNode);
+
+ final ProjectResolver projectResolver = mock(ProjectResolver.class);
+ when(projectResolver.getProjectState(any())).thenReturn(ClusterState.EMPTY_STATE.projectState(Metadata.DEFAULT_PROJECT_ID));
+
+ FeatureService featureService = mock(FeatureService.class);
+ when(featureService.clusterHasFeature(any(), eq(ReindexPlugin.REINDEX_PIT_SEARCH_FEATURE))).thenReturn(true);
+
+ final Reindexer reindexer = new Reindexer(
+ clusterService,
+ projectResolver,
+ client,
+ threadPool,
+ mock(ScriptService.class),
+ mock(ReindexSslConfig.class),
+ null,
+ mock(TransportService.class),
+ mock(ReindexRelocationNodePicker.class),
+ featureService
+ );
+
+ final ReindexRequest request = new ReindexRequest();
+ request.setSourceIndices("source");
+ request.setDestIndex("dest");
+ request.setSlices(1);
+
+ final BulkByScrollTask task = new BulkByScrollTask(
+ randomLong(),
+ "reindex",
+ "reindex",
+ "test",
+ TaskId.EMPTY_TASK_ID,
+ Collections.emptyMap(),
+ false
+ );
+
+ final PlainActionFuture initFuture = new PlainActionFuture<>();
+ reindexer.initTask(task, request, initFuture.delegateFailure((l, v) -> reindexer.execute(task, request, client, l)));
+ initFuture.actionGet();
+
+ OpenPointInTimeRequest pitRequest = client.getCapturedPitRequest();
+ assertNotNull("Expected OpenPointInTimeRequest to have been captured", pitRequest);
+ assertNull("routing should not be set", pitRequest.routing());
+ assertNull("preference should not be set", pitRequest.preference());
+ assertFalse("allowPartialSearchResults should be false", pitRequest.allowPartialSearchResults());
+ } finally {
+ terminate(threadPool);
+ }
+ } finally {
+ client.shutdown();
+ }
+ }
+
+ /**
+ * Verifies that openPitAndExecute throws AssertionError when the SearchRequest has routing set.
+ */
+ public void testLocalOpenPitFailsWhenRoutingSet() {
+ assumeTrue("PIT search must be enabled", ReindexPlugin.REINDEX_PIT_SEARCH_ENABLED);
+
+ final OpenPitCapturingClient client = new OpenPitCapturingClient(getTestName());
+ try {
+ final TestThreadPool threadPool = new TestThreadPool(getTestName()) {
+ @Override
+ public ExecutorService executor(String name) {
+ return DIRECT_EXECUTOR_SERVICE;
+ }
+ };
+ try {
+ final ClusterService clusterService = mock(ClusterService.class);
+ final DiscoveryNode localNode = DiscoveryNodeUtils.builder("local-node").build();
+ when(clusterService.state()).thenReturn(ClusterState.EMPTY_STATE);
+ when(clusterService.localNode()).thenReturn(localNode);
+
+ final ProjectResolver projectResolver = mock(ProjectResolver.class);
+ when(projectResolver.getProjectState(any())).thenReturn(ClusterState.EMPTY_STATE.projectState(Metadata.DEFAULT_PROJECT_ID));
+
+ FeatureService featureService = mock(FeatureService.class);
+ when(featureService.clusterHasFeature(any(), eq(ReindexPlugin.REINDEX_PIT_SEARCH_FEATURE))).thenReturn(true);
+
+ final Reindexer reindexer = new Reindexer(
+ clusterService,
+ projectResolver,
+ client,
+ threadPool,
+ mock(ScriptService.class),
+ mock(ReindexSslConfig.class),
+ null,
+ mock(TransportService.class),
+ mock(ReindexRelocationNodePicker.class),
+ featureService
+ );
+
+ final ReindexRequest request = new ReindexRequest();
+ request.setSourceIndices("source");
+ request.setDestIndex("dest");
+ request.setSlices(1);
+ request.getSearchRequest().routing("r1");
+
+ final BulkByScrollTask task = new BulkByScrollTask(
+ randomLong(),
+ "reindex",
+ "reindex",
+ "test",
+ TaskId.EMPTY_TASK_ID,
+ Collections.emptyMap(),
+ false
+ );
+
+ final PlainActionFuture initFuture = new PlainActionFuture<>();
+ Throwable e = expectThrows(
+ Throwable.class,
+ () -> reindexer.initTask(
+ task,
+ request,
+ initFuture.delegateFailure((l, v) -> reindexer.execute(task, request, client, l))
+ )
+ );
+ assertThat(ExceptionsHelper.unwrapCause(e).getMessage(), containsString("Routing is set in the search request"));
+ } finally {
+ terminate(threadPool);
+ }
+ } finally {
+ client.shutdown();
+ }
+ }
+
+ /**
+ * Verifies that openPitAndExecute throws AssertionError when the SearchRequest has preference set.
+ */
+ public void testLocalOpenPitFailsWhenPreferenceSet() {
+ assumeTrue("PIT search must be enabled", ReindexPlugin.REINDEX_PIT_SEARCH_ENABLED);
+
+ final OpenPitCapturingClient client = new OpenPitCapturingClient(getTestName());
+ try {
+ final TestThreadPool threadPool = new TestThreadPool(getTestName()) {
+ @Override
+ public ExecutorService executor(String name) {
+ return DIRECT_EXECUTOR_SERVICE;
+ }
+ };
+ try {
+ final ClusterService clusterService = mock(ClusterService.class);
+ final DiscoveryNode localNode = DiscoveryNodeUtils.builder("local-node").build();
+ when(clusterService.state()).thenReturn(ClusterState.EMPTY_STATE);
+ when(clusterService.localNode()).thenReturn(localNode);
+
+ final ProjectResolver projectResolver = mock(ProjectResolver.class);
+ when(projectResolver.getProjectState(any())).thenReturn(ClusterState.EMPTY_STATE.projectState(Metadata.DEFAULT_PROJECT_ID));
+
+ FeatureService featureService = mock(FeatureService.class);
+ when(featureService.clusterHasFeature(any(), eq(ReindexPlugin.REINDEX_PIT_SEARCH_FEATURE))).thenReturn(true);
+
+ final Reindexer reindexer = new Reindexer(
+ clusterService,
+ projectResolver,
+ client,
+ threadPool,
+ mock(ScriptService.class),
+ mock(ReindexSslConfig.class),
+ null,
+ mock(TransportService.class),
+ mock(ReindexRelocationNodePicker.class),
+ featureService
+ );
+
+ final ReindexRequest request = new ReindexRequest();
+ request.setSourceIndices("source");
+ request.setDestIndex("dest");
+ request.setSlices(1);
+ request.getSearchRequest().preference("_local");
+
+ final BulkByScrollTask task = new BulkByScrollTask(
+ randomLong(),
+ "reindex",
+ "reindex",
+ "test",
+ TaskId.EMPTY_TASK_ID,
+ Collections.emptyMap(),
+ false
+ );
+
+ final PlainActionFuture initFuture = new PlainActionFuture<>();
+ Throwable e = expectThrows(
+ Throwable.class,
+ () -> reindexer.initTask(
+ task,
+ request,
+ initFuture.delegateFailure((l, v) -> reindexer.execute(task, request, client, l))
+ )
+ );
+ assertThat(ExceptionsHelper.unwrapCause(e).getMessage(), containsString("Preference is set in the search request"));
+ } finally {
+ terminate(threadPool);
+ }
+ } finally {
+ client.shutdown();
+ }
+ }
+
+ /**
+ * Verifies that openPitAndExecute throws AssertionError when the SearchRequest has allowPartialSearchResults set to true.
+ */
+ public void testLocalOpenPitFailsWhenAllowPartialSearchResultsTrue() {
+ assumeTrue("PIT search must be enabled", ReindexPlugin.REINDEX_PIT_SEARCH_ENABLED);
+
+ final OpenPitCapturingClient client = new OpenPitCapturingClient(getTestName());
+ try {
+ final TestThreadPool threadPool = new TestThreadPool(getTestName()) {
+ @Override
+ public ExecutorService executor(String name) {
+ return DIRECT_EXECUTOR_SERVICE;
+ }
+ };
+ try {
+ final ClusterService clusterService = mock(ClusterService.class);
+ final DiscoveryNode localNode = DiscoveryNodeUtils.builder("local-node").build();
+ when(clusterService.state()).thenReturn(ClusterState.EMPTY_STATE);
+ when(clusterService.localNode()).thenReturn(localNode);
+
+ final ProjectResolver projectResolver = mock(ProjectResolver.class);
+ when(projectResolver.getProjectState(any())).thenReturn(ClusterState.EMPTY_STATE.projectState(Metadata.DEFAULT_PROJECT_ID));
+
+ FeatureService featureService = mock(FeatureService.class);
+ when(featureService.clusterHasFeature(any(), eq(ReindexPlugin.REINDEX_PIT_SEARCH_FEATURE))).thenReturn(true);
+
+ final Reindexer reindexer = new Reindexer(
+ clusterService,
+ projectResolver,
+ client,
+ threadPool,
+ mock(ScriptService.class),
+ mock(ReindexSslConfig.class),
+ null,
+ mock(TransportService.class),
+ mock(ReindexRelocationNodePicker.class),
+ featureService
+ );
+
+ final ReindexRequest request = new ReindexRequest();
+ request.setSourceIndices("source");
+ request.setDestIndex("dest");
+ request.setSlices(1);
+ request.getSearchRequest().allowPartialSearchResults(true);
+
+ final BulkByScrollTask task = new BulkByScrollTask(
+ randomLong(),
+ "reindex",
+ "reindex",
+ "test",
+ TaskId.EMPTY_TASK_ID,
+ Collections.emptyMap(),
+ false
+ );
+
+ final PlainActionFuture initFuture = new PlainActionFuture<>();
+ Throwable e = expectThrows(
+ Throwable.class,
+ () -> reindexer.initTask(
+ task,
+ request,
+ initFuture.delegateFailure((l, v) -> reindexer.execute(task, request, client, l))
+ )
+ );
+ assertThat(
+ ExceptionsHelper.unwrapCause(e).getMessage(),
+ containsString("allow_partial_search_results must be false when opening a PIT")
+ );
+ } finally {
+ terminate(threadPool);
+ }
+ } finally {
+ client.shutdown();
+ }
+ }
+
+ /**
+ * Client that succeeds on OpenPointInTime and Search (empty results) but fails on ClosePointInTime.
+ * Used to verify that PIT close failures are logged but don't propagate to the main listener.
+ */
+ private static final class ClosePitFailingClient extends NoOpClient {
+ private final String closeFailureMessage;
+ private final TestThreadPool threadPool;
+
+ ClosePitFailingClient(String threadPoolName, String closeFailureMessage) {
+ super(new TestThreadPool(threadPoolName), TestProjectResolvers.DEFAULT_PROJECT_ONLY);
+ this.threadPool = (TestThreadPool) super.threadPool();
+ this.closeFailureMessage = closeFailureMessage;
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ protected void doExecute(
+ ActionType action,
+ Request request,
+ ActionListener listener
+ ) {
+ if (action == TransportOpenPointInTimeAction.TYPE && request instanceof OpenPointInTimeRequest) {
+ OpenPointInTimeResponse response = new OpenPointInTimeResponse(new BytesArray("pit-id"), 1, 1, 0, 0);
+ listener.onResponse((Response) response);
+ return;
+ }
+ if (action == TransportSearchAction.TYPE && request instanceof SearchRequest) {
+ SearchResponse response = SearchResponseUtils.successfulResponse(
+ SearchHits.empty(new TotalHits(0, TotalHits.Relation.EQUAL_TO), 0)
+ );
+ listener.onResponse((Response) response);
+ response.decRef();
+ return;
+ }
+ if (action == TransportClosePointInTimeAction.TYPE && request instanceof ClosePointInTimeRequest) {
+ listener.onFailure(new RuntimeException(closeFailureMessage));
+ return;
+ }
+ super.doExecute(action, request, listener);
+ }
+
+ void shutdown() {
+ terminate(threadPool);
+ }
+ }
+
+ /**
+ * Client that fails when it receives an OpenPointInTimeRequest. Used to verify the local PIT path is taken.
+ */
+ private static final class OpenPitFailingClient extends NoOpClient {
+ private final String failureMessage;
+ private final TestThreadPool threadPool;
+
+ OpenPitFailingClient(String threadPoolName, String failureMessage) {
+ super(new TestThreadPool(threadPoolName), TestProjectResolvers.DEFAULT_PROJECT_ONLY);
+ this.threadPool = (TestThreadPool) super.threadPool();
+ this.failureMessage = failureMessage;
+ }
+
+ @Override
+ protected void doExecute(
+ ActionType action,
+ Request request,
+ ActionListener listener
+ ) {
+ if (action == TransportOpenPointInTimeAction.TYPE && request instanceof OpenPointInTimeRequest) {
+ listener.onFailure(new RuntimeException(failureMessage));
+ } else {
+ super.doExecute(action, request, listener);
+ }
+ }
+
+ void shutdown() {
+ terminate(threadPool);
+ }
+ }
+
+ /**
+ * Client that captures the OpenPointInTimeRequest when received and returns success.
+ * Used to verify the local PIT request has the expected properties.
+ */
+ private static final class OpenPitCapturingClient extends NoOpClient {
+ private final TestThreadPool threadPool;
+ private volatile OpenPointInTimeRequest capturedPitRequest;
+
+ OpenPitCapturingClient(String threadPoolName) {
+ super(new TestThreadPool(threadPoolName), TestProjectResolvers.DEFAULT_PROJECT_ONLY);
+ this.threadPool = (TestThreadPool) super.threadPool();
+ }
+
+ OpenPointInTimeRequest getCapturedPitRequest() {
+ return capturedPitRequest;
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ protected void doExecute(
+ ActionType action,
+ Request request,
+ ActionListener listener
+ ) {
+ if (action == TransportOpenPointInTimeAction.TYPE && request instanceof OpenPointInTimeRequest pitRequest) {
+ capturedPitRequest = pitRequest;
+ OpenPointInTimeResponse response = new OpenPointInTimeResponse(new BytesArray("pit-id"), 1, 1, 0, 0);
+ listener.onResponse((Response) response);
+ return;
+ }
+ if (action == TransportSearchAction.TYPE && request instanceof SearchRequest) {
+ SearchResponse response = SearchResponseUtils.successfulResponse(
+ SearchHits.empty(new TotalHits(0, TotalHits.Relation.EQUAL_TO), 0)
+ );
+ listener.onResponse((Response) response);
+ response.decRef();
+ return;
+ }
+ if (action == TransportClosePointInTimeAction.TYPE && request instanceof ClosePointInTimeRequest) {
+ listener.onResponse((Response) new ClosePointInTimeResponse(true, 1));
+ return;
+ }
+ super.doExecute(action, request, listener);
+ }
+
+ void shutdown() {
+ terminate(threadPool);
+ }
+ }
+
// --- helpers ---
+ private static final String REMOTE_PIT_TEST_VERSION_JSON = "{\"version\":{\"number\":\"7.10.0\"},\"tagline\":\"You Know, for Search\"}";
+ private static final String REMOTE_PIT_OPEN_RESPONSE = "{\"id\":\"c29tZXBpdGlk\"}";
+ private static final String REMOTE_PIT_EMPTY_SEARCH_RESPONSE = "{"
+ + "\"_scroll_id\":\"scroll1\","
+ + "\"timed_out\":false,"
+ + "\"hits\":{"
+ + "\"total\":0,"
+ + "\"hits\":[]"
+ + "},"
+ + "\"_shards\":{"
+ + "\"total\":1,"
+ + "\"successful\":1,"
+ + "\"failed\":0"
+ + "}"
+ + "}";
+
+ /**
+ * Creates a MockHttpServer that handles the full remote PIT flow (version, open PIT, search, close PIT).
+ * For requests matching the predicate, the customHandler is used; otherwise standard success responses are returned.
+ */
+ @SuppressForbidden(reason = "use http server for testing")
+ private HttpServer createRemotePitMockServer(BiPredicate useCustomHandler, Consumer customHandler)
+ throws IOException {
+ HttpServer server = MockHttpServer.createHttp(new InetSocketAddress(InetAddress.getLoopbackAddress(), 0), 0);
+ server.createContext("/", exchange -> {
+ String path = exchange.getRequestURI().getPath();
+ String method = exchange.getRequestMethod();
+ if (useCustomHandler.test(path, method)) {
+ customHandler.accept(exchange);
+ } else if (path.equals("/") || path.isEmpty()) {
+ respondJson(exchange, 200, REMOTE_PIT_TEST_VERSION_JSON);
+ } else if (path.contains("_pit") && "POST".equals(method)) {
+ respondJson(exchange, 200, REMOTE_PIT_OPEN_RESPONSE);
+ } else if (path.contains("_search") && "POST".equals(method)) {
+ respondJson(exchange, 200, REMOTE_PIT_EMPTY_SEARCH_RESPONSE);
+ } else if (path.contains("_search/scroll") && "DELETE".equals(method)) {
+ exchange.sendResponseHeaders(200, -1);
+ } else {
+ exchange.sendResponseHeaders(404, -1);
+ }
+ exchange.close();
+ });
+ return server;
+ }
+
+ private static void respondJson(HttpExchange exchange, int status, String json) throws IOException {
+ byte[] body = json.getBytes(StandardCharsets.UTF_8);
+ exchange.getResponseHeaders().set("Content-Type", "application/json");
+ exchange.sendResponseHeaders(status, body.length);
+ try (OutputStream out = exchange.getResponseBody()) {
+ out.write(body);
+ }
+ }
+
+ /**
+ * Runs a remote PIT reindex test against a MockHttpServer. The server must already be started.
+ */
+ @SuppressForbidden(reason = "use http server for testing")
+ private void runRemotePitTestWithMockServer(
+ HttpServer server,
+ Consumer requestConfigurer,
+ Consumer> assertions
+ ) {
+ BytesArray matchAll = new BytesArray("{\"match_all\":{}}");
+ RemoteInfo remoteInfo = new RemoteInfo(
+ "http",
+ server.getAddress().getHostString(),
+ server.getAddress().getPort(),
+ null,
+ matchAll,
+ null,
+ null,
+ emptyMap(),
+ TimeValue.timeValueSeconds(5),
+ TimeValue.timeValueSeconds(5)
+ );
+
+ ReindexRequest request = new ReindexRequest();
+ request.setSourceIndices("source");
+ request.setDestIndex("dest");
+ request.setRemoteInfo(remoteInfo);
+ request.setSlices(1);
+ requestConfigurer.accept(request);
+
+ ClusterService clusterService = mock(ClusterService.class);
+ DiscoveryNode localNode = DiscoveryNodeUtils.builder("local-node").build();
+ when(clusterService.state()).thenReturn(ClusterState.EMPTY_STATE);
+ when(clusterService.localNode()).thenReturn(localNode);
+
+ ProjectResolver projectResolver = mock(ProjectResolver.class);
+ when(projectResolver.getProjectState(any())).thenReturn(ClusterState.EMPTY_STATE.projectState(Metadata.DEFAULT_PROJECT_ID));
+
+ TestThreadPool threadPool = new TestThreadPool(getTestName()) {
+ @Override
+ public ExecutorService executor(String name) {
+ return DIRECT_EXECUTOR_SERVICE;
+ }
+ };
+ try {
+ Environment environment = TestEnvironment.newEnvironment(Settings.builder().put("path.home", createTempDir()).build());
+ ReindexSslConfig sslConfig = new ReindexSslConfig(environment.settings(), environment, mock(ResourceWatcherService.class));
+
+ FeatureService featureService = mock(FeatureService.class);
+ when(featureService.clusterHasFeature(any(), eq(ReindexPlugin.REINDEX_PIT_SEARCH_FEATURE))).thenReturn(true);
+
+ Reindexer reindexer = new Reindexer(
+ clusterService,
+ projectResolver,
+ mock(Client.class),
+ threadPool,
+ mock(ScriptService.class),
+ sslConfig,
+ null,
+ mock(TransportService.class),
+ mock(ReindexRelocationNodePicker.class),
+ featureService
+ );
+
+ BulkByScrollTask task = new BulkByScrollTask(
+ randomLong(),
+ "reindex",
+ "reindex",
+ "test",
+ TaskId.EMPTY_TASK_ID,
+ Collections.emptyMap(),
+ false
+ );
+
+ PlainActionFuture initFuture = new PlainActionFuture<>();
+ reindexer.initTask(
+ task,
+ request,
+ initFuture.delegateFailure((l, v) -> reindexer.execute(task, request, mock(Client.class), l))
+ );
+ assertions.accept(initFuture);
+ } finally {
+ terminate(threadPool);
+ }
+ }
+
private BulkByScrollResponse reindexResponseWithBulkAndSearchFailures(
final List bulkFailures,
List searchFailures
diff --git a/modules/reindex/src/test/java/org/elasticsearch/reindex/remote/RemoteReindexingUtilsTests.java b/modules/reindex/src/test/java/org/elasticsearch/reindex/remote/RemoteReindexingUtilsTests.java
index 9969a01c99d07..c850090edfc05 100644
--- a/modules/reindex/src/test/java/org/elasticsearch/reindex/remote/RemoteReindexingUtilsTests.java
+++ b/modules/reindex/src/test/java/org/elasticsearch/reindex/remote/RemoteReindexingUtilsTests.java
@@ -21,11 +21,14 @@
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.Version;
+import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.client.Response;
import org.elasticsearch.client.ResponseException;
import org.elasticsearch.client.ResponseListener;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.common.BackoffPolicy;
+import org.elasticsearch.common.bytes.BytesArray;
+import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.FileSystemUtils;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.core.TimeValue;
@@ -40,6 +43,7 @@
import java.io.IOException;
import java.net.URL;
+import java.util.Base64;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -58,6 +62,17 @@ public class RemoteReindexingUtilsTests extends ESTestCase {
private static final Logger logger = LogManager.getLogger(RemoteReindexingUtilsTests.class);
+ private static String getAllExceptionMessages(Throwable t) {
+ StringBuilder sb = new StringBuilder();
+ while (t != null) {
+ if (t.getMessage() != null) {
+ sb.append(t.getMessage()).append(" ");
+ }
+ t = t.getCause();
+ }
+ return sb.toString();
+ }
+
private ThreadPool threadPool;
private RestClient client;
@@ -351,7 +366,7 @@ public void testLookupRemoteVersionWithRetriesSucceedsOnRetry() throws Exception
/**
* Verifies that lookupRemoteVersionWithRetries propagates failure when retries are exhausted.
*/
- public void testLookupRemoteVersionWithRetriesExhaustedPropagatesFailure() throws Exception {
+ public void testLookupRemoteVersionWithRetriesExhaustedPropagatesFailure() {
Response rejectionResponse = rejectionResponse429();
doAnswer(inv -> {
((ResponseListener) inv.getArgument(1)).onFailure(new ResponseException(rejectionResponse));
@@ -429,6 +444,304 @@ public void testLookupRemoteVersionWithRetriesSucceedsOnFirstCall() throws Excep
verify(client, times(1)).performRequestAsync(any(), any());
}
+ /**
+ * Verifies that openPit parses a valid open PIT response and invokes onResponse with the decoded PIT id.
+ */
+ public void testOpenPitSuccess() {
+ byte[] pitIdBytes = randomByteArrayOfLength(between(1, 64));
+ String base64Id = Base64.getUrlEncoder().encodeToString(pitIdBytes);
+ String json = "{\"id\":\"" + base64Id + "\"}";
+ Response response = mock(Response.class);
+ when(response.getEntity()).thenReturn(new StringEntity(json, ContentType.APPLICATION_JSON));
+ mockSuccess(response);
+
+ String index = randomAlphaOfLength(between(1, 10));
+ SearchRequest searchRequest = new SearchRequest().indices(index);
+ AtomicBoolean success = new AtomicBoolean(false);
+ BytesReference[] capturedPitId = new BytesReference[1];
+ RemoteReindexingUtils.openPit(
+ searchRequest,
+ new String[] { index },
+ TimeValue.timeValueMillis(between(1, 60000)),
+ RejectAwareActionListener.wrap(pitId -> {
+ capturedPitId[0] = pitId;
+ success.set(true);
+ }, e -> fail("unexpected failure"), e -> fail("unexpected rejection")),
+ threadPool,
+ client
+ );
+ assertTrue("listener should have received success", success.get());
+ assertArrayEquals(pitIdBytes, BytesReference.toBytes(capturedPitId[0]));
+ }
+
+ /**
+ * Verifies that openPit invokes onRejection when the remote returns HTTP 429.
+ */
+ public void testOpenPitTooManyRequestsTriggersRejection() throws Exception {
+ mockFailure(new ResponseException(rejectionResponse429()));
+
+ String index = randomAlphaOfLength(between(1, 10));
+ SearchRequest searchRequest = new SearchRequest().indices(index);
+ AtomicBoolean rejected = new AtomicBoolean(false);
+ RemoteReindexingUtils.openPit(
+ searchRequest,
+ new String[] { index },
+ randomPositiveTimeValue(),
+ RejectAwareActionListener.wrap(v -> fail("unexpected success"), e -> fail("unexpected failure"), e -> rejected.set(true)),
+ threadPool,
+ client
+ );
+ assertTrue("onRejection should have been called", rejected.get());
+ }
+
+ /**
+ * Verifies that openPit invokes onFailure when the remote returns a non-429 HTTP error.
+ */
+ public void testOpenPitHttpErrorTriggersFailure() throws Exception {
+ int statusCode = randomFrom(RestStatus.BAD_REQUEST, RestStatus.NOT_FOUND, RestStatus.INTERNAL_SERVER_ERROR).getStatus();
+ StatusLine statusLine = mock(StatusLine.class);
+ when(statusLine.getStatusCode()).thenReturn(statusCode);
+ Response response = mock(Response.class);
+ when(response.getStatusLine()).thenReturn(statusLine);
+ when(response.getEntity()).thenReturn(new StringEntity(randomAlphaOfLength(between(1, 20)), ContentType.TEXT_PLAIN));
+ RequestLine requestLine = mock(RequestLine.class);
+ when(requestLine.getMethod()).thenReturn("POST");
+ when(response.getRequestLine()).thenReturn(requestLine);
+ mockFailure(new ResponseException(response));
+
+ String index = randomAlphaOfLength(between(1, 10));
+ SearchRequest searchRequest = new SearchRequest().indices(index);
+ AtomicBoolean failed = new AtomicBoolean(false);
+ RemoteReindexingUtils.openPit(
+ searchRequest,
+ new String[] { index },
+ randomPositiveTimeValue(),
+ RejectAwareActionListener.wrap(v -> fail("unexpected success"), e -> {
+ assertTrue(e instanceof ElasticsearchStatusException);
+ assertEquals(statusCode, ((ElasticsearchStatusException) e).status().getStatus());
+ failed.set(true);
+ }, e -> fail("unexpected rejection")),
+ threadPool,
+ client
+ );
+ assertTrue("onFailure should have been called", failed.get());
+ }
+
+ /**
+ * Verifies that openPit invokes onFailure when the response body is invalid JSON.
+ */
+ public void testOpenPitInvalidJsonTriggersFailure() {
+ String invalidJson = randomAlphaOfLength(between(5, 20)) + "!!!";
+ Response response = mock(Response.class);
+ when(response.getEntity()).thenReturn(new StringEntity(invalidJson, ContentType.APPLICATION_JSON));
+ mockSuccess(response);
+
+ String index = randomAlphaOfLength(between(1, 10));
+ SearchRequest searchRequest = new SearchRequest().indices(index);
+ AtomicBoolean failed = new AtomicBoolean(false);
+ RemoteReindexingUtils.openPit(
+ searchRequest,
+ new String[] { index },
+ randomPositiveTimeValue(),
+ RejectAwareActionListener.wrap(v -> fail("unexpected success"), e -> {
+ assertTrue(e instanceof ElasticsearchException);
+ assertThat(e.getMessage(), containsString("remote is likely not an Elasticsearch instance"));
+ failed.set(true);
+ }, e -> fail("unexpected rejection")),
+ threadPool,
+ client
+ );
+ assertTrue("onFailure should have been called", failed.get());
+ }
+
+ /**
+ * Verifies that openPit invokes onFailure when the response is valid JSON but missing the required id field.
+ */
+ public void testOpenPitMissingIdFieldTriggersFailure() {
+ String json = "{\"other\":\"" + randomAlphaOfLength(between(1, 10)) + "\"}";
+ Response response = mock(Response.class);
+ when(response.getEntity()).thenReturn(new StringEntity(json, ContentType.APPLICATION_JSON));
+ mockSuccess(response);
+
+ String index = randomAlphaOfLength(between(1, 10));
+ SearchRequest searchRequest = new SearchRequest().indices(index);
+ AtomicBoolean failed = new AtomicBoolean(false);
+ RemoteReindexingUtils.openPit(
+ searchRequest,
+ new String[] { index },
+ randomPositiveTimeValue(),
+ RejectAwareActionListener.wrap(v -> fail("unexpected success"), e -> {
+ assertTrue(e instanceof ElasticsearchException);
+ assertThat(getAllExceptionMessages(e), containsString("open point-in-time response must contain [id] field"));
+ failed.set(true);
+ }, e -> fail("unexpected rejection")),
+ threadPool,
+ client
+ );
+ assertTrue("onFailure should have been called", failed.get());
+ }
+
+ /**
+ * Verifies that openPit throws AssertionError when the SearchRequest has routing set
+ */
+ public void testOpenPitFailsWhenRoutingSet() {
+ SearchRequest searchRequest = new SearchRequest().indices("index");
+ searchRequest.routing("some-routing");
+ AssertionError e = expectThrows(
+ AssertionError.class,
+ () -> RemoteReindexingUtils.openPit(
+ searchRequest,
+ new String[] { "index" },
+ randomPositiveTimeValue(),
+ RejectAwareActionListener.wrap(
+ v -> fail("unexpected success"),
+ err -> fail("unexpected failure"),
+ err -> fail("unexpected rejection")
+ ),
+ threadPool,
+ client
+ )
+ );
+ assertThat(e.getMessage(), containsString("Routing is set"));
+ }
+
+ /**
+ * Verifies that openPit throws AssertionError when the SearchRequest has preference set
+ */
+ public void testOpenPitFailsWhenPreferenceSet() {
+ SearchRequest searchRequest = new SearchRequest().indices("index");
+ searchRequest.preference("_local");
+ AssertionError e = expectThrows(
+ AssertionError.class,
+ () -> RemoteReindexingUtils.openPit(
+ searchRequest,
+ new String[] { "index" },
+ randomPositiveTimeValue(),
+ RejectAwareActionListener.wrap(
+ v -> fail("unexpected success"),
+ err -> fail("unexpected failure"),
+ err -> fail("unexpected rejection")
+ ),
+ threadPool,
+ client
+ )
+ );
+ assertThat(e.getMessage(), containsString("Preference is set"));
+ }
+
+ /**
+ * Verifies that openPit throws AssertionError when the SearchRequest has allowPartialSearchResults set to true
+ * since scroll search defaults to false
+ */
+ public void testOpenPitFailsWhenAllowPartialSearchResultsTrue() {
+ SearchRequest searchRequest = new SearchRequest().indices("index");
+ searchRequest.allowPartialSearchResults(true);
+ AssertionError e = expectThrows(
+ AssertionError.class,
+ () -> RemoteReindexingUtils.openPit(
+ searchRequest,
+ new String[] { "index" },
+ randomPositiveTimeValue(),
+ RejectAwareActionListener.wrap(
+ v -> fail("unexpected success"),
+ err -> fail("unexpected failure"),
+ err -> fail("unexpected rejection")
+ ),
+ threadPool,
+ client
+ )
+ );
+ assertThat(e.getMessage(), containsString("allow_partial_search_results"));
+ }
+
+ /**
+ * Verifies that closePit invokes onResponse when the remote returns a successful close PIT response.
+ */
+ public void testClosePitSuccess() {
+ String json = "{\"succeeded\":" + randomBoolean() + "}";
+ Response response = mock(Response.class);
+ when(response.getEntity()).thenReturn(new StringEntity(json, ContentType.APPLICATION_JSON));
+ mockSuccess(response);
+
+ AtomicBoolean success = new AtomicBoolean(false);
+ BytesReference pitId = new BytesArray(randomByteArrayOfLength(between(1, 32)));
+ RemoteReindexingUtils.closePit(
+ pitId,
+ RejectAwareActionListener.wrap(v -> success.set(true), e -> fail("unexpected failure"), e -> fail("unexpected rejection")),
+ threadPool,
+ client
+ );
+ assertTrue("listener should have received success", success.get());
+ }
+
+ /**
+ * Verifies that closePit invokes onRejection when the remote returns HTTP 429.
+ */
+ public void testClosePitTooManyRequestsTriggersRejection() throws Exception {
+ mockFailure(new ResponseException(rejectionResponse429()));
+
+ AtomicBoolean rejected = new AtomicBoolean(false);
+ RemoteReindexingUtils.closePit(
+ new BytesArray(randomByteArrayOfLength(between(1, 32))),
+ RejectAwareActionListener.wrap(v -> fail("unexpected success"), e -> fail("unexpected failure"), e -> rejected.set(true)),
+ threadPool,
+ client
+ );
+ assertTrue("onRejection should have been called", rejected.get());
+ }
+
+ /**
+ * Verifies that closePit invokes onFailure when the remote returns a non-429 HTTP error.
+ */
+ public void testClosePitHttpErrorTriggersFailure() throws Exception {
+ int statusCode = randomFrom(RestStatus.BAD_REQUEST, RestStatus.NOT_FOUND, RestStatus.INTERNAL_SERVER_ERROR).getStatus();
+ StatusLine statusLine = mock(StatusLine.class);
+ when(statusLine.getStatusCode()).thenReturn(statusCode);
+ Response response = mock(Response.class);
+ when(response.getStatusLine()).thenReturn(statusLine);
+ when(response.getEntity()).thenReturn(new StringEntity(randomAlphaOfLength(between(1, 20)), ContentType.TEXT_PLAIN));
+ RequestLine requestLine = mock(RequestLine.class);
+ when(requestLine.getMethod()).thenReturn("DELETE");
+ when(response.getRequestLine()).thenReturn(requestLine);
+ mockFailure(new ResponseException(response));
+
+ AtomicBoolean failed = new AtomicBoolean(false);
+ RemoteReindexingUtils.closePit(
+ new BytesArray(randomByteArrayOfLength(between(1, 32))),
+ RejectAwareActionListener.wrap(v -> fail("unexpected success"), e -> {
+ assertTrue(e instanceof ElasticsearchStatusException);
+ assertEquals(statusCode, ((ElasticsearchStatusException) e).status().getStatus());
+ failed.set(true);
+ }, e -> fail("unexpected rejection")),
+ threadPool,
+ client
+ );
+ assertTrue("onFailure should have been called", failed.get());
+ }
+
+ /**
+ * Verifies that closePit invokes onFailure when the response body is invalid JSON.
+ */
+ public void testClosePitInvalidJsonTriggersFailure() {
+ String invalidJson = randomAlphaOfLength(between(5, 20)) + "!!!";
+ Response response = mock(Response.class);
+ when(response.getEntity()).thenReturn(new StringEntity(invalidJson, ContentType.APPLICATION_JSON));
+ mockSuccess(response);
+
+ AtomicBoolean failed = new AtomicBoolean(false);
+ RemoteReindexingUtils.closePit(
+ new BytesArray(randomByteArrayOfLength(between(1, 32))),
+ RejectAwareActionListener.wrap(v -> fail("unexpected success"), e -> {
+ assertTrue(e instanceof ElasticsearchException);
+ assertThat(e.getMessage(), containsString("remote is likely not an Elasticsearch instance"));
+ failed.set(true);
+ }, e -> fail("unexpected rejection")),
+ threadPool,
+ client
+ );
+ assertTrue("onFailure should have been called", failed.get());
+ }
+
private Response successResponse(String resource) throws Exception {
URL url = Thread.currentThread().getContextClassLoader().getResource("responses/" + resource);
assertNotNull("missing test resource [" + resource + "]", url);
diff --git a/modules/reindex/src/test/java/org/elasticsearch/reindex/remote/RemoteRequestBuildersTests.java b/modules/reindex/src/test/java/org/elasticsearch/reindex/remote/RemoteRequestBuildersTests.java
index 723d4582ff198..19e0e249e5dd6 100644
--- a/modules/reindex/src/test/java/org/elasticsearch/reindex/remote/RemoteRequestBuildersTests.java
+++ b/modules/reindex/src/test/java/org/elasticsearch/reindex/remote/RemoteRequestBuildersTests.java
@@ -26,11 +26,14 @@
import java.io.IOException;
import java.io.InputStreamReader;
import java.nio.charset.StandardCharsets;
+import java.util.Base64;
import java.util.Map;
import static org.elasticsearch.core.TimeValue.timeValueMillis;
import static org.elasticsearch.reindex.remote.RemoteRequestBuilders.clearScroll;
+import static org.elasticsearch.reindex.remote.RemoteRequestBuilders.closePit;
import static org.elasticsearch.reindex.remote.RemoteRequestBuilders.initialSearch;
+import static org.elasticsearch.reindex.remote.RemoteRequestBuilders.openPit;
import static org.elasticsearch.reindex.remote.RemoteRequestBuilders.scroll;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.containsString;
@@ -40,6 +43,8 @@
import static org.hamcrest.Matchers.hasEntry;
import static org.hamcrest.Matchers.hasKey;
import static org.hamcrest.Matchers.not;
+import static org.hamcrest.Matchers.nullValue;
+import static org.hamcrest.Matchers.startsWith;
/**
* Tests for {@link RemoteRequestBuilders} which builds requests for remote version of
@@ -330,4 +335,134 @@ public void testClearScroll() throws IOException {
assertEquals(scroll, Streams.copyToString(new InputStreamReader(request.getEntity().getContent(), StandardCharsets.UTF_8)));
assertThat(request.getParameters().keySet(), empty());
}
+
+ /**
+ * Verifies that openPit builds a POST request to the correct path for a single index.
+ */
+ public void testOpenPitSingleIndex() {
+ String index = randomAlphaOfLength(between(1, 20));
+ TimeValue keepAlive = randomPositiveTimeValue();
+ Request request = openPit(new String[] { index }, keepAlive);
+ assertEquals("POST", request.getMethod());
+ assertEquals("/" + index + "/_pit", request.getEndpoint());
+ assertThat(request.getParameters(), hasEntry("keep_alive", keepAlive.getStringRep()));
+ assertThat(request.getParameters(), hasEntry("allow_partial_search_results", "false"));
+ }
+
+ /**
+ * Verifies that openPit builds a POST request to the correct path for multiple indices.
+ */
+ public void testOpenPitMultipleIndices() {
+ int numIndices = between(2, 10);
+ String[] indices = new String[numIndices];
+ StringBuilder expectedPath = new StringBuilder("/");
+ for (int i = 0; i < numIndices; i++) {
+ indices[i] = randomAlphaOfLength(between(1, 10));
+ if (i > 0) {
+ expectedPath.append(",");
+ }
+ expectedPath.append(indices[i]);
+ }
+ expectedPath.append("/_pit");
+ TimeValue keepAlive = randomPositiveTimeValue();
+ Request request = openPit(indices, keepAlive);
+ assertEquals("POST", request.getMethod());
+ assertEquals(expectedPath.toString(), request.getEndpoint());
+ assertThat(request.getParameters(), hasEntry("keep_alive", keepAlive.getStringRep()));
+ assertThat(request.getParameters(), hasEntry("allow_partial_search_results", "false"));
+ }
+
+ /**
+ * Verifies that openPit uses /_pit when indices are null or empty, matching addIndices behavior.
+ */
+ public void testOpenPitNullOrEmptyIndices() {
+ TimeValue keepAlive = randomPositiveTimeValue();
+ Request nullRequest = openPit(null, keepAlive);
+ assertEquals("POST", nullRequest.getMethod());
+ assertEquals("/_pit", nullRequest.getEndpoint());
+ assertThat(nullRequest.getParameters(), hasEntry("keep_alive", keepAlive.getStringRep()));
+ assertThat(nullRequest.getParameters(), hasEntry("allow_partial_search_results", "false"));
+
+ Request emptyRequest = openPit(new String[] {}, keepAlive);
+ assertEquals("POST", emptyRequest.getMethod());
+ assertEquals("/_pit", emptyRequest.getEndpoint());
+ assertThat(emptyRequest.getParameters(), hasEntry("keep_alive", keepAlive.getStringRep()));
+ assertThat(emptyRequest.getParameters(), hasEntry("allow_partial_search_results", "false"));
+ }
+
+ /**
+ * Verifies that openPit URL-encodes index names containing special characters (comma, slash).
+ */
+ public void testOpenPitEncodesSpecialCharactersInIndices() {
+ String prefix1 = randomAlphaOfLength(between(1, 5));
+ String prefix2 = randomAlphaOfLength(between(1, 5));
+ Request request = openPit(new String[] { prefix1 + ",", prefix2 + "/" }, randomPositiveTimeValue());
+ assertEquals("POST", request.getMethod());
+ assertEquals("/" + prefix1 + "%2C," + prefix2 + "%2F/_pit", request.getEndpoint());
+ assertThat(request.getParameters(), hasEntry("allow_partial_search_results", "false"));
+ }
+
+ /**
+ * Verifies that openPit passes through various TimeValue formats for keep_alive.
+ */
+ public void testOpenPitKeepAliveParameter() {
+ String index = randomAlphaOfLength(between(1, 10));
+ long millis = between(1, 100000);
+ var params = openPit(new String[] { index }, timeValueMillis(millis)).getParameters();
+ assertThat(params, hasEntry("allow_partial_search_results", "false"));
+ assertThat(params, hasEntry("keep_alive", TimeValue.timeValueMillis(millis).getStringRep()));
+ int minutes = between(1, 60);
+ assertThat(
+ openPit(new String[] { index }, TimeValue.timeValueMinutes(minutes)).getParameters(),
+ hasEntry("keep_alive", TimeValue.timeValueMinutes(minutes).getStringRep())
+ );
+ int hours = between(1, 24);
+ assertThat(
+ openPit(new String[] { index }, TimeValue.timeValueHours(hours)).getParameters(),
+ hasEntry("keep_alive", TimeValue.timeValueHours(hours).getStringRep())
+ );
+ }
+
+ /**
+ * Verifies that closePit builds a DELETE request to /_pit with a JSON body containing the base64-encoded PIT id.
+ */
+ public void testClosePitRequestStructure() throws IOException {
+ byte[] pitIdBytes = randomByteArrayOfLength(between(1, 64));
+ BytesReference pitId = new BytesArray(pitIdBytes);
+ Request request = closePit(pitId);
+ assertEquals("DELETE", request.getMethod());
+ assertEquals("/_pit", request.getEndpoint());
+ assertThat(request.getEntity(), not(nullValue()));
+ assertEquals(ContentType.APPLICATION_JSON.toString(), request.getEntity().getContentType().getValue());
+ String body = Streams.copyToString(new InputStreamReader(request.getEntity().getContent(), StandardCharsets.UTF_8));
+ String expectedId = Base64.getUrlEncoder().encodeToString(pitIdBytes);
+ assertThat(body, containsString("\"id\":\"" + expectedId + "\""));
+ }
+
+ /**
+ * Verifies that closePit correctly encodes the PIT id for binary-like content.
+ */
+ public void testClosePitEncodesBinaryPitId() throws IOException {
+ byte[] pitIdBytes = randomByteArrayOfLength(between(1, 32));
+ BytesReference pitId = new BytesArray(pitIdBytes);
+ Request request = closePit(pitId);
+ String body = Streams.copyToString(new InputStreamReader(request.getEntity().getContent(), StandardCharsets.UTF_8));
+ String expectedId = Base64.getUrlEncoder().encodeToString(pitIdBytes);
+ assertThat(body, containsString("\"id\":\"" + expectedId + "\""));
+ }
+
+ /**
+ * Verifies that closePit produces valid JSON with an id field containing the base64-encoded PIT id.
+ */
+ public void testClosePitProducesValidJson() throws IOException {
+ String pitIdStr = randomAlphaOfLength(between(1, 50));
+ BytesReference pitId = new BytesArray(pitIdStr.getBytes(StandardCharsets.UTF_8));
+ Request request = closePit(pitId);
+ String body = Streams.copyToString(new InputStreamReader(request.getEntity().getContent(), StandardCharsets.UTF_8));
+ String expectedId = Base64.getUrlEncoder().encodeToString(pitIdStr.getBytes(StandardCharsets.UTF_8));
+ assertThat(body, containsString("\"id\""));
+ assertThat(body, containsString(expectedId));
+ assertThat(body.trim(), startsWith("{"));
+ assertThat(body.trim(), endsWith("}"));
+ }
}
diff --git a/modules/reindex/src/test/java/org/elasticsearch/reindex/remote/RemoteResponseParsersTests.java b/modules/reindex/src/test/java/org/elasticsearch/reindex/remote/RemoteResponseParsersTests.java
index 60b5a70e18225..05cba4c635e28 100644
--- a/modules/reindex/src/test/java/org/elasticsearch/reindex/remote/RemoteResponseParsersTests.java
+++ b/modules/reindex/src/test/java/org/elasticsearch/reindex/remote/RemoteResponseParsersTests.java
@@ -9,18 +9,25 @@
package org.elasticsearch.reindex.remote;
+import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.search.ShardSearchFailure;
+import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.index.reindex.PaginatedHitSource;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xcontent.ToXContent;
import org.elasticsearch.xcontent.XContentBuilder;
+import org.elasticsearch.xcontent.XContentParseException;
import org.elasticsearch.xcontent.XContentParser;
+import org.elasticsearch.xcontent.XContentType;
import org.hamcrest.Matchers;
import java.io.IOException;
+import java.util.Base64;
+import static org.elasticsearch.reindex.remote.RemoteResponseParsers.OPEN_PIT_PARSER;
import static org.elasticsearch.xcontent.XContentFactory.jsonBuilder;
+import static org.junit.Assert.assertArrayEquals;
public class RemoteResponseParsersTests extends ESTestCase {
@@ -38,4 +45,115 @@ public void testFailureWithoutIndex() throws IOException {
assertThat(parsed.getReason(), Matchers.instanceOf(EsRejectedExecutionException.class));
}
}
+
+ /**
+ * Verifies that OPEN_PIT_PARSER extracts and base64-url-decodes the id field from a valid open PIT response,
+ * regardless of field order.
+ */
+ public void testOpenPitParserValidResponse() throws IOException {
+ byte[] pitIdBytes = randomByteArrayOfLength(between(1, 64));
+ String base64Id = Base64.getUrlEncoder().encodeToString(pitIdBytes);
+ int fieldsBefore = between(0, 3);
+ int fieldsAfter = between(0, 3);
+
+ XContentBuilder builder = jsonBuilder().startObject();
+ // Randomly generates some fields to come before the ID
+ for (int i = 0; i < fieldsBefore; i++) {
+ builder.field("before_" + i + "_" + randomAlphaOfLength(between(1, 5)), randomAlphaOfLength(between(1, 10)));
+ }
+ builder.field("id", base64Id);
+ // Randomly generates some fields to come after the ID
+ for (int i = 0; i < fieldsAfter; i++) {
+ builder.field("after_" + i + "_" + randomAlphaOfLength(between(1, 5)), randomAlphaOfLength(between(1, 10)));
+ }
+ builder.endObject();
+ try (XContentParser parser = createParser(builder)) {
+ BytesReference result = OPEN_PIT_PARSER.apply(parser, XContentType.JSON);
+ assertArrayEquals(pitIdBytes, BytesReference.toBytes(result));
+ }
+ }
+
+ /**
+ * Verifies that OPEN_PIT_PARSER throws when the response is an empty object with no id field.
+ */
+ public void testOpenPitParserMissingId() throws IOException {
+ XContentBuilder builder = jsonBuilder().startObject().endObject();
+ try (XContentParser parser = createParser(builder)) {
+ Exception e = expectThrows(Exception.class, () -> OPEN_PIT_PARSER.apply(parser, XContentType.JSON));
+ assertThat(
+ ExceptionsHelper.unwrapCause(e).getMessage(),
+ Matchers.containsString("Failed to build [open_pit_response] after last required field arrived")
+ );
+ }
+ }
+
+ /**
+ * Verifies that OPEN_PIT_PARSER throws when the id field is present but empty.
+ */
+ public void testOpenPitParserEmptyId() throws IOException {
+ XContentBuilder builder = jsonBuilder().startObject().field("id", "").endObject();
+ try (XContentParser parser = createParser(builder)) {
+ Exception e = expectThrows(Exception.class, () -> OPEN_PIT_PARSER.apply(parser, XContentType.JSON));
+ assertThat(ExceptionsHelper.unwrapCause(e).getMessage(), Matchers.containsString("failed to parse field [id]"));
+ }
+ }
+
+ /**
+ * Verifies that OPEN_PIT_PARSER throws when the response is not a JSON object (e.g. an array).
+ */
+ public void testOpenPitParserNotAnObject() throws IOException {
+ XContentBuilder builder = jsonBuilder().startArray().value("a").endArray();
+ try (XContentParser parser = createParser(builder)) {
+ XContentParseException e = expectThrows(XContentParseException.class, () -> OPEN_PIT_PARSER.apply(parser, XContentType.JSON));
+ assertThat(e.getMessage(), Matchers.containsString("Expected START_OBJECT"));
+ }
+ }
+
+ /**
+ * Verifies that OPEN_PIT_PARSER throws when the response is a primitive value rather than an object.
+ */
+ public void testOpenPitParserNotAnObjectWithPrimitive() throws IOException {
+ XContentBuilder builder = randomFrom(jsonBuilder().value("bare_string"), jsonBuilder().value(42), jsonBuilder().value(true));
+ try (XContentParser parser = createParser(builder)) {
+ XContentParseException e = expectThrows(XContentParseException.class, () -> OPEN_PIT_PARSER.apply(parser, XContentType.JSON));
+ assertThat(e.getMessage(), Matchers.containsString("Expected START_OBJECT"));
+ }
+ }
+
+ /**
+ * Verifies that OPEN_PIT_PARSER throws when the id field has the wrong type (e.g. number instead of string).
+ */
+ public void testOpenPitParserIdWrongType() throws IOException {
+ XContentBuilder builder = jsonBuilder().startObject().field("id", 12345).endObject();
+ try (XContentParser parser = createParser(builder)) {
+ XContentParseException e = expectThrows(XContentParseException.class, () -> OPEN_PIT_PARSER.apply(parser, XContentType.JSON));
+ assertThat(e.getMessage(), Matchers.anyOf(Matchers.containsString("id doesn't support values of type")));
+ }
+ }
+
+ /**
+ * Verifies that OPEN_PIT_PARSER throws when the id field is explicitly null.
+ */
+ public void testOpenPitParserNullId() throws IOException {
+ XContentBuilder builder = jsonBuilder().startObject().nullField("id").endObject();
+ try (XContentParser parser = createParser(builder)) {
+ Exception e = expectThrows(Exception.class, () -> OPEN_PIT_PARSER.apply(parser, XContentType.JSON));
+ assertThat(
+ ExceptionsHelper.unwrapCause(e).getMessage(),
+ Matchers.containsString("id doesn't support values of type: VALUE_NULL")
+ );
+ }
+ }
+
+ /**
+ * Verifies that OPEN_PIT_PARSER throws when the id field contains invalid base64-url data.
+ */
+ public void testOpenPitParserInvalidBase64() throws IOException {
+ String invalidBase64 = randomAlphaOfLength(between(1, 20)) + "!!!";
+ XContentBuilder builder = jsonBuilder().startObject().field("id", invalidBase64).endObject();
+ try (XContentParser parser = createParser(builder)) {
+ XContentParseException e = expectThrows(XContentParseException.class, () -> OPEN_PIT_PARSER.apply(parser, XContentType.JSON));
+ assertThat(e.getMessage(), Matchers.containsString("failed to parse field [id]"));
+ }
+ }
}
diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/user/InternalUsers.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/user/InternalUsers.java
index f5ef508a4d8c6..8803b4feac101 100644
--- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/user/InternalUsers.java
+++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/user/InternalUsers.java
@@ -26,6 +26,8 @@
import org.elasticsearch.action.datastreams.ModifyDataStreamsAction;
import org.elasticsearch.action.downsample.DownsampleAction;
import org.elasticsearch.action.index.TransportIndexAction;
+import org.elasticsearch.action.search.TransportClosePointInTimeAction;
+import org.elasticsearch.action.search.TransportOpenPointInTimeAction;
import org.elasticsearch.action.search.TransportSearchAction;
import org.elasticsearch.action.search.TransportSearchScrollAction;
import org.elasticsearch.index.reindex.ReindexAction;
@@ -217,6 +219,7 @@ public class InternalUsers {
RoleDescriptor.IndicesPrivileges.builder()
.indices("*")
.privileges(
+ "read",
GetDataStreamAction.NAME,
RolloverAction.NAME,
IndicesStatsAction.NAME,
@@ -233,6 +236,8 @@ public class InternalUsers {
TransportUpdateSettingsAction.TYPE.name(),
RefreshAction.NAME,
ReindexAction.NAME,
+ TransportClosePointInTimeAction.TYPE.name(),
+ TransportOpenPointInTimeAction.TYPE.name(),
TransportSearchAction.NAME,
TransportBulkAction.NAME,
TransportIndexAction.NAME,
diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/security/user/InternalUsersTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/security/user/InternalUsersTests.java
index fa034c1a807ad..aa8998c1b4879 100644
--- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/security/user/InternalUsersTests.java
+++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/security/user/InternalUsersTests.java
@@ -31,6 +31,8 @@
import org.elasticsearch.action.downsample.DownsampleAction;
import org.elasticsearch.action.get.TransportGetAction;
import org.elasticsearch.action.index.TransportIndexAction;
+import org.elasticsearch.action.search.TransportClosePointInTimeAction;
+import org.elasticsearch.action.search.TransportOpenPointInTimeAction;
import org.elasticsearch.action.search.TransportSearchAction;
import org.elasticsearch.action.search.TransportSearchScrollAction;
import org.elasticsearch.cluster.metadata.DataStream;
@@ -344,6 +346,8 @@ public void testReindexDataStreamUser() {
TransportUpdateSettingsAction.TYPE.name(),
RefreshAction.NAME,
ReindexAction.NAME,
+ TransportClosePointInTimeAction.TYPE.name(),
+ TransportOpenPointInTimeAction.TYPE.name(),
TransportSearchAction.NAME,
TransportBulkAction.NAME,
TransportIndexAction.NAME,
diff --git a/x-pack/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/DataStreamsUpgradeIT.java b/x-pack/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/DataStreamsUpgradeIT.java
index 40fc248fc9deb..4a3e23a148c3a 100644
--- a/x-pack/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/DataStreamsUpgradeIT.java
+++ b/x-pack/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/DataStreamsUpgradeIT.java
@@ -24,6 +24,7 @@
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.core.Booleans;
import org.elasticsearch.core.Strings;
+import org.elasticsearch.reindex.ReindexPlugin;
import org.elasticsearch.xcontent.json.JsonXContent;
import org.hamcrest.Matchers;
@@ -189,6 +190,8 @@ public void testDataStreamValidationDoesNotBreakUpgrade() throws Exception {
}
public void testUpgradeDataStream() throws Exception {
+ // TODO - https://github.com/elastic/elasticsearch-team/issues/2410
+ assumeFalse("PIT search cannot be used on closed indices", ReindexPlugin.REINDEX_PIT_SEARCH_ENABLED);
/*
* This test tests upgrading a "normal" data stream (dataStreamName), and upgrading a data stream that was originally just an
* ordinary index that was converted to a data stream (dataStreamFromNonDataStreamIndices).