Skip to content

Commit d875dc2

Browse files
authored
Avoid stack of StepListeners in SharedBlobCacheService (#94393)
Today `CacheService.CacheFile#populateAndRead` constructs a stack of `StepListener` objects, one per region, which chain their results together via `Math::addExact`. There's no need to do this recursively, we can just accumulate the results in an `AtomicInteger` and return its value when all the reads are complete.
1 parent 5010402 commit d875dc2

File tree

4 files changed

+67
-49
lines changed

4 files changed

+67
-49
lines changed

server/src/main/java/org/elasticsearch/action/support/RefCountingListener.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -167,7 +167,8 @@ public String toString() {
167167

168168
/**
169169
* Acquire a reference to this object and return a listener which consumes a response and releases the reference. The delegate {@link
170-
* ActionListener} is called when all its references have been released. The consumer must not throw any exception.
170+
* ActionListener} is called when all its references have been released. If the consumer throws an exception, the exception is passed
171+
* to the final listener as if the returned listener was completed exceptionally.
171172
*
172173
* It is invalid to call this method once all references are released. Doing so will trip an assertion if assertions are enabled, and
173174
* will throw an {@link IllegalStateException} otherwise.
@@ -189,8 +190,7 @@ public void onResponse(Response response) {
189190
acquiredConsumer.accept(response);
190191
}
191192
} catch (Exception e) {
192-
assert false : e;
193-
throw e;
193+
addException(e);
194194
}
195195
}
196196

server/src/test/java/org/elasticsearch/action/support/RefCountingListenerTests.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -182,6 +182,25 @@ public void testValidation() {
182182
}
183183
}
184184

185+
public void testConsumerFailure() {
186+
final var executed = new AtomicBoolean();
187+
try (var refs = new RefCountingListener(new ActionListener<Void>() {
188+
@Override
189+
public void onResponse(Void unused) {
190+
fail("unexpected success");
191+
}
192+
193+
@Override
194+
public void onFailure(Exception e) {
195+
assertThat(e.getMessage(), equalTo("simulated"));
196+
executed.set(true);
197+
}
198+
})) {
199+
refs.acquire(ignored -> { throw new ElasticsearchException("simulated"); }).onResponse(null);
200+
}
201+
assertTrue(executed.get());
202+
}
203+
185204
public void testJavaDocExample() {
186205
final var flag = new AtomicBoolean();
187206
runExample(ActionListener.running(() -> assertTrue(flag.compareAndSet(false, true))));

x-pack/plugin/blob-cache/src/main/java/org/elasticsearch/blobcache/shared/SharedBlobCacheService.java

Lines changed: 38 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,8 @@
1212
import org.apache.lucene.store.AlreadyClosedException;
1313
import org.elasticsearch.Assertions;
1414
import org.elasticsearch.action.ActionListener;
15-
import org.elasticsearch.action.StepListener;
15+
import org.elasticsearch.action.support.PlainActionFuture;
16+
import org.elasticsearch.action.support.RefCountingListener;
1617
import org.elasticsearch.blobcache.BlobCacheUtils;
1718
import org.elasticsearch.blobcache.common.ByteRange;
1819
import org.elasticsearch.blobcache.common.SparseFileTracker;
@@ -50,6 +51,7 @@
5051
import java.util.concurrent.ConcurrentLinkedQueue;
5152
import java.util.concurrent.Executor;
5253
import java.util.concurrent.atomic.AtomicBoolean;
54+
import java.util.concurrent.atomic.AtomicInteger;
5355
import java.util.concurrent.atomic.AtomicReference;
5456
import java.util.concurrent.atomic.LongAdder;
5557
import java.util.function.LongConsumer;
@@ -703,26 +705,31 @@ private static void throwAlreadyEvicted() {
703705
throw new AlreadyClosedException("File chunk is evicted");
704706
}
705707

706-
StepListener<Integer> populateAndRead(
708+
void populateAndRead(
707709
final ByteRange rangeToWrite,
708710
final ByteRange rangeToRead,
709711
final RangeAvailableHandler reader,
710712
final RangeMissingHandler writer,
711-
final Executor executor
713+
final Executor executor,
714+
final ActionListener<Integer> listener
712715
) {
713716
assert rangeToRead.length() > 0;
714-
final StepListener<Integer> listener = new StepListener<>();
715-
Releasable decrementRef = null;
717+
final Releasable[] resources = new Releasable[2];
716718
try {
717719
ensureOpen();
718720
incRef();
719-
decrementRef = Releasables.releaseOnce(this::decRef);
721+
resources[1] = Releasables.releaseOnce(this::decRef);
722+
720723
ensureOpen();
721-
Releasable finalDecrementRef = decrementRef;
722-
listener.whenComplete(integer -> finalDecrementRef.close(), throwable -> finalDecrementRef.close());
723724
final SharedBytes.IO fileChannel = sharedBytes.getFileChannel(sharedBytesPos);
724-
listener.whenComplete(integer -> fileChannel.decRef(), e -> fileChannel.decRef());
725-
final ActionListener<Void> rangeListener = rangeListener(rangeToRead, reader, listener, fileChannel);
725+
resources[0] = Releasables.releaseOnce(fileChannel::decRef);
726+
727+
final ActionListener<Void> rangeListener = rangeListener(
728+
rangeToRead,
729+
reader,
730+
ActionListener.runBefore(listener, () -> Releasables.close(resources)),
731+
fileChannel
732+
);
726733
final List<SparseFileTracker.Gap> gaps = tracker.waitForRange(rangeToWrite, rangeToRead, rangeListener);
727734

728735
for (SparseFileTracker.Gap gap : gaps) {
@@ -758,9 +765,8 @@ public void onFailure(Exception e) {
758765
});
759766
}
760767
} catch (Exception e) {
761-
releaseAndFail(listener, decrementRef, e);
768+
releaseAndFail(listener, Releasables.wrap(resources), e);
762769
}
763-
return listener;
764770
}
765771

766772
private ActionListener<Void> rangeListener(
@@ -831,48 +837,35 @@ public int populateAndRead(
831837
final RangeMissingHandler writer,
832838
final String executor
833839
) throws Exception {
834-
StepListener<Integer> stepListener = null;
835-
final long writeStart = rangeToWrite.start();
836-
final long readStart = rangeToRead.start();
837-
for (int region = getRegion(rangeToWrite.start()); region <= getEndingRegion(rangeToWrite.end()); region++) {
838-
final ByteRange subRangeToWrite = mapSubRangeToRegion(rangeToWrite, region);
839-
final ByteRange subRangeToRead = mapSubRangeToRegion(rangeToRead, region);
840-
if (subRangeToRead.length() == 0L) {
841-
// nothing to read, skip
842-
if (stepListener == null) {
843-
stepListener = new StepListener<>();
844-
stepListener.onResponse(0);
840+
final PlainActionFuture<Void> readsComplete = new PlainActionFuture<>();
841+
final AtomicInteger bytesRead = new AtomicInteger();
842+
try (var listeners = new RefCountingListener(1, readsComplete)) {
843+
final long writeStart = rangeToWrite.start();
844+
final long readStart = rangeToRead.start();
845+
for (int region = getRegion(rangeToWrite.start()); region <= getEndingRegion(rangeToWrite.end()); region++) {
846+
final ByteRange subRangeToWrite = mapSubRangeToRegion(rangeToWrite, region);
847+
final ByteRange subRangeToRead = mapSubRangeToRegion(rangeToRead, region);
848+
if (subRangeToRead.length() == 0L) {
849+
// nothing to read, skip
850+
continue;
845851
}
846-
continue;
847-
}
848-
final CacheFileRegion fileRegion = get(cacheKey, length, region);
849-
final long regionStart = getRegionStart(region);
850-
final long writeOffset = writeStart - regionStart;
851-
final long readOffset = readStart - regionStart;
852-
final StepListener<Integer> lis = fileRegion.populateAndRead(
853-
subRangeToWrite,
854-
subRangeToRead,
855-
(channel, channelPos, relativePos, len) -> {
852+
final CacheFileRegion fileRegion = get(cacheKey, length, region);
853+
final long regionStart = getRegionStart(region);
854+
final long writeOffset = writeStart - regionStart;
855+
final long readOffset = readStart - regionStart;
856+
fileRegion.populateAndRead(subRangeToWrite, subRangeToRead, (channel, channelPos, relativePos, len) -> {
856857
assert regionOwners[fileRegion.sharedBytesPos].get() == fileRegion;
857858
assert channelPos >= fileRegion.physicalStartOffset() && channelPos + len <= fileRegion.physicalEndOffset();
858859
return reader.onRangeAvailable(channel, channelPos, relativePos - readOffset, len);
859-
},
860-
(channel, channelPos, relativePos, len, progressUpdater) -> {
860+
}, (channel, channelPos, relativePos, len, progressUpdater) -> {
861861
assert regionOwners[fileRegion.sharedBytesPos].get() == fileRegion;
862862
assert channelPos >= fileRegion.physicalStartOffset() && channelPos + len <= fileRegion.physicalEndOffset();
863863
writer.fillCacheRange(channel, channelPos, relativePos - writeOffset, len, progressUpdater);
864-
},
865-
threadPool.executor(executor)
866-
);
867-
assert lis != null;
868-
if (stepListener == null) {
869-
stepListener = lis;
870-
} else {
871-
stepListener = stepListener.thenCombine(lis, Math::addExact);
864+
}, threadPool.executor(executor), listeners.acquire(i -> bytesRead.updateAndGet(j -> Math.addExact(i, j))));
872865
}
873-
874866
}
875-
return stepListener.asFuture().get();
867+
readsComplete.get();
868+
return bytesRead.get();
876869
}
877870

878871
@Override

x-pack/plugin/blob-cache/src/test/java/org/elasticsearch/blobcache/shared/SharedBlobCacheServiceTests.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77

88
package org.elasticsearch.blobcache.shared;
99

10+
import org.elasticsearch.action.support.PlainActionFuture;
1011
import org.elasticsearch.blobcache.common.ByteRange;
1112
import org.elasticsearch.cluster.node.DiscoveryNodeRole;
1213
import org.elasticsearch.common.settings.Setting;
@@ -71,20 +72,25 @@ public void testBasicEviction() throws IOException {
7172
assertEquals(3, cacheService.freeRegionCount());
7273
assertFalse(region1.tryEvict());
7374
assertEquals(3, cacheService.freeRegionCount());
75+
final var bytesReadFuture = new PlainActionFuture<Integer>();
7476
region0.populateAndRead(
7577
ByteRange.of(0L, 1L),
7678
ByteRange.of(0L, 1L),
7779
(channel, channelPos, relativePos, length) -> 1,
7880
(channel, channelPos, relativePos, length, progressUpdater) -> progressUpdater.accept(length),
79-
taskQueue.getThreadPool().executor(ThreadPool.Names.GENERIC)
81+
taskQueue.getThreadPool().executor(ThreadPool.Names.GENERIC),
82+
bytesReadFuture
8083
);
8184
assertFalse(region0.tryEvict());
8285
assertEquals(3, cacheService.freeRegionCount());
86+
assertFalse(bytesReadFuture.isDone());
8387
taskQueue.runAllRunnableTasks();
8488
assertTrue(region0.tryEvict());
8589
assertEquals(4, cacheService.freeRegionCount());
8690
assertTrue(region2.tryEvict());
8791
assertEquals(5, cacheService.freeRegionCount());
92+
assertTrue(bytesReadFuture.isDone());
93+
assertEquals(Integer.valueOf(1), bytesReadFuture.actionGet());
8894
}
8995
}
9096

0 commit comments

Comments
 (0)