Skip to content

Commit 2fa6bc5

Browse files
authored
Properly serialize remote query in ReindexRequest (#43596)
This commit modifies the RemoteInfo to clarify that a search query must always be serialized as JSON. Additionally, it adds an assertion to ensure that this is the case. This fixes #43406. Additionally, this PR implements AbstractXContentTestCase for the reindex request. This is related to #43456.
1 parent 51161a4 commit 2fa6bc5

File tree

12 files changed

+251
-46
lines changed

12 files changed

+251
-46
lines changed

modules/reindex/src/test/java/org/elasticsearch/index/reindex/ReindexFromRemoteBuildRestClientTests.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,9 +22,11 @@
2222
import org.elasticsearch.client.RestClient;
2323
import org.elasticsearch.client.RestClientBuilderTestCase;
2424
import org.elasticsearch.common.bytes.BytesArray;
25+
import org.elasticsearch.common.bytes.BytesReference;
2526
import org.elasticsearch.common.settings.Settings;
2627
import org.elasticsearch.env.Environment;
2728
import org.elasticsearch.env.TestEnvironment;
29+
import org.elasticsearch.index.query.MatchAllQueryBuilder;
2830
import org.elasticsearch.watcher.ResourceWatcherService;
2931

3032
import java.util.ArrayList;
@@ -38,9 +40,12 @@
3840
import static org.mockito.Mockito.mock;
3941

4042
public class ReindexFromRemoteBuildRestClientTests extends RestClientBuilderTestCase {
43+
44+
private final BytesReference matchAll = new BytesArray(new MatchAllQueryBuilder().toString());
45+
4146
public void testBuildRestClient() throws Exception {
4247
for(final String path: new String[]{"", null, "/", "path"}) {
43-
RemoteInfo remoteInfo = new RemoteInfo("https", "localhost", 9200, path, new BytesArray("ignored"), null, null, emptyMap(),
48+
RemoteInfo remoteInfo = new RemoteInfo("https", "localhost", 9200, path, matchAll, null, null, emptyMap(),
4449
RemoteInfo.DEFAULT_SOCKET_TIMEOUT, RemoteInfo.DEFAULT_CONNECT_TIMEOUT);
4550
long taskId = randomLong();
4651
List<Thread> threads = synchronizedList(new ArrayList<>());
@@ -64,7 +69,7 @@ public void testHeaders() throws Exception {
6469
for (int i = 0; i < numHeaders; ++i) {
6570
headers.put("header" + i, Integer.toString(i));
6671
}
67-
RemoteInfo remoteInfo = new RemoteInfo("https", "localhost", 9200, null, new BytesArray("ignored"), null, null,
72+
RemoteInfo remoteInfo = new RemoteInfo("https", "localhost", 9200, null, matchAll, null, null,
6873
headers, RemoteInfo.DEFAULT_SOCKET_TIMEOUT, RemoteInfo.DEFAULT_CONNECT_TIMEOUT);
6974
long taskId = randomLong();
7075
List<Thread> threads = synchronizedList(new ArrayList<>());

modules/reindex/src/test/java/org/elasticsearch/index/reindex/ReindexFromRemoteWhitelistTests.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
package org.elasticsearch.index.reindex;
2121

2222
import org.elasticsearch.common.bytes.BytesArray;
23+
import org.elasticsearch.common.bytes.BytesReference;
2324
import org.elasticsearch.test.ESTestCase;
2425

2526
import java.net.UnknownHostException;
@@ -37,6 +38,9 @@
3738
* Tests the reindex-from-remote whitelist of remotes.
3839
*/
3940
public class ReindexFromRemoteWhitelistTests extends ESTestCase {
41+
42+
private final BytesReference query = new BytesArray("{ \"foo\" : \"bar\" }");
43+
4044
public void testLocalRequestWithoutWhitelist() {
4145
checkRemoteWhitelist(buildRemoteWhitelist(emptyList()), null);
4246
}
@@ -49,7 +53,7 @@ public void testLocalRequestWithWhitelist() {
4953
* Build a {@link RemoteInfo}, defaulting values that we don't care about in this test to values that don't hurt anything.
5054
*/
5155
private RemoteInfo newRemoteInfo(String host, int port) {
52-
return new RemoteInfo(randomAlphaOfLength(5), host, port, null, new BytesArray("test"), null, null, emptyMap(),
56+
return new RemoteInfo(randomAlphaOfLength(5), host, port, null, query, null, null, emptyMap(),
5357
RemoteInfo.DEFAULT_SOCKET_TIMEOUT, RemoteInfo.DEFAULT_CONNECT_TIMEOUT);
5458
}
5559

@@ -63,7 +67,7 @@ public void testWhitelistedRemote() {
6367

6468
public void testWhitelistedByPrefix() {
6569
checkRemoteWhitelist(buildRemoteWhitelist(singletonList("*.example.com:9200")),
66-
new RemoteInfo(randomAlphaOfLength(5), "es.example.com", 9200, null, new BytesArray("test"), null, null, emptyMap(),
70+
new RemoteInfo(randomAlphaOfLength(5), "es.example.com", 9200, null, query, null, null, emptyMap(),
6771
RemoteInfo.DEFAULT_SOCKET_TIMEOUT, RemoteInfo.DEFAULT_CONNECT_TIMEOUT));
6872
checkRemoteWhitelist(buildRemoteWhitelist(singletonList("*.example.com:9200")),
6973
newRemoteInfo("6e134134a1.us-east-1.aws.example.com", 9200));

modules/reindex/src/test/java/org/elasticsearch/index/reindex/ReindexRestClientSslTests.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -199,8 +199,9 @@ public void testClientPassesClientCertificate() throws IOException {
199199
}
200200

201201
private RemoteInfo getRemoteInfo() {
202-
return new RemoteInfo("https", server.getAddress().getHostName(), server.getAddress().getPort(), "/", new BytesArray("test"),
203-
"user", "password", Collections.emptyMap(), RemoteInfo.DEFAULT_SOCKET_TIMEOUT, RemoteInfo.DEFAULT_CONNECT_TIMEOUT);
202+
return new RemoteInfo("https", server.getAddress().getHostName(), server.getAddress().getPort(), "/",
203+
new BytesArray("{\"match_all\":{}}"), "user", "password", Collections.emptyMap(), RemoteInfo.DEFAULT_SOCKET_TIMEOUT,
204+
RemoteInfo.DEFAULT_CONNECT_TIMEOUT);
204205
}
205206

206207
@SuppressForbidden(reason = "use http server")

modules/reindex/src/test/java/org/elasticsearch/index/reindex/ReindexSourceTargetValidationTests.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import org.elasticsearch.cluster.metadata.MetaData;
3333
import org.elasticsearch.common.Nullable;
3434
import org.elasticsearch.common.bytes.BytesArray;
35+
import org.elasticsearch.common.bytes.BytesReference;
3536
import org.elasticsearch.common.settings.ClusterSettings;
3637
import org.elasticsearch.common.settings.Settings;
3738
import org.elasticsearch.test.ESTestCase;
@@ -61,6 +62,8 @@ public class ReindexSourceTargetValidationTests extends ESTestCase {
6162
private static final AutoCreateIndex AUTO_CREATE_INDEX = new AutoCreateIndex(Settings.EMPTY,
6263
new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), INDEX_NAME_EXPRESSION_RESOLVER);
6364

65+
private final BytesReference query = new BytesArray("{ \"foo\" : \"bar\" }");
66+
6467
public void testObviousCases() {
6568
fails("target", "target");
6669
fails("target", "foo", "bar", "target", "baz");
@@ -106,10 +109,10 @@ public void testTargetIsWriteAlias() {
106109

107110
public void testRemoteInfoSkipsValidation() {
108111
// The index doesn't have to exist
109-
succeeds(new RemoteInfo(randomAlphaOfLength(5), "test", 9200, null, new BytesArray("test"), null, null, emptyMap(),
112+
succeeds(new RemoteInfo(randomAlphaOfLength(5), "test", 9200, null, query, null, null, emptyMap(),
110113
RemoteInfo.DEFAULT_SOCKET_TIMEOUT, RemoteInfo.DEFAULT_CONNECT_TIMEOUT), "does_not_exist", "target");
111114
// And it doesn't matter if they are the same index. They are considered to be different because the remote one is, well, remote.
112-
succeeds(new RemoteInfo(randomAlphaOfLength(5), "test", 9200, null, new BytesArray("test"), null, null, emptyMap(),
115+
succeeds(new RemoteInfo(randomAlphaOfLength(5), "test", 9200, null, query, null, null, emptyMap(),
113116
RemoteInfo.DEFAULT_SOCKET_TIMEOUT, RemoteInfo.DEFAULT_CONNECT_TIMEOUT), "target", "target");
114117
}
115118

modules/reindex/src/test/java/org/elasticsearch/index/reindex/RoundTripTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ public void testReindexRequest() throws IOException {
5151
reindex.getDestination().index("test");
5252
if (randomBoolean()) {
5353
int port = between(1, Integer.MAX_VALUE);
54-
BytesReference query = new BytesArray(randomAlphaOfLength(5));
54+
BytesReference query = new BytesArray("{\"match_all\":{}}");
5555
String username = randomBoolean() ? randomAlphaOfLength(5) : null;
5656
String password = username != null && randomBoolean() ? randomAlphaOfLength(5) : null;
5757
int headersCount = randomBoolean() ? 0 : between(1, 10);

modules/reindex/src/test/java/org/elasticsearch/index/reindex/remote/RemoteInfoTests.java

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -19,28 +19,28 @@
1919

2020
package org.elasticsearch.index.reindex.remote;
2121

22-
import org.elasticsearch.index.reindex.RemoteInfo;
2322
import org.elasticsearch.common.bytes.BytesArray;
23+
import org.elasticsearch.index.reindex.RemoteInfo;
2424
import org.elasticsearch.test.ESTestCase;
2525

2626
import static java.util.Collections.emptyMap;
2727

2828
public class RemoteInfoTests extends ESTestCase {
2929
private RemoteInfo newRemoteInfo(String scheme, String prefixPath, String username, String password) {
30-
return new RemoteInfo(scheme, "testhost", 12344, prefixPath, new BytesArray("testquery"), username, password, emptyMap(),
31-
RemoteInfo.DEFAULT_SOCKET_TIMEOUT, RemoteInfo.DEFAULT_CONNECT_TIMEOUT);
30+
return new RemoteInfo(scheme, "testhost", 12344, prefixPath,new BytesArray("{ \"foo\" : \"bar\" }"), username, password,
31+
emptyMap(), RemoteInfo.DEFAULT_SOCKET_TIMEOUT, RemoteInfo.DEFAULT_CONNECT_TIMEOUT);
3232
}
3333

3434
public void testToString() {
35-
assertEquals("host=testhost port=12344 query=testquery",
35+
assertEquals("host=testhost port=12344 query={ \"foo\" : \"bar\" }",
3636
newRemoteInfo("http", null, null, null).toString());
37-
assertEquals("host=testhost port=12344 query=testquery username=testuser",
37+
assertEquals("host=testhost port=12344 query={ \"foo\" : \"bar\" } username=testuser",
3838
newRemoteInfo("http", null, "testuser", null).toString());
39-
assertEquals("host=testhost port=12344 query=testquery username=testuser password=<<>>",
39+
assertEquals("host=testhost port=12344 query={ \"foo\" : \"bar\" } username=testuser password=<<>>",
4040
newRemoteInfo("http", null, "testuser", "testpass").toString());
41-
assertEquals("scheme=https host=testhost port=12344 query=testquery username=testuser password=<<>>",
41+
assertEquals("scheme=https host=testhost port=12344 query={ \"foo\" : \"bar\" } username=testuser password=<<>>",
4242
newRemoteInfo("https", null, "testuser", "testpass").toString());
43-
assertEquals("scheme=https host=testhost port=12344 pathPrefix=prxy query=testquery username=testuser password=<<>>",
43+
assertEquals("scheme=https host=testhost port=12344 pathPrefix=prxy query={ \"foo\" : \"bar\" } username=testuser password=<<>>",
4444
newRemoteInfo("https", "prxy", "testuser", "testpass").toString());
4545
}
4646
}

server/src/main/java/org/elasticsearch/index/reindex/ReindexRequest.java

Lines changed: 2 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -33,12 +33,10 @@
3333
import org.elasticsearch.common.lucene.uid.Versions;
3434
import org.elasticsearch.common.unit.TimeValue;
3535
import org.elasticsearch.common.xcontent.ObjectParser;
36-
import org.elasticsearch.common.xcontent.ToXContent;
3736
import org.elasticsearch.common.xcontent.ToXContentObject;
3837
import org.elasticsearch.common.xcontent.XContentBuilder;
3938
import org.elasticsearch.common.xcontent.XContentFactory;
4039
import org.elasticsearch.common.xcontent.XContentParser;
41-
import org.elasticsearch.common.xcontent.json.JsonXContent;
4240
import org.elasticsearch.index.VersionType;
4341
import org.elasticsearch.index.mapper.MapperService;
4442
import org.elasticsearch.index.query.QueryBuilder;
@@ -58,7 +56,6 @@
5856
import static org.elasticsearch.action.ValidateActions.addValidationError;
5957
import static org.elasticsearch.common.unit.TimeValue.parseTimeValue;
6058
import static org.elasticsearch.index.VersionType.INTERNAL;
61-
import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery;
6259

6360
/**
6461
* Request to reindex some documents from one index to another. This implements CompositeIndicesRequest but in a misleading way. Rather than
@@ -313,7 +310,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
313310
builder.startObject("source");
314311
if (remoteInfo != null) {
315312
builder.field("remote", remoteInfo);
316-
builder.rawField("query", remoteInfo.getQuery().streamInput(), builder.contentType());
313+
builder.rawField("query", remoteInfo.getQuery().streamInput(), RemoteInfo.QUERY_CONTENT_TYPE.type());
317314
}
318315
builder.array("index", getSearchRequest().indices());
319316
String[] types = getSearchRequest().types();
@@ -466,7 +463,7 @@ static RemoteInfo buildRemoteInfo(Map<String, Object> source) throws IOException
466463
throw new IllegalArgumentException(
467464
"Unsupported fields in [remote]: [" + Strings.collectionToCommaDelimitedString(remote.keySet()) + "]");
468465
}
469-
return new RemoteInfo(scheme, host, port, pathPrefix, queryForRemote(source),
466+
return new RemoteInfo(scheme, host, port, pathPrefix, RemoteInfo.queryForRemote(source),
470467
username, password, headers, socketTimeout, connectTimeout);
471468
}
472469

@@ -505,20 +502,6 @@ private static TimeValue extractTimeValue(Map<String, Object> source, String nam
505502
return string == null ? defaultValue : parseTimeValue(string, name);
506503
}
507504

508-
private static BytesReference queryForRemote(Map<String, Object> source) throws IOException {
509-
XContentBuilder builder = JsonXContent.contentBuilder().prettyPrint();
510-
Object query = source.remove("query");
511-
if (query == null) {
512-
return BytesReference.bytes(matchAllQuery().toXContent(builder, ToXContent.EMPTY_PARAMS));
513-
}
514-
if (!(query instanceof Map)) {
515-
throw new IllegalArgumentException("Expected [query] to be an object but was [" + query + "]");
516-
}
517-
@SuppressWarnings("unchecked")
518-
Map<String, Object> map = (Map<String, Object>) query;
519-
return BytesReference.bytes(builder.map(map));
520-
}
521-
522505
static void setMaxDocsValidateIdentical(AbstractBulkByScrollRequest<?> request, int maxDocs) {
523506
if (request.getMaxDocs() != AbstractBulkByScrollRequest.MAX_DOCS_ALL_MATCHES && request.getMaxDocs() != maxDocs) {
524507
throw new IllegalArgumentException("[max_docs] set to two different values [" + request.getMaxDocs() + "]" +

server/src/main/java/org/elasticsearch/index/reindex/RemoteInfo.java

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,16 +26,24 @@
2626
import org.elasticsearch.common.io.stream.StreamOutput;
2727
import org.elasticsearch.common.io.stream.Writeable;
2828
import org.elasticsearch.common.unit.TimeValue;
29+
import org.elasticsearch.common.xcontent.DeprecationHandler;
30+
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
31+
import org.elasticsearch.common.xcontent.ToXContent;
2932
import org.elasticsearch.common.xcontent.ToXContentObject;
33+
import org.elasticsearch.common.xcontent.XContent;
3034
import org.elasticsearch.common.xcontent.XContentBuilder;
35+
import org.elasticsearch.common.xcontent.XContentParser;
36+
import org.elasticsearch.common.xcontent.json.JsonXContent;
3137

3238
import java.io.IOException;
3339
import java.util.HashMap;
3440
import java.util.Map;
41+
import java.util.Objects;
3542

3643
import static java.util.Collections.unmodifiableMap;
3744
import static java.util.Objects.requireNonNull;
3845
import static org.elasticsearch.common.unit.TimeValue.timeValueSeconds;
46+
import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery;
3947

4048
public class RemoteInfo implements Writeable, ToXContentObject {
4149
/**
@@ -47,6 +55,8 @@ public class RemoteInfo implements Writeable, ToXContentObject {
4755
*/
4856
public static final TimeValue DEFAULT_CONNECT_TIMEOUT = timeValueSeconds(30);
4957

58+
public static final XContent QUERY_CONTENT_TYPE = JsonXContent.jsonXContent;
59+
5060
private final String scheme;
5161
private final String host;
5262
private final int port;
@@ -66,6 +76,7 @@ public class RemoteInfo implements Writeable, ToXContentObject {
6676

6777
public RemoteInfo(String scheme, String host, int port, String pathPrefix, BytesReference query, String username, String password,
6878
Map<String, String> headers, TimeValue socketTimeout, TimeValue connectTimeout) {
79+
assert isQueryJson(query) : "Query does not appear to be JSON";
6980
this.scheme = requireNonNull(scheme, "[scheme] must be specified to reindex from a remote cluster");
7081
this.host = requireNonNull(host, "[host] must be specified to reindex from a remote cluster");
7182
this.port = port;
@@ -212,4 +223,50 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
212223
builder.endObject();
213224
return builder;
214225
}
226+
227+
@Override
228+
public boolean equals(Object o) {
229+
if (this == o) return true;
230+
if (o == null || getClass() != o.getClass()) return false;
231+
RemoteInfo that = (RemoteInfo) o;
232+
return port == that.port &&
233+
Objects.equals(scheme, that.scheme) &&
234+
Objects.equals(host, that.host) &&
235+
Objects.equals(pathPrefix, that.pathPrefix) &&
236+
Objects.equals(query, that.query) &&
237+
Objects.equals(username, that.username) &&
238+
Objects.equals(password, that.password) &&
239+
Objects.equals(headers, that.headers) &&
240+
Objects.equals(socketTimeout, that.socketTimeout) &&
241+
Objects.equals(connectTimeout, that.connectTimeout);
242+
}
243+
244+
@Override
245+
public int hashCode() {
246+
return Objects.hash(scheme, host, port, pathPrefix, query, username, password, headers, socketTimeout, connectTimeout);
247+
}
248+
249+
static BytesReference queryForRemote(Map<String, Object> source) throws IOException {
250+
XContentBuilder builder = XContentBuilder.builder(QUERY_CONTENT_TYPE).prettyPrint();
251+
Object query = source.remove("query");
252+
if (query == null) {
253+
return BytesReference.bytes(matchAllQuery().toXContent(builder, ToXContent.EMPTY_PARAMS));
254+
}
255+
if (!(query instanceof Map)) {
256+
throw new IllegalArgumentException("Expected [query] to be an object but was [" + query + "]");
257+
}
258+
@SuppressWarnings("unchecked")
259+
Map<String, Object> map = (Map<String, Object>) query;
260+
return BytesReference.bytes(builder.map(map));
261+
}
262+
263+
private static boolean isQueryJson(BytesReference bytesReference) {
264+
try (XContentParser parser = QUERY_CONTENT_TYPE.createParser(NamedXContentRegistry.EMPTY,
265+
DeprecationHandler.THROW_UNSUPPORTED_OPERATION, bytesReference.streamInput())) {
266+
Map<String, Object> query = parser.map();
267+
return true;
268+
} catch (IOException e) {
269+
throw new AssertionError("Could not parse JSON", e);
270+
}
271+
}
215272
}

server/src/test/java/org/elasticsearch/index/reindex/AbstractBulkByScrollRequestTestCase.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,17 +21,22 @@
2121

2222
import org.elasticsearch.action.search.SearchRequest;
2323
import org.elasticsearch.action.support.ActiveShardCount;
24+
import org.elasticsearch.common.xcontent.ToXContent;
2425
import org.elasticsearch.tasks.TaskId;
26+
import org.elasticsearch.test.AbstractXContentTestCase;
2527
import org.elasticsearch.test.ESTestCase;
2628

2729
import static org.elasticsearch.common.unit.TimeValue.parseTimeValue;
2830

2931
/**
3032
* Shared superclass for testing reindex and friends. In particular it makes sure to test the slice features.
3133
*/
32-
public abstract class AbstractBulkByScrollRequestTestCase<R extends AbstractBulkByScrollRequest<R>> extends ESTestCase {
34+
public abstract class AbstractBulkByScrollRequestTestCase<R extends AbstractBulkByScrollRequest<R> & ToXContent>
35+
extends AbstractXContentTestCase<R> {
36+
3337
public void testForSlice() {
3438
R original = newRequest();
39+
extraRandomizationForSlice(original);
3540
original.setAbortOnVersionConflict(randomBoolean());
3641
original.setRefresh(randomBoolean());
3742
original.setTimeout(parseTimeValue(randomPositiveTimeValue(), "timeout"));

server/src/test/java/org/elasticsearch/index/reindex/DeleteByQueryRequestTests.java

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,11 @@
2222
import org.elasticsearch.action.ActionRequestValidationException;
2323
import org.elasticsearch.action.search.SearchRequest;
2424
import org.elasticsearch.action.support.IndicesOptions;
25+
import org.elasticsearch.common.xcontent.XContentParser;
2526
import org.elasticsearch.index.query.QueryBuilders;
2627

28+
import java.io.IOException;
29+
2730
import static org.apache.lucene.util.TestUtil.randomSimpleString;
2831
import static org.hamcrest.Matchers.containsString;
2932
import static org.hamcrest.Matchers.is;
@@ -124,4 +127,28 @@ public void testValidateGivenValid() {
124127

125128
assertThat(e, is(nullValue()));
126129
}
130+
131+
// TODO: Implement standard to/from x-content parsing tests
132+
133+
@Override
134+
protected DeleteByQueryRequest createTestInstance() {
135+
return newRequest();
136+
}
137+
138+
@Override
139+
protected DeleteByQueryRequest doParseInstance(XContentParser parser) throws IOException {
140+
XContentParser.Token token;
141+
while ((token = parser.nextToken()) != null) {
142+
}
143+
return newRequest();
144+
}
145+
146+
@Override
147+
protected boolean supportsUnknownFields() {
148+
return false;
149+
}
150+
151+
@Override
152+
protected void assertEqualInstances(DeleteByQueryRequest expectedInstance, DeleteByQueryRequest newInstance) {
153+
}
127154
}

0 commit comments

Comments
 (0)