Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public final class ChunkedSliceOutput
extends SliceOutput
{
private static final int INSTANCE_SIZE = ClassLayout.parseClass(ChunkedSliceOutput.class).instanceSize();
private static final int MINIMUM_CHUNK_SIZE = 4096;
private static final int MINIMUM_CHUNK_SIZE = 256;
private static final int MAXIMUM_CHUNK_SIZE = 16 * 1024 * 1024;
// This must not be larger than MINIMUM_CHUNK_SIZE/2
private static final int MAX_UNUSED_BUFFER_SIZE = 128;
Expand Down Expand Up @@ -371,8 +371,8 @@ public byte[] get()
{
byte[] buffer;
if (bufferPool.isEmpty()) {
currentSize = min(multiplyExact(currentSize, 2), maxChunkSize);
buffer = new byte[currentSize];
currentSize = min(multiplyExact(currentSize, 2), maxChunkSize);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Revert this unnecessary change

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Previous behaviour is double the currentSize before allocation which mean even through the initial size was configured at 4kb, the actual initial allocation is at 8kb.
I think this is a bug, and this PR double the currentSize after allocation so the initial allocation is based on minOutputBufferChunkSize

}
else {
buffer = bufferPool.remove(0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@

import static com.facebook.presto.orc.OrcWriterOptions.DEFAULT_MAX_COMPRESSION_BUFFER_SIZE;
import static com.facebook.presto.orc.OrcWriterOptions.DEFAULT_MAX_FLATTENED_MAP_KEY_COUNT;
import static com.facebook.presto.orc.OrcWriterOptions.DEFAULT_MAX_OUTPUT_BUFFER_CHUNK_SIZE;
import static com.facebook.presto.orc.OrcWriterOptions.DEFAULT_MAX_STRING_STATISTICS_LIMIT;
import static com.facebook.presto.orc.OrcWriterOptions.DEFAULT_MIN_OUTPUT_BUFFER_CHUNK_SIZE;
import static com.facebook.presto.orc.OrcWriterOptions.DEFAULT_PRESERVE_DIRECT_ENCODING_STRIPE_COUNT;
import static com.google.common.base.Preconditions.checkArgument;
import static io.airlift.units.DataSize.Unit.BYTE;
Expand All @@ -36,6 +38,8 @@ public class ColumnWriterOptions
private final CompressionKind compressionKind;
private final OptionalInt compressionLevel;
private final int compressionMaxBufferSize;
private final int minOutputBufferChunkSize;
private final int maxOutputBufferChunkSize;
private final DataSize stringStatisticsLimit;
private final boolean integerDictionaryEncodingEnabled;
private final boolean stringDictionarySortingEnabled;
Expand All @@ -51,6 +55,8 @@ public ColumnWriterOptions(
CompressionKind compressionKind,
OptionalInt compressionLevel,
DataSize compressionMaxBufferSize,
DataSize minOutputBufferChunkSize,
DataSize maxOutputBufferChunkSize,
DataSize stringStatisticsLimit,
boolean integerDictionaryEncodingEnabled,
boolean stringDictionarySortingEnabled,
Expand All @@ -68,6 +74,8 @@ public ColumnWriterOptions(
this.compressionKind = requireNonNull(compressionKind, "compressionKind is null");
this.compressionLevel = requireNonNull(compressionLevel, "compressionLevel is null");
this.compressionMaxBufferSize = toIntExact(compressionMaxBufferSize.toBytes());
this.minOutputBufferChunkSize = toIntExact(minOutputBufferChunkSize.toBytes());
this.maxOutputBufferChunkSize = toIntExact(maxOutputBufferChunkSize.toBytes());
this.stringStatisticsLimit = requireNonNull(stringStatisticsLimit, "stringStatisticsLimit is null");
this.integerDictionaryEncodingEnabled = integerDictionaryEncodingEnabled;
this.stringDictionarySortingEnabled = stringDictionarySortingEnabled;
Expand Down Expand Up @@ -95,6 +103,16 @@ public int getCompressionMaxBufferSize()
return compressionMaxBufferSize;
}

public int getMinOutputBufferChunkSize()
{
return minOutputBufferChunkSize;
}

public int getMaxOutputBufferChunkSize()
{
return maxOutputBufferChunkSize;
}

public int getStringStatisticsLimit()
{
return toIntExact(stringStatisticsLimit.toBytes());
Expand Down Expand Up @@ -162,6 +180,8 @@ public Builder toBuilder()
.setCompressionKind(getCompressionKind())
.setCompressionLevel(getCompressionLevel())
.setCompressionMaxBufferSize(new DataSize(getCompressionMaxBufferSize(), BYTE))
.setMinOutputBufferChunkSize(new DataSize(getMinOutputBufferChunkSize(), BYTE))
.setMaxOutputBufferChunkSize(new DataSize(getMaxOutputBufferChunkSize(), BYTE))
.setStringStatisticsLimit(new DataSize(getStringStatisticsLimit(), BYTE))
.setIntegerDictionaryEncodingEnabled(isIntegerDictionaryEncodingEnabled())
.setStringDictionarySortingEnabled(isStringDictionarySortingEnabled())
Expand All @@ -184,6 +204,8 @@ public static class Builder
private CompressionKind compressionKind;
private OptionalInt compressionLevel = OptionalInt.empty();
private DataSize compressionMaxBufferSize = DEFAULT_MAX_COMPRESSION_BUFFER_SIZE;
private DataSize minOutputBufferChunkSize = DEFAULT_MIN_OUTPUT_BUFFER_CHUNK_SIZE;
private DataSize maxOutputBufferChunkSize = DEFAULT_MAX_OUTPUT_BUFFER_CHUNK_SIZE;
private DataSize stringStatisticsLimit = DEFAULT_MAX_STRING_STATISTICS_LIMIT;
private boolean integerDictionaryEncodingEnabled;
private boolean stringDictionarySortingEnabled = true;
Expand Down Expand Up @@ -215,6 +237,18 @@ public Builder setCompressionMaxBufferSize(DataSize compressionMaxBufferSize)
return this;
}

public Builder setMinOutputBufferChunkSize(DataSize minOutputBufferChunkSize)
{
this.minOutputBufferChunkSize = minOutputBufferChunkSize;
return this;
}

public Builder setMaxOutputBufferChunkSize(DataSize maxOutputBufferChunkSize)
{
this.maxOutputBufferChunkSize = maxOutputBufferChunkSize;
return this;
}

public Builder setStringStatisticsLimit(DataSize stringStatisticsLimit)
{
this.stringStatisticsLimit = stringStatisticsLimit;
Expand Down Expand Up @@ -281,6 +315,8 @@ public ColumnWriterOptions build()
compressionKind,
compressionLevel,
compressionMaxBufferSize,
minOutputBufferChunkSize,
maxOutputBufferChunkSize,
stringStatisticsLimit,
integerDictionaryEncodingEnabled,
stringDictionarySortingEnabled,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,9 @@ public class OrcOutputBuffer
private static final int INSTANCE_SIZE = ClassLayout.parseClass(OrcOutputBuffer.class).instanceSize();
private static final int PAGE_HEADER_SIZE = 3; // ORC spec 3 byte header
private static final int INITIAL_BUFFER_SIZE = 256;
private static final int MINIMUM_OUTPUT_BUFFER_CHUNK_SIZE = 4 * 1024;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm confused. You are claiming that the old min chunk size was 8kb, but here in the Orc compression output slice it's 4kb. Can you please clarify this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added more context in Summary and the comment on ChunkedSliceOutput (line 374).
The short answer is previously the actual initial allocation (8kB) is 2x of the initial size(4kB) due to a bug.

private static final int MAXIMUM_OUTPUT_BUFFER_CHUNK_SIZE = 1024 * 1024;

private final int maxBufferSize;
private final int minOutputBufferChunkSize;
private final int maxOutputBufferChunkSize;
private final int minCompressibleSize;

private final CompressionBufferPool compressionBufferPool;
Expand Down Expand Up @@ -86,6 +85,8 @@ public OrcOutputBuffer(ColumnWriterOptions columnWriterOptions, Optional<DwrfDat

CompressionKind compressionKind = columnWriterOptions.getCompressionKind();
this.maxBufferSize = compressionKind == CompressionKind.NONE ? maxBufferSize : maxBufferSize - PAGE_HEADER_SIZE;
this.minOutputBufferChunkSize = columnWriterOptions.getMinOutputBufferChunkSize();
this.maxOutputBufferChunkSize = columnWriterOptions.getMaxOutputBufferChunkSize();
this.minCompressibleSize = compressionKind.getMinCompressibleSize();

this.buffer = new byte[INITIAL_BUFFER_SIZE];
Expand Down Expand Up @@ -470,7 +471,7 @@ private void flushBufferToOutputStream()
private void initCompressedOutputStream()
{
checkState(compressedOutputStream == null, "compressedOutputStream is already initialized");
compressedOutputStream = new ChunkedSliceOutput(MINIMUM_OUTPUT_BUFFER_CHUNK_SIZE, MAXIMUM_OUTPUT_BUFFER_CHUNK_SIZE);
compressedOutputStream = new ChunkedSliceOutput(minOutputBufferChunkSize, maxOutputBufferChunkSize);
}

private void writeChunkToOutputStream(byte[] chunk, int offset, int length)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,8 @@ public OrcWriter(
.setCompressionKind(compressionKind)
.setCompressionLevel(options.getCompressionLevel())
.setCompressionMaxBufferSize(options.getMaxCompressionBufferSize())
.setMinOutputBufferChunkSize(options.getMinOutputBufferChunkSize())
.setMaxOutputBufferChunkSize(options.getMaxOutputBufferChunkSize())
.setStringStatisticsLimit(options.getMaxStringStatisticsLimit())
.setIntegerDictionaryEncodingEnabled(options.isIntegerDictionaryEncodingEnabled())
.setStringDictionarySortingEnabled(options.isStringDictionarySortingEnabled())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ public class OrcWriterOptions
public static final DataSize DEFAULT_DICTIONARY_USEFUL_CHECK_COLUMN_SIZE = new DataSize(6, MEGABYTE);
public static final DataSize DEFAULT_MAX_STRING_STATISTICS_LIMIT = new DataSize(64, BYTE);
public static final DataSize DEFAULT_MAX_COMPRESSION_BUFFER_SIZE = new DataSize(256, KILOBYTE);
public static final DataSize DEFAULT_MIN_OUTPUT_BUFFER_CHUNK_SIZE = new DataSize(8, KILOBYTE);
public static final DataSize DEFAULT_MAX_OUTPUT_BUFFER_CHUNK_SIZE = new DataSize(1024, KILOBYTE);
public static final DataSize DEFAULT_DWRF_STRIPE_CACHE_MAX_SIZE = new DataSize(8, MEGABYTE);
public static final DwrfStripeCacheMode DEFAULT_DWRF_STRIPE_CACHE_MODE = INDEX_AND_FOOTER;
public static final int DEFAULT_PRESERVE_DIRECT_ENCODING_STRIPE_COUNT = 0;
Expand All @@ -56,6 +58,8 @@ public class OrcWriterOptions
private final DataSize dictionaryUsefulCheckColumnSize;
private final DataSize maxStringStatisticsLimit;
private final DataSize maxCompressionBufferSize;
private final DataSize minOutputBufferChunkSize;
private final DataSize maxOutputBufferChunkSize;
private final OptionalInt compressionLevel;
private final StreamLayoutFactory streamLayoutFactory;
private final boolean integerDictionaryEncodingEnabled;
Expand Down Expand Up @@ -85,6 +89,8 @@ private OrcWriterOptions(
DataSize dictionaryUsefulCheckColumnSize,
DataSize maxStringStatisticsLimit,
DataSize maxCompressionBufferSize,
DataSize minOutputBufferChunkSize,
DataSize maxOutputBufferChunkSize,
OptionalInt compressionLevel,
StreamLayoutFactory streamLayoutFactory,
boolean integerDictionaryEncodingEnabled,
Expand All @@ -104,6 +110,8 @@ private OrcWriterOptions(
requireNonNull(dictionaryUsefulCheckColumnSize, "dictionaryUsefulCheckColumnSize is null");
requireNonNull(maxStringStatisticsLimit, "maxStringStatisticsLimit is null");
requireNonNull(maxCompressionBufferSize, "maxCompressionBufferSize is null");
requireNonNull(minOutputBufferChunkSize, "minOutputBufferChunkSize is null");
requireNonNull(maxOutputBufferChunkSize, "maxOutputBufferChunkSize is null");
requireNonNull(compressionLevel, "compressionLevel is null");
requireNonNull(streamLayoutFactory, "streamLayoutFactory is null");
requireNonNull(dwrfWriterOptions, "dwrfWriterOptions is null");
Expand All @@ -118,6 +126,8 @@ private OrcWriterOptions(
this.dictionaryUsefulCheckColumnSize = dictionaryUsefulCheckColumnSize;
this.maxStringStatisticsLimit = maxStringStatisticsLimit;
this.maxCompressionBufferSize = maxCompressionBufferSize;
this.minOutputBufferChunkSize = minOutputBufferChunkSize;
this.maxOutputBufferChunkSize = maxOutputBufferChunkSize;
this.compressionLevel = compressionLevel;
this.streamLayoutFactory = streamLayoutFactory;
this.integerDictionaryEncodingEnabled = integerDictionaryEncodingEnabled;
Expand Down Expand Up @@ -171,6 +181,16 @@ public DataSize getMaxCompressionBufferSize()
return maxCompressionBufferSize;
}

public DataSize getMinOutputBufferChunkSize()
{
return minOutputBufferChunkSize;
}

public DataSize getMaxOutputBufferChunkSize()
{
return maxOutputBufferChunkSize;
}

public OptionalInt getCompressionLevel()
{
return compressionLevel;
Expand Down Expand Up @@ -272,6 +292,8 @@ public static class Builder
private DataSize dictionaryUsefulCheckColumnSize = DEFAULT_DICTIONARY_USEFUL_CHECK_COLUMN_SIZE;
private DataSize maxStringStatisticsLimit = DEFAULT_MAX_STRING_STATISTICS_LIMIT;
private DataSize maxCompressionBufferSize = DEFAULT_MAX_COMPRESSION_BUFFER_SIZE;
private DataSize minOutputBufferChunkSize = DEFAULT_MIN_OUTPUT_BUFFER_CHUNK_SIZE;
private DataSize maxOutputBufferChunkSize = DEFAULT_MAX_OUTPUT_BUFFER_CHUNK_SIZE;
private OptionalInt compressionLevel = OptionalInt.empty();
private StreamLayoutFactory streamLayoutFactory = new ColumnSizeLayoutFactory();
private boolean integerDictionaryEncodingEnabled = DEFAULT_INTEGER_DICTIONARY_ENCODING_ENABLED;
Expand Down Expand Up @@ -336,6 +358,18 @@ public Builder withMaxCompressionBufferSize(DataSize maxCompressionBufferSize)
return this;
}

public Builder withMinOutputBufferChunkSize(DataSize minOutputBufferChunkSize)
{
this.minOutputBufferChunkSize = requireNonNull(minOutputBufferChunkSize, "minOutputBufferChunkSize is null");
return this;
}

public Builder withMaxOutputBufferChunkSize(DataSize maxOutputBufferChunkSize)
{
this.maxOutputBufferChunkSize = requireNonNull(maxOutputBufferChunkSize, "maxOutputBufferChunkSize is null");
return this;
}

public Builder withCompressionLevel(OptionalInt compressionLevel)
{
this.compressionLevel = requireNonNull(compressionLevel, "compressionLevel is null");
Expand Down Expand Up @@ -433,6 +467,8 @@ public OrcWriterOptions build()
dictionaryUsefulCheckColumnSize,
maxStringStatisticsLimit,
maxCompressionBufferSize,
minOutputBufferChunkSize,
maxOutputBufferChunkSize,
compressionLevel,
streamLayoutFactory,
integerDictionaryEncodingEnabled,
Expand Down