diff --git a/modules/reindex/build.gradle b/modules/reindex/build.gradle index 86eabc6e773d9..9e37aa1461240 100644 --- a/modules/reindex/build.gradle +++ b/modules/reindex/build.gradle @@ -44,6 +44,7 @@ dependencies { clusterModules project(':modules:lang-painless') clusterModules project(':modules:parent-join') clusterModules project(":modules:rest-root") + clusterModules project(':modules:reindex-management') } restResources { diff --git a/modules/reindex/src/internalClusterTest/java/org/elasticsearch/index/reindex/ReindexPluginMetricsIT.java b/modules/reindex/src/internalClusterTest/java/org/elasticsearch/index/reindex/ReindexPluginMetricsIT.java index dcb4c92a0d4c9..65451a20e11e5 100644 --- a/modules/reindex/src/internalClusterTest/java/org/elasticsearch/index/reindex/ReindexPluginMetricsIT.java +++ b/modules/reindex/src/internalClusterTest/java/org/elasticsearch/index/reindex/ReindexPluginMetricsIT.java @@ -17,12 +17,15 @@ import org.elasticsearch.plugins.PluginsService; import org.elasticsearch.reindex.BulkIndexByScrollResponseMatcher; import org.elasticsearch.reindex.ReindexPlugin; +import org.elasticsearch.reindex.ReindexWithPointInTimeSearchTestFeatureSpecification; import org.elasticsearch.reindex.TransportReindexAction; import org.elasticsearch.rest.root.MainRestPlugin; import org.elasticsearch.search.sort.SortOrder; import org.elasticsearch.telemetry.Measurement; import org.elasticsearch.telemetry.TestTelemetryPlugin; import org.elasticsearch.test.ESIntegTestCase; +import org.junit.After; +import org.junit.Before; import java.net.InetSocketAddress; import java.util.Arrays; @@ -45,6 +48,19 @@ @ESIntegTestCase.ClusterScope(numDataNodes = 0, numClientNodes = 0, scope = ESIntegTestCase.Scope.TEST) public class ReindexPluginMetricsIT extends ESIntegTestCase { + + @Before + public void setPitFlagForTest() { + if ("testReindexFromRemoteMetricsWithPit".equals(getTestName())) { + ReindexWithPointInTimeSearchTestFeatureSpecification.REINDEX_PIT_SEARCH_FOR_TEST = true; + } + } + + @After + public void clearPitFlag() { + ReindexWithPointInTimeSearchTestFeatureSpecification.REINDEX_PIT_SEARCH_FOR_TEST = false; + } + @Override protected Collection> nodePlugins() { return Arrays.asList(ReindexPlugin.class, TestTelemetryPlugin.class, MainRestPlugin.class); @@ -80,6 +96,14 @@ public static BulkIndexByScrollResponseMatcher matcher() { } public void testReindexFromRemoteMetrics() throws Exception { + reindexFromRemoteMetricsInternal(); + } + + public void testReindexFromRemoteMetricsWithPit() throws Exception { + reindexFromRemoteMetricsInternal(); + } + + private void reindexFromRemoteMetricsInternal() throws Exception { final String dataNodeName = internalCluster().startNode(); InetSocketAddress remoteAddress = randomFrom(cluster().httpAddresses()); @@ -132,7 +156,6 @@ public void testReindexFromRemoteMetrics() throws Exception { assertNull(completions.get(1).attributes().get(ATTRIBUTE_NAME_ERROR_TYPE)); assertThat(completions.get(1).attributes().get(ATTRIBUTE_NAME_SOURCE), equalTo(ATTRIBUTE_VALUE_SOURCE_REMOTE)); }); - } public void testReindexMetrics() throws Exception { diff --git a/modules/reindex/src/internalClusterTest/java/org/elasticsearch/index/reindex/ReindexResumeIT.java b/modules/reindex/src/internalClusterTest/java/org/elasticsearch/index/reindex/ReindexResumeIT.java index a580a517ffbf0..9ad5569493cba 100644 --- a/modules/reindex/src/internalClusterTest/java/org/elasticsearch/index/reindex/ReindexResumeIT.java +++ b/modules/reindex/src/internalClusterTest/java/org/elasticsearch/index/reindex/ReindexResumeIT.java @@ -24,12 +24,16 @@ import org.elasticsearch.index.reindex.ResumeInfo.WorkerResult; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.reindex.ReindexPlugin; +import org.elasticsearch.reindex.ReindexWithPointInTimeSearchTestFeatureSpecification; import org.elasticsearch.reindex.TransportReindexAction; import org.elasticsearch.rest.root.MainRestPlugin; import org.elasticsearch.search.builder.SearchSourceBuilder; import org.elasticsearch.search.slice.SliceBuilder; import org.elasticsearch.tasks.TaskResult; import org.elasticsearch.test.ESIntegTestCase; +import org.junit.Rule; +import org.junit.rules.TestRule; +import org.junit.runners.model.Statement; import java.net.InetSocketAddress; import java.util.Collection; @@ -43,9 +47,32 @@ import static org.elasticsearch.index.reindex.AbstractBulkByScrollRequest.DEFAULT_SCROLL_TIMEOUT; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount; -@ESIntegTestCase.ClusterScope(numDataNodes = 1, numClientNodes = 0) +@ESIntegTestCase.ClusterScope(numDataNodes = 1, numClientNodes = 0, scope = ESIntegTestCase.Scope.TEST) public class ReindexResumeIT extends ESIntegTestCase { + /** + * Sets {@link ReindexWithPointInTimeSearchTestFeatureSpecification#REINDEX_PIT_SEARCH_FOR_TEST} + * so the cluster is built with {@link ReindexPlugin#REINDEX_PIT_SEARCH_FEATURE} + * enabled. Uses a {@link TestRule} because with {@code numDataNodes = 1} nodes created in + * {@code setupTestCluster()}, placing this code in {@code @Before} would run too late. + */ + @Rule + public TestRule setPitFlagForTestRule = (base, description) -> new Statement() { + @Override + public void evaluate() throws Throwable { + if ("testRemoteResumeReindexWithPit".equals(getTestName())) { + ReindexWithPointInTimeSearchTestFeatureSpecification.REINDEX_PIT_SEARCH_FOR_TEST = true; + } + try { + base.evaluate(); + } finally { + if (description.getMethodName() != null && description.getMethodName().endsWith("WithPit")) { + ReindexWithPointInTimeSearchTestFeatureSpecification.REINDEX_PIT_SEARCH_FOR_TEST = false; + } + } + } + }; + @Override protected Collection> nodePlugins() { return List.of(ReindexPlugin.class, MainRestPlugin.class); @@ -112,6 +139,14 @@ public void testLocalResumeReindexFromScroll() { } public void testRemoteResumeReindexFromScroll() { + remoteResumeReindexInternal(); + } + + public void testRemoteResumeReindexWithPit() { + remoteResumeReindexInternal(); + } + + private void remoteResumeReindexInternal() { String sourceIndex = randomAlphanumericOfLength(10).toLowerCase(Locale.ROOT); String destIndex = randomAlphanumericOfLength(10).toLowerCase(Locale.ROOT); int totalDocs = randomIntBetween(20, 100); diff --git a/modules/reindex/src/test/java/org/elasticsearch/reindex/ReindexFromRemoteWithAuthTests.java b/modules/reindex/src/test/java/org/elasticsearch/reindex/ReindexFromRemoteWithAuthTests.java index eee6d67d4667e..7d4606d511958 100644 --- a/modules/reindex/src/test/java/org/elasticsearch/reindex/ReindexFromRemoteWithAuthTests.java +++ b/modules/reindex/src/test/java/org/elasticsearch/reindex/ReindexFromRemoteWithAuthTests.java @@ -9,6 +9,9 @@ package org.elasticsearch.reindex; +import com.carrotsearch.randomizedtesting.annotations.Name; +import com.carrotsearch.randomizedtesting.annotations.ParametersFactory; + import org.apache.lucene.util.SetOnce; import org.elasticsearch.ElasticsearchSecurityException; import org.elasticsearch.ElasticsearchStatusException; @@ -53,8 +56,56 @@ import static org.hamcrest.Matchers.containsString; public class ReindexFromRemoteWithAuthTests extends ESSingleNodeTestCase { + + private final boolean withPit; private TransportAddress address; + public ReindexFromRemoteWithAuthTests(@Name("withPit") boolean withPit) { + this.withPit = withPit; + } + + @ParametersFactory(argumentFormatting = "withPit=%s") + public static Iterable parameters() { + return Arrays.asList(new Object[] { false }, new Object[] { true }); + } + + @Override + protected boolean resetNodeAfterTest() { + return true; + } + + /** + * The method {@code tearDown()} calls {@code startNode()}. This introduces a bug where the next tests node is instantiated + * with the current value of {@code withPit}, not the next one. {@code SKIP_START_FOR_NEXT_TEST} is a way to avoid this. + * When set, {@code startNode} skips starting so the next test's setUp starts with the correct {@code withPit}. + */ + private static final ThreadLocal SKIP_START_FOR_NEXT_TEST = ThreadLocal.withInitial(() -> false); + + @Override + public void tearDown() throws Exception { + SKIP_START_FOR_NEXT_TEST.set(true); + try { + super.tearDown(); + } finally { + SKIP_START_FOR_NEXT_TEST.remove(); + } + } + + @Override + protected void startNode(long seed) throws Exception { + if (SKIP_START_FOR_NEXT_TEST.get()) { + SKIP_START_FOR_NEXT_TEST.remove(); + return; + } + logger.info("withPit : {}", withPit); + ReindexWithPointInTimeSearchTestFeatureSpecification.REINDEX_PIT_SEARCH_FOR_TEST = withPit; + try { + super.startNode(seed); + } finally { + ReindexWithPointInTimeSearchTestFeatureSpecification.REINDEX_PIT_SEARCH_FOR_TEST = false; + } + } + @Override protected Collection> getPlugins() { return Arrays.asList( @@ -108,37 +159,37 @@ private RemoteInfo newRemoteInfo(String username, String password, Map request.get()); + ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class, request::get); assertEquals(RestStatus.BAD_REQUEST, e.status()); assertThat(e.getMessage(), containsString("Hurray! Sent the header!")); } - public void testReindexWithoutAuthenticationWhenRequired() throws Exception { + public void testReindexWithoutAuthenticationWhenRequired() { ReindexRequestBuilder request = new ReindexRequestBuilder(client()).source("source") .destination("dest") .setRemoteInfo(newRemoteInfo(null, null, emptyMap())); - ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class, () -> request.get()); + ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class, request::get); assertEquals(RestStatus.UNAUTHORIZED, e.status()); assertThat(e.getMessage(), containsString("\"reason\":\"Authentication required\"")); assertThat(e.getMessage(), containsString("\"WWW-Authenticate\":\"Basic realm=auth-realm\"")); } - public void testReindexWithBadAuthentication() throws Exception { + public void testReindexWithBadAuthentication() { ReindexRequestBuilder request = new ReindexRequestBuilder(client()).source("source") .destination("dest") .setRemoteInfo(newRemoteInfo("junk", "auth", emptyMap())); - ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class, () -> request.get()); + ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class, request::get); assertThat(e.getMessage(), containsString("\"reason\":\"Bad Authorization\"")); } diff --git a/modules/reindex/src/test/java/org/elasticsearch/reindex/ReindexWithPointInTimeSearchTestFeatureSpecification.java b/modules/reindex/src/test/java/org/elasticsearch/reindex/ReindexWithPointInTimeSearchTestFeatureSpecification.java new file mode 100644 index 0000000000000..bdeb3a6f7b693 --- /dev/null +++ b/modules/reindex/src/test/java/org/elasticsearch/reindex/ReindexWithPointInTimeSearchTestFeatureSpecification.java @@ -0,0 +1,37 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.reindex; + +import org.elasticsearch.features.FeatureSpecification; +import org.elasticsearch.features.NodeFeature; + +import java.util.Set; + +/** + * Test-only {@link FeatureSpecification} that conditionally registers {@link ReindexPlugin#REINDEX_PIT_SEARCH_FEATURE} + * based on whether the test is running with scroll or point-in-time searching. + */ +public class ReindexWithPointInTimeSearchTestFeatureSpecification implements FeatureSpecification { + + /** + * Set by the running test before node creation. When {@code true}, this spec + * registers {@link ReindexPlugin#REINDEX_PIT_SEARCH_FEATURE} so the cluster reports the feature. + * Volatile, so it is visible across threads (test thread sets it, node thread reads it). + */ + public static volatile boolean REINDEX_PIT_SEARCH_FOR_TEST = false; + + @Override + public Set getFeatures() { + if (REINDEX_PIT_SEARCH_FOR_TEST) { + return Set.of(ReindexPlugin.REINDEX_PIT_SEARCH_FEATURE); + } + return Set.of(); + } +} diff --git a/modules/reindex/src/test/java/org/elasticsearch/reindex/RetryTests.java b/modules/reindex/src/test/java/org/elasticsearch/reindex/RetryTests.java index fa0e1d22f4556..bf82dbc0615cb 100644 --- a/modules/reindex/src/test/java/org/elasticsearch/reindex/RetryTests.java +++ b/modules/reindex/src/test/java/org/elasticsearch/reindex/RetryTests.java @@ -36,9 +36,14 @@ import org.elasticsearch.plugins.Plugin; import org.elasticsearch.rest.root.MainRestPlugin; import org.elasticsearch.test.ESIntegTestCase; +import org.elasticsearch.test.ESIntegTestCase.ClusterScope; +import org.elasticsearch.test.ESIntegTestCase.Scope; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.netty4.Netty4Plugin; import org.junit.After; +import org.junit.Rule; +import org.junit.rules.TestRule; +import org.junit.runners.model.Statement; import java.util.ArrayList; import java.util.Arrays; @@ -57,12 +62,36 @@ * Integration test for bulk retry behavior. Useful because retrying relies on the way that the * rest of Elasticsearch throws exceptions and unit tests won't verify that. */ +@ClusterScope(scope = Scope.TEST) public class RetryTests extends ESIntegTestCase { private static final int DOC_COUNT = 20; private List blockedExecutors = new ArrayList<>(); + /** + * Sets {@link ReindexWithPointInTimeSearchTestFeatureSpecification#REINDEX_PIT_SEARCH_FOR_TEST} before + * {@code testReindexFromRemoteWithPit} runs so the cluster is built with {@link ReindexPlugin#REINDEX_PIT_SEARCH_FEATURE} + * enabled. Uses a {@link TestRule} because it must run before the cluster is created in {@code setupTestCluster()} which is + * a final method and cannot be overridden. + */ + @Rule + public TestRule setPitFlagForTestRule = (base, description) -> new Statement() { + @Override + public void evaluate() throws Throwable { + if ("testReindexFromRemoteWithPit".equals(description.getMethodName())) { + ReindexWithPointInTimeSearchTestFeatureSpecification.REINDEX_PIT_SEARCH_FOR_TEST = true; + } + try { + base.evaluate(); + } finally { + if ("testReindexFromRemoteWithPit".equals(description.getMethodName())) { + ReindexWithPointInTimeSearchTestFeatureSpecification.REINDEX_PIT_SEARCH_FOR_TEST = false; + } + } + } + }; + @After public void forceUnblockAllExecutors() { for (CyclicBarrier barrier : blockedExecutors) { @@ -104,6 +133,14 @@ public void testReindex() throws Exception { } public void testReindexFromRemote() throws Exception { + indexFromRemoteInternal(); + } + + public void testReindexFromRemoteWithPit() throws Exception { + indexFromRemoteInternal(); + } + + private void indexFromRemoteInternal() throws Exception { Function> function = client -> { /* * Use the master node for the reindex from remote because that node diff --git a/modules/reindex/src/test/resources/META-INF/services/org.elasticsearch.features.FeatureSpecification b/modules/reindex/src/test/resources/META-INF/services/org.elasticsearch.features.FeatureSpecification new file mode 100644 index 0000000000000..8ce031ed02b14 --- /dev/null +++ b/modules/reindex/src/test/resources/META-INF/services/org.elasticsearch.features.FeatureSpecification @@ -0,0 +1 @@ +org.elasticsearch.reindex.ReindexWithPointInTimeSearchTestFeatureSpecification diff --git a/modules/reindex/src/yamlRestTest/java/org/elasticsearch/index/reindex/ReindexClientYamlTestSuiteWithPitIT.java b/modules/reindex/src/yamlRestTest/java/org/elasticsearch/index/reindex/ReindexClientYamlTestSuiteWithPitIT.java new file mode 100644 index 0000000000000..06658f6039d99 --- /dev/null +++ b/modules/reindex/src/yamlRestTest/java/org/elasticsearch/index/reindex/ReindexClientYamlTestSuiteWithPitIT.java @@ -0,0 +1,46 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.index.reindex; + +import com.carrotsearch.randomizedtesting.annotations.Name; +import com.carrotsearch.randomizedtesting.annotations.ParametersFactory; + +import org.elasticsearch.test.cluster.ElasticsearchCluster; +import org.elasticsearch.test.rest.yaml.ClientYamlTestCandidate; +import org.elasticsearch.test.rest.yaml.ESClientYamlSuiteTestCase; +import org.junit.ClassRule; + +public class ReindexClientYamlTestSuiteWithPitIT extends ESClientYamlSuiteTestCase { + + public ReindexClientYamlTestSuiteWithPitIT(@Name("yaml") ClientYamlTestCandidate testCandidate) { + super(testCandidate); + } + + @ParametersFactory + public static Iterable parameters() throws Exception { + return createParameters("reindex/90_remote", "reindex/95_parent_join"); + } + + @ClassRule + public static ElasticsearchCluster cluster = ElasticsearchCluster.local() + .module("reindex") + .module("reindex-management") + .module("lang-painless") + .module("parent-join") + .module("rest-root") + .setting("reindex.remote.whitelist", "127.0.0.1:*") + .systemProperty("es.reindex_pit_search_feature_flag_enabled", "true") + .build(); + + @Override + protected String getTestRestCluster() { + return cluster.getHttpAddresses(); + } +}