Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,6 @@
import static com.facebook.presto.orc.WriterStats.FlushReason.MAX_BYTES;
import static com.facebook.presto.orc.WriterStats.FlushReason.MAX_ROWS;
import static com.facebook.presto.orc.metadata.ColumnEncoding.ColumnEncodingKind.DIRECT;
import static com.facebook.presto.orc.metadata.ColumnEncoding.DEFAULT_SEQUENCE_ID;
import static com.facebook.presto.orc.metadata.DwrfMetadataWriter.toFileStatistics;
import static com.facebook.presto.orc.metadata.DwrfMetadataWriter.toStripeEncryptionGroup;
import static com.facebook.presto.orc.metadata.PostScript.MAGIC;
Expand Down Expand Up @@ -241,7 +240,6 @@ public OrcWriter(
Type fieldType = types.get(fieldId);
ColumnWriter columnWriter = createColumnWriter(
fieldColumnIndex,
DEFAULT_SEQUENCE_ID,
orcTypes,
fieldType,
columnWriterOptions,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -311,10 +311,10 @@ private static Stream toStream(DwrfProto.Stream stream)
{
return new Stream(
stream.getColumn(),
stream.getSequence(),
toStreamKind(stream.getKind()),
toIntExact(stream.getLength()),
stream.getUseVInts(),
stream.getSequence(),
stream.hasOffset() ? Optional.of(stream.getOffset()) : Optional.empty());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,6 @@ private static DwrfProto.Stream toStream(Stream stream)
{
DwrfProto.Stream.Builder streamBuilder = DwrfProto.Stream.newBuilder()
.setColumn(stream.getColumn())
.setSequence(stream.getSequence())
.setKind(toStreamKind(stream.getStreamKind()))
.setLength(stream.getLength())
.setUseVInts(stream.isUseVInts());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
import java.util.List;
import java.util.Map.Entry;

import static com.facebook.presto.orc.metadata.ColumnEncoding.DEFAULT_SEQUENCE_ID;
import static com.google.common.base.Preconditions.checkArgument;
import static java.lang.Math.toIntExact;
import static java.util.stream.Collectors.toList;
Expand Down Expand Up @@ -284,10 +283,6 @@ public int writeStripeFooter(SliceOutput output, StripeFooter footer)

private static OrcProto.Stream toStream(Stream stream)
{
checkArgument(
stream.getSequence() == DEFAULT_SEQUENCE_ID,
"Writing streams with non-zero sequence IDs is not supported in ORC : {} ",
stream);
return OrcProto.Stream.newBuilder()
.setColumn(stream.getColumn())
.setKind(toStreamKind(stream.getStreamKind()))
Expand Down Expand Up @@ -320,8 +315,7 @@ private static OrcProto.ColumnEncoding toColumnEncoding(ColumnEncoding columnEnc
{
checkArgument(
!columnEncodings.getAdditionalSequenceEncodings().isPresent(),
"Writing columns with non-zero sequence IDs is not supported in ORC: {}",
columnEncodings);
"Writing columns with non-zero sequence IDs is not supported in ORC: " + columnEncodings);

return OrcProto.ColumnEncoding.newBuilder()
.setKind(toColumnEncoding(columnEncodings.getColumnEncodingKind()))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@

import java.util.Optional;

import static com.facebook.presto.orc.metadata.ColumnEncoding.DEFAULT_SEQUENCE_ID;
import static com.google.common.base.MoreObjects.toStringHelper;
import static java.util.Objects.requireNonNull;

Expand Down Expand Up @@ -65,15 +64,10 @@ public StreamArea getStreamArea()

public Stream(int column, StreamKind streamKind, int length, boolean useVInts)
{
this(column, DEFAULT_SEQUENCE_ID, streamKind, length, useVInts, Optional.empty());
this(column, streamKind, length, useVInts, ColumnEncoding.DEFAULT_SEQUENCE_ID, Optional.empty());
}

public Stream(int column, int sequence, StreamKind streamKind, int length, boolean useVInts)
{
this(column, sequence, streamKind, length, useVInts, Optional.empty());
}

public Stream(int column, int sequence, StreamKind streamKind, int length, boolean useVInts, Optional<Long> offset)
public Stream(int column, StreamKind streamKind, int length, boolean useVInts, int sequence, Optional<Long> offset)
{
this.column = column;
this.streamKind = requireNonNull(streamKind, "streamKind is null");
Expand Down Expand Up @@ -130,10 +124,10 @@ public Stream withOffset(long offset)
{
return new Stream(
this.column,
this.sequence,
this.streamKind,
this.length,
this.useVInts,
this.sequence,
Optional.of(offset));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -157,10 +157,10 @@ public List<BooleanStreamCheckpoint> getCheckpoints()
}

@Override
public StreamDataOutput getStreamDataOutput(int column, int dwrfSequence)
public StreamDataOutput getStreamDataOutput(int column)
{
checkState(closed);
return byteOutputStream.getStreamDataOutput(column, dwrfSequence);
return byteOutputStream.getStreamDataOutput(column);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,9 +89,9 @@ public List<ByteArrayStreamCheckpoint> getCheckpoints()
}

@Override
public StreamDataOutput getStreamDataOutput(int column, int dwrfSequence)
public StreamDataOutput getStreamDataOutput(int column)
{
return new StreamDataOutput(buffer::writeDataTo, new Stream(column, dwrfSequence, streamKind, toIntExact(buffer.getOutputDataSize()), false));
return new StreamDataOutput(buffer::writeDataTo, new Stream(column, streamKind, toIntExact(buffer.getOutputDataSize()), false));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,9 +149,9 @@ public List<ByteStreamCheckpoint> getCheckpoints()
}

@Override
public StreamDataOutput getStreamDataOutput(int column, int dwrfSequence)
public StreamDataOutput getStreamDataOutput(int column)
{
return new StreamDataOutput(buffer::writeDataTo, new Stream(column, dwrfSequence, DATA, toIntExact(buffer.getOutputDataSize()), false));
return new StreamDataOutput(buffer::writeDataTo, new Stream(column, DATA, toIntExact(buffer.getOutputDataSize()), false));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,9 +110,9 @@ public List<DecimalStreamCheckpoint> getCheckpoints()
}

@Override
public StreamDataOutput getStreamDataOutput(int column, int dwrfSequence)
public StreamDataOutput getStreamDataOutput(int column)
{
return new StreamDataOutput(buffer::writeDataTo, new Stream(column, dwrfSequence, DATA, toIntExact(buffer.getOutputDataSize()), true));
return new StreamDataOutput(buffer::writeDataTo, new Stream(column, DATA, toIntExact(buffer.getOutputDataSize()), true));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,9 +71,9 @@ public List<DoubleStreamCheckpoint> getCheckpoints()
}

@Override
public StreamDataOutput getStreamDataOutput(int column, int dwrfSequence)
public StreamDataOutput getStreamDataOutput(int column)
{
return new StreamDataOutput(buffer::writeDataTo, new Stream(column, dwrfSequence, DATA, toIntExact(buffer.getOutputDataSize()), false));
return new StreamDataOutput(buffer::writeDataTo, new Stream(column, DATA, toIntExact(buffer.getOutputDataSize()), false));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,9 +71,9 @@ public List<FloatStreamCheckpoint> getCheckpoints()
}

@Override
public StreamDataOutput getStreamDataOutput(int column, int dwrfSequence)
public StreamDataOutput getStreamDataOutput(int column)
{
return new StreamDataOutput(buffer::writeDataTo, new Stream(column, dwrfSequence, DATA, toIntExact(buffer.getOutputDataSize()), false));
return new StreamDataOutput(buffer::writeDataTo, new Stream(column, DATA, toIntExact(buffer.getOutputDataSize()), false));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,9 +80,9 @@ public List<LongStreamCheckpoint> getCheckpoints()
}

@Override
public StreamDataOutput getStreamDataOutput(int column, int dwrfSequence)
public StreamDataOutput getStreamDataOutput(int column)
{
return new StreamDataOutput(buffer::writeDataTo, new Stream(column, dwrfSequence, streamKind, toIntExact(buffer.getOutputDataSize()), true));
return new StreamDataOutput(buffer::writeDataTo, new Stream(column, streamKind, toIntExact(buffer.getOutputDataSize()), true));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -190,9 +190,9 @@ public List<LongStreamCheckpoint> getCheckpoints()
}

@Override
public StreamDataOutput getStreamDataOutput(int column, int dwrfSequence)
public StreamDataOutput getStreamDataOutput(int column)
{
return new StreamDataOutput(buffer::writeDataTo, new Stream(column, dwrfSequence, streamKind, toIntExact(buffer.getOutputDataSize()), true));
return new StreamDataOutput(buffer::writeDataTo, new Stream(column, streamKind, toIntExact(buffer.getOutputDataSize()), true));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -748,9 +748,9 @@ public List<LongStreamCheckpoint> getCheckpoints()
}

@Override
public StreamDataOutput getStreamDataOutput(int column, int dwrfSequence)
public StreamDataOutput getStreamDataOutput(int column)
{
return new StreamDataOutput(buffer::writeDataTo, new Stream(column, dwrfSequence, streamKind, toIntExact(buffer.getOutputDataSize()), true));
return new StreamDataOutput(buffer::writeDataTo, new Stream(column, streamKind, toIntExact(buffer.getOutputDataSize()), true));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,15 +102,15 @@ public Optional<List<BooleanStreamCheckpoint>> getCheckpoints()
return Optional.of(booleanOutputStream.getCheckpoints());
}

public Optional<StreamDataOutput> getStreamDataOutput(int column, int dwrfSequence)
public Optional<StreamDataOutput> getStreamDataOutput(int column)
{
checkArgument(closed);
if (booleanOutputStream == null) {
return Optional.empty();
}
StreamDataOutput streamDataOutput = booleanOutputStream.getStreamDataOutput(column, dwrfSequence);
StreamDataOutput streamDataOutput = booleanOutputStream.getStreamDataOutput(column);
// rewrite the DATA stream created by the boolean output stream to a PRESENT stream
Stream stream = new Stream(column, dwrfSequence, PRESENT, toIntExact(streamDataOutput.size()), streamDataOutput.getStream().isUseVInts());
Stream stream = new Stream(column, PRESENT, toIntExact(streamDataOutput.size()), streamDataOutput.getStream().isUseVInts());
return Optional.of(new StreamDataOutput(
sliceOutput -> {
streamDataOutput.writeData(sliceOutput);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ public interface ValueOutputStream<C extends StreamCheckpoint>

List<C> getCheckpoints();

StreamDataOutput getStreamDataOutput(int column, int dwrfSequence);
StreamDataOutput getStreamDataOutput(int column);

/**
* This method returns the size of the flushed data plus any unflushed data.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@ public class BooleanColumnWriter
private static final ColumnEncoding COLUMN_ENCODING = new ColumnEncoding(DIRECT, 0);

private final int column;
private final int dwrfSequence;
private final Type type;
private final boolean compressed;
private final BooleanOutputStream dataStream;
Expand All @@ -69,19 +68,16 @@ public class BooleanColumnWriter

public BooleanColumnWriter(
int column,
int dwrfSequence,
Type type,
ColumnWriterOptions columnWriterOptions,
Optional<DwrfDataEncryptor> dwrfEncryptor,
MetadataWriter metadataWriter)
{
checkArgument(column >= 0, "column is negative");
checkArgument(dwrfSequence >= 0, "sequence is negative");
requireNonNull(columnWriterOptions, "columnWriterOptions is null");
requireNonNull(dwrfEncryptor, "dwrfEncryptor is null");
requireNonNull(metadataWriter, "metadataWriter is null");
this.column = column;
this.dwrfSequence = dwrfSequence;
this.type = requireNonNull(type, "type is null");
this.compressed = columnWriterOptions.getCompressionKind() != NONE;
this.dataStream = new BooleanOutputStream(columnWriterOptions, dwrfEncryptor);
Expand Down Expand Up @@ -169,7 +165,7 @@ public List<StreamDataOutput> getIndexStreams()
}

Slice slice = metadataWriter.writeRowIndexes(rowGroupIndexes.build());
Stream stream = new Stream(column, dwrfSequence, StreamKind.ROW_INDEX, slice.length(), false);
Stream stream = new Stream(column, StreamKind.ROW_INDEX, slice.length(), false);
return ImmutableList.of(new StreamDataOutput(slice, stream));
}

Expand All @@ -190,8 +186,8 @@ public List<StreamDataOutput> getDataStreams()
checkState(closed);

ImmutableList.Builder<StreamDataOutput> outputDataStreams = ImmutableList.builder();
presentStream.getStreamDataOutput(column, dwrfSequence).ifPresent(outputDataStreams::add);
outputDataStreams.add(dataStream.getStreamDataOutput(column, dwrfSequence));
presentStream.getStreamDataOutput(column).ifPresent(outputDataStreams::add);
outputDataStreams.add(dataStream.getStreamDataOutput(column));
return outputDataStreams.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@ public class ByteColumnWriter
private static final ColumnEncoding COLUMN_ENCODING = new ColumnEncoding(DIRECT, 0);

private final int column;
private final int dwrfSequence;
private final Type type;
private final boolean compressed;
private final ByteOutputStream dataStream;
Expand All @@ -67,15 +66,13 @@ public class ByteColumnWriter

private boolean closed;

public ByteColumnWriter(int column, int dwrfSequence, Type type, ColumnWriterOptions columnWriterOptions, Optional<DwrfDataEncryptor> dwrfEncryptor, MetadataWriter metadataWriter)
public ByteColumnWriter(int column, Type type, ColumnWriterOptions columnWriterOptions, Optional<DwrfDataEncryptor> dwrfEncryptor, MetadataWriter metadataWriter)
{
checkArgument(column >= 0, "column is negative");
checkArgument(dwrfSequence >= 0, "sequence is negative");
requireNonNull(columnWriterOptions, "columnWriterOptions is null");
requireNonNull(dwrfEncryptor, "dwrfEncryptor is null");
requireNonNull(metadataWriter, "metadataWriter is null");
this.column = column;
this.dwrfSequence = dwrfSequence;
this.type = requireNonNull(type, "type is null");
this.compressed = columnWriterOptions.getCompressionKind() != NONE;
this.dataStream = new ByteOutputStream(columnWriterOptions, dwrfEncryptor);
Expand Down Expand Up @@ -162,7 +159,7 @@ public List<StreamDataOutput> getIndexStreams()
}

Slice slice = metadataWriter.writeRowIndexes(rowGroupIndexes.build());
Stream stream = new Stream(column, dwrfSequence, StreamKind.ROW_INDEX, slice.length(), false);
Stream stream = new Stream(column, StreamKind.ROW_INDEX, slice.length(), false);
return ImmutableList.of(new StreamDataOutput(slice, stream));
}

Expand All @@ -183,8 +180,8 @@ public List<StreamDataOutput> getDataStreams()
checkState(closed);

ImmutableList.Builder<StreamDataOutput> outputDataStreams = ImmutableList.builder();
presentStream.getStreamDataOutput(column, dwrfSequence).ifPresent(outputDataStreams::add);
outputDataStreams.add(dataStream.getStreamDataOutput(column, dwrfSequence));
presentStream.getStreamDataOutput(column).ifPresent(outputDataStreams::add);
outputDataStreams.add(dataStream.getStreamDataOutput(column));
return outputDataStreams.build();
}

Expand Down
Loading