Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions modules/reindex/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ dependencies {
clusterModules project(':modules:lang-painless')
clusterModules project(':modules:parent-join')
clusterModules project(":modules:rest-root")
clusterModules project(':modules:reindex-management')
}

restResources {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,15 @@
import org.elasticsearch.plugins.PluginsService;
import org.elasticsearch.reindex.BulkIndexByScrollResponseMatcher;
import org.elasticsearch.reindex.ReindexPlugin;
import org.elasticsearch.reindex.ReindexWithPointInTimeSearchTestFeatureSpecification;
import org.elasticsearch.reindex.TransportReindexAction;
import org.elasticsearch.rest.root.MainRestPlugin;
import org.elasticsearch.search.sort.SortOrder;
import org.elasticsearch.telemetry.Measurement;
import org.elasticsearch.telemetry.TestTelemetryPlugin;
import org.elasticsearch.test.ESIntegTestCase;
import org.junit.After;
import org.junit.Before;

import java.net.InetSocketAddress;
import java.util.Arrays;
Expand All @@ -45,6 +48,19 @@

@ESIntegTestCase.ClusterScope(numDataNodes = 0, numClientNodes = 0, scope = ESIntegTestCase.Scope.TEST)
public class ReindexPluginMetricsIT extends ESIntegTestCase {

@Before
public void setPitFlagForTest() {
if ("testReindexFromRemoteMetricsWithPit".equals(getTestName())) {
ReindexWithPointInTimeSearchTestFeatureSpecification.REINDEX_PIT_SEARCH_FOR_TEST = true;
}
}

@After
public void clearPitFlag() {
ReindexWithPointInTimeSearchTestFeatureSpecification.REINDEX_PIT_SEARCH_FOR_TEST = false;
}

@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return Arrays.asList(ReindexPlugin.class, TestTelemetryPlugin.class, MainRestPlugin.class);
Expand Down Expand Up @@ -80,6 +96,14 @@ public static BulkIndexByScrollResponseMatcher matcher() {
}

public void testReindexFromRemoteMetrics() throws Exception {
reindexFromRemoteMetricsInternal();
}

public void testReindexFromRemoteMetricsWithPit() throws Exception {
reindexFromRemoteMetricsInternal();
}

private void reindexFromRemoteMetricsInternal() throws Exception {
final String dataNodeName = internalCluster().startNode();

InetSocketAddress remoteAddress = randomFrom(cluster().httpAddresses());
Expand Down Expand Up @@ -132,7 +156,6 @@ public void testReindexFromRemoteMetrics() throws Exception {
assertNull(completions.get(1).attributes().get(ATTRIBUTE_NAME_ERROR_TYPE));
assertThat(completions.get(1).attributes().get(ATTRIBUTE_NAME_SOURCE), equalTo(ATTRIBUTE_VALUE_SOURCE_REMOTE));
});

}

public void testReindexMetrics() throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,16 @@
import org.elasticsearch.index.reindex.ResumeInfo.WorkerResult;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.reindex.ReindexPlugin;
import org.elasticsearch.reindex.ReindexWithPointInTimeSearchTestFeatureSpecification;
import org.elasticsearch.reindex.TransportReindexAction;
import org.elasticsearch.rest.root.MainRestPlugin;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.slice.SliceBuilder;
import org.elasticsearch.tasks.TaskResult;
import org.elasticsearch.test.ESIntegTestCase;
import org.junit.Rule;
import org.junit.rules.TestRule;
import org.junit.runners.model.Statement;

import java.net.InetSocketAddress;
import java.util.Collection;
Expand All @@ -43,9 +47,32 @@
import static org.elasticsearch.index.reindex.AbstractBulkByScrollRequest.DEFAULT_SCROLL_TIMEOUT;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;

@ESIntegTestCase.ClusterScope(numDataNodes = 1, numClientNodes = 0)
@ESIntegTestCase.ClusterScope(numDataNodes = 1, numClientNodes = 0, scope = ESIntegTestCase.Scope.TEST)
public class ReindexResumeIT extends ESIntegTestCase {

/**
* Sets {@link ReindexWithPointInTimeSearchTestFeatureSpecification#REINDEX_PIT_SEARCH_FOR_TEST}
* so the cluster is built with {@link ReindexPlugin#REINDEX_PIT_SEARCH_FEATURE}
* enabled. Uses a {@link TestRule} because with {@code numDataNodes = 1} nodes created in
* {@code setupTestCluster()}, placing this code in {@code @Before} would run too late.
*/
@Rule
public TestRule setPitFlagForTestRule = (base, description) -> new Statement() {
@Override
public void evaluate() throws Throwable {
if ("testRemoteResumeReindexWithPit".equals(getTestName())) {
ReindexWithPointInTimeSearchTestFeatureSpecification.REINDEX_PIT_SEARCH_FOR_TEST = true;
}
try {
base.evaluate();
} finally {
if (description.getMethodName() != null && description.getMethodName().endsWith("WithPit")) {
ReindexWithPointInTimeSearchTestFeatureSpecification.REINDEX_PIT_SEARCH_FOR_TEST = false;
}
}
}
};

@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return List.of(ReindexPlugin.class, MainRestPlugin.class);
Expand Down Expand Up @@ -112,6 +139,14 @@ public void testLocalResumeReindexFromScroll() {
}

public void testRemoteResumeReindexFromScroll() {
remoteResumeReindexInternal();
}

public void testRemoteResumeReindexWithPit() {
remoteResumeReindexInternal();
}

private void remoteResumeReindexInternal() {
String sourceIndex = randomAlphanumericOfLength(10).toLowerCase(Locale.ROOT);
String destIndex = randomAlphanumericOfLength(10).toLowerCase(Locale.ROOT);
int totalDocs = randomIntBetween(20, 100);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@

package org.elasticsearch.reindex;

import com.carrotsearch.randomizedtesting.annotations.Name;
import com.carrotsearch.randomizedtesting.annotations.ParametersFactory;

import org.apache.lucene.util.SetOnce;
import org.elasticsearch.ElasticsearchSecurityException;
import org.elasticsearch.ElasticsearchStatusException;
Expand Down Expand Up @@ -53,8 +56,56 @@
import static org.hamcrest.Matchers.containsString;

public class ReindexFromRemoteWithAuthTests extends ESSingleNodeTestCase {

private final boolean withPit;
private TransportAddress address;

public ReindexFromRemoteWithAuthTests(@Name("withPit") boolean withPit) {
this.withPit = withPit;
}

@ParametersFactory(argumentFormatting = "withPit=%s")
public static Iterable<Object[]> parameters() {
return Arrays.asList(new Object[] { false }, new Object[] { true });
}

@Override
protected boolean resetNodeAfterTest() {
return true;
}

/**
* The method {@code tearDown()} calls {@code startNode()}. This introduces a bug where the next tests node is instantiated
* with the current value of {@code withPit}, not the next one. {@code SKIP_START_FOR_NEXT_TEST} is a way to avoid this.
* When set, {@code startNode} skips starting so the next test's setUp starts with the correct {@code withPit}.
*/
private static final ThreadLocal<Boolean> SKIP_START_FOR_NEXT_TEST = ThreadLocal.withInitial(() -> false);

@Override
public void tearDown() throws Exception {
SKIP_START_FOR_NEXT_TEST.set(true);
try {
super.tearDown();
} finally {
SKIP_START_FOR_NEXT_TEST.remove();
}
}

@Override
protected void startNode(long seed) throws Exception {
if (SKIP_START_FOR_NEXT_TEST.get()) {
SKIP_START_FOR_NEXT_TEST.remove();
return;
}
logger.info("withPit : {}", withPit);
ReindexWithPointInTimeSearchTestFeatureSpecification.REINDEX_PIT_SEARCH_FOR_TEST = withPit;
try {
super.startNode(seed);
} finally {
ReindexWithPointInTimeSearchTestFeatureSpecification.REINDEX_PIT_SEARCH_FOR_TEST = false;
}
}

@Override
protected Collection<Class<? extends Plugin>> getPlugins() {
return Arrays.asList(
Expand Down Expand Up @@ -108,37 +159,37 @@ private RemoteInfo newRemoteInfo(String username, String password, Map<String, S
);
}

public void testReindexFromRemoteWithAuthentication() throws Exception {
public void testReindexFromRemoteWithAuthentication() {
ReindexRequestBuilder request = new ReindexRequestBuilder(client()).source("source")
.destination("dest")
.setRemoteInfo(newRemoteInfo("Aladdin", "open sesame", emptyMap()));
assertThat(request.get(), matcher().created(1));
}

public void testReindexSendsHeaders() throws Exception {
public void testReindexSendsHeaders() {
ReindexRequestBuilder request = new ReindexRequestBuilder(client()).source("source")
.destination("dest")
.setRemoteInfo(newRemoteInfo(null, null, singletonMap(TestFilter.EXAMPLE_HEADER, "doesn't matter")));
ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class, () -> request.get());
ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class, request::get);
assertEquals(RestStatus.BAD_REQUEST, e.status());
assertThat(e.getMessage(), containsString("Hurray! Sent the header!"));
}

public void testReindexWithoutAuthenticationWhenRequired() throws Exception {
public void testReindexWithoutAuthenticationWhenRequired() {
ReindexRequestBuilder request = new ReindexRequestBuilder(client()).source("source")
.destination("dest")
.setRemoteInfo(newRemoteInfo(null, null, emptyMap()));
ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class, () -> request.get());
ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class, request::get);
assertEquals(RestStatus.UNAUTHORIZED, e.status());
assertThat(e.getMessage(), containsString("\"reason\":\"Authentication required\""));
assertThat(e.getMessage(), containsString("\"WWW-Authenticate\":\"Basic realm=auth-realm\""));
}

public void testReindexWithBadAuthentication() throws Exception {
public void testReindexWithBadAuthentication() {
ReindexRequestBuilder request = new ReindexRequestBuilder(client()).source("source")
.destination("dest")
.setRemoteInfo(newRemoteInfo("junk", "auth", emptyMap()));
ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class, () -> request.get());
ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class, request::get);
assertThat(e.getMessage(), containsString("\"reason\":\"Bad Authorization\""));
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/

package org.elasticsearch.reindex;

import org.elasticsearch.features.FeatureSpecification;
import org.elasticsearch.features.NodeFeature;

import java.util.Set;

/**
* Test-only {@link FeatureSpecification} that conditionally registers {@link ReindexPlugin#REINDEX_PIT_SEARCH_FEATURE}
* based on whether the test is running with scroll or point-in-time searching.
*/
public class ReindexWithPointInTimeSearchTestFeatureSpecification implements FeatureSpecification {

/**
* Set by the running test before node creation. When {@code true}, this spec
* registers {@link ReindexPlugin#REINDEX_PIT_SEARCH_FEATURE} so the cluster reports the feature.
* Volatile, so it is visible across threads (test thread sets it, node thread reads it).
*/
public static volatile boolean REINDEX_PIT_SEARCH_FOR_TEST = false;

@Override
public Set<NodeFeature> getFeatures() {
if (REINDEX_PIT_SEARCH_FOR_TEST) {
return Set.of(ReindexPlugin.REINDEX_PIT_SEARCH_FEATURE);
}
return Set.of();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,14 @@
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.rest.root.MainRestPlugin;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.ESIntegTestCase.ClusterScope;
import org.elasticsearch.test.ESIntegTestCase.Scope;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.netty4.Netty4Plugin;
import org.junit.After;
import org.junit.Rule;
import org.junit.rules.TestRule;
import org.junit.runners.model.Statement;

import java.util.ArrayList;
import java.util.Arrays;
Expand All @@ -57,12 +62,36 @@
* Integration test for bulk retry behavior. Useful because retrying relies on the way that the
* rest of Elasticsearch throws exceptions and unit tests won't verify that.
*/
@ClusterScope(scope = Scope.TEST)
public class RetryTests extends ESIntegTestCase {

private static final int DOC_COUNT = 20;

private List<CyclicBarrier> blockedExecutors = new ArrayList<>();

/**
* Sets {@link ReindexWithPointInTimeSearchTestFeatureSpecification#REINDEX_PIT_SEARCH_FOR_TEST} before
* {@code testReindexFromRemoteWithPit} runs so the cluster is built with {@link ReindexPlugin#REINDEX_PIT_SEARCH_FEATURE}
* enabled. Uses a {@link TestRule} because it must run before the cluster is created in {@code setupTestCluster()} which is
* a final method and cannot be overridden.
*/
@Rule
public TestRule setPitFlagForTestRule = (base, description) -> new Statement() {
@Override
public void evaluate() throws Throwable {
if ("testReindexFromRemoteWithPit".equals(description.getMethodName())) {
ReindexWithPointInTimeSearchTestFeatureSpecification.REINDEX_PIT_SEARCH_FOR_TEST = true;
}
try {
base.evaluate();
} finally {
if ("testReindexFromRemoteWithPit".equals(description.getMethodName())) {
ReindexWithPointInTimeSearchTestFeatureSpecification.REINDEX_PIT_SEARCH_FOR_TEST = false;
}
}
}
};

@After
public void forceUnblockAllExecutors() {
for (CyclicBarrier barrier : blockedExecutors) {
Expand Down Expand Up @@ -104,6 +133,14 @@ public void testReindex() throws Exception {
}

public void testReindexFromRemote() throws Exception {
indexFromRemoteInternal();
}

public void testReindexFromRemoteWithPit() throws Exception {
indexFromRemoteInternal();
}

private void indexFromRemoteInternal() throws Exception {
Function<Client, AbstractBulkByScrollRequestBuilder<?, ?>> function = client -> {
/*
* Use the master node for the reindex from remote because that node
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
org.elasticsearch.reindex.ReindexWithPointInTimeSearchTestFeatureSpecification
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/

package org.elasticsearch.index.reindex;

import com.carrotsearch.randomizedtesting.annotations.Name;
import com.carrotsearch.randomizedtesting.annotations.ParametersFactory;

import org.elasticsearch.test.cluster.ElasticsearchCluster;
import org.elasticsearch.test.rest.yaml.ClientYamlTestCandidate;
import org.elasticsearch.test.rest.yaml.ESClientYamlSuiteTestCase;
import org.junit.ClassRule;

public class ReindexClientYamlTestSuiteWithPitIT extends ESClientYamlSuiteTestCase {

public ReindexClientYamlTestSuiteWithPitIT(@Name("yaml") ClientYamlTestCandidate testCandidate) {
super(testCandidate);
}

@ParametersFactory
public static Iterable<Object[]> parameters() throws Exception {
return createParameters("reindex/90_remote", "reindex/95_parent_join");
}

@ClassRule
public static ElasticsearchCluster cluster = ElasticsearchCluster.local()
.module("reindex")
.module("reindex-management")
.module("lang-painless")
.module("parent-join")
.module("rest-root")
.setting("reindex.remote.whitelist", "127.0.0.1:*")
.systemProperty("es.reindex_pit_search_feature_flag_enabled", "true")
.build();

@Override
protected String getTestRestCluster() {
return cluster.getHttpAddresses();
}
}