Skip to content

Commit

Permalink
Limits on input stream in codecs
Browse files Browse the repository at this point in the history
- Add maxInMemorySize property to Decoder and HttpMessageReader
  implementations that aggregate input to trigger
  DataBufferLimitException when reached.

- For codecs that call DataBufferUtils#join, there is now an overloaded
  variant with a maxInMemorySize extra argument. Internally, a custom
  LimitedDataBufferList is used to count and enforce the limit.

- Jackson2Tokenizer and XmlEventDecoder support those limits per
  streamed JSON object.

See gh-23884
  • Loading branch information
rstoyanchev committed Oct 29, 2019
1 parent ce0b012 commit 89d053d
Show file tree
Hide file tree
Showing 16 changed files with 672 additions and 68 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -48,12 +48,39 @@
@SuppressWarnings("deprecation")
public abstract class AbstractDataBufferDecoder<T> extends AbstractDecoder<T> {

private int maxInMemorySize = 256 * 1024;


protected AbstractDataBufferDecoder(MimeType... supportedMimeTypes) {
super(supportedMimeTypes);
}


/**
* Configure a limit on the number of bytes that can be buffered whenever
* the input stream needs to be aggregated. This can be a result of
* decoding to a single {@code DataBuffer},
* {@link java.nio.ByteBuffer ByteBuffer}, {@code byte[]},
* {@link org.springframework.core.io.Resource Resource}, {@code String}, etc.
* It can also occur when splitting the input stream, e.g. delimited text,
* in which case the limit applies to data buffered between delimiters.
* <p>By default this is set to 256K.
* @param byteCount the max number of bytes to buffer, or -1 for unlimited
* @since 5.1.11
*/
public void setMaxInMemorySize(int byteCount) {
this.maxInMemorySize = byteCount;
}

/**
* Return the {@link #setMaxInMemorySize configured} byte count limit.
* @since 5.1.11
*/
public int getMaxInMemorySize() {
return this.maxInMemorySize;
}


@Override
public Flux<T> decode(Publisher<DataBuffer> input, ResolvableType elementType,
@Nullable MimeType mimeType, @Nullable Map<String, Object> hints) {
Expand All @@ -65,7 +92,7 @@ public Flux<T> decode(Publisher<DataBuffer> input, ResolvableType elementType,
public Mono<T> decodeToMono(Publisher<DataBuffer> input, ResolvableType elementType,
@Nullable MimeType mimeType, @Nullable Map<String, Object> hints) {

return DataBufferUtils.join(input)
return DataBufferUtils.join(input, this.maxInMemorySize)
.map(buffer -> decodeDataBuffer(buffer, elementType, mimeType, hints));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,18 @@
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.function.Consumer;

import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;

import org.springframework.core.ResolvableType;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferLimitException;
import org.springframework.core.io.buffer.DataBufferUtils;
import org.springframework.core.io.buffer.DataBufferWrapper;
import org.springframework.core.io.buffer.DefaultDataBufferFactory;
import org.springframework.core.io.buffer.LimitedDataBufferList;
import org.springframework.core.io.buffer.PooledDataBuffer;
import org.springframework.core.log.LogFormatUtils;
import org.springframework.lang.Nullable;
Expand Down Expand Up @@ -91,12 +94,18 @@ public Flux<String> decode(Publisher<DataBuffer> input, ResolvableType elementTy

byte[][] delimiterBytes = getDelimiterBytes(mimeType);

// TODO: Drop Consumer and use bufferUntil with Supplier<LimistedDataBufferList> (reactor-core#1925)
// TODO: Drop doOnDiscard(LimitedDataBufferList.class, ...) (reactor-core#1924)
LimitedDataBufferConsumer limiter = new LimitedDataBufferConsumer(getMaxInMemorySize());

Flux<DataBuffer> inputFlux = Flux.defer(() -> {
DataBufferUtils.Matcher matcher = DataBufferUtils.matcher(delimiterBytes);
return Flux.from(input)
.concatMapIterable(buffer -> endFrameAfterDelimiter(buffer, matcher))
.doOnNext(limiter)
.bufferUntil(buffer -> buffer instanceof EndFrameBuffer)
.map(buffers -> joinAndStrip(buffers, this.stripDelimiter))
.doOnDiscard(LimitedDataBufferList.class, LimitedDataBufferList::releaseAndClear)
.doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release);

});
Expand Down Expand Up @@ -279,4 +288,34 @@ public byte[] delimiter() {
}


/**
* Temporary measure for reactor-core#1925.
* Consumer that adds to a {@link LimitedDataBufferList} to enforce limits.
*/
private static class LimitedDataBufferConsumer implements Consumer<DataBuffer> {

private final LimitedDataBufferList bufferList;


public LimitedDataBufferConsumer(int maxInMemorySize) {
this.bufferList = new LimitedDataBufferList(maxInMemorySize);
}


@Override
public void accept(DataBuffer buffer) {
if (buffer instanceof EndFrameBuffer) {
this.bufferList.clear();
}
else {
try {
this.bufferList.add(buffer);
}
catch (DataBufferLimitException ex) {
DataBufferUtils.release(buffer);
throw ex;
}
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Copyright 2002-2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.core.io.buffer;

/**
* Exception that indicates the cumulative number of bytes consumed from a
* stream of {@link DataBuffer DataBuffer}'s exceeded some pre-configured limit.
* This can be raised when data buffers are cached and aggregated, e.g.
* {@link DataBufferUtils#join}. Or it could also be raised when data buffers
* have been released but a parsed representation is being aggregated, e.g. async
* parsing with Jackson.
*
* @author Rossen Stoyanchev
* @since 5.1.11
*/
@SuppressWarnings("serial")
public class DataBufferLimitException extends IllegalStateException {


public DataBufferLimitException(String message) {
super(message);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -525,16 +525,35 @@ public static Consumer<DataBuffer> releaseConsumer() {
*/
@SuppressWarnings("unchecked")
public static Mono<DataBuffer> join(Publisher<? extends DataBuffer> dataBuffers) {
Assert.notNull(dataBuffers, "'dataBuffers' must not be null");
return join(dataBuffers, -1);
}

/**
* Variant of {@link #join(Publisher)} that behaves the same way up until
* the specified max number of bytes to buffer. Once the limit is exceeded,
* {@link DataBufferLimitException} is raised.
* @param buffers the data buffers that are to be composed
* @param maxByteCount the max number of bytes to buffer, or -1 for unlimited
* @return a buffer with the aggregated content, possibly an empty Mono if
* the max number of bytes to buffer is exceeded.
* @throws DataBufferLimitException if maxByteCount is exceeded
* @since 5.1.11
*/
@SuppressWarnings("unchecked")
public static Mono<DataBuffer> join(Publisher<? extends DataBuffer> buffers, int maxByteCount) {
Assert.notNull(buffers, "'dataBuffers' must not be null");

if (dataBuffers instanceof Mono) {
return (Mono<DataBuffer>) dataBuffers;
if (buffers instanceof Mono) {
return (Mono<DataBuffer>) buffers;
}

return Flux.from(dataBuffers)
.collectList()
// TODO: Drop doOnDiscard(LimitedDataBufferList.class, ...) (reactor-core#1924)

return Flux.from(buffers)
.collect(() -> new LimitedDataBufferList(maxByteCount), LimitedDataBufferList::add)
.filter(list -> !list.isEmpty())
.map(list -> list.get(0).factory().join(list))
.doOnDiscard(LimitedDataBufferList.class, LimitedDataBufferList::releaseAndClear)
.doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
/*
* Copyright 2002-2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.core.io.buffer;

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.function.Predicate;

import reactor.core.publisher.Flux;

/**
* Custom {@link List} to collect data buffers with and enforce a
* limit on the total number of bytes buffered. For use with "collect" or
* other buffering operators in declarative APIs, e.g. {@link Flux}.
*
* <p>Adding elements increases the byte count and if the limit is exceeded,
* {@link DataBufferLimitException} is raised. {@link #clear()} resets the
* count. Remove and set are not supported.
*
* <p><strong>Note:</strong> This class does not automatically release the
* buffers it contains. It is usually preferable to use hooks such as
* {@link Flux#doOnDiscard} that also take care of cancel and error signals,
* or otherwise {@link #releaseAndClear()} can be used.
*
* @author Rossen Stoyanchev
* @since 5.1.11
*/
@SuppressWarnings("serial")
public class LimitedDataBufferList extends ArrayList<DataBuffer> {

private final int maxByteCount;

private int byteCount;


public LimitedDataBufferList(int maxByteCount) {
this.maxByteCount = maxByteCount;
}


@Override
public boolean add(DataBuffer buffer) {
boolean result = super.add(buffer);
if (result) {
updateCount(buffer.readableByteCount());
}
return result;
}

@Override
public void add(int index, DataBuffer buffer) {
super.add(index, buffer);
updateCount(buffer.readableByteCount());
}

@Override
public boolean addAll(Collection<? extends DataBuffer> collection) {
boolean result = super.addAll(collection);
collection.forEach(buffer -> updateCount(buffer.readableByteCount()));
return result;
}

@Override
public boolean addAll(int index, Collection<? extends DataBuffer> collection) {
boolean result = super.addAll(index, collection);
collection.forEach(buffer -> updateCount(buffer.readableByteCount()));
return result;
}

private void updateCount(int bytesToAdd) {
if (this.maxByteCount < 0) {
return;
}
if (bytesToAdd > Integer.MAX_VALUE - this.byteCount) {
raiseLimitException();
}
else {
this.byteCount += bytesToAdd;
if (this.byteCount > this.maxByteCount) {
raiseLimitException();
}
}
}

private void raiseLimitException() {
// Do not release here, it's likely down via doOnDiscard..
throw new DataBufferLimitException(
"Exceeded limit on max bytes to buffer : " + this.maxByteCount);
}

@Override
public DataBuffer remove(int index) {
throw new UnsupportedOperationException();
}

@Override
public boolean remove(Object o) {
throw new UnsupportedOperationException();
}

@Override
protected void removeRange(int fromIndex, int toIndex) {
throw new UnsupportedOperationException();
}

@Override
public boolean removeAll(Collection<?> c) {
throw new UnsupportedOperationException();
}

@Override
public boolean removeIf(Predicate<? super DataBuffer> filter) {
throw new UnsupportedOperationException();
}

@Override
public DataBuffer set(int index, DataBuffer element) {
throw new UnsupportedOperationException();
}

@Override
public void clear() {
this.byteCount = 0;
super.clear();
}

/**
* Shortcut to {@link DataBufferUtils#release release} all data buffers and
* then {@link #clear()}.
*/
public void releaseAndClear() {
forEach(buf -> {
try {
DataBufferUtils.release(buf);
}
catch (Throwable ex) {
// Keep going..
}
});
clear();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@

import org.springframework.core.ResolvableType;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferLimitException;
import org.springframework.util.MimeType;
import org.springframework.util.MimeTypeUtils;

Expand Down Expand Up @@ -127,6 +128,20 @@ void decodeNewLine() {
.verify());
}

@Test
void decodeNewLineWithLimit() {
Flux<DataBuffer> input = Flux.just(
stringBuffer("abc\n"),
stringBuffer("defg\n"),
stringBuffer("hijkl\n")
);
this.decoder.setMaxInMemorySize(5);

testDecode(input, String.class, step ->
step.expectNext("abc", "defg")
.verifyError(DataBufferLimitException.class));
}

@Test
void decodeNewLineIncludeDelimiters() {
this.decoder = StringDecoder.allMimeTypes(StringDecoder.DEFAULT_DELIMITERS, false);
Expand Down
Loading

0 comments on commit 89d053d

Please sign in to comment.