Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertResponse;
import static org.elasticsearch.xcontent.XContentFactory.jsonBuilder;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.instanceOf;

public class IncrementalBulkIT extends ESIntegTestCase {
Expand All @@ -55,6 +56,14 @@ protected Collection<Class<? extends Plugin>> nodePlugins() {
return List.of(IngestClientIT.ExtendedIngestTestPlugin.class);
}

@Override
protected Settings nodeSettings(int nodeOrdinal, Settings otherSettings) {
return Settings.builder()
.put(super.nodeSettings(nodeOrdinal, otherSettings))
.put(IndexingPressure.SPLIT_BULK_THRESHOLD.getKey(), "512B")
.build();
}

public void testSingleBulkRequest() {
String index = "test";
createIndex(index);
Expand All @@ -81,6 +90,71 @@ public void testSingleBulkRequest() {
assertFalse(refCounted.hasReferences());
}

public void testIndexingPressureRejection() {
String index = "test";
createIndex(index);

String nodeName = internalCluster().getRandomNodeName();
IncrementalBulkService incrementalBulkService = internalCluster().getInstance(IncrementalBulkService.class, nodeName);
IndexingPressure indexingPressure = internalCluster().getInstance(IndexingPressure.class, nodeName);

try (Releasable r = indexingPressure.markCoordinatingOperationStarted(1, indexingPressure.stats().getMemoryLimit(), true)) {
IncrementalBulkService.Handler handler = incrementalBulkService.newBulkRequest();
AbstractRefCounted refCounted = AbstractRefCounted.of(() -> {});

if (randomBoolean()) {
AtomicBoolean nextPage = new AtomicBoolean(false);
refCounted.incRef();
handler.addItems(List.of(indexRequest(index)), refCounted::decRef, () -> nextPage.set(true));
assertTrue(nextPage.get());
}

PlainActionFuture<BulkResponse> future = new PlainActionFuture<>();
handler.lastItems(List.of(indexRequest(index)), refCounted::decRef, future);

expectThrows(EsRejectedExecutionException.class, future::actionGet);
assertFalse(refCounted.hasReferences());
}
}

public void testIncrementalBulkRequestMemoryBackOff() throws Exception {
String index = "test";
createIndex(index);

String nodeName = internalCluster().getRandomNodeName();
IncrementalBulkService incrementalBulkService = internalCluster().getInstance(IncrementalBulkService.class, nodeName);
IndexingPressure indexingPressure = internalCluster().getInstance(IndexingPressure.class, nodeName);

IncrementalBulkService.Handler handler = incrementalBulkService.newBulkRequest();

AbstractRefCounted refCounted = AbstractRefCounted.of(() -> {});
AtomicBoolean nextPage = new AtomicBoolean(false);

IndexRequest indexRequest = indexRequest(index);
long total = indexRequest.ramBytesUsed();
while (total < 512) {
refCounted.incRef();
handler.addItems(List.of(indexRequest), refCounted::decRef, () -> nextPage.set(true));
assertTrue(nextPage.get());
nextPage.set(false);
indexRequest = indexRequest(index);
total += indexRequest.ramBytesUsed();
}

assertThat(indexingPressure.stats().getCurrentCombinedCoordinatingAndPrimaryBytes(), greaterThan(0L));
refCounted.incRef();
handler.addItems(List.of(indexRequest(index)), refCounted::decRef, () -> nextPage.set(true));

assertBusy(() -> assertThat(indexingPressure.stats().getCurrentCombinedCoordinatingAndPrimaryBytes(), equalTo(0L)));

PlainActionFuture<BulkResponse> future = new PlainActionFuture<>();
handler.lastItems(List.of(indexRequest), refCounted::decRef, future);

BulkResponse bulkResponse = future.actionGet();
assertNoFailures(bulkResponse);
assertFalse(refCounted.hasReferences());
}

public void testMultipleBulkPartsWithBackoff() {
ExecutorService executorService = Executors.newFixedThreadPool(1);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -399,7 +399,7 @@ private void completeBulkOperation() {
responses.toArray(new BulkItemResponse[responses.length()]),
buildTookInMillis(startTimeNanos),
BulkResponse.NO_INGEST_TOOK,
new BulkRequest.IncrementalState(shortCircuitShardFailures)
new BulkRequest.IncrementalState(shortCircuitShardFailures, bulkRequest.incrementalState().indexingPressureAccounted())
)
);
// Allow memory for bulk shard request items to be reclaimed before all items have been completed
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -496,12 +496,12 @@ public boolean isSimulated() {
return false; // Always false, but may be overridden by a subclass
}

record IncrementalState(Map<ShardId, Exception> shardLevelFailures) implements Writeable {
record IncrementalState(Map<ShardId, Exception> shardLevelFailures, boolean indexingPressureAccounted) implements Writeable {

static final IncrementalState EMPTY = new IncrementalState(Collections.emptyMap());
static final IncrementalState EMPTY = new IncrementalState(Collections.emptyMap(), false);

IncrementalState(StreamInput in) throws IOException {
this(in.readMap(ShardId::new, input -> input.readException()));
this(in.readMap(ShardId::new, input -> input.readException()), false);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,37 +8,46 @@

package org.elasticsearch.action.bulk;

import org.apache.lucene.util.Accountable;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.support.ActiveShardCount;
import org.elasticsearch.client.internal.Client;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.Releasable;
import org.elasticsearch.core.Releasables;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.index.IndexingPressure;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;

public class IncrementalBulkService {

private final Client client;
private final IndexingPressure indexingPressure;

public IncrementalBulkService(Client client) {
public IncrementalBulkService(Client client, IndexingPressure indexingPressure) {
this.client = client;
this.indexingPressure = indexingPressure;
}

public Handler newBulkRequest() {
return newBulkRequest(null, null, null);
}

public Handler newBulkRequest(@Nullable String waitForActiveShards, @Nullable TimeValue timeout, @Nullable String refresh) {
return new Handler(client, waitForActiveShards, timeout, refresh);
return new Handler(client, indexingPressure, waitForActiveShards, timeout, refresh);
}

public static class Handler {

public static final BulkRequest.IncrementalState EMPTY_STATE = new BulkRequest.IncrementalState(Collections.emptyMap(), true);

private final Client client;
private final IndexingPressure indexingPressure;
private final ActiveShardCount waitForActiveShards;
private final TimeValue timeout;
private final String refresh;
Expand All @@ -50,12 +59,19 @@ public static class Handler {
private Exception bulkActionLevelFailure = null;
private BulkRequest bulkRequest = null;

private Handler(Client client, @Nullable String waitForActiveShards, @Nullable TimeValue timeout, @Nullable String refresh) {
private Handler(
Client client,
IndexingPressure indexingPressure,
@Nullable String waitForActiveShards,
@Nullable TimeValue timeout,
@Nullable String refresh
) {
this.client = client;
this.indexingPressure = indexingPressure;
this.waitForActiveShards = waitForActiveShards != null ? ActiveShardCount.parseString(waitForActiveShards) : null;
this.timeout = timeout;
this.refresh = refresh;
createNewBulkRequest(BulkRequest.IncrementalState.EMPTY);
createNewBulkRequest(EMPTY_STATE);
}

public void addItems(List<DocWriteRequest<?>> items, Releasable releasable, Runnable nextItems) {
Expand All @@ -64,35 +80,39 @@ public void addItems(List<DocWriteRequest<?>> items, Releasable releasable, Runn
nextItems.run();
} else {
assert bulkRequest != null;
internalAddItems(items, releasable);

if (shouldBackOff()) {
final boolean isFirstRequest = incrementalRequestSubmitted == false;
incrementalRequestSubmitted = true;

client.bulk(bulkRequest, ActionListener.runAfter(new ActionListener<>() {

@Override
public void onResponse(BulkResponse bulkResponse) {
responses.add(bulkResponse);
releaseCurrentReferences();
createNewBulkRequest(bulkResponse.getIncrementalState());
}

@Override
public void onFailure(Exception e) {
handleBulkFailure(isFirstRequest, e);
}
}, nextItems::run));
if (internalAddItems(items, releasable)) {
if (shouldBackOff()) {
final boolean isFirstRequest = incrementalRequestSubmitted == false;
incrementalRequestSubmitted = true;

client.bulk(bulkRequest, ActionListener.runAfter(new ActionListener<>() {

@Override
public void onResponse(BulkResponse bulkResponse) {
responses.add(bulkResponse);
releaseCurrentReferences();
createNewBulkRequest(
new BulkRequest.IncrementalState(bulkResponse.getIncrementalState().shardLevelFailures(), true)
);
}

@Override
public void onFailure(Exception e) {
handleBulkFailure(isFirstRequest, e);
}
}, nextItems));
} else {
nextItems.run();
}
} else {
nextItems.run();
}

}
}

private boolean shouldBackOff() {
// TODO: Implement Real Memory Logic
return bulkRequest.requests().size() >= 16;
return indexingPressure.shouldSplitBulks();
}

public void lastItems(List<DocWriteRequest<?>> items, Releasable releasable, ActionListener<BulkResponse> listener) {
Expand All @@ -101,25 +121,27 @@ public void lastItems(List<DocWriteRequest<?>> items, Releasable releasable, Act
errorResponse(listener);
} else {
assert bulkRequest != null;
internalAddItems(items, releasable);
if (internalAddItems(items, releasable)) {
client.bulk(bulkRequest, new ActionListener<>() {

client.bulk(bulkRequest, new ActionListener<>() {
private final boolean isFirstRequest = incrementalRequestSubmitted == false;

private final boolean isFirstRequest = incrementalRequestSubmitted == false;

@Override
public void onResponse(BulkResponse bulkResponse) {
responses.add(bulkResponse);
releaseCurrentReferences();
listener.onResponse(combineResponses());
}
@Override
public void onResponse(BulkResponse bulkResponse) {
responses.add(bulkResponse);
releaseCurrentReferences();
listener.onResponse(combineResponses());
}

@Override
public void onFailure(Exception e) {
handleBulkFailure(isFirstRequest, e);
errorResponse(listener);
}
});
@Override
public void onFailure(Exception e) {
handleBulkFailure(isFirstRequest, e);
errorResponse(listener);
}
});
} else {
errorResponse(listener);
}
}
}

Expand Down Expand Up @@ -159,9 +181,22 @@ private void addItemLevelFailures(List<DocWriteRequest<?>> items) {
responses.add(new BulkResponse(bulkItemResponses, 0, 0));
}

private void internalAddItems(List<DocWriteRequest<?>> items, Releasable releasable) {
bulkRequest.add(items);
releasables.add(releasable);
private boolean internalAddItems(List<DocWriteRequest<?>> items, Releasable releasable) {
try {
bulkRequest.add(items);
releasables.add(releasable);
releasables.add(
indexingPressure.markCoordinatingOperationStarted(
items.size(),
items.stream().mapToLong(Accountable::ramBytesUsed).sum(),
false
)
);
return true;
} catch (EsRejectedExecutionException e) {
handleBulkFailure(incrementalRequestSubmitted == false, e);
return false;
}
}

private void createNewBulkRequest(BulkRequest.IncrementalState incrementalState) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,12 @@ protected void doExecute(Task task, BulkRequest bulkRequest, ActionListener<Bulk
clusterService.state().metadata().getIndicesLookup(),
systemIndices
);
final Releasable releasable = indexingPressure.markCoordinatingOperationStarted(indexingOps, indexingBytes, isOnlySystem);
final Releasable releasable;
if (bulkRequest.incrementalState().indexingPressureAccounted()) {
releasable = () -> {};
} else {
releasable = indexingPressure.markCoordinatingOperationStarted(indexingOps, indexingBytes, isOnlySystem);
}
final ActionListener<BulkResponse> releasingListener = ActionListener.runBefore(listener, releasable::close);
final Executor executor = isOnlySystem ? systemWriteExecutor : writeExecutor;
ensureClusterStateThenForkAndExecute(task, bulkRequest, executor, releasingListener);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -560,6 +560,7 @@ public void apply(Settings value, Settings current, Settings previous) {
FsHealthService.REFRESH_INTERVAL_SETTING,
FsHealthService.SLOW_PATH_LOGGING_THRESHOLD_SETTING,
IndexingPressure.MAX_INDEXING_BYTES,
IndexingPressure.SPLIT_BULK_THRESHOLD,
ShardLimitValidator.SETTING_CLUSTER_MAX_SHARDS_PER_NODE_FROZEN,
DataTier.ENFORCE_DEFAULT_TIER_PREFERENCE_SETTING,
CoordinationDiagnosticsService.IDENTITY_CHANGES_THRESHOLD_SETTING,
Expand Down
12 changes: 12 additions & 0 deletions server/src/main/java/org/elasticsearch/index/IndexingPressure.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,12 @@ public class IndexingPressure {
Setting.Property.NodeScope
);

public static final Setting<ByteSizeValue> SPLIT_BULK_THRESHOLD = Setting.memorySizeSetting(
"indexing_pressure.memory.split_bulk_threshold",
"8.5%",
Setting.Property.NodeScope
);

private static final Logger logger = LogManager.getLogger(IndexingPressure.class);

private final AtomicLong currentCombinedCoordinatingAndPrimaryBytes = new AtomicLong(0);
Expand Down Expand Up @@ -56,10 +62,12 @@ public class IndexingPressure {
private final AtomicLong primaryDocumentRejections = new AtomicLong(0);

private final long primaryAndCoordinatingLimits;
private final long splitBulkThreshold;
private final long replicaLimits;

public IndexingPressure(Settings settings) {
this.primaryAndCoordinatingLimits = MAX_INDEXING_BYTES.get(settings).getBytes();
this.splitBulkThreshold = SPLIT_BULK_THRESHOLD.get(settings).getBytes();
this.replicaLimits = (long) (this.primaryAndCoordinatingLimits * 1.5);
}

Expand Down Expand Up @@ -203,6 +211,10 @@ public Releasable markReplicaOperationStarted(int operations, long bytes, boolea
});
}

public boolean shouldSplitBulks() {
return currentCombinedCoordinatingAndPrimaryBytes.get() >= splitBulkThreshold;
}

public IndexingPressureStats stats() {
return new IndexingPressureStats(
totalCombinedCoordinatingAndPrimaryBytes.get(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -965,7 +965,7 @@ record PluginServiceInstances(
);
final HttpServerTransport httpServerTransport = serviceProvider.newHttpTransport(pluginsService, networkModule);
final IndexingPressure indexingLimits = new IndexingPressure(settings);
final IncrementalBulkService incrementalBulkService = new IncrementalBulkService(client);
final IncrementalBulkService incrementalBulkService = new IncrementalBulkService(client, indexingLimits);

SnapshotsService snapshotsService = new SnapshotsService(
settings,
Expand Down
Loading