Skip to content

Commit 69ccba3

Browse files
committed
Introduce write aggregator to DataBufferUtils
As a consequence of dropping CompositeByteBuf (see prior commit), DataBuffers fluxes that are aggregated with Flux.reduce(BiFunction) are now required to be released, as the composite no longer holds a reference to subsequent data buffers. For this purpose, DataBufferUtils now has a writeAggregator that can be used with Flux.reduce, and that released the subsequent buffers properly. Issue: SPR-16351
1 parent e6893da commit 69ccba3

File tree

2 files changed

+70
-40
lines changed

2 files changed

+70
-40
lines changed

spring-core/src/main/java/org/springframework/core/io/buffer/DataBufferUtils.java

Lines changed: 32 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2017 the original author or authors.
2+
* Copyright 2002-2018 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -31,6 +31,7 @@
3131
import java.util.concurrent.atomic.AtomicBoolean;
3232
import java.util.concurrent.atomic.AtomicLong;
3333
import java.util.function.BiFunction;
34+
import java.util.function.BinaryOperator;
3435
import java.util.function.Consumer;
3536

3637
import org.reactivestreams.Publisher;
@@ -55,6 +56,13 @@ public abstract class DataBufferUtils {
5556

5657
private static final Consumer<DataBuffer> RELEASE_CONSUMER = DataBufferUtils::release;
5758

59+
private static final BinaryOperator<DataBuffer> WRITE_AGGREGATOR =
60+
(dataBuffer1, dataBuffer2) -> {
61+
DataBuffer result = dataBuffer1.write(dataBuffer2);
62+
release(dataBuffer2);
63+
return result;
64+
};
65+
5866
//---------------------------------------------------------------------
5967
// Reading
6068
//---------------------------------------------------------------------
@@ -303,6 +311,10 @@ private static void closeChannel(@Nullable Channel channel) {
303311
}
304312
}
305313

314+
//---------------------------------------------------------------------
315+
// Various
316+
//---------------------------------------------------------------------
317+
306318
/**
307319
* Relay buffers from the given {@link Publisher} until the total
308320
* {@linkplain DataBuffer#readableByteCount() byte count} reaches
@@ -335,10 +347,6 @@ public static Flux<DataBuffer> takeUntilByteCount(Publisher<DataBuffer> publishe
335347
});
336348
}
337349

338-
//---------------------------------------------------------------------
339-
// Various
340-
//---------------------------------------------------------------------
341-
342350
/**
343351
* Skip buffers from the given {@link Publisher} until the total
344352
* {@linkplain DataBuffer#readableByteCount() byte count} reaches
@@ -403,13 +411,29 @@ public static boolean release(@Nullable DataBuffer dataBuffer) {
403411
}
404412

405413
/**
406-
* Returns a consumer that calls {@link #release(DataBuffer)} on all
414+
* Return a consumer that calls {@link #release(DataBuffer)} on all
407415
* passed data buffers.
408416
*/
409417
public static Consumer<DataBuffer> releaseConsumer() {
410418
return RELEASE_CONSUMER;
411419
}
412420

421+
/**
422+
* Return an aggregator function that can be used to {@linkplain Flux#reduce(BiFunction) reduce}
423+
* a {@code Flux} of data buffers into a single data buffer by writing all subsequent buffers
424+
* into the first buffer. All buffers except the first buffer are
425+
* {@linkplain #release(DataBuffer) released}.
426+
* <p>For example:
427+
* <pre class="code">
428+
* Flux&lt;DataBuffer&gt; flux = ...
429+
* Mono&lt;DataBuffer&gt; mono = flux.reduce(DataBufferUtils.writeAggregator());
430+
* </pre>
431+
* @see Flux#reduce(BiFunction)
432+
*/
433+
public static BinaryOperator<DataBuffer> writeAggregator() {
434+
return WRITE_AGGREGATOR;
435+
}
436+
413437

414438
private static class ReadableByteChannelGenerator
415439
implements BiFunction<ReadableByteChannel, SynchronousSink<DataBuffer>, ReadableByteChannel> {
@@ -452,6 +476,7 @@ public ReadableByteChannel apply(ReadableByteChannel channel,
452476
}
453477
}
454478

479+
455480
private static class AsynchronousFileChannelReadCompletionHandler
456481
implements CompletionHandler<Integer, DataBuffer> {
457482

@@ -504,6 +529,7 @@ public void failed(Throwable exc, DataBuffer dataBuffer) {
504529
}
505530
}
506531

532+
507533
private static class AsynchronousFileChannelWriteCompletionHandler
508534
extends BaseSubscriber<DataBuffer>
509535
implements CompletionHandler<Integer, ByteBuffer> {

spring-core/src/test/java/org/springframework/core/io/buffer/DataBufferUtilsTests.java

Lines changed: 38 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2017 the original author or authors.
2+
* Copyright 2002-2018 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -24,20 +24,23 @@
2424
import java.nio.channels.FileChannel;
2525
import java.nio.channels.ReadableByteChannel;
2626
import java.nio.channels.WritableByteChannel;
27+
import java.nio.charset.StandardCharsets;
2728
import java.nio.file.Files;
2829
import java.nio.file.Path;
2930
import java.nio.file.Paths;
3031
import java.nio.file.StandardOpenOption;
3132
import java.time.Duration;
3233
import java.util.stream.Collectors;
3334

35+
import io.netty.buffer.ByteBuf;
3436
import org.junit.Test;
3537
import org.mockito.stubbing.Answer;
3638
import reactor.core.publisher.Flux;
3739
import reactor.test.StepVerifier;
3840

3941
import org.springframework.core.io.ClassPathResource;
4042
import org.springframework.core.io.Resource;
43+
import org.springframework.core.io.buffer.support.DataBufferTestUtils;
4144

4245
import static org.junit.Assert.*;
4346
import static org.mockito.ArgumentMatchers.any;
@@ -293,27 +296,50 @@ public void releaseConsumer() {
293296

294297
flux.subscribe(DataBufferUtils.releaseConsumer());
295298

296-
// AbstractDataBufferAllocatingTestCase.LeakDetector will assert the release of the buffers
299+
assertReleased(foo);
300+
assertReleased(bar);
301+
assertReleased(baz);
302+
}
303+
304+
private static void assertReleased(DataBuffer dataBuffer) {
305+
if (dataBuffer instanceof NettyDataBuffer) {
306+
ByteBuf byteBuf = ((NettyDataBuffer) dataBuffer).getNativeBuffer();
307+
assertEquals(0, byteBuf.refCnt());
308+
}
309+
}
310+
311+
@Test
312+
public void writeAggregator() {
313+
DataBuffer foo = stringBuffer("foo");
314+
DataBuffer bar = stringBuffer("bar");
315+
DataBuffer baz = stringBuffer("baz");
316+
Flux<DataBuffer> flux = Flux.just(foo, bar, baz);
317+
318+
DataBuffer result =
319+
flux.reduce(DataBufferUtils.writeAggregator()).block(Duration.ofSeconds(1));
320+
321+
assertEquals("foobarbaz", DataBufferTestUtils.dumpString(result, StandardCharsets.UTF_8));
322+
323+
release(result);
297324
}
298325

299326
@Test
300327
public void SPR16070() throws Exception {
301328
ReadableByteChannel channel = mock(ReadableByteChannel.class);
302329
when(channel.read(any()))
303-
.thenAnswer(putByte(1))
304-
.thenAnswer(putByte(2))
305-
.thenAnswer(putByte(3))
330+
.thenAnswer(putByte('a'))
331+
.thenAnswer(putByte('b'))
332+
.thenAnswer(putByte('c'))
306333
.thenReturn(-1);
307334

308335
Flux<DataBuffer> read = DataBufferUtils.read(channel, this.bufferFactory, 1);
309336

310-
StepVerifier.create(
311-
read.reduce(DataBuffer::write)
312-
.map(this::dataBufferToBytes)
313-
.map(this::encodeHexString)
314-
)
315-
.expectNext("010203")
316-
.verifyComplete();
337+
StepVerifier.create(read)
338+
.consumeNextWith(stringConsumer("a"))
339+
.consumeNextWith(stringConsumer("b"))
340+
.consumeNextWith(stringConsumer("c"))
341+
.expectComplete()
342+
.verify(Duration.ofSeconds(5));
317343

318344
}
319345

@@ -325,27 +351,5 @@ private Answer<Integer> putByte(int b) {
325351
};
326352
}
327353

328-
private byte[] dataBufferToBytes(DataBuffer buffer) {
329-
try {
330-
int byteCount = buffer.readableByteCount();
331-
byte[] bytes = new byte[byteCount];
332-
buffer.read(bytes);
333-
return bytes;
334-
}
335-
finally {
336-
release(buffer);
337-
}
338-
}
339-
340-
private String encodeHexString(byte[] data) {
341-
StringBuilder builder = new StringBuilder();
342-
for (byte b : data) {
343-
builder.append((0xF0 & b) >>> 4);
344-
builder.append(0x0F & b);
345-
}
346-
return builder.toString();
347-
}
348-
349-
350354

351355
}

0 commit comments

Comments
 (0)