Skip to content

Commit

Permalink
Improve limit handling in StringDecoder
Browse files Browse the repository at this point in the history
The case of one data buffer containing multiple lines can could cause
a buffer leak due to a suspected issue in concatMapIterable. This
commit adds workarounds for that until the underlying issue is
addressed.

Closes gh-24346
  • Loading branch information
rstoyanchev committed Jan 13, 2020
1 parent 04b3f5a commit 9969cb6
Show file tree
Hide file tree
Showing 2 changed files with 92 additions and 39 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2019 the original author or authors.
* Copyright 2002-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -95,17 +95,44 @@ public Flux<String> decode(Publisher<DataBuffer> input, ResolvableType elementTy

List<byte[]> delimiterBytes = getDelimiterBytes(mimeType);

// TODO: Drop Consumer and use bufferUntil with Supplier<LimistedDataBufferList> (reactor-core#1925)
// TODO: Drop doOnDiscard(LimitedDataBufferList.class, ...) (reactor-core#1924)
LimitedDataBufferConsumer limiter = new LimitedDataBufferConsumer(getMaxInMemorySize());
Flux<DataBuffer> inputFlux = Flux.defer(() -> {
if (getMaxInMemorySize() != -1) {

Flux<DataBuffer> inputFlux = Flux.from(input)
.flatMapIterable(buffer -> splitOnDelimiter(buffer, delimiterBytes))
.doOnNext(limiter)
.bufferUntil(buffer -> buffer == END_FRAME)
.map(StringDecoder::joinUntilEndFrame)
.doOnDiscard(LimitedDataBufferList.class, LimitedDataBufferList::releaseAndClear)
.doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release);
// Passing limiter into endFrameAfterDelimiter helps to ensure that in case of one DataBuffer
// containing multiple lines, the limit is checked and raised immediately without accumulating
// subsequent lines. This is necessary because concatMapIterable doesn't respect doOnDiscard.
// When reactor-core#1925 is resolved, we could replace bufferUntil with:

// .windowUntil(buffer -> buffer instanceof EndFrameBuffer)
// .concatMap(fluxes -> fluxes.collect(() -> new LimitedDataBufferList(getMaxInMemorySize()), LimitedDataBufferList::add))

LimitedDataBufferList limiter = new LimitedDataBufferList(getMaxInMemorySize());

return Flux.from(input)
.concatMapIterable(buffer -> splitOnDelimiter(buffer, delimiterBytes, limiter))
.bufferUntil(buffer -> buffer == END_FRAME)
.map(StringDecoder::joinUntilEndFrame)
.doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release);
}
else {

// When the decoder is unlimited (-1), concatMapIterable will cache buffers that may not
// be released if cancel is signalled before they are turned into String lines
// (see test maxInMemoryLimitReleasesUnprocessedLinesWhenUnlimited).
// When reactor-core#1925 is resolved, the workaround can be removed and the entire
// else clause possibly dropped.

ConcatMapIterableDiscardWorkaroundCache cache = new ConcatMapIterableDiscardWorkaroundCache();

return Flux.from(input)
.concatMapIterable(buffer -> cache.addAll(splitOnDelimiter(buffer, delimiterBytes, null)))
.doOnNext(cache)
.doOnCancel(cache)
.bufferUntil(buffer -> buffer == END_FRAME)
.map(StringDecoder::joinUntilEndFrame)
.doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release);
}
});

return super.decode(inputFlux, elementType, mimeType, hints);
}
Expand All @@ -125,7 +152,9 @@ private List<byte[]> getDelimiterBytes(@Nullable MimeType mimeType) {
* Split the given data buffer on delimiter boundaries.
* The returned Flux contains an {@link #END_FRAME} buffer after each delimiter.
*/
private List<DataBuffer> splitOnDelimiter(DataBuffer buffer, List<byte[]> delimiterBytes) {
private List<DataBuffer> splitOnDelimiter(
DataBuffer buffer, List<byte[]> delimiterBytes, @Nullable LimitedDataBufferList limiter) {

List<DataBuffer> frames = new ArrayList<>();
try {
do {
Expand All @@ -147,15 +176,28 @@ private List<DataBuffer> splitOnDelimiter(DataBuffer buffer, List<byte[]> delimi
buffer.readPosition(readPosition + length + matchingDelimiter.length);
frames.add(DataBufferUtils.retain(frame));
frames.add(END_FRAME);
if (limiter != null) {
limiter.add(frame); // enforce the limit
limiter.clear();
}
}
else {
frame = buffer.slice(readPosition, buffer.readableByteCount());
buffer.readPosition(readPosition + buffer.readableByteCount());
frames.add(DataBufferUtils.retain(frame));
if (limiter != null) {
limiter.add(frame);
}
}
}
while (buffer.readableByteCount() > 0);
}
catch (DataBufferLimitException ex) {
if (limiter != null) {
limiter.releaseAndClear();
}
throw ex;
}
catch (Throwable ex) {
for (DataBuffer frame : frames) {
DataBufferUtils.release(frame);
Expand Down Expand Up @@ -293,34 +335,32 @@ public static StringDecoder allMimeTypes(List<String> delimiters, boolean stripD
}


/**
* Temporary measure for reactor-core#1925.
* Consumer that adds to a {@link LimitedDataBufferList} to enforce limits.
*/
private static class LimitedDataBufferConsumer implements Consumer<DataBuffer> {
private class ConcatMapIterableDiscardWorkaroundCache implements Consumer<DataBuffer>, Runnable {

private final LimitedDataBufferList bufferList;
private final List<DataBuffer> buffers = new ArrayList<>();


public LimitedDataBufferConsumer(int maxInMemorySize) {
this.bufferList = new LimitedDataBufferList(maxInMemorySize);
public List<DataBuffer> addAll(List<DataBuffer> buffersToAdd) {
this.buffers.addAll(buffersToAdd);
return buffersToAdd;
}

@Override
public void accept(DataBuffer dataBuffer) {
this.buffers.remove(dataBuffer);
}

@Override
public void accept(DataBuffer buffer) {
if (buffer == END_FRAME) {
this.bufferList.clear();
}
else {
public void run() {
this.buffers.forEach(buffer -> {
try {
this.bufferList.add(buffer);
}
catch (DataBufferLimitException ex) {
DataBufferUtils.release(buffer);
throw ex;
}
}
catch (Throwable ex) {
// Keep going..
}
});
}
}

}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2019 the original author or authors.
* Copyright 2002-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -128,17 +128,30 @@ public void decodeNewLine() {
}

@Test
public void decodeNewLineWithLimit() {
public void maxInMemoryLimit() {
Flux<DataBuffer> input = Flux.just(
stringBuffer("abc\n"),
stringBuffer("defg\n"),
stringBuffer("hijkl\n")
);
this.decoder.setMaxInMemorySize(4);
stringBuffer("abc\n"), stringBuffer("defg\n"), stringBuffer("hijkl\n"));

this.decoder.setMaxInMemorySize(4);
testDecode(input, String.class, step ->
step.expectNext("abc", "defg")
.verifyError(DataBufferLimitException.class));
step.expectNext("abc", "defg").verifyError(DataBufferLimitException.class));
}

@Test // gh-24312
public void maxInMemoryLimitReleaseUnprocessedLinesFromCurrentBuffer() {
Flux<DataBuffer> input = Flux.just(
stringBuffer("TOO MUCH DATA\nanother line\n\nand another\n"));

this.decoder.setMaxInMemorySize(5);
testDecode(input, String.class, step -> step.verifyError(DataBufferLimitException.class));
}

@Test // gh-24339
public void maxInMemoryLimitReleaseUnprocessedLinesWhenUnlimited() {
Flux<DataBuffer> input = Flux.just(stringBuffer("Line 1\nLine 2\nLine 3\n"));

this.decoder.setMaxInMemorySize(-1);
testDecodeCancel(input, ResolvableType.forClass(String.class), null, Collections.emptyMap());
}

@Test
Expand Down

0 comments on commit 9969cb6

Please sign in to comment.