Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
*/
package com.facebook.presto.orc;

import com.facebook.airlift.log.Logger;
import io.airlift.slice.Slice;
import io.airlift.slice.SliceOutput;
import io.airlift.slice.Slices;
Expand Down Expand Up @@ -61,9 +62,9 @@ public final class ChunkedSliceOutput
*/
private int bufferPosition;

public ChunkedSliceOutput(int minChunkSize, int maxChunkSize)
public ChunkedSliceOutput(int minChunkSize, int maxChunkSize, boolean resetOutputBuffer)
{
this.chunkSupplier = new ChunkSupplier(minChunkSize, maxChunkSize);
this.chunkSupplier = new ChunkSupplier(minChunkSize, maxChunkSize, resetOutputBuffer);

this.buffer = chunkSupplier.get();
this.slice = Slices.wrappedBuffer(buffer);
Expand Down Expand Up @@ -344,25 +345,37 @@ private void closeChunk()
// reusing the buffers.
private static class ChunkSupplier
{
private static final Logger log = Logger.get(ChunkSupplier.class);
private final int maxChunkSize;
private boolean resetOutputBuffer;

private final List<byte[]> bufferPool = new ArrayList<>();
private final List<byte[]> usedBuffers = new ArrayList<>();

private int currentSize;

public ChunkSupplier(int minChunkSize, int maxChunkSize)
public ChunkSupplier(int minChunkSize, int maxChunkSize, boolean resetOutputBuffer)
{
checkArgument(minChunkSize >= MINIMUM_CHUNK_SIZE, "minimum chunk size of " + MINIMUM_CHUNK_SIZE + " required");
checkArgument(maxChunkSize <= MAXIMUM_CHUNK_SIZE, "maximum chunk size of " + MAXIMUM_CHUNK_SIZE + " required");
checkArgument(minChunkSize <= maxChunkSize, "minimum chunk size must be less than maximum chunk size");

this.currentSize = minChunkSize;
this.maxChunkSize = maxChunkSize;
this.resetOutputBuffer = resetOutputBuffer;
}

public void reset()
{
if (!bufferPool.isEmpty() && resetOutputBuffer) {
log.info("Reset unused buffers, used %d chunks (%d bytes), unused %d chunks (%d bytes)",
usedBuffers.size(),
usedBuffers.stream().mapToInt(b -> b.length).sum(),
bufferPool.size(),
bufferPool.stream().mapToInt(b -> b.length).sum());
bufferPool.clear();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a tricky place. If you look at the ChunkSupplier.get method you will see that ChunkSupplier scales up chunks from 256 bytes all the way to 16MB when it needs to produce a new chunk. If you reset the pool and start producing new chunks you will eventually end up only with big 16MB new chunks. Which is probably opposite of your goal.

I don't know what is a best strategy here, because it's really depends on the data shapes, but I see a few options:

  1. Have chunk supplier that does not have a buffer, and alway produces smaller fixed size chunks. Will perform best in terms of overhead, but will regress for small streams.
  2. Have chunk supplier that does not have a buffer, and alway produces scaled up chunks with reset resetting the current size to min size. Good middle ground option.
  3. Have chunk supplier that does not have a buffer, and alway produces scaled up chunks with reset resetting the current size to min size AND using smaller max chunk size. Will perform even better in terms of overhead, won't regress as much as 1 for small streams.

System.setProperty("RESET_OUTPUT_BUFFER", "RESET_OUTPUT_BUFFER");
}
bufferPool.addAll(0, usedBuffers);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you want to save on the memory, do you really want to keep the last batch of chunks? I suppose you don't.

usedBuffers.clear();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import static com.facebook.presto.orc.OrcWriterOptions.DEFAULT_MAX_STRING_STATISTICS_LIMIT;
import static com.facebook.presto.orc.OrcWriterOptions.DEFAULT_MIN_OUTPUT_BUFFER_CHUNK_SIZE;
import static com.facebook.presto.orc.OrcWriterOptions.DEFAULT_PRESERVE_DIRECT_ENCODING_STRIPE_COUNT;
import static com.facebook.presto.orc.OrcWriterOptions.DEFAULT_RESET_OUTPUT_BUFFER;
import static com.google.common.base.Preconditions.checkArgument;
import static io.airlift.units.DataSize.Unit.BYTE;
import static java.lang.Math.toIntExact;
Expand All @@ -50,6 +51,7 @@ public class ColumnWriterOptions
private final Set<Integer> flattenedNodes;
private final boolean mapStatisticsEnabled;
private final int maxFlattenedMapKeyCount;
private final boolean resetOutputBuffer;

public ColumnWriterOptions(
CompressionKind compressionKind,
Expand All @@ -66,7 +68,8 @@ public ColumnWriterOptions(
CompressionBufferPool compressionBufferPool,
Set<Integer> flattenedNodes,
boolean mapStatisticsEnabled,
int maxFlattenedMapKeyCount)
int maxFlattenedMapKeyCount,
boolean resetOutputBuffer)
{
checkArgument(maxFlattenedMapKeyCount > 0, "maxFlattenedMapKeyCount must be positive: %s", maxFlattenedMapKeyCount);
requireNonNull(compressionMaxBufferSize, "compressionMaxBufferSize is null");
Expand All @@ -86,6 +89,7 @@ public ColumnWriterOptions(
this.flattenedNodes = requireNonNull(flattenedNodes, "flattenedNodes is null");
this.mapStatisticsEnabled = mapStatisticsEnabled;
this.maxFlattenedMapKeyCount = maxFlattenedMapKeyCount;
this.resetOutputBuffer = resetOutputBuffer;
}

public CompressionKind getCompressionKind()
Expand Down Expand Up @@ -163,6 +167,10 @@ public int getMaxFlattenedMapKeyCount()
return maxFlattenedMapKeyCount;
}

public boolean isResetOutputBuffer()
{
return resetOutputBuffer;
}
/**
* Create a copy of this ColumnWriterOptions, but disable string and integer dictionary encodings.
*/
Expand Down Expand Up @@ -191,7 +199,8 @@ public Builder toBuilder()
.setCompressionBufferPool(getCompressionBufferPool())
.setFlattenedNodes(getFlattenedNodes())
.setMapStatisticsEnabled(isMapStatisticsEnabled())
.setMaxFlattenedMapKeyCount(getMaxFlattenedMapKeyCount());
.setMaxFlattenedMapKeyCount(getMaxFlattenedMapKeyCount())
.setResetOutputBuffer(resetOutputBuffer);
}

public static Builder builder()
Expand All @@ -216,6 +225,7 @@ public static class Builder
private Set<Integer> flattenedNodes = ImmutableSet.of();
private boolean mapStatisticsEnabled;
private int maxFlattenedMapKeyCount = DEFAULT_MAX_FLATTENED_MAP_KEY_COUNT;
private boolean resetOutputBuffer = DEFAULT_RESET_OUTPUT_BUFFER;

private Builder() {}

Expand Down Expand Up @@ -309,6 +319,12 @@ public Builder setMaxFlattenedMapKeyCount(int maxFlattenedMapKeyCount)
return this;
}

public Builder setResetOutputBuffer(boolean resetOutputBuffer)
{
this.resetOutputBuffer = resetOutputBuffer;
return this;
}

public ColumnWriterOptions build()
{
return new ColumnWriterOptions(
Expand All @@ -326,7 +342,8 @@ public ColumnWriterOptions build()
compressionBufferPool,
flattenedNodes,
mapStatisticsEnabled,
maxFlattenedMapKeyCount);
maxFlattenedMapKeyCount,
resetOutputBuffer);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ public class OrcOutputBuffer
private final int minOutputBufferChunkSize;
private final int maxOutputBufferChunkSize;
private final int minCompressibleSize;
private final boolean resetOutputBuffer;

private final CompressionBufferPool compressionBufferPool;
private final Optional<DwrfDataEncryptor> dwrfEncryptor;
Expand Down Expand Up @@ -87,6 +88,7 @@ public OrcOutputBuffer(ColumnWriterOptions columnWriterOptions, Optional<DwrfDat
this.maxBufferSize = compressionKind == CompressionKind.NONE ? maxBufferSize : maxBufferSize - PAGE_HEADER_SIZE;
this.minOutputBufferChunkSize = columnWriterOptions.getMinOutputBufferChunkSize();
this.maxOutputBufferChunkSize = columnWriterOptions.getMaxOutputBufferChunkSize();
this.resetOutputBuffer = columnWriterOptions.isResetOutputBuffer();
this.minCompressibleSize = compressionKind.getMinCompressibleSize();

this.buffer = new byte[INITIAL_BUFFER_SIZE];
Expand Down Expand Up @@ -473,7 +475,7 @@ private void flushBufferToOutputStream()
private void initCompressedOutputStream()
{
checkState(compressedOutputStream == null, "compressedOutputStream is already initialized");
compressedOutputStream = new ChunkedSliceOutput(minOutputBufferChunkSize, maxOutputBufferChunkSize);
compressedOutputStream = new ChunkedSliceOutput(minOutputBufferChunkSize, maxOutputBufferChunkSize, resetOutputBuffer);
}

private void writeChunkToOutputStream(byte[] chunk, int offset, int length)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,7 @@ public OrcWriter(
.setFlattenedNodes(flattenedNodes)
.setMapStatisticsEnabled(options.isMapStatisticsEnabled())
.setMaxFlattenedMapKeyCount(options.getMaxFlattenedMapKeyCount())
.setResetOutputBuffer(options.isResetOutputBuffer())
.build();
recordValidation(validation -> validation.setCompression(compressionKind));
recordValidation(validation -> validation.setFlattenedNodes(flattenedNodes));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ public class OrcWriterOptions
public static final boolean DEFAULT_INTEGER_DICTIONARY_ENCODING_ENABLED = false;
public static final boolean DEFAULT_STRING_DICTIONARY_ENCODING_ENABLED = true;
public static final boolean DEFAULT_STRING_DICTIONARY_SORTING_ENABLED = true;
public static final boolean DEFAULT_RESET_OUTPUT_BUFFER = false;

private final OrcWriterFlushPolicy flushPolicy;
private final int rowGroupMaxRowCount;
Expand All @@ -74,6 +75,7 @@ public class OrcWriterOptions
private final int preserveDirectEncodingStripeCount;
private final boolean mapStatisticsEnabled;
private final int maxFlattenedMapKeyCount;
private final boolean resetOutputBuffer;

/**
* Contains indexes of columns (not nodes!) for which writer should use flattened encoding, e.g. flat maps.
Expand Down Expand Up @@ -101,7 +103,8 @@ private OrcWriterOptions(
int preserveDirectEncodingStripeCount,
Set<Integer> flattenedColumns,
boolean mapStatisticsEnabled,
int maxFlattenedMapKeyCount)
int maxFlattenedMapKeyCount,
boolean resetOutputBuffer)
{
requireNonNull(flushPolicy, "flushPolicy is null");
checkArgument(rowGroupMaxRowCount >= 1, "rowGroupMaxRowCount must be at least 1");
Expand Down Expand Up @@ -139,6 +142,7 @@ private OrcWriterOptions(
this.flattenedColumns = flattenedColumns;
this.mapStatisticsEnabled = mapStatisticsEnabled;
this.maxFlattenedMapKeyCount = maxFlattenedMapKeyCount;
this.resetOutputBuffer = resetOutputBuffer;
}

public OrcWriterFlushPolicy getFlushPolicy()
Expand Down Expand Up @@ -246,6 +250,11 @@ public int getMaxFlattenedMapKeyCount()
return maxFlattenedMapKeyCount;
}

public boolean isResetOutputBuffer()
{
return resetOutputBuffer;
}

@Override
public String toString()
{
Expand All @@ -269,6 +278,7 @@ public String toString()
.add("flattenedColumns", flattenedColumns)
.add("mapStatisticsEnabled", mapStatisticsEnabled)
.add("maxFlattenedMapKeyCount", maxFlattenedMapKeyCount)
.add("resetOutputBuffer", resetOutputBuffer)
.toString();
}

Expand Down Expand Up @@ -307,6 +317,7 @@ public static class Builder
private Set<Integer> flattenedColumns = ImmutableSet.of();
private boolean mapStatisticsEnabled;
private int maxFlattenedMapKeyCount = DEFAULT_MAX_FLATTENED_MAP_KEY_COUNT;
private boolean resetOutputBuffer = DEFAULT_RESET_OUTPUT_BUFFER;

public Builder withFlushPolicy(OrcWriterFlushPolicy flushPolicy)
{
Expand Down Expand Up @@ -448,6 +459,12 @@ public Builder withMaxFlattenedMapKeyCount(int maxFlattenedMapKeyCount)
return this;
}

public Builder withResetOutputBuffer(boolean resetOutputBuffer)
{
this.resetOutputBuffer = resetOutputBuffer;
return this;
}

public OrcWriterOptions build()
{
Optional<DwrfStripeCacheOptions> dwrfWriterOptions;
Expand Down Expand Up @@ -479,7 +496,8 @@ public OrcWriterOptions build()
preserveDirectEncodingStripeCount,
flattenedColumns,
mapStatisticsEnabled,
maxFlattenedMapKeyCount);
maxFlattenedMapKeyCount,
resetOutputBuffer);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,7 @@ public void testToString()
int preserveDirectEncodingStripeCount = 0;
boolean mapStatisticsEnabled = true;
int maxFlattenedMapKeyCount = 27;
boolean resetOutputBuffer = false;

OrcWriterOptions writerOptions = OrcWriterOptions.builder()
.withFlushPolicy(DefaultOrcWriterFlushPolicy.builder()
Expand All @@ -185,6 +186,7 @@ public void testToString()
.withFlattenedColumns(ImmutableSet.of(4))
.withMapStatisticsEnabled(mapStatisticsEnabled)
.withMaxFlattenedMapKeyCount(maxFlattenedMapKeyCount)
.withResetOutputBuffer(resetOutputBuffer)
.build();

String expectedString = "OrcWriterOptions{flushPolicy=DefaultOrcWriterFlushPolicy{stripeMaxRowCount=1100000, " +
Expand All @@ -195,7 +197,7 @@ public void testToString()
"stringDictionarySortingEnabled=true, stringDictionaryEncodingEnabled=true, " +
"dwrfWriterOptions=Optional[DwrfStripeCacheOptions{stripeCacheMode=INDEX_AND_FOOTER, stripeCacheMaxSize=4MB}], " +
"ignoreDictionaryRowGroupSizes=false, preserveDirectEncodingStripeCount=0, flattenedColumns=[4], mapStatisticsEnabled=true, " +
"maxFlattenedMapKeyCount=27}";
"maxFlattenedMapKeyCount=27, resetOutputBuffer=false}";
assertEquals(expectedString, writerOptions.toString());
}
}
Loading