diff --git a/presto-orc/src/main/java/com/facebook/presto/orc/ChunkedSliceOutput.java b/presto-orc/src/main/java/com/facebook/presto/orc/ChunkedSliceOutput.java index 3c2a8a8b013d5..7f36b2ee7ecb1 100644 --- a/presto-orc/src/main/java/com/facebook/presto/orc/ChunkedSliceOutput.java +++ b/presto-orc/src/main/java/com/facebook/presto/orc/ChunkedSliceOutput.java @@ -36,7 +36,7 @@ import static java.lang.Math.toIntExact; public final class ChunkedSliceOutput - extends SliceOutput + extends SliceOutput implements OrcChunkedOutputBuffer { private static final int INSTANCE_SIZE = ClassLayout.parseClass(ChunkedSliceOutput.class).instanceSize(); private static final int MINIMUM_CHUNK_SIZE = 256; @@ -202,6 +202,20 @@ public void writeBytes(byte[] source, int sourceIndex, int length) } } + @Override + public void ensureAvailable(int minLength, int length) + { + ensureWritableBytes(minLength); + } + + @Override + public void writeHeader(int header) + { + write(header & 0x00_00FF); + write((header & 0x00_FF00) >> 8); + write((header & 0xFF_0000) >> 16); + } + @Override public void writeBytes(InputStream in, int length) throws IOException diff --git a/presto-orc/src/main/java/com/facebook/presto/orc/ColumnWriterOptions.java b/presto-orc/src/main/java/com/facebook/presto/orc/ColumnWriterOptions.java index f8fd0f659a983..7b4cd44d37e7f 100644 --- a/presto-orc/src/main/java/com/facebook/presto/orc/ColumnWriterOptions.java +++ b/presto-orc/src/main/java/com/facebook/presto/orc/ColumnWriterOptions.java @@ -22,6 +22,7 @@ import java.util.OptionalInt; import java.util.Set; +import static com.facebook.presto.orc.OrcWriterOptions.DEFAULT_LAZY_OUTPUT_BUFFER; import static com.facebook.presto.orc.OrcWriterOptions.DEFAULT_MAX_COMPRESSION_BUFFER_SIZE; import static com.facebook.presto.orc.OrcWriterOptions.DEFAULT_MAX_FLATTENED_MAP_KEY_COUNT; import static com.facebook.presto.orc.OrcWriterOptions.DEFAULT_MAX_OUTPUT_BUFFER_CHUNK_SIZE; @@ -52,6 +53,7 @@ public class ColumnWriterOptions private final boolean mapStatisticsEnabled; private final int maxFlattenedMapKeyCount; private final boolean resetOutputBuffer; + private final boolean lazyOutputBuffer; public ColumnWriterOptions( CompressionKind compressionKind, @@ -69,7 +71,8 @@ public ColumnWriterOptions( Set flattenedNodes, boolean mapStatisticsEnabled, int maxFlattenedMapKeyCount, - boolean resetOutputBuffer) + boolean resetOutputBuffer, + boolean lazyOutputBuffer) { checkArgument(maxFlattenedMapKeyCount > 0, "maxFlattenedMapKeyCount must be positive: %s", maxFlattenedMapKeyCount); requireNonNull(compressionMaxBufferSize, "compressionMaxBufferSize is null"); @@ -90,6 +93,7 @@ public ColumnWriterOptions( this.mapStatisticsEnabled = mapStatisticsEnabled; this.maxFlattenedMapKeyCount = maxFlattenedMapKeyCount; this.resetOutputBuffer = resetOutputBuffer; + this.lazyOutputBuffer = lazyOutputBuffer; } public CompressionKind getCompressionKind() @@ -171,6 +175,11 @@ public boolean isResetOutputBuffer() { return resetOutputBuffer; } + + public boolean isLazyOutputBuffer() + { + return lazyOutputBuffer; + } /** * Create a copy of this ColumnWriterOptions, but disable string and integer dictionary encodings. */ @@ -200,7 +209,8 @@ public Builder toBuilder() .setFlattenedNodes(getFlattenedNodes()) .setMapStatisticsEnabled(isMapStatisticsEnabled()) .setMaxFlattenedMapKeyCount(getMaxFlattenedMapKeyCount()) - .setResetOutputBuffer(resetOutputBuffer); + .setResetOutputBuffer(resetOutputBuffer) + .setLazyOutputBuffer(lazyOutputBuffer); } public static Builder builder() @@ -226,6 +236,7 @@ public static class Builder private boolean mapStatisticsEnabled; private int maxFlattenedMapKeyCount = DEFAULT_MAX_FLATTENED_MAP_KEY_COUNT; private boolean resetOutputBuffer = DEFAULT_RESET_OUTPUT_BUFFER; + private boolean lazyOutputBuffer = DEFAULT_LAZY_OUTPUT_BUFFER; private Builder() {} @@ -325,6 +336,12 @@ public Builder setResetOutputBuffer(boolean resetOutputBuffer) return this; } + public Builder setLazyOutputBuffer(boolean lazyOutputBuffer) + { + this.lazyOutputBuffer = lazyOutputBuffer; + return this; + } + public ColumnWriterOptions build() { return new ColumnWriterOptions( @@ -343,7 +360,8 @@ public ColumnWriterOptions build() flattenedNodes, mapStatisticsEnabled, maxFlattenedMapKeyCount, - resetOutputBuffer); + resetOutputBuffer, + lazyOutputBuffer); } } } diff --git a/presto-orc/src/main/java/com/facebook/presto/orc/OrcChunkedOutputBuffer.java b/presto-orc/src/main/java/com/facebook/presto/orc/OrcChunkedOutputBuffer.java new file mode 100644 index 0000000000000..799b949fb9d58 --- /dev/null +++ b/presto-orc/src/main/java/com/facebook/presto/orc/OrcChunkedOutputBuffer.java @@ -0,0 +1,36 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.orc; + +import io.airlift.slice.SliceOutput; + +public interface OrcChunkedOutputBuffer +{ + void writeTo(SliceOutput outputStream); + + void reset(); + + int size(); + + long getRetainedSize(); + + // need to be called before writing + void ensureAvailable(int minLength, int length); + + void writeHeader(int value); + + void writeBytes(byte[] source, int sourceIndex, int length); + + String toString(); +} diff --git a/presto-orc/src/main/java/com/facebook/presto/orc/OrcLazyChunkedOutputBuffer.java b/presto-orc/src/main/java/com/facebook/presto/orc/OrcLazyChunkedOutputBuffer.java new file mode 100644 index 0000000000000..09cda2285799a --- /dev/null +++ b/presto-orc/src/main/java/com/facebook/presto/orc/OrcLazyChunkedOutputBuffer.java @@ -0,0 +1,145 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.orc; + +import io.airlift.slice.SliceOutput; +import org.openjdk.jol.info.ClassLayout; + +import java.util.ArrayList; +import java.util.List; + +import static java.lang.Math.min; +import static java.lang.Math.toIntExact; + +public class OrcLazyChunkedOutputBuffer + implements OrcChunkedOutputBuffer +{ + private static final int INSTANCE_SIZE = ClassLayout.parseClass(ChunkedSliceOutput.class).instanceSize(); + private byte[] buffer; + private final List closedBuffers = new ArrayList<>(); + private final List closedBufferLengths = new ArrayList<>(); + private long closedBuffersRetainedSize; + + /** + * Offset of buffer within stream. + */ + private long streamOffset; + + /** + * Current position for writing in buffer. + */ + private int bufferPosition; + + @Override + public void writeTo(SliceOutput outputStream) + { + for (int i = 0; i < closedBuffers.size(); i++) { + outputStream.writeBytes(closedBuffers.get(i), 0, closedBufferLengths.get(i)); + } + if (bufferPosition > 0) { + outputStream.writeBytes(buffer, 0, bufferPosition); + } + } + + @Override + public void reset() + { + closedBuffers.clear(); + closedBufferLengths.clear(); + closedBuffersRetainedSize = 0; + streamOffset = 0; + bufferPosition = 0; + } + + @Override + public int size() + { + return toIntExact(streamOffset + bufferPosition); + } + + @Override + public long getRetainedSize() + { + return buffer.length + closedBuffersRetainedSize + INSTANCE_SIZE; + } + + // need to be called before writing + @Override + public void ensureAvailable(int minLength, int length) + { + if (buffer == null) { + buffer = new byte[length]; + bufferPosition = 0; + } + // no room for minLength + if (bufferPosition + minLength > buffer.length) { + closeChunk(length); + } + } + + @Override + public void writeBytes(byte[] source, int sourceIndex, int length) + { + while (length > 0) { + int batch = ensureBatchSize(length); + System.arraycopy(source, sourceIndex, buffer, bufferPosition, batch); + bufferPosition += batch; + sourceIndex += batch; + length -= batch; + } + } + + @Override + public void writeHeader(int value) + { + buffer[bufferPosition] = (byte) (value & 0x00_00FF); + bufferPosition += 1; + buffer[bufferPosition] = (byte) ((value & 0x00_FF00) >> 8); + bufferPosition += 1; + buffer[bufferPosition] = (byte) ((value & 0xFF_0000) >> 16); + bufferPosition += 1; + } + + @Override + public String toString() + { + StringBuilder builder = new StringBuilder("OrcLazyChunkedOutputBuffer{"); + builder.append("position=").append(size()); + builder.append('}'); + return builder.toString(); + } + + private int ensureBatchSize(int length) + { + // no room + if (bufferPosition >= buffer.length) { + closeChunk(length); + } + return min(length, buffer.length - bufferPosition); + } + + private void closeChunk(int length) + { + // add trimmed view of slice to closed slices + closedBuffers.add(buffer); + closedBufferLengths.add(bufferPosition); + closedBuffersRetainedSize += buffer.length; + + // create a new buffer + buffer = new byte[length]; + + streamOffset += bufferPosition; + bufferPosition = 0; + } +} diff --git a/presto-orc/src/main/java/com/facebook/presto/orc/OrcOutputBuffer.java b/presto-orc/src/main/java/com/facebook/presto/orc/OrcOutputBuffer.java index d542c47ae55fe..f1ac0f465b71b 100644 --- a/presto-orc/src/main/java/com/facebook/presto/orc/OrcOutputBuffer.java +++ b/presto-orc/src/main/java/com/facebook/presto/orc/OrcOutputBuffer.java @@ -58,13 +58,13 @@ public class OrcOutputBuffer private final int maxOutputBufferChunkSize; private final int minCompressibleSize; private final boolean resetOutputBuffer; - + private final boolean lazyOutputBuffer; private final CompressionBufferPool compressionBufferPool; private final Optional dwrfEncryptor; @Nullable private final Compressor compressor; - private ChunkedSliceOutput compressedOutputStream; + private OrcChunkedOutputBuffer compressedOutputStream; private Slice slice; private byte[] buffer; @@ -89,11 +89,12 @@ public OrcOutputBuffer(ColumnWriterOptions columnWriterOptions, Optional 0) { @@ -475,7 +498,12 @@ private void flushBufferToOutputStream() private void initCompressedOutputStream() { checkState(compressedOutputStream == null, "compressedOutputStream is already initialized"); - compressedOutputStream = new ChunkedSliceOutput(minOutputBufferChunkSize, maxOutputBufferChunkSize, resetOutputBuffer); + if (!lazyOutputBuffer) { + compressedOutputStream = new ChunkedSliceOutput(minOutputBufferChunkSize, maxOutputBufferChunkSize, resetOutputBuffer); + } + else { + compressedOutputStream = new OrcLazyChunkedOutputBuffer(); + } } private void writeChunkToOutputStream(byte[] chunk, int offset, int length) @@ -485,7 +513,8 @@ private void writeChunkToOutputStream(byte[] chunk, int offset, int length) } if (compressor == null && !dwrfEncryptor.isPresent()) { - compressedOutputStream.write(chunk, offset, length); + compressedOutputStream.ensureAvailable(1, length); + compressedOutputStream.writeBytes(chunk, offset, length); return; } @@ -527,9 +556,8 @@ private void writeChunkToOutputStream(byte[] chunk, int offset, int length) private void writeChunkedOutput(byte[] chunk, int offset, int length, int header) { - compressedOutputStream.write(header & 0x00_00FF); - compressedOutputStream.write((header & 0x00_FF00) >> 8); - compressedOutputStream.write((header & 0xFF_0000) >> 16); + compressedOutputStream.ensureAvailable(3, length + 3); + compressedOutputStream.writeHeader(header); compressedOutputStream.writeBytes(chunk, offset, length); } diff --git a/presto-orc/src/main/java/com/facebook/presto/orc/OrcWriter.java b/presto-orc/src/main/java/com/facebook/presto/orc/OrcWriter.java index 81a6bb5de90b1..3a9db53789f18 100644 --- a/presto-orc/src/main/java/com/facebook/presto/orc/OrcWriter.java +++ b/presto-orc/src/main/java/com/facebook/presto/orc/OrcWriter.java @@ -237,6 +237,7 @@ public OrcWriter( .setMapStatisticsEnabled(options.isMapStatisticsEnabled()) .setMaxFlattenedMapKeyCount(options.getMaxFlattenedMapKeyCount()) .setResetOutputBuffer(options.isResetOutputBuffer()) + .setLazyOutputBuffer(options.isLazyOutputBuffer()) .build(); recordValidation(validation -> validation.setCompression(compressionKind)); recordValidation(validation -> validation.setFlattenedNodes(flattenedNodes)); diff --git a/presto-orc/src/main/java/com/facebook/presto/orc/OrcWriterOptions.java b/presto-orc/src/main/java/com/facebook/presto/orc/OrcWriterOptions.java index e1fa4652beaa6..e051d3434a6f6 100644 --- a/presto-orc/src/main/java/com/facebook/presto/orc/OrcWriterOptions.java +++ b/presto-orc/src/main/java/com/facebook/presto/orc/OrcWriterOptions.java @@ -50,7 +50,7 @@ public class OrcWriterOptions public static final boolean DEFAULT_STRING_DICTIONARY_ENCODING_ENABLED = true; public static final boolean DEFAULT_STRING_DICTIONARY_SORTING_ENABLED = true; public static final boolean DEFAULT_RESET_OUTPUT_BUFFER = false; - + public static final boolean DEFAULT_LAZY_OUTPUT_BUFFER = false; private final OrcWriterFlushPolicy flushPolicy; private final int rowGroupMaxRowCount; private final DataSize dictionaryMaxMemory; @@ -76,6 +76,7 @@ public class OrcWriterOptions private final boolean mapStatisticsEnabled; private final int maxFlattenedMapKeyCount; private final boolean resetOutputBuffer; + private final boolean lazyOutputBuffer; /** * Contains indexes of columns (not nodes!) for which writer should use flattened encoding, e.g. flat maps. @@ -104,7 +105,8 @@ private OrcWriterOptions( Set flattenedColumns, boolean mapStatisticsEnabled, int maxFlattenedMapKeyCount, - boolean resetOutputBuffer) + boolean resetOutputBuffer, + boolean lazyOutputBuffer) { requireNonNull(flushPolicy, "flushPolicy is null"); checkArgument(rowGroupMaxRowCount >= 1, "rowGroupMaxRowCount must be at least 1"); @@ -143,6 +145,7 @@ private OrcWriterOptions( this.mapStatisticsEnabled = mapStatisticsEnabled; this.maxFlattenedMapKeyCount = maxFlattenedMapKeyCount; this.resetOutputBuffer = resetOutputBuffer; + this.lazyOutputBuffer = lazyOutputBuffer; } public OrcWriterFlushPolicy getFlushPolicy() @@ -255,6 +258,11 @@ public boolean isResetOutputBuffer() return resetOutputBuffer; } + public boolean isLazyOutputBuffer() + { + return lazyOutputBuffer; + } + @Override public String toString() { @@ -279,6 +287,7 @@ public String toString() .add("mapStatisticsEnabled", mapStatisticsEnabled) .add("maxFlattenedMapKeyCount", maxFlattenedMapKeyCount) .add("resetOutputBuffer", resetOutputBuffer) + .add("lazyOutputBuffer", lazyOutputBuffer) .toString(); } @@ -318,6 +327,7 @@ public static class Builder private boolean mapStatisticsEnabled; private int maxFlattenedMapKeyCount = DEFAULT_MAX_FLATTENED_MAP_KEY_COUNT; private boolean resetOutputBuffer = DEFAULT_RESET_OUTPUT_BUFFER; + private boolean lazyOutputBuffer = DEFAULT_LAZY_OUTPUT_BUFFER; public Builder withFlushPolicy(OrcWriterFlushPolicy flushPolicy) { @@ -465,6 +475,12 @@ public Builder withResetOutputBuffer(boolean resetOutputBuffer) return this; } + public Builder withLazyOutputBuffer(boolean lazyOutputBuffer) + { + this.lazyOutputBuffer = lazyOutputBuffer; + return this; + } + public OrcWriterOptions build() { Optional dwrfWriterOptions; @@ -497,7 +513,8 @@ public OrcWriterOptions build() flattenedColumns, mapStatisticsEnabled, maxFlattenedMapKeyCount, - resetOutputBuffer); + resetOutputBuffer, + lazyOutputBuffer); } } } diff --git a/presto-orc/src/test/java/com/facebook/presto/orc/TestOrcWriterOptions.java b/presto-orc/src/test/java/com/facebook/presto/orc/TestOrcWriterOptions.java index 1c2c7a0a33d7d..8b02f9c55f801 100644 --- a/presto-orc/src/test/java/com/facebook/presto/orc/TestOrcWriterOptions.java +++ b/presto-orc/src/test/java/com/facebook/presto/orc/TestOrcWriterOptions.java @@ -161,6 +161,7 @@ public void testToString() boolean mapStatisticsEnabled = true; int maxFlattenedMapKeyCount = 27; boolean resetOutputBuffer = false; + boolean lazyOutputBuffer = false; OrcWriterOptions writerOptions = OrcWriterOptions.builder() .withFlushPolicy(DefaultOrcWriterFlushPolicy.builder() @@ -187,6 +188,7 @@ public void testToString() .withMapStatisticsEnabled(mapStatisticsEnabled) .withMaxFlattenedMapKeyCount(maxFlattenedMapKeyCount) .withResetOutputBuffer(resetOutputBuffer) + .withLazyOutputBuffer(lazyOutputBuffer) .build(); String expectedString = "OrcWriterOptions{flushPolicy=DefaultOrcWriterFlushPolicy{stripeMaxRowCount=1100000, " + @@ -197,7 +199,7 @@ public void testToString() "stringDictionarySortingEnabled=true, stringDictionaryEncodingEnabled=true, " + "dwrfWriterOptions=Optional[DwrfStripeCacheOptions{stripeCacheMode=INDEX_AND_FOOTER, stripeCacheMaxSize=4MB}], " + "ignoreDictionaryRowGroupSizes=false, preserveDirectEncodingStripeCount=0, flattenedColumns=[4], mapStatisticsEnabled=true, " + - "maxFlattenedMapKeyCount=27, resetOutputBuffer=false}"; + "maxFlattenedMapKeyCount=27, resetOutputBuffer=false, lazyOutputBuffer=false}"; assertEquals(expectedString, writerOptions.toString()); } }