diff --git a/docs/changelog/104927.yaml b/docs/changelog/104927.yaml new file mode 100644 index 0000000000000..e0e098ba10b7b --- /dev/null +++ b/docs/changelog/104927.yaml @@ -0,0 +1,5 @@ +pr: 104927 +summary: Adding `ActionRequestLazyBuilder` implementation of `RequestBuilder` +area: Ingest Node +type: enhancement +issues: [] diff --git a/server/src/main/java/org/elasticsearch/action/ActionRequestLazyBuilder.java b/server/src/main/java/org/elasticsearch/action/ActionRequestLazyBuilder.java new file mode 100644 index 0000000000000..7779b71c46717 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/action/ActionRequestLazyBuilder.java @@ -0,0 +1,61 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.action; + +import org.elasticsearch.client.internal.ElasticsearchClient; +import org.elasticsearch.core.TimeValue; + +import java.util.Objects; + +/** + * This class is similar to ActionRequestBuilder, except that it does not build the request until the request() method is called. + * @param + * @param + */ +public abstract class ActionRequestLazyBuilder + implements + RequestBuilder { + + protected final ActionType action; + protected final ElasticsearchClient client; + + protected ActionRequestLazyBuilder(ElasticsearchClient client, ActionType action) { + Objects.requireNonNull(action, "action must not be null"); + this.action = action; + this.client = client; + } + + /** + * This method creates the request. The caller of this method is responsible for calling Request#decRef. + * @return A newly-built Request, fully initialized by this builder. + */ + public abstract Request request(); + + public ActionFuture execute() { + return client.execute(action, request()); + } + + /** + * Short version of execute().actionGet(). + */ + public Response get() { + return execute().actionGet(); + } + + /** + * Short version of execute().actionGet(). + */ + public Response get(TimeValue timeout) { + return execute().actionGet(timeout); + } + + public void execute(ActionListener listener) { + client.execute(action, request(), listener); + } +} diff --git a/server/src/main/java/org/elasticsearch/action/bulk/BulkRequestBuilder.java b/server/src/main/java/org/elasticsearch/action/bulk/BulkRequestBuilder.java index 2b961b6bc7351..16e5430063650 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/BulkRequestBuilder.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/BulkRequestBuilder.java @@ -8,12 +8,17 @@ package org.elasticsearch.action.bulk; -import org.elasticsearch.action.ActionRequestBuilder; +import org.elasticsearch.action.ActionRequest; +import org.elasticsearch.action.ActionRequestLazyBuilder; +import org.elasticsearch.action.ActionResponse; +import org.elasticsearch.action.DocWriteRequest; +import org.elasticsearch.action.RequestBuilder; import org.elasticsearch.action.delete.DeleteRequest; import org.elasticsearch.action.delete.DeleteRequestBuilder; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.index.IndexRequestBuilder; import org.elasticsearch.action.support.ActiveShardCount; +import org.elasticsearch.action.support.WriteRequest; import org.elasticsearch.action.support.WriteRequestBuilder; import org.elasticsearch.action.support.replication.ReplicationRequest; import org.elasticsearch.action.update.UpdateRequest; @@ -23,26 +28,50 @@ import org.elasticsearch.core.TimeValue; import org.elasticsearch.xcontent.XContentType; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + /** * A bulk request holds an ordered {@link IndexRequest}s and {@link DeleteRequest}s and allows to executes * it in a single batch. */ -public class BulkRequestBuilder extends ActionRequestBuilder implements WriteRequestBuilder { +public class BulkRequestBuilder extends ActionRequestLazyBuilder + implements + WriteRequestBuilder { + private final String globalIndex; + /* + * The following 3 variables hold the list of requests that make up this bulk. Only one can be non-empty. That is, users can't add + * some IndexRequests and some IndexRequestBuilders. They need to pick one (preferably builders) and stick with it. + */ + private final List> requests = new ArrayList<>(); + private final List framedData = new ArrayList<>(); + private final List> requestBuilders = new ArrayList<>(); + private ActiveShardCount waitForActiveShards; + private TimeValue timeout; + private String timeoutString; + private String globalPipeline; + private String globalRouting; + private WriteRequest.RefreshPolicy refreshPolicy; + private String refreshPolicyString; public BulkRequestBuilder(ElasticsearchClient client, @Nullable String globalIndex) { - super(client, BulkAction.INSTANCE, new BulkRequest(globalIndex)); + super(client, BulkAction.INSTANCE); + this.globalIndex = globalIndex; } public BulkRequestBuilder(ElasticsearchClient client) { - super(client, BulkAction.INSTANCE, new BulkRequest()); + this(client, null); } /** * Adds an {@link IndexRequest} to the list of actions to execute. Follows the same behavior of {@link IndexRequest} * (for example, if no id is provided, one will be generated, or usage of the create flag). + * @deprecated use {@link #add(IndexRequestBuilder)} instead */ + @Deprecated public BulkRequestBuilder add(IndexRequest request) { - super.request.add(request); + requests.add(request); return this; } @@ -51,15 +80,17 @@ public BulkRequestBuilder add(IndexRequest request) { * (for example, if no id is provided, one will be generated, or usage of the create flag). */ public BulkRequestBuilder add(IndexRequestBuilder request) { - super.request.add(request.request()); + requestBuilders.add(request); return this; } /** * Adds an {@link DeleteRequest} to the list of actions to execute. + * @deprecated use {@link #add(DeleteRequestBuilder)} instead */ + @Deprecated public BulkRequestBuilder add(DeleteRequest request) { - super.request.add(request); + requests.add(request); return this; } @@ -67,15 +98,17 @@ public BulkRequestBuilder add(DeleteRequest request) { * Adds an {@link DeleteRequest} to the list of actions to execute. */ public BulkRequestBuilder add(DeleteRequestBuilder request) { - super.request.add(request.request()); + requestBuilders.add(request); return this; } /** * Adds an {@link UpdateRequest} to the list of actions to execute. + * @deprecated use {@link #add(UpdateRequestBuilder)} instead */ + @Deprecated public BulkRequestBuilder add(UpdateRequest request) { - super.request.add(request); + requests.add(request); return this; } @@ -83,7 +116,7 @@ public BulkRequestBuilder add(UpdateRequest request) { * Adds an {@link UpdateRequest} to the list of actions to execute. */ public BulkRequestBuilder add(UpdateRequestBuilder request) { - super.request.add(request.request()); + requestBuilders.add(request); return this; } @@ -91,7 +124,7 @@ public BulkRequestBuilder add(UpdateRequestBuilder request) { * Adds a framed data in binary format */ public BulkRequestBuilder add(byte[] data, int from, int length, XContentType xContentType) throws Exception { - request.add(data, from, length, null, xContentType); + framedData.add(new FramedData(data, from, length, null, xContentType)); return this; } @@ -100,7 +133,7 @@ public BulkRequestBuilder add(byte[] data, int from, int length, XContentType xC */ public BulkRequestBuilder add(byte[] data, int from, int length, @Nullable String defaultIndex, XContentType xContentType) throws Exception { - request.add(data, from, length, defaultIndex, xContentType); + framedData.add(new FramedData(data, from, length, defaultIndex, xContentType)); return this; } @@ -109,7 +142,7 @@ public BulkRequestBuilder add(byte[] data, int from, int length, @Nullable Strin * See {@link ReplicationRequest#waitForActiveShards(ActiveShardCount)} for details. */ public BulkRequestBuilder setWaitForActiveShards(ActiveShardCount waitForActiveShards) { - request.waitForActiveShards(waitForActiveShards); + this.waitForActiveShards = waitForActiveShards; return this; } @@ -126,7 +159,7 @@ public BulkRequestBuilder setWaitForActiveShards(final int waitForActiveShards) * A timeout to wait if the index operation can't be performed immediately. Defaults to {@code 1m}. */ public final BulkRequestBuilder setTimeout(TimeValue timeout) { - request.timeout(timeout); + this.timeout = timeout; return this; } @@ -134,7 +167,7 @@ public final BulkRequestBuilder setTimeout(TimeValue timeout) { * A timeout to wait if the index operation can't be performed immediately. Defaults to {@code 1m}. */ public final BulkRequestBuilder setTimeout(String timeout) { - request.timeout(timeout); + this.timeoutString = timeout; return this; } @@ -142,16 +175,96 @@ public final BulkRequestBuilder setTimeout(String timeout) { * The number of actions currently in the bulk. */ public int numberOfActions() { - return request.numberOfActions(); + return requests.size() + requestBuilders.size() + framedData.size(); } public BulkRequestBuilder pipeline(String globalPipeline) { - request.pipeline(globalPipeline); + this.globalPipeline = globalPipeline; return this; } public BulkRequestBuilder routing(String globalRouting) { - request.routing(globalRouting); + this.globalRouting = globalRouting; + return this; + } + + @Override + public BulkRequestBuilder setRefreshPolicy(WriteRequest.RefreshPolicy refreshPolicy) { + this.refreshPolicy = refreshPolicy; + return this; + } + + @Override + public BulkRequestBuilder setRefreshPolicy(String refreshPolicy) { + this.refreshPolicyString = refreshPolicy; return this; } + + @Override + public BulkRequest request() { + validate(); + BulkRequest request = new BulkRequest(globalIndex); + for (RequestBuilder requestBuilder : requestBuilders) { + ActionRequest childRequest = requestBuilder.request(); + request.add((DocWriteRequest) childRequest); + } + for (DocWriteRequest childRequest : requests) { + request.add(childRequest); + } + for (FramedData framedData : framedData) { + try { + request.add(framedData.data, framedData.from, framedData.length, framedData.defaultIndex, framedData.xContentType); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + if (waitForActiveShards != null) { + request.waitForActiveShards(waitForActiveShards); + } + if (timeout != null) { + request.timeout(timeout); + } + if (timeoutString != null) { + request.timeout(timeoutString); + } + if (globalPipeline != null) { + request.pipeline(globalPipeline); + } + if (globalRouting != null) { + request.routing(globalRouting); + } + if (refreshPolicy != null) { + request.setRefreshPolicy(refreshPolicy); + } + if (refreshPolicyString != null) { + request.setRefreshPolicy(refreshPolicyString); + } + return request; + } + + private void validate() { + if (countNonEmptyLists(requestBuilders, requests, framedData) > 1) { + throw new IllegalStateException( + "Must use only request builders, requests, or byte arrays within a single bulk request. Cannot mix and match" + ); + } + if (timeout != null && timeoutString != null) { + throw new IllegalStateException("Must use only one setTimeout method"); + } + if (refreshPolicy != null && refreshPolicyString != null) { + throw new IllegalStateException("Must use only one setRefreshPolicy method"); + } + } + + private int countNonEmptyLists(List... lists) { + int sum = 0; + for (List list : lists) { + if (list.isEmpty() == false) { + sum++; + } + } + return sum; + } + + private record FramedData(byte[] data, int from, int length, @Nullable String defaultIndex, XContentType xContentType) {} } diff --git a/server/src/test/java/org/elasticsearch/action/bulk/BulkRequestBuilderTests.java b/server/src/test/java/org/elasticsearch/action/bulk/BulkRequestBuilderTests.java new file mode 100644 index 0000000000000..8843801e528a3 --- /dev/null +++ b/server/src/test/java/org/elasticsearch/action/bulk/BulkRequestBuilderTests.java @@ -0,0 +1,37 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.action.bulk; + +import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.action.index.IndexRequestBuilder; +import org.elasticsearch.action.support.WriteRequest; +import org.elasticsearch.core.TimeValue; +import org.elasticsearch.test.ESTestCase; + +public class BulkRequestBuilderTests extends ESTestCase { + + public void testValidation() { + BulkRequestBuilder bulkRequestBuilder = new BulkRequestBuilder(null, null); + bulkRequestBuilder.add(new IndexRequestBuilder(null, randomAlphaOfLength(10))); + bulkRequestBuilder.add(new IndexRequest()); + expectThrows(IllegalStateException.class, bulkRequestBuilder::request); + + bulkRequestBuilder = new BulkRequestBuilder(null, null); + bulkRequestBuilder.add(new IndexRequestBuilder(null, randomAlphaOfLength(10))); + bulkRequestBuilder.setTimeout(randomTimeValue()); + bulkRequestBuilder.setTimeout(TimeValue.timeValueSeconds(randomIntBetween(1, 30))); + expectThrows(IllegalStateException.class, bulkRequestBuilder::request); + + bulkRequestBuilder = new BulkRequestBuilder(null, null); + bulkRequestBuilder.add(new IndexRequestBuilder(null, randomAlphaOfLength(10))); + bulkRequestBuilder.setRefreshPolicy(randomFrom(WriteRequest.RefreshPolicy.values()).getValue()); + bulkRequestBuilder.setRefreshPolicy(randomFrom(WriteRequest.RefreshPolicy.values())); + expectThrows(IllegalStateException.class, bulkRequestBuilder::request); + } +}