diff --git a/docs/changelog/134128.yaml b/docs/changelog/134128.yaml new file mode 100644 index 0000000000000..15d93618135b2 --- /dev/null +++ b/docs/changelog/134128.yaml @@ -0,0 +1,6 @@ +pr: 134128 +summary: Fix deadlock in `ThreadPoolMergeScheduler` when a failing merge closes the + `IndexWriter` +area: Engine +type: bug +issues: [] diff --git a/server/src/internalClusterTest/java/org/elasticsearch/index/engine/MergeWithFailureIT.java b/server/src/internalClusterTest/java/org/elasticsearch/index/engine/MergeWithFailureIT.java new file mode 100644 index 0000000000000..d348a1b364e07 --- /dev/null +++ b/server/src/internalClusterTest/java/org/elasticsearch/index/engine/MergeWithFailureIT.java @@ -0,0 +1,390 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.index.engine; + +import org.apache.lucene.codecs.DocValuesProducer; +import org.apache.lucene.index.CodecReader; +import org.apache.lucene.index.FieldInfo; +import org.apache.lucene.index.FilterCodecReader; +import org.apache.lucene.index.MergePolicy; +import org.apache.lucene.index.NumericDocValues; +import org.apache.lucene.index.OneMergeWrappingMergePolicy; +import org.elasticsearch.ExceptionsHelper; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.support.PlainActionFuture; +import org.elasticsearch.action.support.RefCountingListener; +import org.elasticsearch.action.support.WriteRequest; +import org.elasticsearch.cluster.ClusterChangedEvent; +import org.elasticsearch.cluster.ClusterStateListener; +import org.elasticsearch.cluster.health.ClusterHealthStatus; +import org.elasticsearch.cluster.metadata.IndexMetadata; +import org.elasticsearch.cluster.metadata.ProjectId; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.routing.IndexShardRoutingTable; +import org.elasticsearch.cluster.routing.ShardRoutingState; +import org.elasticsearch.cluster.routing.UnassignedInfo; +import org.elasticsearch.cluster.routing.allocation.decider.MaxRetryAllocationDecider; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.Priority; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.CollectionUtils; +import org.elasticsearch.index.Index; +import org.elasticsearch.index.IndexModule; +import org.elasticsearch.index.IndexSettings; +import org.elasticsearch.index.MergeSchedulerConfig; +import org.elasticsearch.index.codec.FilterDocValuesProducer; +import org.elasticsearch.index.shard.AbstractIndexShardComponent; +import org.elasticsearch.index.shard.IndexEventListener; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.indices.IndicesService; +import org.elasticsearch.plugins.EnginePlugin; +import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.plugins.PluginsService; +import org.elasticsearch.test.ESIntegTestCase; +import org.elasticsearch.threadpool.ThreadPool; + +import java.io.IOException; +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Function; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.notNullValue; + +/** + * Test a specific deadlock situation encountered when a merge throws an exception which closes the + * {@link org.apache.lucene.index.IndexWriter}, which in turn closes the {@link ThreadPoolMergeScheduler} which would wait for all merges + * to be aborted/completed in a manner that would not work if the current thread is one of the merges to be aborted. As a consequence, the + * {@link ThreadPoolMergeScheduler} is blocked indefinitely and the shard's store remains opened. + * + * This test creates one or more indices with one or more shards, then selects some of those shards and ensures each selected shard throws + * an exception during a merge. It finally checks that the shard indeed failed and that their stores were closed correctly. + */ +public class MergeWithFailureIT extends ESIntegTestCase { + + private static final String FAILING_MERGE_ON_PURPOSE = "Failing merge on purpose"; + + /** + * Test plugin that allows to fail the merges of specific shard instances + */ + public static class TestMergeFailurePlugin extends Plugin implements EnginePlugin { + + record TestMergeInstance( + ActionListener readyToFailMergeListener, + AtomicLong minMergesBeforeFailure, + ActionListener onStoreClosedListener + ) { + private boolean shouldFailMerge() { + if (minMergesBeforeFailure.getAndDecrement() == 0) { + readyToFailMergeListener.onResponse(null); + return true; + } + return false; + } + } + + // This future is completed once all the shards that are expected to fail are ready to execute the failing merge + private final PlainActionFuture allMergesReadyToFailListener = new PlainActionFuture<>(); + + // This future is completed once all the shards that are expected to fail have their store closed + private final PlainActionFuture allShardsStoresClosedListener = new PlainActionFuture<>(); + + // Map of shards that are expected to fail in the test + private final Map shardsToFail = new HashMap<>(); + + // Latch to fail the merges + private final CountDownLatch failMerges = new CountDownLatch(1); + + private final boolean isDataNode; + + public TestMergeFailurePlugin(Settings settings) { + this.isDataNode = DiscoveryNode.hasDataRole(settings); + } + + private synchronized TestMergeInstance getTestMergeInstance(ShardId shardId) { + return shardsToFail.get(shardId); + } + + private synchronized void failMergesForShards(Set shards, long minMergesBeforeFailure) { + try ( + var ready = new RefCountingListener(allMergesReadyToFailListener); + var close = new RefCountingListener(allShardsStoresClosedListener) + ) { + shards.forEach(shardId -> { + var instance = new TestMergeInstance(ready.acquire(), new AtomicLong(minMergesBeforeFailure), close.acquire()); + if (shardsToFail.put(shardId, instance) != null) { + throw new AssertionError("Shard already registered for merge failure: " + shardId); + } + }); + } + } + + private void waitForMergesReadyToFail() { + safeGet(this.allMergesReadyToFailListener); + } + + private void failMerges() { + assert this.allMergesReadyToFailListener.isDone(); + failMerges.countDown(); + } + + private void waitForClose() { + safeGet(this.allShardsStoresClosedListener); + } + + private void onShardStoreClosed(ShardId shardId) { + var candidate = getTestMergeInstance(shardId); + if (candidate != null) { + candidate.onStoreClosedListener().onResponse(null); + } + } + + @Override + public void onIndexModule(IndexModule indexModule) { + if (isDataNode) { + indexModule.addIndexEventListener(new IndexEventListener() { + @Override + public void onStoreClosed(ShardId shardId) { + onShardStoreClosed(shardId); + } + }); + } + } + + @Override + public Optional getEngineFactory(IndexSettings indexSettings) { + if (isDataNode == false) { + return Optional.of(InternalEngine::new); + } + return Optional.of( + config -> new InternalEngine( + EngineTestCase.copy( + config, + new OneMergeWrappingMergePolicy(config.getMergePolicy(), toWrap -> new MergePolicy.OneMerge(toWrap) { + @Override + public CodecReader wrapForMerge(CodecReader reader) { + var candidate = getTestMergeInstance(config.getShardId()); + if (candidate == null || candidate.shouldFailMerge() == false) { + return reader; + } + return new FilterCodecReader(reader) { + @Override + public CacheHelper getCoreCacheHelper() { + return in.getCoreCacheHelper(); + } + + @Override + public CacheHelper getReaderCacheHelper() { + return in.getReaderCacheHelper(); + } + + @Override + public DocValuesProducer getDocValuesReader() { + return new FilterDocValuesProducer(super.getDocValuesReader()) { + + final AtomicBoolean failOnce = new AtomicBoolean(false); + + @Override + public NumericDocValues getNumeric(FieldInfo field) throws IOException { + if (failOnce.compareAndSet(false, true)) { + safeAwait(failMerges); + throw new IOException(FAILING_MERGE_ON_PURPOSE); + } + return super.getNumeric(field); + } + }; + } + }; + } + }) + ) + ) + ); + } + } + + @Override + protected boolean addMockInternalEngine() { + return false; + } + + @Override + protected Collection> nodePlugins() { + return CollectionUtils.appendToCopy(super.nodePlugins(), TestMergeFailurePlugin.class); + } + + public void testFailedMergeDoesNotPreventShardFromClosing() throws Exception { + internalCluster().startMasterOnlyNode(); + final var dataNode = internalCluster().startDataOnlyNode( + // test works with concurrent and pooled merge scheduler + Settings.builder().put(ThreadPoolMergeScheduler.USE_THREAD_POOL_MERGE_SCHEDULER_SETTING.getKey(), randomBoolean()).build() + ); + + final int nbPrimariesPerIndex = randomIntBetween(1, 3); + + final var indices = new String[randomIntBetween(1, 2)]; + for (int i = 0; i < indices.length; i++) { + var indexName = randomIdentifier(); + createIndex( + indexName, + indexSettings(nbPrimariesPerIndex, 0).put(IndexMetadata.INDEX_ROUTING_INCLUDE_GROUP_PREFIX + ".name", dataNode) + .put(MaxRetryAllocationDecider.SETTING_ALLOCATION_MAX_RETRY.getKey(), 1) + .build() + ); + indices[i] = indexName; + } + + final var clusterService = internalCluster().getCurrentMasterNodeInstance(ClusterService.class); + ensureGreen(indices); + + // randomly choose an index + final var indexToFail = resolveIndex(randomFrom(indices)); + + // randomly choose one or more of its shards where the plugin will fail merges + final var randomShardsIdsToFail = randomSubsetOf( + // expect failing merges to be executed concurrently as the test waits for them to be "ready to fail" + randomIntBetween(1, Math.min(nbPrimariesPerIndex, maxConcurrentMerges(dataNode, indexToFail))), + IntStream.range(0, nbPrimariesPerIndex).boxed().toList() + ); + + // capture the IndexShard instances, this will be useful to ensure the shards are closed + var indexService = internalCluster().getInstance(IndicesService.class, dataNode).indexService(indexToFail); + final var shardsToFail = randomShardsIdsToFail.stream().map(shardId -> { + var indexShard = indexService.getShardOrNull(shardId); + assertThat(indexShard, notNullValue()); + return indexShard; + }).filter(Objects::nonNull).collect(Collectors.toUnmodifiableMap(AbstractIndexShardComponent::shardId, Function.identity())); + + logger.debug("--> merges of the following shards will fail: {}", shardsToFail.keySet()); + + // sometimes allow some merges to run before triggering the merge failure + final long minMergesPerShardBeforeFailure = randomBoolean() ? randomLongBetween(1, 3) : 0L; + + // a future completed when all expected shards are unassigned due to merge failures + final var waitForShardsFailuresListener = new PlainActionFuture(); + + // the cluster state listener that completes the previous future + var allExpectedShardsAreUnassignedListener = new ClusterStateListener() { + @Override + public void clusterChanged(ClusterChangedEvent event) { + if (event.routingTableChanged()) { + var allExpectedShardsAreUnassigned = shardsToFail.keySet().stream().allMatch(shardId -> { + var routingTable = event.state().routingTable(ProjectId.DEFAULT).index(shardId.getIndex()); + return routingTable != null + && routingTable.shard(shardId.id()).primaryShard().unassigned() + && routingTable.shard(shardId.id()).primaryShard().unassignedInfo().failure() != null; + }); + if (allExpectedShardsAreUnassigned) { + clusterService.removeListener(this); + waitForShardsFailuresListener.onResponse(null); + } + } + } + }; + + // now instruct on which shards the merges must fail + var plugin = getTestMergeFailurePlugin(dataNode); + plugin.failMergesForShards(shardsToFail.keySet(), minMergesPerShardBeforeFailure); + + // add the cluster state listener + clusterService.addListener(allExpectedShardsAreUnassignedListener); + + // create many new segments on every index, enough to trigger merges on every shard + final var createSegments = new AtomicBoolean(true); + final var createSegmentsThread = new Thread(() -> { + while (createSegments.get()) { + var client = client(); + for (int request = 0; request < 10; request++) { + var bulkRequest = client.prepareBulk().setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); + for (var index : indices) { + for (int doc = 0; doc < 10; doc++) { + bulkRequest.add(client.prepareIndex(index).setCreate(true).setSource("value", randomIntBetween(0, 1024))); + } + } + bulkRequest.get(); + } + safeSleep(randomLongBetween(50, 200L)); + } + }); + createSegmentsThread.start(); + + // wait for the merges to start on the shards that are expected to fail + plugin.waitForMergesReadyToFail(); + + // no need to create more segments + createSegments.set(false); + createSegmentsThread.join(); + + // allow merges to fail + plugin.failMerges(); + + // wait for the expected shards to be failed and unassigned + safeGet(waitForShardsFailuresListener); + ensureRed(indexToFail.getName()); + + // waits for the expected shards stores to be closed + plugin.waitForClose(); + + // check the state of every shard + var routingTable = clusterService.state().routingTable(ProjectId.DEFAULT); + for (var index : indices) { + var indexRoutingTable = routingTable.index(index); + for (int shard = 0; shard < nbPrimariesPerIndex; shard++) { + var primary = asInstanceOf(IndexShardRoutingTable.class, indexRoutingTable.shard(shard)).primaryShard(); + + if (shardsToFail.containsKey(primary.shardId())) { + assertThat(primary.state(), equalTo(ShardRoutingState.UNASSIGNED)); + assertThat(primary.unassignedInfo(), notNullValue()); + assertThat(primary.unassignedInfo().reason(), equalTo(UnassignedInfo.Reason.ALLOCATION_FAILED)); + var failure = ExceptionsHelper.unwrap(primary.unassignedInfo().failure(), IOException.class); + assertThat(failure, notNullValue()); + assertThat(failure.getMessage(), containsString(FAILING_MERGE_ON_PURPOSE)); + continue; + } + + assertThat(primary.state(), equalTo(ShardRoutingState.STARTED)); + } + } + } + + private static TestMergeFailurePlugin getTestMergeFailurePlugin(String dataNode) { + return internalCluster().getInstance(PluginsService.class, dataNode).filterPlugins(TestMergeFailurePlugin.class).findFirst().get(); + } + + private void ensureRed(String indexName) throws Exception { + assertBusy(() -> { + var healthResponse = clusterAdmin().prepareHealth(TEST_REQUEST_TIMEOUT, indexName) + .setWaitForStatus(ClusterHealthStatus.RED) + .setWaitForEvents(Priority.LANGUID) + .get(); + assertThat(healthResponse.getStatus(), equalTo(ClusterHealthStatus.RED)); + }); + } + + private static int maxConcurrentMerges(String dataNode, Index index) { + var clusterService = internalCluster().clusterService(dataNode); + var indexMetadata = clusterService.state().metadata().indexMetadata(index); + int maxMerges = MergeSchedulerConfig.MAX_THREAD_COUNT_SETTING.get(indexMetadata.getSettings()); + if (ThreadPoolMergeScheduler.USE_THREAD_POOL_MERGE_SCHEDULER_SETTING.get(clusterService.getSettings())) { + return Math.min(maxMerges, clusterService.threadPool().info(ThreadPool.Names.MERGE).getMax()); + } + return maxMerges; + } +} diff --git a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index 90c10fdf09975..197000f4d0113 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -2921,7 +2921,9 @@ LiveIndexWriterConfig getCurrentIndexWriterConfig() { } private void maybeFlushAfterMerge(OnGoingMerge merge) { - if (indexWriter.hasPendingMerges() == false && System.nanoTime() - lastWriteNanos >= engineConfig.getFlushMergesAfter().nanos()) { + if (merge.getMerge().isAborted() == false + && indexWriter.hasPendingMerges() == false + && System.nanoTime() - lastWriteNanos >= engineConfig.getFlushMergesAfter().nanos()) { // NEVER do this on a merge thread since we acquire some locks blocking here and if we concurrently rollback the // writer // we deadlock on engine#close for instance. diff --git a/server/src/main/java/org/elasticsearch/index/engine/ThreadPoolMergeScheduler.java b/server/src/main/java/org/elasticsearch/index/engine/ThreadPoolMergeScheduler.java index 7f1ad5adf3181..398124b93c225 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/ThreadPoolMergeScheduler.java +++ b/server/src/main/java/org/elasticsearch/index/engine/ThreadPoolMergeScheduler.java @@ -20,6 +20,7 @@ import org.apache.lucene.store.IOContext; import org.apache.lucene.store.IndexOutput; import org.apache.lucene.store.RateLimitedIndexOutput; +import org.apache.lucene.util.SetOnce; import org.elasticsearch.common.logging.Loggers; import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.unit.ByteSizeValue; @@ -35,9 +36,10 @@ import java.util.HashMap; import java.util.Locale; import java.util.Map; +import java.util.Objects; import java.util.PriorityQueue; import java.util.Set; -import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; @@ -76,7 +78,8 @@ public class ThreadPoolMergeScheduler extends MergeScheduler implements Elastics // how many {@link MergeTask}s have kicked off (this is used to name them). private final AtomicLong submittedMergeTaskCount = new AtomicLong(); private final AtomicLong doneMergeTaskCount = new AtomicLong(); - private final CountDownLatch closedWithNoRunningMerges = new CountDownLatch(1); + // used to know if merges are still running while we're closing the scheduler + private final Semaphore executingMergesPermits = new Semaphore(Integer.MAX_VALUE); private volatile boolean closed = false; private final MergeMemoryEstimateProvider mergeMemoryEstimateProvider; @@ -284,6 +287,7 @@ private void checkMergeTaskThrottling() { // synchronized so that {@code #closed}, {@code #runningMergeTasks} and {@code #backloggedMergeTasks} are modified atomically synchronized Schedule schedule(MergeTask mergeTask) { assert mergeTask.hasStartedRunning() == false; + assert mergeTask.hasRunningThread() == false; if (closed) { // do not run or backlog tasks when closing the merge scheduler, instead abort them return Schedule.ABORT; @@ -293,11 +297,11 @@ synchronized Schedule schedule(MergeTask mergeTask) { } return Schedule.ABORT; } else if (runningMergeTasks.size() < getMaxThreadCount()) { + mergeTask.setRunningThread(Thread.currentThread()); boolean added = runningMergeTasks.put(mergeTask.onGoingMerge.getMerge(), mergeTask) == null; assert added : "starting merge task [" + mergeTask + "] registered as already running"; return Schedule.RUN; } else { - assert mergeTask.hasStartedRunning() == false; backloggedMergeTasks.add(mergeTask); return Schedule.BACKLOG; } @@ -309,8 +313,6 @@ synchronized void mergeTaskFinishedRunning(MergeTask mergeTask) { assert removed : "completed merge task [" + mergeTask + "] not registered as running"; // when one merge is done, maybe a backlogged one can now execute enqueueBackloggedTasks(); - // signal here, because, when closing, we wait for all currently running merges to finish - maybeSignalAllMergesDoneAfterClose(); } private void mergeTaskDone(OnGoingMerge merge) { @@ -320,12 +322,6 @@ private void mergeTaskDone(OnGoingMerge merge) { checkMergeTaskThrottling(); } - private synchronized void maybeSignalAllMergesDoneAfterClose() { - if (closed && runningMergeTasks.isEmpty()) { - closedWithNoRunningMerges.countDown(); - } - } - private synchronized void enqueueBackloggedTasks() { int maxBackloggedTasksToEnqueue = getMaxThreadCount() - runningMergeTasks.size(); // enqueue all backlogged tasks when closing, as the queue expects all backlogged tasks to always be enqueued back @@ -343,13 +339,27 @@ private synchronized void enqueueBackloggedTasks() { * Does the actual merge, by calling {@link org.apache.lucene.index.MergeScheduler.MergeSource#merge} */ void doMerge(MergeSource mergeSource, MergePolicy.OneMerge oneMerge) { + boolean hasPermit = oneMerge.isAborted() == false; try { + if (hasPermit) { + executingMergesPermits.acquire(); + } mergeSource.merge(oneMerge); } catch (Throwable t) { // OK to ignore MergeAbortedException. This is what Lucene's ConcurrentMergeScheduler does. if (t instanceof MergePolicy.MergeAbortedException == false) { + assert executingMergesPermits.availablePermits() == 0; + hasPermit = false; + // A merge thread that thrown an exception that closed the IndexWriter causes other merge threads to be aborted, but it is + // not itself aborted: instead the current merge is just completed and the thrown exception is set in the package-private + // OneMerge.error field. Here we set such merge as aborted too so that it is not considered as successful later. + oneMerge.setAborted(); handleMergeException(t); } + } finally { + if (hasPermit) { + executingMergesPermits.release(); + } } } @@ -391,6 +401,11 @@ class MergeTask implements Runnable { private final boolean supportsIOThrottling; private final long mergeMemoryEstimateBytes; + /** + * Thread running the merge task + */ + private final SetOnce runningThread; + MergeTask( MergeSource mergeSource, MergePolicy.OneMerge merge, @@ -405,6 +420,7 @@ class MergeTask implements Runnable { this.rateLimiter = new MergeRateLimiter(merge.getMergeProgress()); this.supportsIOThrottling = supportsIOThrottling; this.mergeMemoryEstimateBytes = mergeMemoryEstimateBytes; + this.runningThread = new SetOnce<>(); } Schedule schedule() { @@ -432,6 +448,18 @@ public boolean hasStartedRunning() { return isRunning; } + /** + * Sets the thread running the merge task. + * @param thread the current thread running the merge task + */ + private void setRunningThread(Thread thread) { + runningThread.set(Objects.requireNonNull(thread)); + } + + private boolean hasRunningThread() { + return runningThread.get() != null; + } + /** * Runs the merge associated to this task. MUST be invoked after {@link #schedule()} returned {@link Schedule#RUN}, * to confirm that the associated {@link MergeScheduler} assents to run the merge. @@ -440,6 +468,7 @@ public boolean hasStartedRunning() { */ @Override public void run() { + assert hasRunningThread(); assert hasStartedRunning() == false; assert ThreadPoolMergeScheduler.this.runningMergeTasks.containsKey(onGoingMerge.getMerge()) : "runNowOrBacklog must be invoked before actually running the merge task"; @@ -611,14 +640,27 @@ protected void message(String message) { @Override public void close() throws IOException { synchronized (this) { + if (closed) { + return; + } closed = true; // enqueue any backlogged merge tasks, because the merge queue assumes that the backlogged tasks are always re-enqueued enqueueBackloggedTasks(); - // signal if there aren't any currently running merges - maybeSignalAllMergesDoneAfterClose(); } try { - closedWithNoRunningMerges.await(); + // in case a merge thread is calling us, release its permit + synchronized (this) { + for (var runningMergeTask : runningMergeTasks.values()) { + var mergeThread = runningMergeTask.runningThread.get(); + assert mergeThread != null : " running merge task has no owner thread"; + if (mergeThread.isAlive() && mergeThread == Thread.currentThread()) { + executingMergesPermits.release(); + break; + } + } + } + // waits for any running merge threads to finish + executingMergesPermits.acquire(Integer.MAX_VALUE); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } finally { diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index cd3365beeb96b..53f559ffc2cca 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -1356,7 +1356,11 @@ public long getWritingBytes() { if (engine == null) { return 0L; } - return engine.getWritingBytes(); + try { + return engine.getIndexBufferRAMBytesUsed(); + } catch (AlreadyClosedException ex) { + return 0L; + } }); } diff --git a/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java index 36f50f3ce958e..e552bb133add4 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java @@ -357,7 +357,7 @@ public EngineConfig copy(EngineConfig config, Analyzer analyzer) { ); } - public EngineConfig copy(EngineConfig config, MergePolicy mergePolicy) { + public static EngineConfig copy(EngineConfig config, MergePolicy mergePolicy) { return new EngineConfig( config.getShardId(), config.getThreadPool(),