Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
---
"Array of objects":
- do:
bulk:
refresh: true
body:
- index:
_index: test_index
_type: test_type
_id: test_id
- f1: v1
f2: 42
- index:
_index: test_index
_type: test_type
_id: test_id2
- f1: v2
f2: 47

- do:
count:
index: test_index

- match: {count: 2}

---
"Empty _id":
- do:
bulk:
refresh: true
body:
- index:
_index: test
_type: type
_id: ''
- f: 1
- index:
_index: test
_type: type
_id: id
- f: 2
- index:
_index: test
_type: type
- f: 3
- match: { errors: true }
- match: { items.0.index.status: 400 }
- match: { items.0.index.error.type: illegal_argument_exception }
- match: { items.0.index.error.reason: if _id is specified it must not be empty }
- match: { items.1.index.result: created }
- match: { items.2.index.result: created }

- do:
count:
index: test

- match: { count: 2 }

---
"Empty _id with op_type create":
- skip:
version: " - 7.4.99"
reason: "auto id + op type create only supported since 7.5"

- do:
bulk:
refresh: true
body:
- index:
_index: test
_type: type
_id: ''
- f: 1
- index:
_index: test
_type: type
_id: id
- f: 2
- index:
_index: test
_type: type
- f: 3
- create:
_index: test
_type: type
- f: 4
- index:
_index: test
_type: type
op_type: create
- f: 5
- match: { errors: true }
- match: { items.0.index.status: 400 }
- match: { items.0.index.error.type: illegal_argument_exception }
- match: { items.0.index.error.reason: if _id is specified it must not be empty }
- match: { items.1.index.result: created }
- match: { items.2.index.result: created }
- match: { items.3.create.result: created }
- match: { items.4.create.result: created }

- do:
count:
index: test

- match: { count: 4 }

---
"empty action":
- skip:
features: headers

- do:
catch: /Malformed action\/metadata line \[3\], expected FIELD_NAME but found \[END_OBJECT\]/
headers:
Content-Type: application/json
bulk:
body: |
{"index": {"_index": "test_index", "_type": "test_type", "_id": "test_id"}}
{"f1": "v1", "f2": 42}
{}

---
"List of strings":
- do:
bulk:
refresh: true
body:
- '{"index": {"_index": "test_index", "_type": "test_type", "_id": "test_id"}}'
- '{"f1": "v1", "f2": 42}'
- '{"index": {"_index": "test_index", "_type": "test_type", "_id": "test_id2"}}'
- '{"f1": "v2", "f2": 47}'

- do:
count:
index: test_index

- match: {count: 2}
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,18 @@ public void testBulkWithGlobalDefaults() throws Exception {
}
}

// Todo: This test is added to verify type support in bulk action. This should be removed once all clients
// avoid sending this param.
// https://github.com/opensearch-project/OpenSearch/issues/3484
public void testBulkWithTypes() throws Exception {
String bulkAction = copyToStringFromClasspath("/org/opensearch/action/bulk/bulk-with-deprecated-types.json");
{
BulkRequest bulkRequest = new BulkRequest();
bulkRequest.add(bulkAction.getBytes(StandardCharsets.UTF_8), 0, bulkAction.length(), null, XContentType.JSON);
assertThat(bulkRequest.numberOfActions(), equalTo(5));
}
}

private void createSamplePipeline(String pipelineId) throws IOException, ExecutionException, InterruptedException {
XContentBuilder pipeline = jsonBuilder().startObject()
.startArray("processors")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,9 @@ public BulkRequest add(
String routing = valueOrDefault(defaultRouting, globalRouting);
String pipeline = valueOrDefault(defaultPipeline, globalPipeline);
Boolean requireAlias = valueOrDefault(defaultRequireAlias, globalRequireAlias);
new BulkRequestParser().parse(
// https://github.com/opensearch-project/OpenSearch/issues/3484
// Undo error on types which breaks compatibility with some external clients
new BulkRequestParser(false).parse(
data,
defaultIndex,
routing,
Expand All @@ -296,7 +298,7 @@ public BulkRequest add(
requireAlias,
allowExplicitIndex,
xContentType,
this::internalAdd,
(indexRequest, type) -> internalAdd(indexRequest),
this::internalAdd,
this::add
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Function;

Expand All @@ -66,6 +67,7 @@
public final class BulkRequestParser {

private static final ParseField INDEX = new ParseField("_index");
private static final ParseField TYPE = new ParseField("_type");
private static final ParseField ID = new ParseField("_id");
private static final ParseField ROUTING = new ParseField("routing");
private static final ParseField OP_TYPE = new ParseField("op_type");
Expand All @@ -78,6 +80,17 @@ public final class BulkRequestParser {
private static final ParseField IF_PRIMARY_TERM = new ParseField("if_primary_term");
private static final ParseField REQUIRE_ALIAS = new ParseField(DocWriteRequest.REQUIRE_ALIAS);

// TODO: Remove this parameter once the BulkMonitoring endpoint has been removed
private final boolean errorOnType;

/**
* Create a new parser.
* @param errorOnType whether to allow _type information in the index line; used by BulkMonitoring
*/
public BulkRequestParser(boolean errorOnType) {
this.errorOnType = errorOnType;
}

private static int findNextMarker(byte marker, int from, BytesReference data) {
final int res = data.indexOf(marker, from);
if (res != -1) {
Expand Down Expand Up @@ -123,7 +136,7 @@ public void parse(
@Nullable Boolean defaultRequireAlias,
boolean allowExplicitIndex,
XContentType xContentType,
Consumer<IndexRequest> indexRequestConsumer,
BiConsumer<IndexRequest, String> indexRequestConsumer,
Consumer<UpdateRequest> updateRequestConsumer,
Consumer<DeleteRequest> deleteRequestConsumer
) throws IOException {
Expand Down Expand Up @@ -179,6 +192,7 @@ public void parse(
String action = parser.currentName();

String index = defaultIndex;
String type = null;
String id = null;
String routing = defaultRouting;
FetchSourceContext fetchSourceContext = defaultFetchSourceContext;
Expand All @@ -191,7 +205,7 @@ public void parse(
String pipeline = defaultPipeline;
boolean requireAlias = defaultRequireAlias != null && defaultRequireAlias;

// at this stage, next token can either be END_OBJECT (and use default index with auto generated id)
// at this stage, next token can either be END_OBJECT (and use default index and type, with auto generated id)
// or START_OBJECT which will have another set of parameters
token = parser.nextToken();

Expand All @@ -206,6 +220,13 @@ public void parse(
throw new IllegalArgumentException("explicit index in bulk is not allowed");
}
index = stringDeduplicator.computeIfAbsent(parser.text(), Function.identity());
} else if (TYPE.match(currentFieldName, parser.getDeprecationHandler())) {
if (errorOnType) {
throw new IllegalArgumentException(
"Action/metadata line [" + line + "] contains an unknown parameter [" + currentFieldName + "]"
);
}
type = stringDeduplicator.computeIfAbsent(parser.text(), Function.identity());
} else if (ID.match(currentFieldName, parser.getDeprecationHandler())) {
id = parser.text();
} else if (ROUTING.match(currentFieldName, parser.getDeprecationHandler())) {
Expand Down Expand Up @@ -301,7 +322,8 @@ public void parse(
.setIfSeqNo(ifSeqNo)
.setIfPrimaryTerm(ifPrimaryTerm)
.source(sliceTrimmingCarriageReturn(data, from, nextMarker, xContentType), xContentType)
.setRequireAlias(requireAlias)
.setRequireAlias(requireAlias),
type
);
} else {
indexRequestConsumer.accept(
Expand All @@ -314,7 +336,8 @@ public void parse(
.setIfSeqNo(ifSeqNo)
.setIfPrimaryTerm(ifPrimaryTerm)
.source(sliceTrimmingCarriageReturn(data, from, nextMarker, xContentType), xContentType)
.setRequireAlias(requireAlias)
.setRequireAlias(requireAlias),
type
);
}
} else if ("create".equals(action)) {
Expand All @@ -328,7 +351,8 @@ public void parse(
.setIfSeqNo(ifSeqNo)
.setIfPrimaryTerm(ifPrimaryTerm)
.source(sliceTrimmingCarriageReturn(data, from, nextMarker, xContentType), xContentType)
.setRequireAlias(requireAlias)
.setRequireAlias(requireAlias),
type
);
} else if ("update".equals(action)) {
if (version != Versions.MATCH_ANY || versionType != VersionType.INTERNAL) {
Expand Down
Loading