diff --git a/presto-orc/src/main/java/com/facebook/presto/orc/ChunkedSliceOutput.java b/presto-orc/src/main/java/com/facebook/presto/orc/ChunkedSliceOutput.java index e5142bc7a25ec..3c2a8a8b013d5 100644 --- a/presto-orc/src/main/java/com/facebook/presto/orc/ChunkedSliceOutput.java +++ b/presto-orc/src/main/java/com/facebook/presto/orc/ChunkedSliceOutput.java @@ -13,6 +13,7 @@ */ package com.facebook.presto.orc; +import com.facebook.airlift.log.Logger; import io.airlift.slice.Slice; import io.airlift.slice.SliceOutput; import io.airlift.slice.Slices; @@ -61,9 +62,9 @@ public final class ChunkedSliceOutput */ private int bufferPosition; - public ChunkedSliceOutput(int minChunkSize, int maxChunkSize) + public ChunkedSliceOutput(int minChunkSize, int maxChunkSize, boolean resetOutputBuffer) { - this.chunkSupplier = new ChunkSupplier(minChunkSize, maxChunkSize); + this.chunkSupplier = new ChunkSupplier(minChunkSize, maxChunkSize, resetOutputBuffer); this.buffer = chunkSupplier.get(); this.slice = Slices.wrappedBuffer(buffer); @@ -344,14 +345,16 @@ private void closeChunk() // reusing the buffers. private static class ChunkSupplier { + private static final Logger log = Logger.get(ChunkSupplier.class); private final int maxChunkSize; + private boolean resetOutputBuffer; private final List bufferPool = new ArrayList<>(); private final List usedBuffers = new ArrayList<>(); private int currentSize; - public ChunkSupplier(int minChunkSize, int maxChunkSize) + public ChunkSupplier(int minChunkSize, int maxChunkSize, boolean resetOutputBuffer) { checkArgument(minChunkSize >= MINIMUM_CHUNK_SIZE, "minimum chunk size of " + MINIMUM_CHUNK_SIZE + " required"); checkArgument(maxChunkSize <= MAXIMUM_CHUNK_SIZE, "maximum chunk size of " + MAXIMUM_CHUNK_SIZE + " required"); @@ -359,10 +362,20 @@ public ChunkSupplier(int minChunkSize, int maxChunkSize) this.currentSize = minChunkSize; this.maxChunkSize = maxChunkSize; + this.resetOutputBuffer = resetOutputBuffer; } public void reset() { + if (!bufferPool.isEmpty() && resetOutputBuffer) { + log.info("Reset unused buffers, used %d chunks (%d bytes), unused %d chunks (%d bytes)", + usedBuffers.size(), + usedBuffers.stream().mapToInt(b -> b.length).sum(), + bufferPool.size(), + bufferPool.stream().mapToInt(b -> b.length).sum()); + bufferPool.clear(); + System.setProperty("RESET_OUTPUT_BUFFER", "RESET_OUTPUT_BUFFER"); + } bufferPool.addAll(0, usedBuffers); usedBuffers.clear(); } diff --git a/presto-orc/src/main/java/com/facebook/presto/orc/ColumnWriterOptions.java b/presto-orc/src/main/java/com/facebook/presto/orc/ColumnWriterOptions.java index 86b3a6659364b..f8fd0f659a983 100644 --- a/presto-orc/src/main/java/com/facebook/presto/orc/ColumnWriterOptions.java +++ b/presto-orc/src/main/java/com/facebook/presto/orc/ColumnWriterOptions.java @@ -28,6 +28,7 @@ import static com.facebook.presto.orc.OrcWriterOptions.DEFAULT_MAX_STRING_STATISTICS_LIMIT; import static com.facebook.presto.orc.OrcWriterOptions.DEFAULT_MIN_OUTPUT_BUFFER_CHUNK_SIZE; import static com.facebook.presto.orc.OrcWriterOptions.DEFAULT_PRESERVE_DIRECT_ENCODING_STRIPE_COUNT; +import static com.facebook.presto.orc.OrcWriterOptions.DEFAULT_RESET_OUTPUT_BUFFER; import static com.google.common.base.Preconditions.checkArgument; import static io.airlift.units.DataSize.Unit.BYTE; import static java.lang.Math.toIntExact; @@ -50,6 +51,7 @@ public class ColumnWriterOptions private final Set flattenedNodes; private final boolean mapStatisticsEnabled; private final int maxFlattenedMapKeyCount; + private final boolean resetOutputBuffer; public ColumnWriterOptions( CompressionKind compressionKind, @@ -66,7 +68,8 @@ public ColumnWriterOptions( CompressionBufferPool compressionBufferPool, Set flattenedNodes, boolean mapStatisticsEnabled, - int maxFlattenedMapKeyCount) + int maxFlattenedMapKeyCount, + boolean resetOutputBuffer) { checkArgument(maxFlattenedMapKeyCount > 0, "maxFlattenedMapKeyCount must be positive: %s", maxFlattenedMapKeyCount); requireNonNull(compressionMaxBufferSize, "compressionMaxBufferSize is null"); @@ -86,6 +89,7 @@ public ColumnWriterOptions( this.flattenedNodes = requireNonNull(flattenedNodes, "flattenedNodes is null"); this.mapStatisticsEnabled = mapStatisticsEnabled; this.maxFlattenedMapKeyCount = maxFlattenedMapKeyCount; + this.resetOutputBuffer = resetOutputBuffer; } public CompressionKind getCompressionKind() @@ -163,6 +167,10 @@ public int getMaxFlattenedMapKeyCount() return maxFlattenedMapKeyCount; } + public boolean isResetOutputBuffer() + { + return resetOutputBuffer; + } /** * Create a copy of this ColumnWriterOptions, but disable string and integer dictionary encodings. */ @@ -191,7 +199,8 @@ public Builder toBuilder() .setCompressionBufferPool(getCompressionBufferPool()) .setFlattenedNodes(getFlattenedNodes()) .setMapStatisticsEnabled(isMapStatisticsEnabled()) - .setMaxFlattenedMapKeyCount(getMaxFlattenedMapKeyCount()); + .setMaxFlattenedMapKeyCount(getMaxFlattenedMapKeyCount()) + .setResetOutputBuffer(resetOutputBuffer); } public static Builder builder() @@ -216,6 +225,7 @@ public static class Builder private Set flattenedNodes = ImmutableSet.of(); private boolean mapStatisticsEnabled; private int maxFlattenedMapKeyCount = DEFAULT_MAX_FLATTENED_MAP_KEY_COUNT; + private boolean resetOutputBuffer = DEFAULT_RESET_OUTPUT_BUFFER; private Builder() {} @@ -309,6 +319,12 @@ public Builder setMaxFlattenedMapKeyCount(int maxFlattenedMapKeyCount) return this; } + public Builder setResetOutputBuffer(boolean resetOutputBuffer) + { + this.resetOutputBuffer = resetOutputBuffer; + return this; + } + public ColumnWriterOptions build() { return new ColumnWriterOptions( @@ -326,7 +342,8 @@ public ColumnWriterOptions build() compressionBufferPool, flattenedNodes, mapStatisticsEnabled, - maxFlattenedMapKeyCount); + maxFlattenedMapKeyCount, + resetOutputBuffer); } } } diff --git a/presto-orc/src/main/java/com/facebook/presto/orc/OrcOutputBuffer.java b/presto-orc/src/main/java/com/facebook/presto/orc/OrcOutputBuffer.java index 60b5dd97f955c..d542c47ae55fe 100644 --- a/presto-orc/src/main/java/com/facebook/presto/orc/OrcOutputBuffer.java +++ b/presto-orc/src/main/java/com/facebook/presto/orc/OrcOutputBuffer.java @@ -57,6 +57,7 @@ public class OrcOutputBuffer private final int minOutputBufferChunkSize; private final int maxOutputBufferChunkSize; private final int minCompressibleSize; + private final boolean resetOutputBuffer; private final CompressionBufferPool compressionBufferPool; private final Optional dwrfEncryptor; @@ -87,6 +88,7 @@ public OrcOutputBuffer(ColumnWriterOptions columnWriterOptions, Optional validation.setCompression(compressionKind)); recordValidation(validation -> validation.setFlattenedNodes(flattenedNodes)); diff --git a/presto-orc/src/main/java/com/facebook/presto/orc/OrcWriterOptions.java b/presto-orc/src/main/java/com/facebook/presto/orc/OrcWriterOptions.java index 37fa482aff7cb..e1fa4652beaa6 100644 --- a/presto-orc/src/main/java/com/facebook/presto/orc/OrcWriterOptions.java +++ b/presto-orc/src/main/java/com/facebook/presto/orc/OrcWriterOptions.java @@ -49,6 +49,7 @@ public class OrcWriterOptions public static final boolean DEFAULT_INTEGER_DICTIONARY_ENCODING_ENABLED = false; public static final boolean DEFAULT_STRING_DICTIONARY_ENCODING_ENABLED = true; public static final boolean DEFAULT_STRING_DICTIONARY_SORTING_ENABLED = true; + public static final boolean DEFAULT_RESET_OUTPUT_BUFFER = false; private final OrcWriterFlushPolicy flushPolicy; private final int rowGroupMaxRowCount; @@ -74,6 +75,7 @@ public class OrcWriterOptions private final int preserveDirectEncodingStripeCount; private final boolean mapStatisticsEnabled; private final int maxFlattenedMapKeyCount; + private final boolean resetOutputBuffer; /** * Contains indexes of columns (not nodes!) for which writer should use flattened encoding, e.g. flat maps. @@ -101,7 +103,8 @@ private OrcWriterOptions( int preserveDirectEncodingStripeCount, Set flattenedColumns, boolean mapStatisticsEnabled, - int maxFlattenedMapKeyCount) + int maxFlattenedMapKeyCount, + boolean resetOutputBuffer) { requireNonNull(flushPolicy, "flushPolicy is null"); checkArgument(rowGroupMaxRowCount >= 1, "rowGroupMaxRowCount must be at least 1"); @@ -139,6 +142,7 @@ private OrcWriterOptions( this.flattenedColumns = flattenedColumns; this.mapStatisticsEnabled = mapStatisticsEnabled; this.maxFlattenedMapKeyCount = maxFlattenedMapKeyCount; + this.resetOutputBuffer = resetOutputBuffer; } public OrcWriterFlushPolicy getFlushPolicy() @@ -246,6 +250,11 @@ public int getMaxFlattenedMapKeyCount() return maxFlattenedMapKeyCount; } + public boolean isResetOutputBuffer() + { + return resetOutputBuffer; + } + @Override public String toString() { @@ -269,6 +278,7 @@ public String toString() .add("flattenedColumns", flattenedColumns) .add("mapStatisticsEnabled", mapStatisticsEnabled) .add("maxFlattenedMapKeyCount", maxFlattenedMapKeyCount) + .add("resetOutputBuffer", resetOutputBuffer) .toString(); } @@ -307,6 +317,7 @@ public static class Builder private Set flattenedColumns = ImmutableSet.of(); private boolean mapStatisticsEnabled; private int maxFlattenedMapKeyCount = DEFAULT_MAX_FLATTENED_MAP_KEY_COUNT; + private boolean resetOutputBuffer = DEFAULT_RESET_OUTPUT_BUFFER; public Builder withFlushPolicy(OrcWriterFlushPolicy flushPolicy) { @@ -448,6 +459,12 @@ public Builder withMaxFlattenedMapKeyCount(int maxFlattenedMapKeyCount) return this; } + public Builder withResetOutputBuffer(boolean resetOutputBuffer) + { + this.resetOutputBuffer = resetOutputBuffer; + return this; + } + public OrcWriterOptions build() { Optional dwrfWriterOptions; @@ -479,7 +496,8 @@ public OrcWriterOptions build() preserveDirectEncodingStripeCount, flattenedColumns, mapStatisticsEnabled, - maxFlattenedMapKeyCount); + maxFlattenedMapKeyCount, + resetOutputBuffer); } } } diff --git a/presto-orc/src/test/java/com/facebook/presto/orc/TestOrcWriterOptions.java b/presto-orc/src/test/java/com/facebook/presto/orc/TestOrcWriterOptions.java index 9544a275eb4e8..1c2c7a0a33d7d 100644 --- a/presto-orc/src/test/java/com/facebook/presto/orc/TestOrcWriterOptions.java +++ b/presto-orc/src/test/java/com/facebook/presto/orc/TestOrcWriterOptions.java @@ -160,6 +160,7 @@ public void testToString() int preserveDirectEncodingStripeCount = 0; boolean mapStatisticsEnabled = true; int maxFlattenedMapKeyCount = 27; + boolean resetOutputBuffer = false; OrcWriterOptions writerOptions = OrcWriterOptions.builder() .withFlushPolicy(DefaultOrcWriterFlushPolicy.builder() @@ -185,6 +186,7 @@ public void testToString() .withFlattenedColumns(ImmutableSet.of(4)) .withMapStatisticsEnabled(mapStatisticsEnabled) .withMaxFlattenedMapKeyCount(maxFlattenedMapKeyCount) + .withResetOutputBuffer(resetOutputBuffer) .build(); String expectedString = "OrcWriterOptions{flushPolicy=DefaultOrcWriterFlushPolicy{stripeMaxRowCount=1100000, " + @@ -195,7 +197,7 @@ public void testToString() "stringDictionarySortingEnabled=true, stringDictionaryEncodingEnabled=true, " + "dwrfWriterOptions=Optional[DwrfStripeCacheOptions{stripeCacheMode=INDEX_AND_FOOTER, stripeCacheMaxSize=4MB}], " + "ignoreDictionaryRowGroupSizes=false, preserveDirectEncodingStripeCount=0, flattenedColumns=[4], mapStatisticsEnabled=true, " + - "maxFlattenedMapKeyCount=27}"; + "maxFlattenedMapKeyCount=27, resetOutputBuffer=false}"; assertEquals(expectedString, writerOptions.toString()); } }