Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC
bulkRequest.timeout(request.paramAsTime("timeout", BulkShardRequest.DEFAULT_TIMEOUT));
bulkRequest.setRefreshPolicy(request.param("refresh"));
bulkRequest.add(request.requiredContent(), defaultIndex, defaultType, defaultRouting,
null, defaultPipeline, null, true, request.getXContentType());
null, defaultPipeline, true, request.getXContentType());

// short circuit the call to the transport layer
return channel -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -447,7 +447,7 @@ private MultiGetRequest indexDocs(BulkProcessor processor, int numDocs, String l
.source(XContentType.JSON, "field", randomRealisticUnicodeOfLengthBetween(1, 30)));
} else {
BytesArray data = bytesBulkRequest(localIndex, localType, i);
processor.add(data, globalIndex, globalType, globalPipeline, null, XContentType.JSON);
processor.add(data, globalIndex, globalType, globalPipeline, XContentType.JSON);

if (localType != null) {
// If the payload contains types, parsing it into a bulk request results in a warning.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -301,11 +301,7 @@ public BulkProcessor add(DeleteRequest request) {
* Adds either a delete or an index request.
*/
public BulkProcessor add(DocWriteRequest<?> request) {
return add(request, null);
}

public BulkProcessor add(DocWriteRequest<?> request, @Nullable Object payload) {
internalAdd(request, payload);
internalAdd(request);
return this;
}

Expand All @@ -319,9 +315,9 @@ protected void ensureOpen() {
}
}

private synchronized void internalAdd(DocWriteRequest<?> request, @Nullable Object payload) {
private synchronized void internalAdd(DocWriteRequest<?> request) {
ensureOpen();
bulkRequest.add(request, payload);
bulkRequest.add(request);
executeIfNeeded();
}

Expand All @@ -330,16 +326,16 @@ private synchronized void internalAdd(DocWriteRequest<?> request, @Nullable Obje
*/
public BulkProcessor add(BytesReference data, @Nullable String defaultIndex, @Nullable String defaultType,
XContentType xContentType) throws Exception {
return add(data, defaultIndex, defaultType, null, null, xContentType);
return add(data, defaultIndex, defaultType, null, xContentType);
}

/**
* Adds the data from the bytes to be processed by the bulk processor
*/
public synchronized BulkProcessor add(BytesReference data, @Nullable String defaultIndex, @Nullable String defaultType,
@Nullable String defaultPipeline, @Nullable Object payload,
@Nullable String defaultPipeline,
XContentType xContentType) throws Exception {
bulkRequest.add(data, defaultIndex, defaultType, null, null, defaultPipeline, payload, true, xContentType);
bulkRequest.add(data, defaultIndex, defaultType, null, null, defaultPipeline, true, xContentType);
executeIfNeeded();
return this;
}
Expand Down
301 changes: 20 additions & 281 deletions server/src/main/java/org/elasticsearch/action/bulk/BulkRequest.java

Large diffs are not rendered by default.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC
bulkRequest.timeout(request.paramAsTime("timeout", BulkShardRequest.DEFAULT_TIMEOUT));
bulkRequest.setRefreshPolicy(request.param("refresh"));
bulkRequest.add(request.requiredContent(), defaultIndex, defaultType, defaultRouting,
defaultFetchSourceContext, defaultPipeline, null, allowExplicitIndex, request.getXContentType());
defaultFetchSourceContext, defaultPipeline, allowExplicitIndex, request.getXContentType());

return channel -> client.bulk(bulkRequest, new RestStatusToXContentListener<>(channel));
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.action.bulk;

import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.rest.action.document.RestBulkAction;
import org.elasticsearch.test.ESTestCase;

import java.io.IOException;
import java.util.concurrent.atomic.AtomicBoolean;

public class BulkRequestParserTests extends ESTestCase {

public void testIndexRequest() throws IOException {
BytesArray request = new BytesArray("{ \"index\":{ \"_id\": \"bar\" } }\n{}\n");
BulkRequestParser parser = new BulkRequestParser(randomBoolean());
final AtomicBoolean parsed = new AtomicBoolean();
parser.parse(request, "foo", null, null, null, null, false, XContentType.JSON,
indexRequest -> {
assertFalse(parsed.get());
assertEquals("foo", indexRequest.index());
assertEquals("bar", indexRequest.id());
parsed.set(true);
},
req -> fail(), req -> fail());
assertTrue(parsed.get());
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this (and tests below) need to conditionally assert assertWarnings(RestBulkAction.TYPES_DEPRECATION_MESSAGE); based on the randomBoolean() (line 34) ?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They don't since the document doesn't include a _type.

}

public void testDeleteRequest() throws IOException {
BytesArray request = new BytesArray("{ \"delete\":{ \"_id\": \"bar\" } }\n");
BulkRequestParser parser = new BulkRequestParser(randomBoolean());
final AtomicBoolean parsed = new AtomicBoolean();
parser.parse(request, "foo", null, null, null, null, false, XContentType.JSON,
req -> fail(), req -> fail(),
deleteRequest -> {
assertFalse(parsed.get());
assertEquals("foo", deleteRequest.index());
assertEquals("bar", deleteRequest.id());
parsed.set(true);
});
assertTrue(parsed.get());
}

public void testUpdateRequest() throws IOException {
BytesArray request = new BytesArray("{ \"update\":{ \"_id\": \"bar\" } }\n{}\n");
BulkRequestParser parser = new BulkRequestParser(randomBoolean());
final AtomicBoolean parsed = new AtomicBoolean();
parser.parse(request, "foo", null, null, null, null, false, XContentType.JSON,
req -> fail(),
updateRequest -> {
assertFalse(parsed.get());
assertEquals("foo", updateRequest.index());
assertEquals("bar", updateRequest.id());
parsed.set(true);
},
req -> fail());
assertTrue(parsed.get());
}

public void testBarfOnLackOfTrailingNewline() throws IOException {
BytesArray request = new BytesArray("{ \"index\":{ \"_id\": \"bar\" } }\n{}");
BulkRequestParser parser = new BulkRequestParser(randomBoolean());
IllegalArgumentException e = expectThrows(IllegalArgumentException.class,
() -> parser.parse(request, "foo", null, null, null, null, false, XContentType.JSON,
indexRequest -> fail(), req -> fail(), req -> fail()));
assertEquals("The bulk request must be terminated by a newline [\\n]", e.getMessage());
}

public void testFailOnExplicitIndex() throws IOException {
BytesArray request = new BytesArray("{ \"index\":{ \"_index\": \"foo\", \"_id\": \"bar\" } }\n{}\n");
BulkRequestParser parser = new BulkRequestParser(randomBoolean());

IllegalArgumentException ex = expectThrows(IllegalArgumentException.class,
() -> parser.parse(request, null, null, null, null, null, false, XContentType.JSON,
req -> fail(), req -> fail(), req -> fail()));
assertEquals("explicit index in bulk is not allowed", ex.getMessage());
}

public void testTypeWarning() throws IOException {
BytesArray request = new BytesArray("{ \"index\":{ \"_type\": \"quux\", \"_id\": \"bar\" } }\n{}\n");
BulkRequestParser parser = new BulkRequestParser(true);
final AtomicBoolean parsed = new AtomicBoolean();
parser.parse(request, "foo", null, null, null, null, false, XContentType.JSON,
indexRequest -> {
assertFalse(parsed.get());
assertEquals("foo", indexRequest.index());
assertEquals("bar", indexRequest.id());
parsed.set(true);
},
req -> fail(), req -> fail());
assertTrue(parsed.get());

assertWarnings(RestBulkAction.TYPES_DEPRECATION_MESSAGE);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -352,7 +352,7 @@ public void testBulkTerminatedByNewline() throws Exception {
String bulkAction = copyToStringFromClasspath("/org/elasticsearch/action/bulk/simple-bulk11.json");
IllegalArgumentException expectThrows = expectThrows(IllegalArgumentException.class, () -> new BulkRequest()
.add(bulkAction.getBytes(StandardCharsets.UTF_8), 0, bulkAction.length(), null, XContentType.JSON));
assertEquals("The bulk request must be terminated by a newline [\n]", expectThrows.getMessage());
assertEquals("The bulk request must be terminated by a newline [\\n]", expectThrows.getMessage());

String bulkActionWithNewLine = bulkAction + "\n";
BulkRequest bulkRequestWithNewLine = new BulkRequest();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,7 @@

import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.Requests;
import org.elasticsearch.action.bulk.BulkRequestParser;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
Expand Down Expand Up @@ -75,32 +72,27 @@ public MonitoringBulkRequest add(final MonitoredSystem system,
final long timestamp,
final long intervalMillis) throws IOException {

// MonitoringBulkRequest accepts a body request that has the same format as the BulkRequest:
// instead of duplicating the parsing logic here we use a new BulkRequest instance to parse the content.
final BulkRequest bulkRequest = Requests.bulkRequest().add(content, null, defaultType, xContentType);
// MonitoringBulkRequest accepts a body request that has the same format as the BulkRequest
new BulkRequestParser(false).parse(content, null, defaultType, null, null, null, true, xContentType,
indexRequest -> {
// we no longer accept non-timestamped indexes from Kibana, LS, or Beats because we do not use the data
// and it was duplicated anyway; by simply dropping it, we allow BWC for older clients that still send it
if (MonitoringIndex.from(indexRequest.index()) != MonitoringIndex.TIMESTAMPED) {
return;
}
final BytesReference source = indexRequest.source();
if (source.length() == 0) {
throw new IllegalArgumentException("source is missing for monitoring document ["
+ indexRequest.index() + "][" + indexRequest.type() + "][" + indexRequest.id() + "]");
}

// builds a new monitoring document based on the index request
add(new MonitoringBulkDoc(system, indexRequest.type(), indexRequest.id(), timestamp, intervalMillis, source,
xContentType));
},
updateRequest -> { throw new IllegalArgumentException("monitoring bulk requests should only contain index requests"); },
deleteRequest -> { throw new IllegalArgumentException("monitoring bulk requests should only contain index requests"); });

for (DocWriteRequest request : bulkRequest.requests()) {
if (request instanceof IndexRequest) {
final IndexRequest indexRequest = (IndexRequest) request;

// we no longer accept non-timestamped indexes from Kibana, LS, or Beats because we do not use the data
// and it was duplicated anyway; by simply dropping it, we allow BWC for older clients that still send it
if (MonitoringIndex.from(indexRequest.index()) != MonitoringIndex.TIMESTAMPED) {
continue;
}

final BytesReference source = indexRequest.source();
if (source.length() == 0) {
throw new IllegalArgumentException("source is missing for monitoring document ["
+ indexRequest.index() + "][" + indexRequest.type() + "][" + indexRequest.id() + "]");
}

// builds a new monitoring document based on the index request
add(new MonitoringBulkDoc(system, indexRequest.type(), indexRequest.id(), timestamp, intervalMillis, source, xContentType));
} else {
throw new IllegalArgumentException("monitoring bulk requests should only contain index requests");
}
}
return this;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.rest.action.document.RestBulkAction;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.RandomObjects;
import org.elasticsearch.xpack.core.monitoring.MonitoredSystem;
Expand Down Expand Up @@ -142,8 +141,6 @@ public void testAddRequestContent() throws IOException {
assertThat(bulkDoc.getXContentType(), equalTo(xContentType));
++count;
}
//This test's JSON contains outdated references to types
assertWarnings(RestBulkAction.TYPES_DEPRECATION_MESSAGE);
}

public void testAddRequestContentWithEmptySource() throws IOException {
Expand Down Expand Up @@ -191,8 +188,6 @@ public void testAddRequestContentWithEmptySource() throws IOException {
);

assertThat(e.getMessage(), containsString("source is missing for monitoring document [][doc][" + nbDocs + "]"));
//This test's JSON contains outdated references to types
assertWarnings(RestBulkAction.TYPES_DEPRECATION_MESSAGE);
}

public void testAddRequestContentWithUnrecognizedIndexName() throws IOException {
Expand Down Expand Up @@ -230,8 +225,6 @@ public void testAddRequestContentWithUnrecognizedIndexName() throws IOException
);

assertThat(e.getMessage(), containsString("unrecognized index name [" + indexName + "]"));
//This test's JSON contains outdated references to types
assertWarnings(RestBulkAction.TYPES_DEPRECATION_MESSAGE);
}

public void testSerialization() throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import org.elasticsearch.rest.RestResponse;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.rest.action.RestBuilderListener;
import org.elasticsearch.rest.action.document.RestBulkAction;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.rest.FakeRestRequest;
import org.elasticsearch.xpack.core.XPackClient;
Expand Down Expand Up @@ -122,8 +121,6 @@ public void testNoErrors() throws Exception {
assertThat(restResponse.status(), is(RestStatus.OK));
assertThat(restResponse.content().utf8ToString(),
is("{\"took\":" + response.getTookInMillis() + ",\"ignored\":false,\"errors\":false}"));
//This test's JSON contains outdated references to types
assertWarnings(RestBulkAction.TYPES_DEPRECATION_MESSAGE);
}

public void testNoErrorsButIgnored() throws Exception {
Expand All @@ -134,8 +131,6 @@ public void testNoErrorsButIgnored() throws Exception {
assertThat(restResponse.status(), is(RestStatus.OK));
assertThat(restResponse.content().utf8ToString(),
is("{\"took\":" + response.getTookInMillis() + ",\"ignored\":true,\"errors\":false}"));
//This test's JSON contains outdated references to types
assertWarnings(RestBulkAction.TYPES_DEPRECATION_MESSAGE);
}

public void testWithErrors() throws Exception {
Expand All @@ -155,8 +150,6 @@ public void testWithErrors() throws Exception {
assertThat(restResponse.status(), is(RestStatus.INTERNAL_SERVER_ERROR));
assertThat(restResponse.content().utf8ToString(),
is("{\"took\":" + response.getTookInMillis() + ",\"ignored\":false,\"errors\":true,\"error\":" + errorJson + "}"));
//This test's JSON contains outdated references to types
assertWarnings(RestBulkAction.TYPES_DEPRECATION_MESSAGE);
}

/**
Expand Down