Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
import static java.lang.Math.toIntExact;

public final class ChunkedSliceOutput
extends SliceOutput
extends SliceOutput implements OrcChunkedOutputBuffer
{
private static final int INSTANCE_SIZE = ClassLayout.parseClass(ChunkedSliceOutput.class).instanceSize();
private static final int MINIMUM_CHUNK_SIZE = 256;
Expand Down Expand Up @@ -202,6 +202,20 @@ public void writeBytes(byte[] source, int sourceIndex, int length)
}
}

@Override
public void ensureAvailable(int minLength, int length)
{
ensureWritableBytes(minLength);
}

@Override
public void writeHeader(int header)
{
write(header & 0x00_00FF);
write((header & 0x00_FF00) >> 8);
write((header & 0xFF_0000) >> 16);
}

@Override
public void writeBytes(InputStream in, int length)
throws IOException
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.OptionalInt;
import java.util.Set;

import static com.facebook.presto.orc.OrcWriterOptions.DEFAULT_LAZY_OUTPUT_BUFFER;
import static com.facebook.presto.orc.OrcWriterOptions.DEFAULT_MAX_COMPRESSION_BUFFER_SIZE;
import static com.facebook.presto.orc.OrcWriterOptions.DEFAULT_MAX_FLATTENED_MAP_KEY_COUNT;
import static com.facebook.presto.orc.OrcWriterOptions.DEFAULT_MAX_OUTPUT_BUFFER_CHUNK_SIZE;
Expand Down Expand Up @@ -52,6 +53,7 @@ public class ColumnWriterOptions
private final boolean mapStatisticsEnabled;
private final int maxFlattenedMapKeyCount;
private final boolean resetOutputBuffer;
private final boolean lazyOutputBuffer;

public ColumnWriterOptions(
CompressionKind compressionKind,
Expand All @@ -69,7 +71,8 @@ public ColumnWriterOptions(
Set<Integer> flattenedNodes,
boolean mapStatisticsEnabled,
int maxFlattenedMapKeyCount,
boolean resetOutputBuffer)
boolean resetOutputBuffer,
boolean lazyOutputBuffer)
{
checkArgument(maxFlattenedMapKeyCount > 0, "maxFlattenedMapKeyCount must be positive: %s", maxFlattenedMapKeyCount);
requireNonNull(compressionMaxBufferSize, "compressionMaxBufferSize is null");
Expand All @@ -90,6 +93,7 @@ public ColumnWriterOptions(
this.mapStatisticsEnabled = mapStatisticsEnabled;
this.maxFlattenedMapKeyCount = maxFlattenedMapKeyCount;
this.resetOutputBuffer = resetOutputBuffer;
this.lazyOutputBuffer = lazyOutputBuffer;
}

public CompressionKind getCompressionKind()
Expand Down Expand Up @@ -171,6 +175,11 @@ public boolean isResetOutputBuffer()
{
return resetOutputBuffer;
}

public boolean isLazyOutputBuffer()
{
return lazyOutputBuffer;
}
/**
* Create a copy of this ColumnWriterOptions, but disable string and integer dictionary encodings.
*/
Expand Down Expand Up @@ -200,7 +209,8 @@ public Builder toBuilder()
.setFlattenedNodes(getFlattenedNodes())
.setMapStatisticsEnabled(isMapStatisticsEnabled())
.setMaxFlattenedMapKeyCount(getMaxFlattenedMapKeyCount())
.setResetOutputBuffer(resetOutputBuffer);
.setResetOutputBuffer(resetOutputBuffer)
.setLazyOutputBuffer(lazyOutputBuffer);
}

public static Builder builder()
Expand All @@ -226,6 +236,7 @@ public static class Builder
private boolean mapStatisticsEnabled;
private int maxFlattenedMapKeyCount = DEFAULT_MAX_FLATTENED_MAP_KEY_COUNT;
private boolean resetOutputBuffer = DEFAULT_RESET_OUTPUT_BUFFER;
private boolean lazyOutputBuffer = DEFAULT_LAZY_OUTPUT_BUFFER;

private Builder() {}

Expand Down Expand Up @@ -325,6 +336,12 @@ public Builder setResetOutputBuffer(boolean resetOutputBuffer)
return this;
}

public Builder setLazyOutputBuffer(boolean lazyOutputBuffer)
{
this.lazyOutputBuffer = lazyOutputBuffer;
return this;
}

public ColumnWriterOptions build()
{
return new ColumnWriterOptions(
Expand All @@ -343,7 +360,8 @@ public ColumnWriterOptions build()
flattenedNodes,
mapStatisticsEnabled,
maxFlattenedMapKeyCount,
resetOutputBuffer);
resetOutputBuffer,
lazyOutputBuffer);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.orc;

import io.airlift.slice.SliceOutput;

public interface OrcChunkedOutputBuffer
{
void writeTo(SliceOutput outputStream);

void reset();

int size();

long getRetainedSize();

// need to be called before writing
void ensureAvailable(int minLength, int length);

void writeHeader(int value);

void writeBytes(byte[] source, int sourceIndex, int length);

String toString();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.orc;

import io.airlift.slice.SliceOutput;
import org.openjdk.jol.info.ClassLayout;

import java.util.ArrayList;
import java.util.List;

import static java.lang.Math.min;
import static java.lang.Math.toIntExact;

public class OrcLazyChunkedOutputBuffer
implements OrcChunkedOutputBuffer
{
private static final int INSTANCE_SIZE = ClassLayout.parseClass(ChunkedSliceOutput.class).instanceSize();
private byte[] buffer;
private final List<byte[]> closedBuffers = new ArrayList<>();
private final List<Integer> closedBufferLengths = new ArrayList<>();
private long closedBuffersRetainedSize;

/**
* Offset of buffer within stream.
*/
private long streamOffset;

/**
* Current position for writing in buffer.
*/
private int bufferPosition;

@Override
public void writeTo(SliceOutput outputStream)
{
for (int i = 0; i < closedBuffers.size(); i++) {
outputStream.writeBytes(closedBuffers.get(i), 0, closedBufferLengths.get(i));
}
if (bufferPosition > 0) {
outputStream.writeBytes(buffer, 0, bufferPosition);
}
}

@Override
public void reset()
{
closedBuffers.clear();
closedBufferLengths.clear();
closedBuffersRetainedSize = 0;
streamOffset = 0;
bufferPosition = 0;
}

@Override
public int size()
{
return toIntExact(streamOffset + bufferPosition);
}

@Override
public long getRetainedSize()
{
return buffer.length + closedBuffersRetainedSize + INSTANCE_SIZE;
}

// need to be called before writing
@Override
public void ensureAvailable(int minLength, int length)
{
if (buffer == null) {
buffer = new byte[length];
bufferPosition = 0;
}
// no room for minLength
if (bufferPosition + minLength > buffer.length) {
closeChunk(length);
}
}

@Override
public void writeBytes(byte[] source, int sourceIndex, int length)
{
while (length > 0) {
int batch = ensureBatchSize(length);
System.arraycopy(source, sourceIndex, buffer, bufferPosition, batch);
bufferPosition += batch;
sourceIndex += batch;
length -= batch;
}
}

@Override
public void writeHeader(int value)
{
buffer[bufferPosition] = (byte) (value & 0x00_00FF);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please change to a more concise:

buffer[bufferPosition++] = (byte) (value & 0x00_00FF);
buffer[bufferPosition++] = (byte) ((value & 0x00_FF00) >> 8);
buffer[bufferPosition++] = (byte) ((value & 0xFF_0000) >> 16);

bufferPosition += 1;
buffer[bufferPosition] = (byte) ((value & 0x00_FF00) >> 8);
bufferPosition += 1;
buffer[bufferPosition] = (byte) ((value & 0xFF_0000) >> 16);
bufferPosition += 1;
}

@Override
public String toString()
{
StringBuilder builder = new StringBuilder("OrcLazyChunkedOutputBuffer{");
builder.append("position=").append(size());
builder.append('}');
return builder.toString();
}

private int ensureBatchSize(int length)
{
// no room
if (bufferPosition >= buffer.length) {
closeChunk(length);
}
return min(length, buffer.length - bufferPosition);
}

private void closeChunk(int length)
{
// add trimmed view of slice to closed slices
closedBuffers.add(buffer);
closedBufferLengths.add(bufferPosition);
closedBuffersRetainedSize += buffer.length;

// create a new buffer
buffer = new byte[length];

streamOffset += bufferPosition;
bufferPosition = 0;
}
}
Loading
Loading