diff --git a/client/client-benchmark-noop-api-plugin/src/main/java/org/elasticsearch/plugin/noop/action/bulk/RestNoopBulkAction.java b/client/client-benchmark-noop-api-plugin/src/main/java/org/elasticsearch/plugin/noop/action/bulk/RestNoopBulkAction.java index 9c632afe19192..8805af367a80e 100644 --- a/client/client-benchmark-noop-api-plugin/src/main/java/org/elasticsearch/plugin/noop/action/bulk/RestNoopBulkAction.java +++ b/client/client-benchmark-noop-api-plugin/src/main/java/org/elasticsearch/plugin/noop/action/bulk/RestNoopBulkAction.java @@ -76,7 +76,7 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC bulkRequest.timeout(request.paramAsTime("timeout", BulkShardRequest.DEFAULT_TIMEOUT)); bulkRequest.setRefreshPolicy(request.param("refresh")); bulkRequest.add(request.requiredContent(), defaultIndex, defaultType, defaultRouting, - null, defaultPipeline, null, true, request.getXContentType()); + null, defaultPipeline, true, request.getXContentType()); // short circuit the call to the transport layer return channel -> { diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/BulkProcessorIT.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/BulkProcessorIT.java index 52110989e1715..762e927551b8b 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/BulkProcessorIT.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/BulkProcessorIT.java @@ -447,7 +447,7 @@ private MultiGetRequest indexDocs(BulkProcessor processor, int numDocs, String l .source(XContentType.JSON, "field", randomRealisticUnicodeOfLengthBetween(1, 30))); } else { BytesArray data = bytesBulkRequest(localIndex, localType, i); - processor.add(data, globalIndex, globalType, globalPipeline, null, XContentType.JSON); + processor.add(data, globalIndex, globalType, globalPipeline, XContentType.JSON); if (localType != null) { // If the payload contains types, parsing it into a bulk request results in a warning. diff --git a/server/src/main/java/org/elasticsearch/action/bulk/BulkProcessor.java b/server/src/main/java/org/elasticsearch/action/bulk/BulkProcessor.java index 02bebb5b38e42..b0ad87a8b744a 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/BulkProcessor.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/BulkProcessor.java @@ -301,11 +301,7 @@ public BulkProcessor add(DeleteRequest request) { * Adds either a delete or an index request. */ public BulkProcessor add(DocWriteRequest request) { - return add(request, null); - } - - public BulkProcessor add(DocWriteRequest request, @Nullable Object payload) { - internalAdd(request, payload); + internalAdd(request); return this; } @@ -319,9 +315,9 @@ protected void ensureOpen() { } } - private synchronized void internalAdd(DocWriteRequest request, @Nullable Object payload) { + private synchronized void internalAdd(DocWriteRequest request) { ensureOpen(); - bulkRequest.add(request, payload); + bulkRequest.add(request); executeIfNeeded(); } @@ -330,16 +326,16 @@ private synchronized void internalAdd(DocWriteRequest request, @Nullable Obje */ public BulkProcessor add(BytesReference data, @Nullable String defaultIndex, @Nullable String defaultType, XContentType xContentType) throws Exception { - return add(data, defaultIndex, defaultType, null, null, xContentType); + return add(data, defaultIndex, defaultType, null, xContentType); } /** * Adds the data from the bytes to be processed by the bulk processor */ public synchronized BulkProcessor add(BytesReference data, @Nullable String defaultIndex, @Nullable String defaultType, - @Nullable String defaultPipeline, @Nullable Object payload, + @Nullable String defaultPipeline, XContentType xContentType) throws Exception { - bulkRequest.add(data, defaultIndex, defaultType, null, null, defaultPipeline, payload, true, xContentType); + bulkRequest.add(data, defaultIndex, defaultType, null, null, defaultPipeline, true, xContentType); executeIfNeeded(); return this; } diff --git a/server/src/main/java/org/elasticsearch/action/bulk/BulkRequest.java b/server/src/main/java/org/elasticsearch/action/bulk/BulkRequest.java index c17507658bde2..b55425fc1b007 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/BulkRequest.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/BulkRequest.java @@ -19,7 +19,6 @@ package org.elasticsearch.action.bulk; -import org.apache.logging.log4j.LogManager; import org.elasticsearch.action.ActionRequest; import org.elasticsearch.action.ActionRequestValidationException; import org.elasticsearch.action.CompositeIndicesRequest; @@ -31,28 +30,17 @@ import org.elasticsearch.action.support.replication.ReplicationRequest; import org.elasticsearch.action.update.UpdateRequest; import org.elasticsearch.common.Nullable; -import org.elasticsearch.common.ParseField; import org.elasticsearch.common.Strings; import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; -import org.elasticsearch.common.logging.DeprecationLogger; -import org.elasticsearch.common.lucene.uid.Versions; import org.elasticsearch.common.unit.TimeValue; -import org.elasticsearch.common.xcontent.LoggingDeprecationHandler; -import org.elasticsearch.common.xcontent.NamedXContentRegistry; -import org.elasticsearch.common.xcontent.XContent; -import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.common.xcontent.XContentType; -import org.elasticsearch.index.VersionType; import org.elasticsearch.index.mapper.MapperService; -import org.elasticsearch.index.seqno.SequenceNumbers; -import org.elasticsearch.rest.action.document.RestBulkAction; import org.elasticsearch.search.fetch.subphase.FetchSourceContext; import java.io.IOException; -import java.io.InputStream; import java.util.ArrayList; import java.util.HashSet; import java.util.List; @@ -60,7 +48,6 @@ import java.util.Set; import static org.elasticsearch.action.ValidateActions.addValidationError; -import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_PRIMARY_TERM; /** * A bulk request holds an ordered {@link IndexRequest}s, {@link DeleteRequest}s and {@link UpdateRequest}s @@ -72,19 +59,6 @@ public class BulkRequest extends ActionRequest implements CompositeIndicesRequest, WriteRequest { private static final int REQUEST_OVERHEAD = 50; - private static final DeprecationLogger deprecationLogger = new DeprecationLogger(LogManager.getLogger(BulkRequest.class)); - private static final ParseField INDEX = new ParseField("_index"); - private static final ParseField TYPE = new ParseField("_type"); - private static final ParseField ID = new ParseField("_id"); - private static final ParseField ROUTING = new ParseField("routing"); - private static final ParseField OP_TYPE = new ParseField("op_type"); - private static final ParseField VERSION = new ParseField("version"); - private static final ParseField VERSION_TYPE = new ParseField("version_type"); - private static final ParseField RETRY_ON_CONFLICT = new ParseField("retry_on_conflict"); - private static final ParseField PIPELINE = new ParseField("pipeline"); - private static final ParseField SOURCE = new ParseField("_source"); - private static final ParseField IF_SEQ_NO = new ParseField("if_seq_no"); - private static final ParseField IF_PRIMARY_TERM = new ParseField("if_primary_term"); /** * Requests that are part of this request. It is only possible to add things that are both {@link ActionRequest}s and @@ -93,7 +67,6 @@ public class BulkRequest extends ActionRequest implements CompositeIndicesReques */ final List> requests = new ArrayList<>(); private final Set indices = new HashSet<>(); - List payloads = null; protected TimeValue timeout = BulkShardRequest.DEFAULT_TIMEOUT; private ActiveShardCount waitForActiveShards = ActiveShardCount.DEFAULT; @@ -131,23 +104,18 @@ public BulkRequest add(DocWriteRequest... requests) { return this; } - public BulkRequest add(DocWriteRequest request) { - return add(request, null); - } - /** * Add a request to the current BulkRequest. * @param request Request to add - * @param payload Optional payload * @return the current bulk request */ - public BulkRequest add(DocWriteRequest request, @Nullable Object payload) { + public BulkRequest add(DocWriteRequest request) { if (request instanceof IndexRequest) { - add((IndexRequest) request, payload); + add((IndexRequest) request); } else if (request instanceof DeleteRequest) { - add((DeleteRequest) request, payload); + add((DeleteRequest) request); } else if (request instanceof UpdateRequest) { - add((UpdateRequest) request, payload); + add((UpdateRequest) request); } else { throw new IllegalArgumentException("No support for request [" + request + "]"); } @@ -170,19 +138,14 @@ public BulkRequest add(Iterable> requests) { * (for example, if no id is provided, one will be generated, or usage of the create flag). */ public BulkRequest add(IndexRequest request) { - return internalAdd(request, null); - } - - public BulkRequest add(IndexRequest request, @Nullable Object payload) { - return internalAdd(request, payload); + return internalAdd(request); } - BulkRequest internalAdd(IndexRequest request, @Nullable Object payload) { + BulkRequest internalAdd(IndexRequest request) { Objects.requireNonNull(request, "'request' must not be null"); applyGlobalMandatoryParameters(request); requests.add(request); - addPayload(payload); // lack of source is validated in validate() method sizeInBytes += (request.source() != null ? request.source().length() : 0) + REQUEST_OVERHEAD; indices.add(request.index()); @@ -193,19 +156,14 @@ BulkRequest internalAdd(IndexRequest request, @Nullable Object payload) { * Adds an {@link UpdateRequest} to the list of actions to execute. */ public BulkRequest add(UpdateRequest request) { - return internalAdd(request, null); - } - - public BulkRequest add(UpdateRequest request, @Nullable Object payload) { - return internalAdd(request, payload); + return internalAdd(request); } - BulkRequest internalAdd(UpdateRequest request, @Nullable Object payload) { + BulkRequest internalAdd(UpdateRequest request) { Objects.requireNonNull(request, "'request' must not be null"); applyGlobalMandatoryParameters(request); requests.add(request); - addPayload(payload); if (request.doc() != null) { sizeInBytes += request.doc().source().length(); } @@ -223,34 +181,15 @@ BulkRequest internalAdd(UpdateRequest request, @Nullable Object payload) { * Adds an {@link DeleteRequest} to the list of actions to execute. */ public BulkRequest add(DeleteRequest request) { - return add(request, null); - } - - public BulkRequest add(DeleteRequest request, @Nullable Object payload) { Objects.requireNonNull(request, "'request' must not be null"); applyGlobalMandatoryParameters(request); requests.add(request); - addPayload(payload); sizeInBytes += REQUEST_OVERHEAD; indices.add(request.index()); return this; } - private void addPayload(Object payload) { - if (payloads == null) { - if (payload == null) { - return; - } - payloads = new ArrayList<>(requests.size() + 10); - // add requests#size-1 elements to the payloads if it null (we add for an *existing* request) - for (int i = 1; i < requests.size(); i++) { - payloads.add(null); - } - } - payloads.add(payload); - } - /** * The list of requests in this bulk request. */ @@ -258,17 +197,6 @@ public List> requests() { return this.requests; } - /** - * The list of optional payloads associated with requests in the same order as the requests. Note, elements within - * it might be null if no payload has been provided. - *

- * Note, if no payloads have been provided, this method will return null (as to conserve memory overhead). - */ - @Nullable - public List payloads() { - return this.payloads; - } - /** * The number of actions in the bulk request. */ @@ -316,7 +244,7 @@ public BulkRequest add(byte[] data, int from, int length, @Nullable String defau @Deprecated public BulkRequest add(BytesReference data, @Nullable String defaultIndex, @Nullable String defaultType, XContentType xContentType) throws IOException { - return add(data, defaultIndex, defaultType, null, null, null, null, true, xContentType); + return add(data, defaultIndex, defaultType, null, null, null, true, xContentType); } /** @@ -324,7 +252,7 @@ public BulkRequest add(BytesReference data, @Nullable String defaultIndex, @Null */ public BulkRequest add(BytesReference data, @Nullable String defaultIndex, XContentType xContentType) throws IOException { - return add(data, defaultIndex, MapperService.SINGLE_MAPPING_NAME, null, null, null, null, true, xContentType); + return add(data, defaultIndex, MapperService.SINGLE_MAPPING_NAME, null, null, null, true, xContentType); } /** @@ -334,7 +262,7 @@ public BulkRequest add(BytesReference data, @Nullable String defaultIndex, @Deprecated public BulkRequest add(BytesReference data, @Nullable String defaultIndex, @Nullable String defaultType, boolean allowExplicitIndex, XContentType xContentType) throws IOException { - return add(data, defaultIndex, defaultType, null, null, null, null, allowExplicitIndex, xContentType); + return add(data, defaultIndex, defaultType, null, null, null, allowExplicitIndex, xContentType); } /** @@ -342,209 +270,32 @@ public BulkRequest add(BytesReference data, @Nullable String defaultIndex, @Null */ public BulkRequest add(BytesReference data, @Nullable String defaultIndex, boolean allowExplicitIndex, XContentType xContentType) throws IOException { - return add(data, defaultIndex, MapperService.SINGLE_MAPPING_NAME, null, null, null, null, allowExplicitIndex, xContentType); + return add(data, defaultIndex, MapperService.SINGLE_MAPPING_NAME, null, null, null, allowExplicitIndex, xContentType); } public BulkRequest add(BytesReference data, @Nullable String defaultIndex, @Nullable String defaultRouting, @Nullable FetchSourceContext defaultFetchSourceContext, - @Nullable String defaultPipeline, @Nullable Object payload, boolean allowExplicitIndex, + @Nullable String defaultPipeline, boolean allowExplicitIndex, XContentType xContentType) throws IOException { return add(data, defaultIndex, MapperService.SINGLE_MAPPING_NAME, defaultRouting, defaultFetchSourceContext, - defaultPipeline, payload, allowExplicitIndex, xContentType); + defaultPipeline, allowExplicitIndex, xContentType); } /** - * @deprecated use {@link #add(BytesReference, String, String, FetchSourceContext, String, Object, boolean, XContentType)} instead + * @deprecated use {@link #add(BytesReference, String, String, FetchSourceContext, String, boolean, XContentType)} instead */ @Deprecated public BulkRequest add(BytesReference data, @Nullable String defaultIndex, @Nullable String defaultType, @Nullable String defaultRouting, @Nullable FetchSourceContext defaultFetchSourceContext, - @Nullable String defaultPipeline, @Nullable Object payload, boolean allowExplicitIndex, + @Nullable String defaultPipeline, boolean allowExplicitIndex, XContentType xContentType) throws IOException { - XContent xContent = xContentType.xContent(); - int line = 0; - int from = 0; - byte marker = xContent.streamSeparator(); - boolean typesDeprecationLogged = false; - while (true) { - int nextMarker = findNextMarker(marker, from, data); - if (nextMarker == -1) { - break; - } - line++; - - // now parse the action - // EMPTY is safe here because we never call namedObject - try (InputStream stream = data.slice(from, nextMarker - from).streamInput(); - XContentParser parser = xContent - .createParser(NamedXContentRegistry.EMPTY, LoggingDeprecationHandler.INSTANCE, stream)) { - // move pointers - from = nextMarker + 1; - - // Move to START_OBJECT - XContentParser.Token token = parser.nextToken(); - if (token == null) { - continue; - } - if (token != XContentParser.Token.START_OBJECT) { - throw new IllegalArgumentException("Malformed action/metadata line [" + line + "], expected " - + XContentParser.Token.START_OBJECT + " but found [" + token + "]"); - } - // Move to FIELD_NAME, that's the action - token = parser.nextToken(); - if (token != XContentParser.Token.FIELD_NAME) { - throw new IllegalArgumentException("Malformed action/metadata line [" + line + "], expected " - + XContentParser.Token.FIELD_NAME + " but found [" + token + "]"); - } - String action = parser.currentName(); - - String index = defaultIndex; - String type = defaultType; - String id = null; - String routing = valueOrDefault(defaultRouting, globalRouting); - FetchSourceContext fetchSourceContext = defaultFetchSourceContext; - String opType = null; - long version = Versions.MATCH_ANY; - VersionType versionType = VersionType.INTERNAL; - long ifSeqNo = SequenceNumbers.UNASSIGNED_SEQ_NO; - long ifPrimaryTerm = UNASSIGNED_PRIMARY_TERM; - int retryOnConflict = 0; - String pipeline = valueOrDefault(defaultPipeline, globalPipeline); - - // at this stage, next token can either be END_OBJECT (and use default index and type, with auto generated id) - // or START_OBJECT which will have another set of parameters - token = parser.nextToken(); - - if (token == XContentParser.Token.START_OBJECT) { - String currentFieldName = null; - while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) { - if (token == XContentParser.Token.FIELD_NAME) { - currentFieldName = parser.currentName(); - } else if (token.isValue()) { - if (INDEX.match(currentFieldName, parser.getDeprecationHandler())){ - if (!allowExplicitIndex) { - throw new IllegalArgumentException("explicit index in bulk is not allowed"); - } - index = parser.text(); - } else if (TYPE.match(currentFieldName, parser.getDeprecationHandler())) { - if (typesDeprecationLogged == false) { - deprecationLogger.deprecatedAndMaybeLog("bulk_with_types", RestBulkAction.TYPES_DEPRECATION_MESSAGE); - typesDeprecationLogged = true; - } - type = parser.text(); - } else if (ID.match(currentFieldName, parser.getDeprecationHandler())) { - id = parser.text(); - } else if (ROUTING.match(currentFieldName, parser.getDeprecationHandler())) { - routing = parser.text(); - } else if (OP_TYPE.match(currentFieldName, parser.getDeprecationHandler())) { - opType = parser.text(); - } else if (VERSION.match(currentFieldName, parser.getDeprecationHandler())) { - version = parser.longValue(); - } else if (VERSION_TYPE.match(currentFieldName, parser.getDeprecationHandler())) { - versionType = VersionType.fromString(parser.text()); - } else if (IF_SEQ_NO.match(currentFieldName, parser.getDeprecationHandler())) { - ifSeqNo = parser.longValue(); - } else if (IF_PRIMARY_TERM.match(currentFieldName, parser.getDeprecationHandler())) { - ifPrimaryTerm = parser.longValue(); - } else if (RETRY_ON_CONFLICT.match(currentFieldName, parser.getDeprecationHandler())) { - retryOnConflict = parser.intValue(); - } else if (PIPELINE.match(currentFieldName, parser.getDeprecationHandler())) { - pipeline = parser.text(); - } else if (SOURCE.match(currentFieldName, parser.getDeprecationHandler())) { - fetchSourceContext = FetchSourceContext.fromXContent(parser); - } else { - throw new IllegalArgumentException("Action/metadata line [" + line + "] contains an unknown parameter [" - + currentFieldName + "]"); - } - } else if (token == XContentParser.Token.START_ARRAY) { - throw new IllegalArgumentException("Malformed action/metadata line [" + line + - "], expected a simple value for field [" + currentFieldName + "] but found [" + token + "]"); - } else if (token == XContentParser.Token.START_OBJECT && SOURCE.match(currentFieldName, - parser.getDeprecationHandler())) { - fetchSourceContext = FetchSourceContext.fromXContent(parser); - } else if (token != XContentParser.Token.VALUE_NULL) { - throw new IllegalArgumentException("Malformed action/metadata line [" + line - + "], expected a simple value for field [" + currentFieldName + "] but found [" + token + "]"); - } - } - } else if (token != XContentParser.Token.END_OBJECT) { - throw new IllegalArgumentException("Malformed action/metadata line [" + line + "], expected " - + XContentParser.Token.START_OBJECT + " or " + XContentParser.Token.END_OBJECT + " but found [" + token + "]"); - } - - if ("delete".equals(action)) { - add(new DeleteRequest(index, type, id).routing(routing) - .version(version).versionType(versionType).setIfSeqNo(ifSeqNo).setIfPrimaryTerm(ifPrimaryTerm), payload); - } else { - nextMarker = findNextMarker(marker, from, data); - if (nextMarker == -1) { - break; - } - line++; - - // we use internalAdd so we don't fork here, this allows us not to copy over the big byte array to small chunks - // of index request. - if ("index".equals(action)) { - if (opType == null) { - internalAdd(new IndexRequest(index, type, id).routing(routing).version(version).versionType(versionType) - .setPipeline(pipeline).setIfSeqNo(ifSeqNo).setIfPrimaryTerm(ifPrimaryTerm) - .source(sliceTrimmingCarriageReturn(data, from, nextMarker,xContentType), xContentType), payload); - } else { - internalAdd(new IndexRequest(index, type, id).routing(routing).version(version).versionType(versionType) - .create("create".equals(opType)).setPipeline(pipeline) - .setIfSeqNo(ifSeqNo).setIfPrimaryTerm(ifPrimaryTerm) - .source(sliceTrimmingCarriageReturn(data, from, nextMarker, xContentType), xContentType), payload); - } - } else if ("create".equals(action)) { - internalAdd(new IndexRequest(index, type, id).routing(routing).version(version).versionType(versionType) - .create(true).setPipeline(pipeline).setIfSeqNo(ifSeqNo).setIfPrimaryTerm(ifPrimaryTerm) - .source(sliceTrimmingCarriageReturn(data, from, nextMarker, xContentType), xContentType), payload); - } else if ("update".equals(action)) { - if (version != Versions.MATCH_ANY || versionType != VersionType.INTERNAL) { - throw new IllegalArgumentException("Update requests do not support versioning. " + - "Please use `if_seq_no` and `if_primary_term` instead"); - } - UpdateRequest updateRequest = new UpdateRequest(index, type, id).routing(routing).retryOnConflict(retryOnConflict) - .setIfSeqNo(ifSeqNo).setIfPrimaryTerm(ifPrimaryTerm) - .routing(routing); - // EMPTY is safe here because we never call namedObject - try (InputStream dataStream = sliceTrimmingCarriageReturn(data, from, nextMarker, xContentType).streamInput(); - XContentParser sliceParser = xContent.createParser(NamedXContentRegistry.EMPTY, - LoggingDeprecationHandler.INSTANCE, dataStream)) { - updateRequest.fromXContent(sliceParser); - } - if (fetchSourceContext != null) { - updateRequest.fetchSource(fetchSourceContext); - } - IndexRequest upsertRequest = updateRequest.upsertRequest(); - if (upsertRequest != null) { - upsertRequest.setPipeline(defaultPipeline); - } - - internalAdd(updateRequest, payload); - } - // move pointers - from = nextMarker + 1; - } - } - } + String routing = valueOrDefault(defaultRouting, globalRouting); + String pipeline = valueOrDefault(defaultPipeline, globalPipeline); + new BulkRequestParser(true).parse(data, defaultIndex, defaultType, routing, defaultFetchSourceContext, pipeline, + allowExplicitIndex, xContentType, this::internalAdd, this::internalAdd, this::add); return this; } - /** - * Returns the sliced {@link BytesReference}. If the {@link XContentType} is JSON, the byte preceding the marker is checked to see - * if it is a carriage return and if so, the BytesReference is sliced so that the carriage return is ignored - */ - private BytesReference sliceTrimmingCarriageReturn(BytesReference bytesReference, int from, int nextMarker, XContentType xContentType) { - final int length; - if (XContentType.JSON == xContentType && bytesReference.get(nextMarker - 1) == (byte) '\r') { - length = nextMarker - from - 1; - } else { - length = nextMarker - from; - } - return bytesReference.slice(from, length); - } - /** * Sets the number of shard copies that must be active before proceeding with the write. * See {@link ReplicationRequest#waitForActiveShards(ActiveShardCount)} for details. @@ -614,18 +365,6 @@ public String routing() { return globalRouting; } - private static int findNextMarker(byte marker, int from, BytesReference data) { - final int res = data.indexOf(marker, from); - if (res != -1) { - assert res >= 0; - return res; - } - if (from != data.length()) { - throw new IllegalArgumentException("The bulk request must be terminated by a newline [\n]"); - } - return res; - } - @Override public ActionRequestValidationException validate() { ActionRequestValidationException validationException = null; diff --git a/server/src/main/java/org/elasticsearch/action/bulk/BulkRequestParser.java b/server/src/main/java/org/elasticsearch/action/bulk/BulkRequestParser.java new file mode 100644 index 0000000000000..d1116b67b9a53 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/action/bulk/BulkRequestParser.java @@ -0,0 +1,309 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.action.bulk; + +import org.apache.logging.log4j.LogManager; +import org.elasticsearch.action.delete.DeleteRequest; +import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.action.update.UpdateRequest; +import org.elasticsearch.common.Nullable; +import org.elasticsearch.common.ParseField; +import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.logging.DeprecationLogger; +import org.elasticsearch.common.lucene.uid.Versions; +import org.elasticsearch.common.xcontent.LoggingDeprecationHandler; +import org.elasticsearch.common.xcontent.NamedXContentRegistry; +import org.elasticsearch.common.xcontent.XContent; +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.index.VersionType; +import org.elasticsearch.index.seqno.SequenceNumbers; +import org.elasticsearch.rest.action.document.RestBulkAction; +import org.elasticsearch.search.fetch.subphase.FetchSourceContext; + +import java.io.IOException; +import java.io.InputStream; +import java.util.function.Consumer; + +import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_PRIMARY_TERM; + +/** + * Helper to parse bulk requests. This should be considered an internal class. + */ +public final class BulkRequestParser { + + private static final DeprecationLogger deprecationLogger = new DeprecationLogger(LogManager.getLogger(BulkRequestParser.class)); + + private static final ParseField INDEX = new ParseField("_index"); + private static final ParseField TYPE = new ParseField("_type"); + private static final ParseField ID = new ParseField("_id"); + private static final ParseField ROUTING = new ParseField("routing"); + private static final ParseField OP_TYPE = new ParseField("op_type"); + private static final ParseField VERSION = new ParseField("version"); + private static final ParseField VERSION_TYPE = new ParseField("version_type"); + private static final ParseField RETRY_ON_CONFLICT = new ParseField("retry_on_conflict"); + private static final ParseField PIPELINE = new ParseField("pipeline"); + private static final ParseField SOURCE = new ParseField("_source"); + private static final ParseField IF_SEQ_NO = new ParseField("if_seq_no"); + private static final ParseField IF_PRIMARY_TERM = new ParseField("if_primary_term"); + + private final boolean warnOnTypeUsage; + + /** + * Create a new parser. + * @param warnOnTypeUsage whether it warns upon types being explicitly specified + */ + public BulkRequestParser(boolean warnOnTypeUsage) { + this.warnOnTypeUsage = warnOnTypeUsage; + } + + private static int findNextMarker(byte marker, int from, BytesReference data) { + final int res = data.indexOf(marker, from); + if (res != -1) { + assert res >= 0; + return res; + } + if (from != data.length()) { + throw new IllegalArgumentException("The bulk request must be terminated by a newline [\\n]"); + } + return res; + } + + /** + * Returns the sliced {@link BytesReference}. If the {@link XContentType} is JSON, the byte preceding the marker is checked to see + * if it is a carriage return and if so, the BytesReference is sliced so that the carriage return is ignored + */ + private static BytesReference sliceTrimmingCarriageReturn(BytesReference bytesReference, int from, int nextMarker, + XContentType xContentType) { + final int length; + if (XContentType.JSON == xContentType && bytesReference.get(nextMarker - 1) == (byte) '\r') { + length = nextMarker - from - 1; + } else { + length = nextMarker - from; + } + return bytesReference.slice(from, length); + } + + /** + * Parse the provided {@code data} assuming the provided default values. Index requests + * will be passed to the {@code indexRequestConsumer}, update requests to the + * {@code updateRequestConsumer} and delete requests to the {@code deleteRequestConsumer}. + */ + public void parse( + BytesReference data, @Nullable String defaultIndex, + @Nullable String defaultRouting, @Nullable FetchSourceContext defaultFetchSourceContext, + @Nullable String defaultPipeline, boolean allowExplicitIndex, + XContentType xContentType, + Consumer indexRequestConsumer, + Consumer updateRequestConsumer, + Consumer deleteRequestConsumer) throws IOException { + parse(data, defaultIndex, null, defaultRouting, defaultFetchSourceContext, defaultPipeline, allowExplicitIndex, xContentType, + indexRequestConsumer, updateRequestConsumer, deleteRequestConsumer); + } + + /** + * Parse the provided {@code data} assuming the provided default values. Index requests + * will be passed to the {@code indexRequestConsumer}, update requests to the + * {@code updateRequestConsumer} and delete requests to the {@code deleteRequestConsumer}. + * @deprecated Use {@link #parse(BytesReference, String, String, FetchSourceContext, String, boolean, XContentType, + * Consumer, Consumer, Consumer)} instead. + */ + @Deprecated + public void parse( + BytesReference data, @Nullable String defaultIndex, @Nullable String defaultType, + @Nullable String defaultRouting, @Nullable FetchSourceContext defaultFetchSourceContext, + @Nullable String defaultPipeline, boolean allowExplicitIndex, + XContentType xContentType, + Consumer indexRequestConsumer, + Consumer updateRequestConsumer, + Consumer deleteRequestConsumer) throws IOException { + XContent xContent = xContentType.xContent(); + int line = 0; + int from = 0; + byte marker = xContent.streamSeparator(); + boolean typesDeprecationLogged = false; + while (true) { + int nextMarker = findNextMarker(marker, from, data); + if (nextMarker == -1) { + break; + } + line++; + + // now parse the action + // EMPTY is safe here because we never call namedObject + try (InputStream stream = data.slice(from, nextMarker - from).streamInput(); + XContentParser parser = xContent + .createParser(NamedXContentRegistry.EMPTY, LoggingDeprecationHandler.INSTANCE, stream)) { + // move pointers + from = nextMarker + 1; + + // Move to START_OBJECT + XContentParser.Token token = parser.nextToken(); + if (token == null) { + continue; + } + if (token != XContentParser.Token.START_OBJECT) { + throw new IllegalArgumentException("Malformed action/metadata line [" + line + "], expected " + + XContentParser.Token.START_OBJECT + " but found [" + token + "]"); + } + // Move to FIELD_NAME, that's the action + token = parser.nextToken(); + if (token != XContentParser.Token.FIELD_NAME) { + throw new IllegalArgumentException("Malformed action/metadata line [" + line + "], expected " + + XContentParser.Token.FIELD_NAME + " but found [" + token + "]"); + } + String action = parser.currentName(); + + String index = defaultIndex; + String type = defaultType; + String id = null; + String routing = defaultRouting; + FetchSourceContext fetchSourceContext = defaultFetchSourceContext; + String opType = null; + long version = Versions.MATCH_ANY; + VersionType versionType = VersionType.INTERNAL; + long ifSeqNo = SequenceNumbers.UNASSIGNED_SEQ_NO; + long ifPrimaryTerm = UNASSIGNED_PRIMARY_TERM; + int retryOnConflict = 0; + String pipeline = defaultPipeline; + + // at this stage, next token can either be END_OBJECT (and use default index and type, with auto generated id) + // or START_OBJECT which will have another set of parameters + token = parser.nextToken(); + + if (token == XContentParser.Token.START_OBJECT) { + String currentFieldName = null; + while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) { + if (token == XContentParser.Token.FIELD_NAME) { + currentFieldName = parser.currentName(); + } else if (token.isValue()) { + if (INDEX.match(currentFieldName, parser.getDeprecationHandler())){ + if (!allowExplicitIndex) { + throw new IllegalArgumentException("explicit index in bulk is not allowed"); + } + index = parser.text(); + } else if (TYPE.match(currentFieldName, parser.getDeprecationHandler())) { + if (warnOnTypeUsage && typesDeprecationLogged == false) { + deprecationLogger.deprecatedAndMaybeLog("bulk_with_types", RestBulkAction.TYPES_DEPRECATION_MESSAGE); + typesDeprecationLogged = true; + } + type = parser.text(); + } else if (ID.match(currentFieldName, parser.getDeprecationHandler())) { + id = parser.text(); + } else if (ROUTING.match(currentFieldName, parser.getDeprecationHandler())) { + routing = parser.text(); + } else if (OP_TYPE.match(currentFieldName, parser.getDeprecationHandler())) { + opType = parser.text(); + } else if (VERSION.match(currentFieldName, parser.getDeprecationHandler())) { + version = parser.longValue(); + } else if (VERSION_TYPE.match(currentFieldName, parser.getDeprecationHandler())) { + versionType = VersionType.fromString(parser.text()); + } else if (IF_SEQ_NO.match(currentFieldName, parser.getDeprecationHandler())) { + ifSeqNo = parser.longValue(); + } else if (IF_PRIMARY_TERM.match(currentFieldName, parser.getDeprecationHandler())) { + ifPrimaryTerm = parser.longValue(); + } else if (RETRY_ON_CONFLICT.match(currentFieldName, parser.getDeprecationHandler())) { + retryOnConflict = parser.intValue(); + } else if (PIPELINE.match(currentFieldName, parser.getDeprecationHandler())) { + pipeline = parser.text(); + } else if (SOURCE.match(currentFieldName, parser.getDeprecationHandler())) { + fetchSourceContext = FetchSourceContext.fromXContent(parser); + } else { + throw new IllegalArgumentException("Action/metadata line [" + line + "] contains an unknown parameter [" + + currentFieldName + "]"); + } + } else if (token == XContentParser.Token.START_ARRAY) { + throw new IllegalArgumentException("Malformed action/metadata line [" + line + + "], expected a simple value for field [" + currentFieldName + "] but found [" + token + "]"); + } else if (token == XContentParser.Token.START_OBJECT && SOURCE.match(currentFieldName, + parser.getDeprecationHandler())) { + fetchSourceContext = FetchSourceContext.fromXContent(parser); + } else if (token != XContentParser.Token.VALUE_NULL) { + throw new IllegalArgumentException("Malformed action/metadata line [" + line + + "], expected a simple value for field [" + currentFieldName + "] but found [" + token + "]"); + } + } + } else if (token != XContentParser.Token.END_OBJECT) { + throw new IllegalArgumentException("Malformed action/metadata line [" + line + "], expected " + + XContentParser.Token.START_OBJECT + " or " + XContentParser.Token.END_OBJECT + " but found [" + token + "]"); + } + + if ("delete".equals(action)) { + deleteRequestConsumer.accept(new DeleteRequest(index, type, id).routing(routing) + .version(version).versionType(versionType).setIfSeqNo(ifSeqNo).setIfPrimaryTerm(ifPrimaryTerm)); + } else { + nextMarker = findNextMarker(marker, from, data); + if (nextMarker == -1) { + break; + } + line++; + + // we use internalAdd so we don't fork here, this allows us not to copy over the big byte array to small chunks + // of index request. + if ("index".equals(action)) { + if (opType == null) { + indexRequestConsumer.accept(new IndexRequest(index, type, id).routing(routing) + .version(version).versionType(versionType) + .setPipeline(pipeline).setIfSeqNo(ifSeqNo).setIfPrimaryTerm(ifPrimaryTerm) + .source(sliceTrimmingCarriageReturn(data, from, nextMarker,xContentType), xContentType)); + } else { + indexRequestConsumer.accept(new IndexRequest(index, type, id).routing(routing) + .version(version).versionType(versionType) + .create("create".equals(opType)).setPipeline(pipeline) + .setIfSeqNo(ifSeqNo).setIfPrimaryTerm(ifPrimaryTerm) + .source(sliceTrimmingCarriageReturn(data, from, nextMarker, xContentType), xContentType)); + } + } else if ("create".equals(action)) { + indexRequestConsumer.accept(new IndexRequest(index, type, id).routing(routing) + .version(version).versionType(versionType) + .create(true).setPipeline(pipeline).setIfSeqNo(ifSeqNo).setIfPrimaryTerm(ifPrimaryTerm) + .source(sliceTrimmingCarriageReturn(data, from, nextMarker, xContentType), xContentType)); + } else if ("update".equals(action)) { + if (version != Versions.MATCH_ANY || versionType != VersionType.INTERNAL) { + throw new IllegalArgumentException("Update requests do not support versioning. " + + "Please use `if_seq_no` and `if_primary_term` instead"); + } + UpdateRequest updateRequest = new UpdateRequest(index, type, id).routing(routing).retryOnConflict(retryOnConflict) + .setIfSeqNo(ifSeqNo).setIfPrimaryTerm(ifPrimaryTerm) + .routing(routing); + // EMPTY is safe here because we never call namedObject + try (InputStream dataStream = sliceTrimmingCarriageReturn(data, from, nextMarker, xContentType).streamInput(); + XContentParser sliceParser = xContent.createParser(NamedXContentRegistry.EMPTY, + LoggingDeprecationHandler.INSTANCE, dataStream)) { + updateRequest.fromXContent(sliceParser); + } + if (fetchSourceContext != null) { + updateRequest.fetchSource(fetchSourceContext); + } + IndexRequest upsertRequest = updateRequest.upsertRequest(); + if (upsertRequest != null) { + upsertRequest.setPipeline(defaultPipeline); + } + + updateRequestConsumer.accept(updateRequest); + } + // move pointers + from = nextMarker + 1; + } + } + } + } + +} diff --git a/server/src/main/java/org/elasticsearch/rest/action/document/RestBulkAction.java b/server/src/main/java/org/elasticsearch/rest/action/document/RestBulkAction.java index 7861a4fe9d1bd..08ddbb728c1ab 100644 --- a/server/src/main/java/org/elasticsearch/rest/action/document/RestBulkAction.java +++ b/server/src/main/java/org/elasticsearch/rest/action/document/RestBulkAction.java @@ -95,7 +95,7 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC bulkRequest.timeout(request.paramAsTime("timeout", BulkShardRequest.DEFAULT_TIMEOUT)); bulkRequest.setRefreshPolicy(request.param("refresh")); bulkRequest.add(request.requiredContent(), defaultIndex, defaultType, defaultRouting, - defaultFetchSourceContext, defaultPipeline, null, allowExplicitIndex, request.getXContentType()); + defaultFetchSourceContext, defaultPipeline, allowExplicitIndex, request.getXContentType()); return channel -> client.bulk(bulkRequest, new RestStatusToXContentListener<>(channel)); } diff --git a/server/src/test/java/org/elasticsearch/action/bulk/BulkRequestParserTests.java b/server/src/test/java/org/elasticsearch/action/bulk/BulkRequestParserTests.java new file mode 100644 index 0000000000000..fbcd5c46e2f90 --- /dev/null +++ b/server/src/test/java/org/elasticsearch/action/bulk/BulkRequestParserTests.java @@ -0,0 +1,114 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.action.bulk; + +import org.elasticsearch.common.bytes.BytesArray; +import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.rest.action.document.RestBulkAction; +import org.elasticsearch.test.ESTestCase; + +import java.io.IOException; +import java.util.concurrent.atomic.AtomicBoolean; + +public class BulkRequestParserTests extends ESTestCase { + + public void testIndexRequest() throws IOException { + BytesArray request = new BytesArray("{ \"index\":{ \"_id\": \"bar\" } }\n{}\n"); + BulkRequestParser parser = new BulkRequestParser(randomBoolean()); + final AtomicBoolean parsed = new AtomicBoolean(); + parser.parse(request, "foo", null, null, null, null, false, XContentType.JSON, + indexRequest -> { + assertFalse(parsed.get()); + assertEquals("foo", indexRequest.index()); + assertEquals("bar", indexRequest.id()); + parsed.set(true); + }, + req -> fail(), req -> fail()); + assertTrue(parsed.get()); + } + + public void testDeleteRequest() throws IOException { + BytesArray request = new BytesArray("{ \"delete\":{ \"_id\": \"bar\" } }\n"); + BulkRequestParser parser = new BulkRequestParser(randomBoolean()); + final AtomicBoolean parsed = new AtomicBoolean(); + parser.parse(request, "foo", null, null, null, null, false, XContentType.JSON, + req -> fail(), req -> fail(), + deleteRequest -> { + assertFalse(parsed.get()); + assertEquals("foo", deleteRequest.index()); + assertEquals("bar", deleteRequest.id()); + parsed.set(true); + }); + assertTrue(parsed.get()); + } + + public void testUpdateRequest() throws IOException { + BytesArray request = new BytesArray("{ \"update\":{ \"_id\": \"bar\" } }\n{}\n"); + BulkRequestParser parser = new BulkRequestParser(randomBoolean()); + final AtomicBoolean parsed = new AtomicBoolean(); + parser.parse(request, "foo", null, null, null, null, false, XContentType.JSON, + req -> fail(), + updateRequest -> { + assertFalse(parsed.get()); + assertEquals("foo", updateRequest.index()); + assertEquals("bar", updateRequest.id()); + parsed.set(true); + }, + req -> fail()); + assertTrue(parsed.get()); + } + + public void testBarfOnLackOfTrailingNewline() throws IOException { + BytesArray request = new BytesArray("{ \"index\":{ \"_id\": \"bar\" } }\n{}"); + BulkRequestParser parser = new BulkRequestParser(randomBoolean()); + IllegalArgumentException e = expectThrows(IllegalArgumentException.class, + () -> parser.parse(request, "foo", null, null, null, null, false, XContentType.JSON, + indexRequest -> fail(), req -> fail(), req -> fail())); + assertEquals("The bulk request must be terminated by a newline [\\n]", e.getMessage()); + } + + public void testFailOnExplicitIndex() throws IOException { + BytesArray request = new BytesArray("{ \"index\":{ \"_index\": \"foo\", \"_id\": \"bar\" } }\n{}\n"); + BulkRequestParser parser = new BulkRequestParser(randomBoolean()); + + IllegalArgumentException ex = expectThrows(IllegalArgumentException.class, + () -> parser.parse(request, null, null, null, null, null, false, XContentType.JSON, + req -> fail(), req -> fail(), req -> fail())); + assertEquals("explicit index in bulk is not allowed", ex.getMessage()); + } + + public void testTypeWarning() throws IOException { + BytesArray request = new BytesArray("{ \"index\":{ \"_type\": \"quux\", \"_id\": \"bar\" } }\n{}\n"); + BulkRequestParser parser = new BulkRequestParser(true); + final AtomicBoolean parsed = new AtomicBoolean(); + parser.parse(request, "foo", null, null, null, null, false, XContentType.JSON, + indexRequest -> { + assertFalse(parsed.get()); + assertEquals("foo", indexRequest.index()); + assertEquals("bar", indexRequest.id()); + parsed.set(true); + }, + req -> fail(), req -> fail()); + assertTrue(parsed.get()); + + assertWarnings(RestBulkAction.TYPES_DEPRECATION_MESSAGE); + } + +} diff --git a/server/src/test/java/org/elasticsearch/action/bulk/BulkRequestTests.java b/server/src/test/java/org/elasticsearch/action/bulk/BulkRequestTests.java index 6d3e4c04c13d7..ebd6590a80cca 100644 --- a/server/src/test/java/org/elasticsearch/action/bulk/BulkRequestTests.java +++ b/server/src/test/java/org/elasticsearch/action/bulk/BulkRequestTests.java @@ -352,7 +352,7 @@ public void testBulkTerminatedByNewline() throws Exception { String bulkAction = copyToStringFromClasspath("/org/elasticsearch/action/bulk/simple-bulk11.json"); IllegalArgumentException expectThrows = expectThrows(IllegalArgumentException.class, () -> new BulkRequest() .add(bulkAction.getBytes(StandardCharsets.UTF_8), 0, bulkAction.length(), null, XContentType.JSON)); - assertEquals("The bulk request must be terminated by a newline [\n]", expectThrows.getMessage()); + assertEquals("The bulk request must be terminated by a newline [\\n]", expectThrows.getMessage()); String bulkActionWithNewLine = bulkAction + "\n"; BulkRequest bulkRequestWithNewLine = new BulkRequest(); diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/monitoring/action/MonitoringBulkRequest.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/monitoring/action/MonitoringBulkRequest.java index adb6b04db35b2..1ba0f605cba3c 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/monitoring/action/MonitoringBulkRequest.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/monitoring/action/MonitoringBulkRequest.java @@ -7,10 +7,7 @@ import org.elasticsearch.action.ActionRequest; import org.elasticsearch.action.ActionRequestValidationException; -import org.elasticsearch.action.DocWriteRequest; -import org.elasticsearch.action.bulk.BulkRequest; -import org.elasticsearch.action.index.IndexRequest; -import org.elasticsearch.client.Requests; +import org.elasticsearch.action.bulk.BulkRequestParser; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; @@ -75,32 +72,27 @@ public MonitoringBulkRequest add(final MonitoredSystem system, final long timestamp, final long intervalMillis) throws IOException { - // MonitoringBulkRequest accepts a body request that has the same format as the BulkRequest: - // instead of duplicating the parsing logic here we use a new BulkRequest instance to parse the content. - final BulkRequest bulkRequest = Requests.bulkRequest().add(content, null, defaultType, xContentType); + // MonitoringBulkRequest accepts a body request that has the same format as the BulkRequest + new BulkRequestParser(false).parse(content, null, defaultType, null, null, null, true, xContentType, + indexRequest -> { + // we no longer accept non-timestamped indexes from Kibana, LS, or Beats because we do not use the data + // and it was duplicated anyway; by simply dropping it, we allow BWC for older clients that still send it + if (MonitoringIndex.from(indexRequest.index()) != MonitoringIndex.TIMESTAMPED) { + return; + } + final BytesReference source = indexRequest.source(); + if (source.length() == 0) { + throw new IllegalArgumentException("source is missing for monitoring document [" + + indexRequest.index() + "][" + indexRequest.type() + "][" + indexRequest.id() + "]"); + } + + // builds a new monitoring document based on the index request + add(new MonitoringBulkDoc(system, indexRequest.type(), indexRequest.id(), timestamp, intervalMillis, source, + xContentType)); + }, + updateRequest -> { throw new IllegalArgumentException("monitoring bulk requests should only contain index requests"); }, + deleteRequest -> { throw new IllegalArgumentException("monitoring bulk requests should only contain index requests"); }); - for (DocWriteRequest request : bulkRequest.requests()) { - if (request instanceof IndexRequest) { - final IndexRequest indexRequest = (IndexRequest) request; - - // we no longer accept non-timestamped indexes from Kibana, LS, or Beats because we do not use the data - // and it was duplicated anyway; by simply dropping it, we allow BWC for older clients that still send it - if (MonitoringIndex.from(indexRequest.index()) != MonitoringIndex.TIMESTAMPED) { - continue; - } - - final BytesReference source = indexRequest.source(); - if (source.length() == 0) { - throw new IllegalArgumentException("source is missing for monitoring document [" - + indexRequest.index() + "][" + indexRequest.type() + "][" + indexRequest.id() + "]"); - } - - // builds a new monitoring document based on the index request - add(new MonitoringBulkDoc(system, indexRequest.type(), indexRequest.id(), timestamp, intervalMillis, source, xContentType)); - } else { - throw new IllegalArgumentException("monitoring bulk requests should only contain index requests"); - } - } return this; } diff --git a/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/action/MonitoringBulkRequestTests.java b/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/action/MonitoringBulkRequestTests.java index fc3bf633a3964..dc5cad7c94fd4 100644 --- a/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/action/MonitoringBulkRequestTests.java +++ b/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/action/MonitoringBulkRequestTests.java @@ -14,7 +14,6 @@ import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.common.xcontent.XContentType; -import org.elasticsearch.rest.action.document.RestBulkAction; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.RandomObjects; import org.elasticsearch.xpack.core.monitoring.MonitoredSystem; @@ -142,8 +141,6 @@ public void testAddRequestContent() throws IOException { assertThat(bulkDoc.getXContentType(), equalTo(xContentType)); ++count; } - //This test's JSON contains outdated references to types - assertWarnings(RestBulkAction.TYPES_DEPRECATION_MESSAGE); } public void testAddRequestContentWithEmptySource() throws IOException { @@ -191,8 +188,6 @@ public void testAddRequestContentWithEmptySource() throws IOException { ); assertThat(e.getMessage(), containsString("source is missing for monitoring document [][doc][" + nbDocs + "]")); - //This test's JSON contains outdated references to types - assertWarnings(RestBulkAction.TYPES_DEPRECATION_MESSAGE); } public void testAddRequestContentWithUnrecognizedIndexName() throws IOException { @@ -230,8 +225,6 @@ public void testAddRequestContentWithUnrecognizedIndexName() throws IOException ); assertThat(e.getMessage(), containsString("unrecognized index name [" + indexName + "]")); - //This test's JSON contains outdated references to types - assertWarnings(RestBulkAction.TYPES_DEPRECATION_MESSAGE); } public void testSerialization() throws IOException { diff --git a/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/rest/action/RestMonitoringBulkActionTests.java b/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/rest/action/RestMonitoringBulkActionTests.java index 10fc10e3f973d..7a4427c9f0fdc 100644 --- a/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/rest/action/RestMonitoringBulkActionTests.java +++ b/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/rest/action/RestMonitoringBulkActionTests.java @@ -23,7 +23,6 @@ import org.elasticsearch.rest.RestResponse; import org.elasticsearch.rest.RestStatus; import org.elasticsearch.rest.action.RestBuilderListener; -import org.elasticsearch.rest.action.document.RestBulkAction; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.rest.FakeRestRequest; import org.elasticsearch.xpack.core.XPackClient; @@ -122,8 +121,6 @@ public void testNoErrors() throws Exception { assertThat(restResponse.status(), is(RestStatus.OK)); assertThat(restResponse.content().utf8ToString(), is("{\"took\":" + response.getTookInMillis() + ",\"ignored\":false,\"errors\":false}")); - //This test's JSON contains outdated references to types - assertWarnings(RestBulkAction.TYPES_DEPRECATION_MESSAGE); } public void testNoErrorsButIgnored() throws Exception { @@ -134,8 +131,6 @@ public void testNoErrorsButIgnored() throws Exception { assertThat(restResponse.status(), is(RestStatus.OK)); assertThat(restResponse.content().utf8ToString(), is("{\"took\":" + response.getTookInMillis() + ",\"ignored\":true,\"errors\":false}")); - //This test's JSON contains outdated references to types - assertWarnings(RestBulkAction.TYPES_DEPRECATION_MESSAGE); } public void testWithErrors() throws Exception { @@ -155,8 +150,6 @@ public void testWithErrors() throws Exception { assertThat(restResponse.status(), is(RestStatus.INTERNAL_SERVER_ERROR)); assertThat(restResponse.content().utf8ToString(), is("{\"took\":" + response.getTookInMillis() + ",\"ignored\":false,\"errors\":true,\"error\":" + errorJson + "}")); - //This test's JSON contains outdated references to types - assertWarnings(RestBulkAction.TYPES_DEPRECATION_MESSAGE); } /**