From 77ed93a1227a2860808bd3074793c9febed03bee Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Mon, 16 Sep 2024 15:46:12 -0600 Subject: [PATCH 1/2] Properly handle empty incremental bulk requests This commit ensures we properly throw exceptions when an empty bulk request is received with the incremental handling enabled. --- .../test/rest/RequestsWithoutContentIT.java | 5 +- .../http/IncrementalBulkRestIT.java | 27 ++++++ .../rest/action/document/RestBulkAction.java | 83 +++++++++++-------- 3 files changed, 79 insertions(+), 36 deletions(-) diff --git a/distribution/archives/integ-test-zip/src/javaRestTest/java/org/elasticsearch/test/rest/RequestsWithoutContentIT.java b/distribution/archives/integ-test-zip/src/javaRestTest/java/org/elasticsearch/test/rest/RequestsWithoutContentIT.java index 83cb4fb7180ef..3cd9ea393fada 100644 --- a/distribution/archives/integ-test-zip/src/javaRestTest/java/org/elasticsearch/test/rest/RequestsWithoutContentIT.java +++ b/distribution/archives/integ-test-zip/src/javaRestTest/java/org/elasticsearch/test/rest/RequestsWithoutContentIT.java @@ -26,11 +26,12 @@ public void testIndexMissingBody() throws IOException { assertResponseException(responseException, "request body is required"); } - @AwaitsFix(bugUrl = "need to decide how to handle this scenario") public void testBulkMissingBody() throws IOException { + Request request = new Request(randomBoolean() ? "POST" : "PUT", "/_bulk"); + request.setJsonEntity(""); ResponseException responseException = expectThrows( ResponseException.class, - () -> client().performRequest(new Request(randomBoolean() ? "POST" : "PUT", "/_bulk")) + () -> client().performRequest(request) ); assertResponseException(responseException, "request body is required"); } diff --git a/qa/smoke-test-http/src/javaRestTest/java/org/elasticsearch/http/IncrementalBulkRestIT.java b/qa/smoke-test-http/src/javaRestTest/java/org/elasticsearch/http/IncrementalBulkRestIT.java index a05c3e510c0f8..13cbc60fd3d6d 100644 --- a/qa/smoke-test-http/src/javaRestTest/java/org/elasticsearch/http/IncrementalBulkRestIT.java +++ b/qa/smoke-test-http/src/javaRestTest/java/org/elasticsearch/http/IncrementalBulkRestIT.java @@ -23,11 +23,38 @@ import java.util.Map; import static org.elasticsearch.rest.RestStatus.OK; +import static org.hamcrest.CoreMatchers.containsString; import static org.hamcrest.Matchers.equalTo; @ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.SUITE, supportsDedicatedMasters = false, numDataNodes = 2, numClientNodes = 0) public class IncrementalBulkRestIT extends HttpSmokeTestCase { + public void testBulkMissingBody() throws IOException { + Request request = new Request(randomBoolean() ? "POST" : "PUT", "/_bulk"); + request.setJsonEntity(""); + ResponseException responseException = expectThrows( + ResponseException.class, + () -> getRestClient().performRequest(request) + ); + assertEquals(400, responseException.getResponse().getStatusLine().getStatusCode()); + assertThat(responseException.getMessage(), containsString("request body is required")); + } + + public void testBulkRequestBodyImproperlyTerminated() throws IOException { + Request request = new Request(randomBoolean() ? "POST" : "PUT", "/_bulk"); + // missing final line of the bulk body. cannot process + request.setJsonEntity("{\"index\":{\"_index\":\"index_name\",\"_id\":\"1\"}}\n" + + "{\"field\":1}\n" + + "{\"index\":{\"_index\":\"index_name\",\"_id\":\"2\"}"); + ResponseException responseException = expectThrows( + ResponseException.class, + () -> getRestClient().performRequest(request) + ); + responseException.printStackTrace(); + assertEquals(400, responseException.getResponse().getStatusLine().getStatusCode()); + assertThat(responseException.getMessage(), containsString("could not parse bulk request body")); + } + public void testIncrementalBulk() throws IOException { Request createRequest = new Request("PUT", "/index_name"); createRequest.setJsonEntity(""" diff --git a/server/src/main/java/org/elasticsearch/rest/action/document/RestBulkAction.java b/server/src/main/java/org/elasticsearch/rest/action/document/RestBulkAction.java index 8766f275a5621..ff87bb834f3e1 100644 --- a/server/src/main/java/org/elasticsearch/rest/action/document/RestBulkAction.java +++ b/server/src/main/java/org/elasticsearch/rest/action/document/RestBulkAction.java @@ -9,6 +9,7 @@ package org.elasticsearch.rest.action.document; +import org.elasticsearch.ElasticsearchParseException; import org.elasticsearch.action.DocWriteRequest; import org.elasticsearch.action.bulk.BulkRequest; import org.elasticsearch.action.bulk.BulkRequestParser; @@ -150,6 +151,7 @@ static class ChunkHandler implements BaseRestHandler.RequestBodyChunkConsumer { private volatile RestChannel restChannel; private boolean shortCircuited; + private int bytesParsed = 0; private final ArrayDeque unParsedChunks = new ArrayDeque<>(4); private final ArrayList> items = new ArrayList<>(4); @@ -186,48 +188,61 @@ public void handleChunk(RestChannel channel, ReleasableBytesReference chunk, boo final BytesReference data; int bytesConsumed; - try { - unParsedChunks.add(chunk); + if (chunk.length() == 0) { + chunk.close(); + bytesConsumed = 0; + } else { + try { + unParsedChunks.add(chunk); - if (unParsedChunks.size() > 1) { - data = CompositeBytesReference.of(unParsedChunks.toArray(new ReleasableBytesReference[0])); - } else { - data = chunk; - } + if (unParsedChunks.size() > 1) { + data = CompositeBytesReference.of(unParsedChunks.toArray(new ReleasableBytesReference[0])); + } else { + data = chunk; + } - // TODO: Check that the behavior here vs. globalRouting, globalPipeline, globalRequireAlias, globalRequireDatsStream in - // BulkRequest#add is fine - bytesConsumed = parser.incrementalParse( - data, - defaultIndex, - defaultRouting, - defaultFetchSourceContext, - defaultPipeline, - defaultRequireAlias, - defaultRequireDataStream, - defaultListExecutedPipelines, - allowExplicitIndex, - request.getXContentType(), - (request, type) -> items.add(request), - items::add, - items::add, - isLast == false, - stringDeduplicator - ); + // TODO: Check that the behavior here vs. globalRouting, globalPipeline, globalRequireAlias, globalRequireDatsStream in + // BulkRequest#add is fine + bytesConsumed = parser.incrementalParse( + data, + defaultIndex, + defaultRouting, + defaultFetchSourceContext, + defaultPipeline, + defaultRequireAlias, + defaultRequireDataStream, + defaultListExecutedPipelines, + allowExplicitIndex, + request.getXContentType(), + (request, type) -> items.add(request), + items::add, + items::add, + isLast == false, + stringDeduplicator + ); + bytesParsed += bytesConsumed; - } catch (Exception e) { - shortCircuit(); - new RestToXContentListener<>(channel).onFailure(e); - return; + } catch (Exception e) { + shortCircuit(); + new RestToXContentListener<>(channel).onFailure( + new ElasticsearchParseException("could not parse bulk request body", e) + ); + return; + } } final ArrayList releasables = accountParsing(bytesConsumed); if (isLast) { assert unParsedChunks.isEmpty(); - assert channel != null; - ArrayList> toPass = new ArrayList<>(items); - items.clear(); - handler.lastItems(toPass, () -> Releasables.close(releasables), new RestRefCountedChunkedToXContentListener<>(channel)); + if (bytesParsed == 0) { + shortCircuit(); + new RestToXContentListener<>(channel).onFailure(new ElasticsearchParseException("request body is required")); + } else { + assert channel != null; + ArrayList> toPass = new ArrayList<>(items); + items.clear(); + handler.lastItems(toPass, () -> Releasables.close(releasables), new RestRefCountedChunkedToXContentListener<>(channel)); + } } else if (items.isEmpty() == false) { ArrayList> toPass = new ArrayList<>(items); items.clear(); From 0e7ffa42786624ce233b3aaaff27596aa0d298f9 Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Mon, 16 Sep 2024 15:55:29 -0600 Subject: [PATCH 2/2] Fix --- .../test/rest/RequestsWithoutContentIT.java | 5 +---- .../http/IncrementalBulkRestIT.java | 17 ++++++----------- 2 files changed, 7 insertions(+), 15 deletions(-) diff --git a/distribution/archives/integ-test-zip/src/javaRestTest/java/org/elasticsearch/test/rest/RequestsWithoutContentIT.java b/distribution/archives/integ-test-zip/src/javaRestTest/java/org/elasticsearch/test/rest/RequestsWithoutContentIT.java index 3cd9ea393fada..8732110bb1937 100644 --- a/distribution/archives/integ-test-zip/src/javaRestTest/java/org/elasticsearch/test/rest/RequestsWithoutContentIT.java +++ b/distribution/archives/integ-test-zip/src/javaRestTest/java/org/elasticsearch/test/rest/RequestsWithoutContentIT.java @@ -29,10 +29,7 @@ public void testIndexMissingBody() throws IOException { public void testBulkMissingBody() throws IOException { Request request = new Request(randomBoolean() ? "POST" : "PUT", "/_bulk"); request.setJsonEntity(""); - ResponseException responseException = expectThrows( - ResponseException.class, - () -> client().performRequest(request) - ); + ResponseException responseException = expectThrows(ResponseException.class, () -> client().performRequest(request)); assertResponseException(responseException, "request body is required"); } diff --git a/qa/smoke-test-http/src/javaRestTest/java/org/elasticsearch/http/IncrementalBulkRestIT.java b/qa/smoke-test-http/src/javaRestTest/java/org/elasticsearch/http/IncrementalBulkRestIT.java index 13cbc60fd3d6d..08026e0435f33 100644 --- a/qa/smoke-test-http/src/javaRestTest/java/org/elasticsearch/http/IncrementalBulkRestIT.java +++ b/qa/smoke-test-http/src/javaRestTest/java/org/elasticsearch/http/IncrementalBulkRestIT.java @@ -32,10 +32,7 @@ public class IncrementalBulkRestIT extends HttpSmokeTestCase { public void testBulkMissingBody() throws IOException { Request request = new Request(randomBoolean() ? "POST" : "PUT", "/_bulk"); request.setJsonEntity(""); - ResponseException responseException = expectThrows( - ResponseException.class, - () -> getRestClient().performRequest(request) - ); + ResponseException responseException = expectThrows(ResponseException.class, () -> getRestClient().performRequest(request)); assertEquals(400, responseException.getResponse().getStatusLine().getStatusCode()); assertThat(responseException.getMessage(), containsString("request body is required")); } @@ -43,14 +40,12 @@ public void testBulkMissingBody() throws IOException { public void testBulkRequestBodyImproperlyTerminated() throws IOException { Request request = new Request(randomBoolean() ? "POST" : "PUT", "/_bulk"); // missing final line of the bulk body. cannot process - request.setJsonEntity("{\"index\":{\"_index\":\"index_name\",\"_id\":\"1\"}}\n" - + "{\"field\":1}\n" - + "{\"index\":{\"_index\":\"index_name\",\"_id\":\"2\"}"); - ResponseException responseException = expectThrows( - ResponseException.class, - () -> getRestClient().performRequest(request) + request.setJsonEntity( + "{\"index\":{\"_index\":\"index_name\",\"_id\":\"1\"}}\n" + + "{\"field\":1}\n" + + "{\"index\":{\"_index\":\"index_name\",\"_id\":\"2\"}" ); - responseException.printStackTrace(); + ResponseException responseException = expectThrows(ResponseException.class, () -> getRestClient().performRequest(request)); assertEquals(400, responseException.getResponse().getStatusLine().getStatusCode()); assertThat(responseException.getMessage(), containsString("could not parse bulk request body")); }