Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,14 @@
import io.netty.util.ReferenceCounted;

import org.elasticsearch.ESNetty4IntegTestCase;
import org.elasticsearch.action.bulk.IncrementalBulkService;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.core.Tuple;
import org.elasticsearch.http.HttpServerTransport;
import org.elasticsearch.indices.breaker.HierarchyCircuitBreakerService;
import org.elasticsearch.rest.action.document.RestBulkAction;
import org.elasticsearch.test.ESIntegTestCase.ClusterScope;
import org.elasticsearch.test.ESIntegTestCase.Scope;

Expand Down Expand Up @@ -53,7 +53,7 @@ protected Settings nodeSettings(int nodeOrdinal, Settings otherSettings) {
return Settings.builder()
.put(super.nodeSettings(nodeOrdinal, otherSettings))
// TODO: We do not currently support in flight circuit breaker limits for bulk. However, IndexingPressure applies
.put(RestBulkAction.INCREMENTAL_BULK.getKey(), false)
.put(IncrementalBulkService.INCREMENTAL_BULK.getKey(), false)
.put(HierarchyCircuitBreakerService.IN_FLIGHT_REQUESTS_CIRCUIT_BREAKER_LIMIT_SETTING.getKey(), LIMIT)
.build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,11 @@

package org.elasticsearch.http;

import org.elasticsearch.action.bulk.IncrementalBulkService;
import org.elasticsearch.client.Request;
import org.elasticsearch.client.Response;
import org.elasticsearch.client.ResponseException;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.xcontent.json.JsonXContent;
Expand All @@ -25,7 +27,6 @@
@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.SUITE, supportsDedicatedMasters = false, numDataNodes = 2, numClientNodes = 0)
public class IncrementalBulkRestIT extends HttpSmokeTestCase {

@SuppressWarnings("unchecked")
public void testIncrementalBulk() throws IOException {
Request createRequest = new Request("PUT", "/index_name");
createRequest.setJsonEntity("""
Expand Down Expand Up @@ -55,35 +56,52 @@ public void testIncrementalBulk() throws IOException {
final Response indexSuccessFul = getRestClient().performRequest(firstBulkRequest);
assertThat(indexSuccessFul.getStatusLine().getStatusCode(), equalTo(OK.getStatus()));

Request bulkRequest = new Request("POST", "/index_name/_bulk");
sendLargeBulk();
}

public void testBulkWithIncrementalDisabled() throws IOException {
Request createRequest = new Request("PUT", "/index_name");
createRequest.setJsonEntity("""
{
"settings": {
"index": {
"number_of_shards": 1,
"number_of_replicas": 1,
"write.wait_for_active_shards": 2
}
}
}""");
final Response indexCreatedResponse = getRestClient().performRequest(createRequest);
assertThat(indexCreatedResponse.getStatusLine().getStatusCode(), equalTo(OK.getStatus()));

Request firstBulkRequest = new Request("POST", "/index_name/_bulk");

// index documents for the rollup job
final StringBuilder bulk = new StringBuilder();
bulk.append("{\"delete\":{\"_index\":\"index_name\",\"_id\":\"1\"}}\n");
int updates = 0;
for (int i = 0; i < 1000; i++) {
bulk.append("{\"index\":{\"_index\":\"index_name\"}}\n");
bulk.append("{\"field\":").append(i).append("}\n");
if (randomBoolean() && randomBoolean() && randomBoolean() && randomBoolean()) {
++updates;
bulk.append("{\"update\":{\"_index\":\"index_name\",\"_id\":\"2\"}}\n");
bulk.append("{\"doc\":{\"field\":").append(i).append("}}\n");
}
}
bulk.append("\r\n");
String bulkBody = "{\"index\":{\"_index\":\"index_name\",\"_id\":\"1\"}}\n"
+ "{\"field\":1}\n"
+ "{\"index\":{\"_index\":\"index_name\",\"_id\":\"2\"}}\n"
+ "{\"field\":1}\n"
+ "\r\n";

bulkRequest.setJsonEntity(bulk.toString());
firstBulkRequest.setJsonEntity(bulkBody);

final Response bulkResponse = getRestClient().performRequest(bulkRequest);
assertThat(bulkResponse.getStatusLine().getStatusCode(), equalTo(OK.getStatus()));
Map<String, Object> responseMap = XContentHelper.convertToMap(
JsonXContent.jsonXContent,
bulkResponse.getEntity().getContent(),
true
);
final Response indexSuccessFul = getRestClient().performRequest(firstBulkRequest);
assertThat(indexSuccessFul.getStatusLine().getStatusCode(), equalTo(OK.getStatus()));

assertFalse((Boolean) responseMap.get("errors"));
assertThat(((List<Object>) responseMap.get("items")).size(), equalTo(1001 + updates));
clusterAdmin().prepareUpdateSettings()
.setPersistentSettings(Settings.builder().put(IncrementalBulkService.INCREMENTAL_BULK.getKey(), false).build())
.get();

internalCluster().getInstances(IncrementalBulkService.class).forEach(i -> i.setForTests(false));

try {
sendLargeBulk();
} finally {
internalCluster().getInstances(IncrementalBulkService.class).forEach(i -> i.setForTests(true));
clusterAdmin().prepareUpdateSettings()
.setPersistentSettings(Settings.builder().put(IncrementalBulkService.INCREMENTAL_BULK.getKey(), (String) null).build())
.get();
}
}

public void testIncrementalMalformed() throws IOException {
Expand Down Expand Up @@ -114,4 +132,37 @@ public void testIncrementalMalformed() throws IOException {

expectThrows(ResponseException.class, () -> getRestClient().performRequest(bulkRequest));
}

@SuppressWarnings("unchecked")
private static void sendLargeBulk() throws IOException {
Request bulkRequest = new Request("POST", "/index_name/_bulk");

// index documents for the rollup job
final StringBuilder bulk = new StringBuilder();
bulk.append("{\"delete\":{\"_index\":\"index_name\",\"_id\":\"1\"}}\n");
int updates = 0;
for (int i = 0; i < 1000; i++) {
bulk.append("{\"index\":{\"_index\":\"index_name\"}}\n");
bulk.append("{\"field\":").append(i).append("}\n");
if (randomBoolean() && randomBoolean() && randomBoolean() && randomBoolean()) {
++updates;
bulk.append("{\"update\":{\"_index\":\"index_name\",\"_id\":\"2\"}}\n");
bulk.append("{\"doc\":{\"field\":").append(i).append("}}\n");
}
}
bulk.append("\r\n");

bulkRequest.setJsonEntity(bulk.toString());

final Response bulkResponse = getRestClient().performRequest(bulkRequest);
assertThat(bulkResponse.getStatusLine().getStatusCode(), equalTo(OK.getStatus()));
Map<String, Object> responseMap = XContentHelper.convertToMap(
JsonXContent.jsonXContent,
bulkResponse.getEntity().getContent(),
true
);

assertFalse((Boolean) responseMap.get("errors"));
assertThat(((List<Object>) responseMap.get("items")).size(), equalTo(1001 + updates));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,74 +14,72 @@
import org.elasticsearch.action.support.ActiveShardCount;
import org.elasticsearch.client.internal.Client;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.Releasable;
import org.elasticsearch.core.Releasables;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.index.IndexingPressure;
import org.elasticsearch.rest.action.document.RestBulkAction;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Supplier;

import static org.elasticsearch.common.settings.Setting.boolSetting;

public class IncrementalBulkService {

public static final Setting<Boolean> INCREMENTAL_BULK = boolSetting(
"rest.incremental_bulk",
true,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd probably want to flip this before merge to main? Unless we are confident in having the feature enabled without a rollout plan. Can be a follow-up.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will add a ticket and make it a blocker for merge into main.

Setting.Property.NodeScope,
Setting.Property.Dynamic
);
private final Client client;
private final AtomicBoolean enabledForTests = new AtomicBoolean(true);
private final IndexingPressure indexingPressure;
private final ThreadContext threadContext;
private final Supplier<Boolean> enabled;

public IncrementalBulkService(Client client, IndexingPressure indexingPressure, ThreadContext threadContext) {
this(client, indexingPressure, threadContext, new Enabled());
}

public IncrementalBulkService(
Client client,
IndexingPressure indexingPressure,
ThreadContext threadContext,
ClusterSettings clusterSettings
) {
this(client, indexingPressure, threadContext, new Enabled(clusterSettings));
}

public IncrementalBulkService(
Client client,
IndexingPressure indexingPressure,
ThreadContext threadContext,
Supplier<Boolean> enabled
) {
this.client = client;
this.indexingPressure = indexingPressure;
this.threadContext = threadContext;
this.enabled = enabled;
}

public boolean incrementalBulkEnabled() {
return enabled.get();
}

public Handler newBulkRequest() {
ensureEnabled();
return newBulkRequest(null, null, null);
}

public Handler newBulkRequest(@Nullable String waitForActiveShards, @Nullable TimeValue timeout, @Nullable String refresh) {
ensureEnabled();
return new Handler(client, threadContext, indexingPressure, waitForActiveShards, timeout, refresh);
}

private void ensureEnabled() {
if (enabledForTests.get() == false) {
throw new AssertionError("Unexpected incremental bulk request");
}
}

// This method only exists to tests that the feature flag works. Remove once we no longer need the flag.
Comment thread
Tim-Brooks marked this conversation as resolved.
public void setForTests(boolean value) {
enabledForTests.set(value);
}

public static class Enabled implements Supplier<Boolean> {

private final AtomicBoolean incrementalBulksEnabled = new AtomicBoolean(true);

public Enabled() {}

public Enabled(ClusterSettings clusterSettings) {
incrementalBulksEnabled.set(clusterSettings.get(RestBulkAction.INCREMENTAL_BULK));
clusterSettings.addSettingsUpdateConsumer(RestBulkAction.INCREMENTAL_BULK, incrementalBulksEnabled::set);
incrementalBulksEnabled.set(clusterSettings.get(INCREMENTAL_BULK));
clusterSettings.addSettingsUpdateConsumer(INCREMENTAL_BULK, incrementalBulksEnabled::set);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import org.apache.logging.log4j.LogManager;
import org.elasticsearch.action.admin.cluster.configuration.TransportAddVotingConfigExclusionsAction;
import org.elasticsearch.action.admin.indices.close.TransportCloseIndexAction;
import org.elasticsearch.action.bulk.IncrementalBulkService;
import org.elasticsearch.action.bulk.WriteAckDelay;
import org.elasticsearch.action.datastreams.autosharding.DataStreamAutoShardingService;
import org.elasticsearch.action.ingest.SimulatePipelineTransportAction;
Expand Down Expand Up @@ -112,7 +113,6 @@
import org.elasticsearch.readiness.ReadinessService;
import org.elasticsearch.repositories.fs.FsRepository;
import org.elasticsearch.rest.BaseRestHandler;
import org.elasticsearch.rest.action.document.RestBulkAction;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.search.SearchModule;
import org.elasticsearch.search.SearchService;
Expand Down Expand Up @@ -242,7 +242,7 @@ public void apply(Settings value, Settings current, Settings previous) {
Metadata.SETTING_READ_ONLY_SETTING,
Metadata.SETTING_READ_ONLY_ALLOW_DELETE_SETTING,
ShardLimitValidator.SETTING_CLUSTER_MAX_SHARDS_PER_NODE,
RestBulkAction.INCREMENTAL_BULK,
IncrementalBulkService.INCREMENTAL_BULK,
RecoverySettings.INDICES_RECOVERY_MAX_BYTES_PER_SEC_SETTING,
RecoverySettings.INDICES_RECOVERY_RETRY_DELAY_STATE_SYNC_SETTING,
RecoverySettings.INDICES_RECOVERY_RETRY_DELAY_NETWORK_SETTING,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -893,8 +893,7 @@ private void construct(
final IncrementalBulkService incrementalBulkService = new IncrementalBulkService(
client,
indexingLimits,
threadPool.getThreadContext(),
clusterService.getClusterSettings()
threadPool.getThreadContext()
);

ActionModule actionModule = new ActionModule(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.bytes.CompositeBytesReference;
import org.elasticsearch.common.bytes.ReleasableBytesReference;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.core.Releasable;
import org.elasticsearch.core.Releasables;
Expand All @@ -41,7 +40,6 @@
import java.util.Map;
import java.util.function.Supplier;

import static org.elasticsearch.common.settings.Setting.boolSetting;
import static org.elasticsearch.rest.RestRequest.Method.POST;
import static org.elasticsearch.rest.RestRequest.Method.PUT;

Expand All @@ -58,12 +56,6 @@
public class RestBulkAction extends BaseRestHandler {

public static final String TYPES_DEPRECATION_MESSAGE = "[types removal] Specifying types in bulk requests is deprecated.";
public static final Setting<Boolean> INCREMENTAL_BULK = boolSetting(
"rest.incremental_bulk",
true,
Setting.Property.NodeScope,
Setting.Property.Dynamic
);

private final boolean allowExplicitIndex;
private final IncrementalBulkService bulkHandler;
Expand Down Expand Up @@ -92,7 +84,7 @@ public String getName() {

@Override
public RestChannelConsumer prepareRequest(final RestRequest request, final NodeClient client) throws IOException {
if (bulkHandler.incrementalBulkEnabled() == false) {
if (request.isStreamedContent() == false) {
if (request.getRestApiVersion() == RestApiVersion.V_7 && request.hasParam("type")) {
request.param("type");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ public void bulk(BulkRequest request, ActionListener<BulkResponse> listener) {
params.put("pipeline", "timestamps");
new RestBulkAction(
settings(IndexVersion.current()).build(),
new IncrementalBulkService(mock(Client.class), mock(IndexingPressure.class), new ThreadContext(Settings.EMPTY), () -> false)
new IncrementalBulkService(mock(Client.class), mock(IndexingPressure.class), new ThreadContext(Settings.EMPTY))
).handleRequest(
new FakeRestRequest.Builder(xContentRegistry()).withPath("my_index/_bulk").withParams(params).withContent(new BytesArray("""
{"index":{"_id":"1"}}
Expand Down Expand Up @@ -101,12 +101,7 @@ public void bulk(BulkRequest request, ActionListener<BulkResponse> listener) {
{
new RestBulkAction(
settings(IndexVersion.current()).build(),
new IncrementalBulkService(
mock(Client.class),
mock(IndexingPressure.class),
new ThreadContext(Settings.EMPTY),
() -> false
)
new IncrementalBulkService(mock(Client.class), mock(IndexingPressure.class), new ThreadContext(Settings.EMPTY))
).handleRequest(
new FakeRestRequest.Builder(xContentRegistry()).withPath("my_index/_bulk")
.withParams(params)
Expand All @@ -130,12 +125,7 @@ public void bulk(BulkRequest request, ActionListener<BulkResponse> listener) {
bulkCalled.set(false);
new RestBulkAction(
settings(IndexVersion.current()).build(),
new IncrementalBulkService(
mock(Client.class),
mock(IndexingPressure.class),
new ThreadContext(Settings.EMPTY),
() -> false
)
new IncrementalBulkService(mock(Client.class), mock(IndexingPressure.class), new ThreadContext(Settings.EMPTY))
).handleRequest(
new FakeRestRequest.Builder(xContentRegistry()).withPath("my_index/_bulk")
.withParams(params)
Expand All @@ -158,12 +148,7 @@ public void bulk(BulkRequest request, ActionListener<BulkResponse> listener) {
bulkCalled.set(false);
new RestBulkAction(
settings(IndexVersion.current()).build(),
new IncrementalBulkService(
mock(Client.class),
mock(IndexingPressure.class),
new ThreadContext(Settings.EMPTY),
() -> false
)
new IncrementalBulkService(mock(Client.class), mock(IndexingPressure.class), new ThreadContext(Settings.EMPTY))
).handleRequest(
new FakeRestRequest.Builder(xContentRegistry()).withPath("my_index/_bulk")
.withParams(params)
Expand All @@ -187,12 +172,7 @@ public void bulk(BulkRequest request, ActionListener<BulkResponse> listener) {
bulkCalled.set(false);
new RestBulkAction(
settings(IndexVersion.current()).build(),
new IncrementalBulkService(
mock(Client.class),
mock(IndexingPressure.class),
new ThreadContext(Settings.EMPTY),
() -> false
)
new IncrementalBulkService(mock(Client.class), mock(IndexingPressure.class), new ThreadContext(Settings.EMPTY))
).handleRequest(
new FakeRestRequest.Builder(xContentRegistry()).withPath("my_index/_bulk")
.withParams(params)
Expand Down