Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,10 @@ public void testIndexMissingBody() throws IOException {
assertResponseException(responseException, "request body is required");
}

@AwaitsFix(bugUrl = "need to decide how to handle this scenario")
public void testBulkMissingBody() throws IOException {
ResponseException responseException = expectThrows(
ResponseException.class,
() -> client().performRequest(new Request(randomBoolean() ? "POST" : "PUT", "/_bulk"))
);
Request request = new Request(randomBoolean() ? "POST" : "PUT", "/_bulk");
request.setJsonEntity("");
ResponseException responseException = expectThrows(ResponseException.class, () -> client().performRequest(request));
assertResponseException(responseException, "request body is required");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,33 @@
import java.util.Map;

import static org.elasticsearch.rest.RestStatus.OK;
import static org.hamcrest.CoreMatchers.containsString;
import static org.hamcrest.Matchers.equalTo;

@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.SUITE, supportsDedicatedMasters = false, numDataNodes = 2, numClientNodes = 0)
public class IncrementalBulkRestIT extends HttpSmokeTestCase {

public void testBulkMissingBody() throws IOException {
Request request = new Request(randomBoolean() ? "POST" : "PUT", "/_bulk");
request.setJsonEntity("");
ResponseException responseException = expectThrows(ResponseException.class, () -> getRestClient().performRequest(request));
assertEquals(400, responseException.getResponse().getStatusLine().getStatusCode());
assertThat(responseException.getMessage(), containsString("request body is required"));
}

public void testBulkRequestBodyImproperlyTerminated() throws IOException {
Request request = new Request(randomBoolean() ? "POST" : "PUT", "/_bulk");
// missing final line of the bulk body. cannot process
request.setJsonEntity(
"{\"index\":{\"_index\":\"index_name\",\"_id\":\"1\"}}\n"
+ "{\"field\":1}\n"
+ "{\"index\":{\"_index\":\"index_name\",\"_id\":\"2\"}"
);
ResponseException responseException = expectThrows(ResponseException.class, () -> getRestClient().performRequest(request));
assertEquals(400, responseException.getResponse().getStatusLine().getStatusCode());
assertThat(responseException.getMessage(), containsString("could not parse bulk request body"));
}

public void testIncrementalBulk() throws IOException {
Request createRequest = new Request("PUT", "/index_name");
createRequest.setJsonEntity("""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

package org.elasticsearch.rest.action.document;

import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkRequestParser;
Expand Down Expand Up @@ -150,6 +151,7 @@ static class ChunkHandler implements BaseRestHandler.RequestBodyChunkConsumer {

private volatile RestChannel restChannel;
private boolean shortCircuited;
private int bytesParsed = 0;
private final ArrayDeque<ReleasableBytesReference> unParsedChunks = new ArrayDeque<>(4);
private final ArrayList<DocWriteRequest<?>> items = new ArrayList<>(4);

Expand Down Expand Up @@ -186,48 +188,61 @@ public void handleChunk(RestChannel channel, ReleasableBytesReference chunk, boo

final BytesReference data;
int bytesConsumed;
try {
unParsedChunks.add(chunk);
if (chunk.length() == 0) {
chunk.close();
bytesConsumed = 0;
} else {
try {
unParsedChunks.add(chunk);

if (unParsedChunks.size() > 1) {
data = CompositeBytesReference.of(unParsedChunks.toArray(new ReleasableBytesReference[0]));
} else {
data = chunk;
}
if (unParsedChunks.size() > 1) {
data = CompositeBytesReference.of(unParsedChunks.toArray(new ReleasableBytesReference[0]));
} else {
data = chunk;
}

// TODO: Check that the behavior here vs. globalRouting, globalPipeline, globalRequireAlias, globalRequireDatsStream in
// BulkRequest#add is fine
bytesConsumed = parser.incrementalParse(
data,
defaultIndex,
defaultRouting,
defaultFetchSourceContext,
defaultPipeline,
defaultRequireAlias,
defaultRequireDataStream,
defaultListExecutedPipelines,
allowExplicitIndex,
request.getXContentType(),
(request, type) -> items.add(request),
items::add,
items::add,
isLast == false,
stringDeduplicator
);
// TODO: Check that the behavior here vs. globalRouting, globalPipeline, globalRequireAlias, globalRequireDatsStream in
// BulkRequest#add is fine
bytesConsumed = parser.incrementalParse(
data,
defaultIndex,
defaultRouting,
defaultFetchSourceContext,
defaultPipeline,
defaultRequireAlias,
defaultRequireDataStream,
defaultListExecutedPipelines,
allowExplicitIndex,
request.getXContentType(),
(request, type) -> items.add(request),
items::add,
items::add,
isLast == false,
stringDeduplicator
);
bytesParsed += bytesConsumed;

} catch (Exception e) {
shortCircuit();
new RestToXContentListener<>(channel).onFailure(e);
return;
} catch (Exception e) {
shortCircuit();
new RestToXContentListener<>(channel).onFailure(
new ElasticsearchParseException("could not parse bulk request body", e)
);
return;
}
}

final ArrayList<Releasable> releasables = accountParsing(bytesConsumed);
if (isLast) {
assert unParsedChunks.isEmpty();
assert channel != null;
ArrayList<DocWriteRequest<?>> toPass = new ArrayList<>(items);
items.clear();
handler.lastItems(toPass, () -> Releasables.close(releasables), new RestRefCountedChunkedToXContentListener<>(channel));
if (bytesParsed == 0) {
shortCircuit();
new RestToXContentListener<>(channel).onFailure(new ElasticsearchParseException("request body is required"));
} else {
assert channel != null;
ArrayList<DocWriteRequest<?>> toPass = new ArrayList<>(items);
items.clear();
handler.lastItems(toPass, () -> Releasables.close(releasables), new RestRefCountedChunkedToXContentListener<>(channel));
}
} else if (items.isEmpty() == false) {
ArrayList<DocWriteRequest<?>> toPass = new ArrayList<>(items);
items.clear();
Expand Down