diff --git a/presto-orc/src/main/java/com/facebook/presto/orc/OrcWriter.java b/presto-orc/src/main/java/com/facebook/presto/orc/OrcWriter.java index 3f155982f9072..811d06e4a57c8 100644 --- a/presto-orc/src/main/java/com/facebook/presto/orc/OrcWriter.java +++ b/presto-orc/src/main/java/com/facebook/presto/orc/OrcWriter.java @@ -76,6 +76,7 @@ import static com.facebook.presto.orc.WriterStats.FlushReason.MAX_BYTES; import static com.facebook.presto.orc.WriterStats.FlushReason.MAX_ROWS; import static com.facebook.presto.orc.metadata.ColumnEncoding.ColumnEncodingKind.DIRECT; +import static com.facebook.presto.orc.metadata.ColumnEncoding.DEFAULT_SEQUENCE_ID; import static com.facebook.presto.orc.metadata.DwrfMetadataWriter.toFileStatistics; import static com.facebook.presto.orc.metadata.DwrfMetadataWriter.toStripeEncryptionGroup; import static com.facebook.presto.orc.metadata.PostScript.MAGIC; @@ -240,6 +241,7 @@ public OrcWriter( Type fieldType = types.get(fieldId); ColumnWriter columnWriter = createColumnWriter( fieldColumnIndex, + DEFAULT_SEQUENCE_ID, orcTypes, fieldType, columnWriterOptions, diff --git a/presto-orc/src/main/java/com/facebook/presto/orc/metadata/DwrfMetadataReader.java b/presto-orc/src/main/java/com/facebook/presto/orc/metadata/DwrfMetadataReader.java index 26f0ce7a7c275..2a7d34afd0838 100644 --- a/presto-orc/src/main/java/com/facebook/presto/orc/metadata/DwrfMetadataReader.java +++ b/presto-orc/src/main/java/com/facebook/presto/orc/metadata/DwrfMetadataReader.java @@ -299,10 +299,10 @@ private static Stream toStream(DwrfProto.Stream stream) { return new Stream( stream.getColumn(), + stream.getSequence(), toStreamKind(stream.getKind()), toIntExact(stream.getLength()), stream.getUseVInts(), - stream.getSequence(), stream.hasOffset() ? Optional.of(stream.getOffset()) : Optional.empty()); } diff --git a/presto-orc/src/main/java/com/facebook/presto/orc/metadata/DwrfMetadataWriter.java b/presto-orc/src/main/java/com/facebook/presto/orc/metadata/DwrfMetadataWriter.java index c8c651f43aaa7..06f4ddd837885 100644 --- a/presto-orc/src/main/java/com/facebook/presto/orc/metadata/DwrfMetadataWriter.java +++ b/presto-orc/src/main/java/com/facebook/presto/orc/metadata/DwrfMetadataWriter.java @@ -252,6 +252,7 @@ private static DwrfProto.Stream toStream(Stream stream) { DwrfProto.Stream.Builder streamBuilder = DwrfProto.Stream.newBuilder() .setColumn(stream.getColumn()) + .setSequence(stream.getSequence()) .setKind(toStreamKind(stream.getStreamKind())) .setLength(stream.getLength()) .setUseVInts(stream.isUseVInts()); diff --git a/presto-orc/src/main/java/com/facebook/presto/orc/metadata/OrcMetadataWriter.java b/presto-orc/src/main/java/com/facebook/presto/orc/metadata/OrcMetadataWriter.java index 2b415c0094c1d..197e0313a532d 100644 --- a/presto-orc/src/main/java/com/facebook/presto/orc/metadata/OrcMetadataWriter.java +++ b/presto-orc/src/main/java/com/facebook/presto/orc/metadata/OrcMetadataWriter.java @@ -35,6 +35,7 @@ import java.util.List; import java.util.Map.Entry; +import static com.facebook.presto.orc.metadata.ColumnEncoding.DEFAULT_SEQUENCE_ID; import static com.google.common.base.Preconditions.checkArgument; import static java.lang.Math.toIntExact; import static java.util.stream.Collectors.toList; @@ -283,6 +284,10 @@ public int writeStripeFooter(SliceOutput output, StripeFooter footer) private static OrcProto.Stream toStream(Stream stream) { + checkArgument( + stream.getSequence() == DEFAULT_SEQUENCE_ID, + "Writing streams with non-zero sequence IDs is not supported in ORC : {} ", + stream); return OrcProto.Stream.newBuilder() .setColumn(stream.getColumn()) .setKind(toStreamKind(stream.getStreamKind())) @@ -315,7 +320,8 @@ private static OrcProto.ColumnEncoding toColumnEncoding(ColumnEncoding columnEnc { checkArgument( !columnEncodings.getAdditionalSequenceEncodings().isPresent(), - "Writing columns with non-zero sequence IDs is not supported in ORC: " + columnEncodings); + "Writing columns with non-zero sequence IDs is not supported in ORC: {}", + columnEncodings); return OrcProto.ColumnEncoding.newBuilder() .setKind(toColumnEncoding(columnEncodings.getColumnEncodingKind())) diff --git a/presto-orc/src/main/java/com/facebook/presto/orc/metadata/Stream.java b/presto-orc/src/main/java/com/facebook/presto/orc/metadata/Stream.java index 3280cff2403eb..5e39b09a0234f 100644 --- a/presto-orc/src/main/java/com/facebook/presto/orc/metadata/Stream.java +++ b/presto-orc/src/main/java/com/facebook/presto/orc/metadata/Stream.java @@ -15,6 +15,7 @@ import java.util.Optional; +import static com.facebook.presto.orc.metadata.ColumnEncoding.DEFAULT_SEQUENCE_ID; import static com.google.common.base.MoreObjects.toStringHelper; import static java.util.Objects.requireNonNull; @@ -47,10 +48,15 @@ public enum StreamKind public Stream(int column, StreamKind streamKind, int length, boolean useVInts) { - this(column, streamKind, length, useVInts, ColumnEncoding.DEFAULT_SEQUENCE_ID, Optional.empty()); + this(column, DEFAULT_SEQUENCE_ID, streamKind, length, useVInts, Optional.empty()); } - public Stream(int column, StreamKind streamKind, int length, boolean useVInts, int sequence, Optional offset) + public Stream(int column, int sequence, StreamKind streamKind, int length, boolean useVInts) + { + this(column, sequence, streamKind, length, useVInts, Optional.empty()); + } + + public Stream(int column, int sequence, StreamKind streamKind, int length, boolean useVInts, Optional offset) { this.column = column; this.streamKind = requireNonNull(streamKind, "streamKind is null"); @@ -107,10 +113,10 @@ public Stream withOffset(long offset) { return new Stream( this.column, + this.sequence, this.streamKind, this.length, this.useVInts, - this.sequence, Optional.of(offset)); } } diff --git a/presto-orc/src/main/java/com/facebook/presto/orc/stream/BooleanOutputStream.java b/presto-orc/src/main/java/com/facebook/presto/orc/stream/BooleanOutputStream.java index a025733b89b88..b20b66dc486f4 100644 --- a/presto-orc/src/main/java/com/facebook/presto/orc/stream/BooleanOutputStream.java +++ b/presto-orc/src/main/java/com/facebook/presto/orc/stream/BooleanOutputStream.java @@ -157,10 +157,10 @@ public List getCheckpoints() } @Override - public StreamDataOutput getStreamDataOutput(int column) + public StreamDataOutput getStreamDataOutput(int column, int dwrfSequence) { checkState(closed); - return byteOutputStream.getStreamDataOutput(column); + return byteOutputStream.getStreamDataOutput(column, dwrfSequence); } @Override diff --git a/presto-orc/src/main/java/com/facebook/presto/orc/stream/ByteArrayOutputStream.java b/presto-orc/src/main/java/com/facebook/presto/orc/stream/ByteArrayOutputStream.java index 096054988ff63..4dbfa981ae4ba 100644 --- a/presto-orc/src/main/java/com/facebook/presto/orc/stream/ByteArrayOutputStream.java +++ b/presto-orc/src/main/java/com/facebook/presto/orc/stream/ByteArrayOutputStream.java @@ -89,9 +89,9 @@ public List getCheckpoints() } @Override - public StreamDataOutput getStreamDataOutput(int column) + public StreamDataOutput getStreamDataOutput(int column, int dwrfSequence) { - return new StreamDataOutput(buffer::writeDataTo, new Stream(column, streamKind, toIntExact(buffer.getOutputDataSize()), false)); + return new StreamDataOutput(buffer::writeDataTo, new Stream(column, dwrfSequence, streamKind, toIntExact(buffer.getOutputDataSize()), false)); } @Override diff --git a/presto-orc/src/main/java/com/facebook/presto/orc/stream/ByteOutputStream.java b/presto-orc/src/main/java/com/facebook/presto/orc/stream/ByteOutputStream.java index 714ef7836b597..9e2f1c3cb3caa 100644 --- a/presto-orc/src/main/java/com/facebook/presto/orc/stream/ByteOutputStream.java +++ b/presto-orc/src/main/java/com/facebook/presto/orc/stream/ByteOutputStream.java @@ -149,9 +149,9 @@ public List getCheckpoints() } @Override - public StreamDataOutput getStreamDataOutput(int column) + public StreamDataOutput getStreamDataOutput(int column, int dwrfSequence) { - return new StreamDataOutput(buffer::writeDataTo, new Stream(column, DATA, toIntExact(buffer.getOutputDataSize()), false)); + return new StreamDataOutput(buffer::writeDataTo, new Stream(column, dwrfSequence, DATA, toIntExact(buffer.getOutputDataSize()), false)); } @Override diff --git a/presto-orc/src/main/java/com/facebook/presto/orc/stream/DecimalOutputStream.java b/presto-orc/src/main/java/com/facebook/presto/orc/stream/DecimalOutputStream.java index 105bd5fc91315..9ea1a46653bc4 100644 --- a/presto-orc/src/main/java/com/facebook/presto/orc/stream/DecimalOutputStream.java +++ b/presto-orc/src/main/java/com/facebook/presto/orc/stream/DecimalOutputStream.java @@ -110,9 +110,9 @@ public List getCheckpoints() } @Override - public StreamDataOutput getStreamDataOutput(int column) + public StreamDataOutput getStreamDataOutput(int column, int dwrfSequence) { - return new StreamDataOutput(buffer::writeDataTo, new Stream(column, DATA, toIntExact(buffer.getOutputDataSize()), true)); + return new StreamDataOutput(buffer::writeDataTo, new Stream(column, dwrfSequence, DATA, toIntExact(buffer.getOutputDataSize()), true)); } @Override diff --git a/presto-orc/src/main/java/com/facebook/presto/orc/stream/DoubleOutputStream.java b/presto-orc/src/main/java/com/facebook/presto/orc/stream/DoubleOutputStream.java index f54f6852403de..08497602be09c 100644 --- a/presto-orc/src/main/java/com/facebook/presto/orc/stream/DoubleOutputStream.java +++ b/presto-orc/src/main/java/com/facebook/presto/orc/stream/DoubleOutputStream.java @@ -71,9 +71,9 @@ public List getCheckpoints() } @Override - public StreamDataOutput getStreamDataOutput(int column) + public StreamDataOutput getStreamDataOutput(int column, int dwrfSequence) { - return new StreamDataOutput(buffer::writeDataTo, new Stream(column, DATA, toIntExact(buffer.getOutputDataSize()), false)); + return new StreamDataOutput(buffer::writeDataTo, new Stream(column, dwrfSequence, DATA, toIntExact(buffer.getOutputDataSize()), false)); } @Override diff --git a/presto-orc/src/main/java/com/facebook/presto/orc/stream/FloatOutputStream.java b/presto-orc/src/main/java/com/facebook/presto/orc/stream/FloatOutputStream.java index 36677db5c5a14..0f111e849e2dd 100644 --- a/presto-orc/src/main/java/com/facebook/presto/orc/stream/FloatOutputStream.java +++ b/presto-orc/src/main/java/com/facebook/presto/orc/stream/FloatOutputStream.java @@ -71,9 +71,9 @@ public List getCheckpoints() } @Override - public StreamDataOutput getStreamDataOutput(int column) + public StreamDataOutput getStreamDataOutput(int column, int dwrfSequence) { - return new StreamDataOutput(buffer::writeDataTo, new Stream(column, DATA, toIntExact(buffer.getOutputDataSize()), false)); + return new StreamDataOutput(buffer::writeDataTo, new Stream(column, dwrfSequence, DATA, toIntExact(buffer.getOutputDataSize()), false)); } @Override diff --git a/presto-orc/src/main/java/com/facebook/presto/orc/stream/LongOutputStreamDwrf.java b/presto-orc/src/main/java/com/facebook/presto/orc/stream/LongOutputStreamDwrf.java index aef02d2229813..cec1e81b5c623 100644 --- a/presto-orc/src/main/java/com/facebook/presto/orc/stream/LongOutputStreamDwrf.java +++ b/presto-orc/src/main/java/com/facebook/presto/orc/stream/LongOutputStreamDwrf.java @@ -80,9 +80,9 @@ public List getCheckpoints() } @Override - public StreamDataOutput getStreamDataOutput(int column) + public StreamDataOutput getStreamDataOutput(int column, int dwrfSequence) { - return new StreamDataOutput(buffer::writeDataTo, new Stream(column, streamKind, toIntExact(buffer.getOutputDataSize()), true)); + return new StreamDataOutput(buffer::writeDataTo, new Stream(column, dwrfSequence, streamKind, toIntExact(buffer.getOutputDataSize()), true)); } @Override diff --git a/presto-orc/src/main/java/com/facebook/presto/orc/stream/LongOutputStreamV1.java b/presto-orc/src/main/java/com/facebook/presto/orc/stream/LongOutputStreamV1.java index 7cd065f125dab..dfe91c95cf462 100644 --- a/presto-orc/src/main/java/com/facebook/presto/orc/stream/LongOutputStreamV1.java +++ b/presto-orc/src/main/java/com/facebook/presto/orc/stream/LongOutputStreamV1.java @@ -190,9 +190,9 @@ public List getCheckpoints() } @Override - public StreamDataOutput getStreamDataOutput(int column) + public StreamDataOutput getStreamDataOutput(int column, int dwrfSequence) { - return new StreamDataOutput(buffer::writeDataTo, new Stream(column, streamKind, toIntExact(buffer.getOutputDataSize()), true)); + return new StreamDataOutput(buffer::writeDataTo, new Stream(column, dwrfSequence, streamKind, toIntExact(buffer.getOutputDataSize()), true)); } @Override diff --git a/presto-orc/src/main/java/com/facebook/presto/orc/stream/LongOutputStreamV2.java b/presto-orc/src/main/java/com/facebook/presto/orc/stream/LongOutputStreamV2.java index 12944810d5c53..f976c923dc694 100644 --- a/presto-orc/src/main/java/com/facebook/presto/orc/stream/LongOutputStreamV2.java +++ b/presto-orc/src/main/java/com/facebook/presto/orc/stream/LongOutputStreamV2.java @@ -748,9 +748,9 @@ public List getCheckpoints() } @Override - public StreamDataOutput getStreamDataOutput(int column) + public StreamDataOutput getStreamDataOutput(int column, int dwrfSequence) { - return new StreamDataOutput(buffer::writeDataTo, new Stream(column, streamKind, toIntExact(buffer.getOutputDataSize()), true)); + return new StreamDataOutput(buffer::writeDataTo, new Stream(column, dwrfSequence, streamKind, toIntExact(buffer.getOutputDataSize()), true)); } @Override diff --git a/presto-orc/src/main/java/com/facebook/presto/orc/stream/PresentOutputStream.java b/presto-orc/src/main/java/com/facebook/presto/orc/stream/PresentOutputStream.java index 37889240a622b..e0429dfb55d34 100644 --- a/presto-orc/src/main/java/com/facebook/presto/orc/stream/PresentOutputStream.java +++ b/presto-orc/src/main/java/com/facebook/presto/orc/stream/PresentOutputStream.java @@ -102,15 +102,15 @@ public Optional> getCheckpoints() return Optional.of(booleanOutputStream.getCheckpoints()); } - public Optional getStreamDataOutput(int column) + public Optional getStreamDataOutput(int column, int dwrfSequence) { checkArgument(closed); if (booleanOutputStream == null) { return Optional.empty(); } - StreamDataOutput streamDataOutput = booleanOutputStream.getStreamDataOutput(column); + StreamDataOutput streamDataOutput = booleanOutputStream.getStreamDataOutput(column, dwrfSequence); // rewrite the DATA stream created by the boolean output stream to a PRESENT stream - Stream stream = new Stream(column, PRESENT, toIntExact(streamDataOutput.size()), streamDataOutput.getStream().isUseVInts()); + Stream stream = new Stream(column, dwrfSequence, PRESENT, toIntExact(streamDataOutput.size()), streamDataOutput.getStream().isUseVInts()); return Optional.of(new StreamDataOutput( sliceOutput -> { streamDataOutput.writeData(sliceOutput); diff --git a/presto-orc/src/main/java/com/facebook/presto/orc/stream/ValueOutputStream.java b/presto-orc/src/main/java/com/facebook/presto/orc/stream/ValueOutputStream.java index f433b76ff6e05..af35e649ea122 100644 --- a/presto-orc/src/main/java/com/facebook/presto/orc/stream/ValueOutputStream.java +++ b/presto-orc/src/main/java/com/facebook/presto/orc/stream/ValueOutputStream.java @@ -25,7 +25,7 @@ public interface ValueOutputStream List getCheckpoints(); - StreamDataOutput getStreamDataOutput(int column); + StreamDataOutput getStreamDataOutput(int column, int dwrfSequence); /** * This method returns the size of the flushed data plus any unflushed data. diff --git a/presto-orc/src/main/java/com/facebook/presto/orc/writer/BooleanColumnWriter.java b/presto-orc/src/main/java/com/facebook/presto/orc/writer/BooleanColumnWriter.java index e270230200f2b..0aa12cec78a5e 100644 --- a/presto-orc/src/main/java/com/facebook/presto/orc/writer/BooleanColumnWriter.java +++ b/presto-orc/src/main/java/com/facebook/presto/orc/writer/BooleanColumnWriter.java @@ -53,6 +53,7 @@ public class BooleanColumnWriter private static final ColumnEncoding COLUMN_ENCODING = new ColumnEncoding(DIRECT, 0); private final int column; + private final int dwrfSequence; private final Type type; private final boolean compressed; private final BooleanOutputStream dataStream; @@ -68,16 +69,19 @@ public class BooleanColumnWriter public BooleanColumnWriter( int column, + int dwrfSequence, Type type, ColumnWriterOptions columnWriterOptions, Optional dwrfEncryptor, MetadataWriter metadataWriter) { checkArgument(column >= 0, "column is negative"); + checkArgument(dwrfSequence >= 0, "sequence is negative"); requireNonNull(columnWriterOptions, "columnWriterOptions is null"); requireNonNull(dwrfEncryptor, "dwrfEncryptor is null"); requireNonNull(metadataWriter, "metadataWriter is null"); this.column = column; + this.dwrfSequence = dwrfSequence; this.type = requireNonNull(type, "type is null"); this.compressed = columnWriterOptions.getCompressionKind() != NONE; this.dataStream = new BooleanOutputStream(columnWriterOptions, dwrfEncryptor); @@ -165,7 +169,7 @@ public List getIndexStreams() } Slice slice = metadataWriter.writeRowIndexes(rowGroupIndexes.build()); - Stream stream = new Stream(column, StreamKind.ROW_INDEX, slice.length(), false); + Stream stream = new Stream(column, dwrfSequence, StreamKind.ROW_INDEX, slice.length(), false); return ImmutableList.of(new StreamDataOutput(slice, stream)); } @@ -186,8 +190,8 @@ public List getDataStreams() checkState(closed); ImmutableList.Builder outputDataStreams = ImmutableList.builder(); - presentStream.getStreamDataOutput(column).ifPresent(outputDataStreams::add); - outputDataStreams.add(dataStream.getStreamDataOutput(column)); + presentStream.getStreamDataOutput(column, dwrfSequence).ifPresent(outputDataStreams::add); + outputDataStreams.add(dataStream.getStreamDataOutput(column, dwrfSequence)); return outputDataStreams.build(); } diff --git a/presto-orc/src/main/java/com/facebook/presto/orc/writer/ByteColumnWriter.java b/presto-orc/src/main/java/com/facebook/presto/orc/writer/ByteColumnWriter.java index e8b4821b239b3..2b5a4c0a89924 100644 --- a/presto-orc/src/main/java/com/facebook/presto/orc/writer/ByteColumnWriter.java +++ b/presto-orc/src/main/java/com/facebook/presto/orc/writer/ByteColumnWriter.java @@ -53,6 +53,7 @@ public class ByteColumnWriter private static final ColumnEncoding COLUMN_ENCODING = new ColumnEncoding(DIRECT, 0); private final int column; + private final int dwrfSequence; private final Type type; private final boolean compressed; private final ByteOutputStream dataStream; @@ -66,13 +67,15 @@ public class ByteColumnWriter private boolean closed; - public ByteColumnWriter(int column, Type type, ColumnWriterOptions columnWriterOptions, Optional dwrfEncryptor, MetadataWriter metadataWriter) + public ByteColumnWriter(int column, int dwrfSequence, Type type, ColumnWriterOptions columnWriterOptions, Optional dwrfEncryptor, MetadataWriter metadataWriter) { checkArgument(column >= 0, "column is negative"); + checkArgument(dwrfSequence >= 0, "sequence is negative"); requireNonNull(columnWriterOptions, "columnWriterOptions is null"); requireNonNull(dwrfEncryptor, "dwrfEncryptor is null"); requireNonNull(metadataWriter, "metadataWriter is null"); this.column = column; + this.dwrfSequence = dwrfSequence; this.type = requireNonNull(type, "type is null"); this.compressed = columnWriterOptions.getCompressionKind() != NONE; this.dataStream = new ByteOutputStream(columnWriterOptions, dwrfEncryptor); @@ -159,7 +162,7 @@ public List getIndexStreams() } Slice slice = metadataWriter.writeRowIndexes(rowGroupIndexes.build()); - Stream stream = new Stream(column, StreamKind.ROW_INDEX, slice.length(), false); + Stream stream = new Stream(column, dwrfSequence, StreamKind.ROW_INDEX, slice.length(), false); return ImmutableList.of(new StreamDataOutput(slice, stream)); } @@ -180,8 +183,8 @@ public List getDataStreams() checkState(closed); ImmutableList.Builder outputDataStreams = ImmutableList.builder(); - presentStream.getStreamDataOutput(column).ifPresent(outputDataStreams::add); - outputDataStreams.add(dataStream.getStreamDataOutput(column)); + presentStream.getStreamDataOutput(column, dwrfSequence).ifPresent(outputDataStreams::add); + outputDataStreams.add(dataStream.getStreamDataOutput(column, dwrfSequence)); return outputDataStreams.build(); } diff --git a/presto-orc/src/main/java/com/facebook/presto/orc/writer/ColumnWriters.java b/presto-orc/src/main/java/com/facebook/presto/orc/writer/ColumnWriters.java index 22afe06dc75eb..4ef16981d4db6 100644 --- a/presto-orc/src/main/java/com/facebook/presto/orc/writer/ColumnWriters.java +++ b/presto-orc/src/main/java/com/facebook/presto/orc/writer/ColumnWriters.java @@ -39,6 +39,7 @@ private ColumnWriters() {} public static ColumnWriter createColumnWriter( int columnIndex, + int dwrfSequence, List orcTypes, Type type, ColumnWriterOptions columnWriterOptions, @@ -52,53 +53,54 @@ public static ColumnWriter createColumnWriter( Optional dwrfEncryptor = dwrfEncryptors.getEncryptorByNodeId(columnIndex); switch (orcType.getOrcTypeKind()) { case BOOLEAN: - return new BooleanColumnWriter(columnIndex, type, columnWriterOptions, dwrfEncryptor, metadataWriter); + return new BooleanColumnWriter(columnIndex, dwrfSequence, type, columnWriterOptions, dwrfEncryptor, metadataWriter); case FLOAT: - return new FloatColumnWriter(columnIndex, type, columnWriterOptions, dwrfEncryptor, metadataWriter); + return new FloatColumnWriter(columnIndex, dwrfSequence, type, columnWriterOptions, dwrfEncryptor, metadataWriter); case DOUBLE: - return new DoubleColumnWriter(columnIndex, type, columnWriterOptions, dwrfEncryptor, metadataWriter); + return new DoubleColumnWriter(columnIndex, dwrfSequence, type, columnWriterOptions, dwrfEncryptor, metadataWriter); case BYTE: - return new ByteColumnWriter(columnIndex, type, columnWriterOptions, dwrfEncryptor, metadataWriter); + return new ByteColumnWriter(columnIndex, dwrfSequence, type, columnWriterOptions, dwrfEncryptor, metadataWriter); case DATE: checkArgument(orcEncoding != DWRF, "DWRF does not support %s type", type); - return new LongColumnWriter(columnIndex, type, columnWriterOptions, dwrfEncryptor, orcEncoding, DateStatisticsBuilder::new, metadataWriter); + return new LongColumnWriter(columnIndex, dwrfSequence, type, columnWriterOptions, dwrfEncryptor, orcEncoding, DateStatisticsBuilder::new, metadataWriter); case SHORT: - return new LongColumnWriter(columnIndex, type, columnWriterOptions, dwrfEncryptor, orcEncoding, IntegerStatisticsBuilder::new, metadataWriter); + return new LongColumnWriter(columnIndex, dwrfSequence, type, columnWriterOptions, dwrfEncryptor, orcEncoding, IntegerStatisticsBuilder::new, metadataWriter); case INT: case LONG: if (columnWriterOptions.isIntegerDictionaryEncodingEnabled() && orcEncoding == DWRF) { // ORC V1 does not support Integer Dictionary encoding. DWRF supports Integer dictionary encoding. - return new LongDictionaryColumnWriter(columnIndex, type, columnWriterOptions, dwrfEncryptor, orcEncoding, metadataWriter); + return new LongDictionaryColumnWriter(columnIndex, dwrfSequence, type, columnWriterOptions, dwrfEncryptor, orcEncoding, metadataWriter); } - return new LongColumnWriter(columnIndex, type, columnWriterOptions, dwrfEncryptor, orcEncoding, IntegerStatisticsBuilder::new, metadataWriter); + return new LongColumnWriter(columnIndex, dwrfSequence, type, columnWriterOptions, dwrfEncryptor, orcEncoding, IntegerStatisticsBuilder::new, metadataWriter); case DECIMAL: checkArgument(orcEncoding != DWRF, "DWRF does not support %s type", type); - return new DecimalColumnWriter(columnIndex, type, columnWriterOptions, orcEncoding, metadataWriter); + return new DecimalColumnWriter(columnIndex, dwrfSequence, type, columnWriterOptions, orcEncoding, metadataWriter); case TIMESTAMP: - return new TimestampColumnWriter(columnIndex, type, columnWriterOptions, dwrfEncryptor, orcEncoding, hiveStorageTimeZone, metadataWriter); + return new TimestampColumnWriter(columnIndex, dwrfSequence, type, columnWriterOptions, dwrfEncryptor, orcEncoding, hiveStorageTimeZone, metadataWriter); case BINARY: - return new SliceDirectColumnWriter(columnIndex, type, columnWriterOptions, dwrfEncryptor, orcEncoding, BinaryStatisticsBuilder::new, metadataWriter); + return new SliceDirectColumnWriter(columnIndex, dwrfSequence, type, columnWriterOptions, dwrfEncryptor, orcEncoding, BinaryStatisticsBuilder::new, metadataWriter); case CHAR: checkArgument(orcEncoding != DWRF, "DWRF does not support %s type", type); // fall through case VARCHAR: case STRING: - return new SliceDictionaryColumnWriter(columnIndex, type, columnWriterOptions, dwrfEncryptor, orcEncoding, metadataWriter); + return new SliceDictionaryColumnWriter(columnIndex, dwrfSequence, type, columnWriterOptions, dwrfEncryptor, orcEncoding, metadataWriter); case LIST: { int fieldColumnIndex = orcType.getFieldTypeIndex(0); Type fieldType = type.getTypeParameters().get(0); ColumnWriter elementWriter = createColumnWriter( fieldColumnIndex, + dwrfSequence, orcTypes, fieldType, columnWriterOptions, @@ -106,12 +108,12 @@ public static ColumnWriter createColumnWriter( hiveStorageTimeZone, dwrfEncryptors, metadataWriter); - return new ListColumnWriter(columnIndex, columnWriterOptions, dwrfEncryptor, orcEncoding, elementWriter, metadataWriter); + return new ListColumnWriter(columnIndex, dwrfSequence, columnWriterOptions, dwrfEncryptor, orcEncoding, elementWriter, metadataWriter); } - case MAP: { ColumnWriter keyWriter = createColumnWriter( orcType.getFieldTypeIndex(0), + dwrfSequence, orcTypes, type.getTypeParameters().get(0), columnWriterOptions, @@ -121,6 +123,7 @@ public static ColumnWriter createColumnWriter( metadataWriter); ColumnWriter valueWriter = createColumnWriter( orcType.getFieldTypeIndex(1), + dwrfSequence, orcTypes, type.getTypeParameters().get(1), columnWriterOptions, @@ -128,9 +131,8 @@ public static ColumnWriter createColumnWriter( hiveStorageTimeZone, dwrfEncryptors, metadataWriter); - return new MapColumnWriter(columnIndex, columnWriterOptions, dwrfEncryptor, orcEncoding, keyWriter, valueWriter, metadataWriter); + return new MapColumnWriter(columnIndex, dwrfSequence, columnWriterOptions, dwrfEncryptor, orcEncoding, keyWriter, valueWriter, metadataWriter); } - case STRUCT: { ImmutableList.Builder fieldWriters = ImmutableList.builder(); for (int fieldId = 0; fieldId < orcType.getFieldCount(); fieldId++) { @@ -138,6 +140,7 @@ public static ColumnWriter createColumnWriter( Type fieldType = type.getTypeParameters().get(fieldId); fieldWriters.add(createColumnWriter( fieldColumnIndex, + dwrfSequence, orcTypes, fieldType, columnWriterOptions, @@ -146,7 +149,7 @@ public static ColumnWriter createColumnWriter( dwrfEncryptors, metadataWriter)); } - return new StructColumnWriter(columnIndex, columnWriterOptions, dwrfEncryptor, fieldWriters.build(), metadataWriter); + return new StructColumnWriter(columnIndex, dwrfSequence, columnWriterOptions, dwrfEncryptor, fieldWriters.build(), metadataWriter); } } diff --git a/presto-orc/src/main/java/com/facebook/presto/orc/writer/DecimalColumnWriter.java b/presto-orc/src/main/java/com/facebook/presto/orc/writer/DecimalColumnWriter.java index 0230d9a5245a0..9c74323ee0e74 100644 --- a/presto-orc/src/main/java/com/facebook/presto/orc/writer/DecimalColumnWriter.java +++ b/presto-orc/src/main/java/com/facebook/presto/orc/writer/DecimalColumnWriter.java @@ -61,6 +61,7 @@ public class DecimalColumnWriter { private static final int INSTANCE_SIZE = ClassLayout.parseClass(DecimalColumnWriter.class).instanceSize(); private final int column; + private final int dwrfSequence; private final DecimalType type; private final ColumnEncoding columnEncoding; private final boolean compressed; @@ -77,13 +78,15 @@ public class DecimalColumnWriter private boolean closed; - public DecimalColumnWriter(int column, Type type, ColumnWriterOptions columnWriterOptions, OrcEncoding orcEncoding, MetadataWriter metadataWriter) + public DecimalColumnWriter(int column, int dwrfSequence, Type type, ColumnWriterOptions columnWriterOptions, OrcEncoding orcEncoding, MetadataWriter metadataWriter) { checkArgument(column >= 0, "column is negative"); + checkArgument(dwrfSequence >= 0, "sequence is negative"); requireNonNull(columnWriterOptions, "columnWriterOptions is null"); checkArgument(orcEncoding != DWRF, "DWRF does not support %s type", type); requireNonNull(metadataWriter, "metadataWriter is null"); this.column = column; + this.dwrfSequence = dwrfSequence; this.type = (DecimalType) requireNonNull(type, "type is null"); this.compressed = columnWriterOptions.getCompressionKind() != NONE; this.columnEncoding = new ColumnEncoding(DIRECT_V2, 0); @@ -208,7 +211,7 @@ public List getIndexStreams() } Slice slice = metadataWriter.writeRowIndexes(rowGroupIndexes.build()); - Stream stream = new Stream(column, StreamKind.ROW_INDEX, slice.length(), false); + Stream stream = new Stream(column, dwrfSequence, StreamKind.ROW_INDEX, slice.length(), false); return ImmutableList.of(new StreamDataOutput(slice, stream)); } @@ -231,9 +234,9 @@ public List getDataStreams() checkState(closed); ImmutableList.Builder outputDataStreams = ImmutableList.builder(); - presentStream.getStreamDataOutput(column).ifPresent(outputDataStreams::add); - outputDataStreams.add(dataStream.getStreamDataOutput(column)); - outputDataStreams.add(scaleStream.getStreamDataOutput(column)); + presentStream.getStreamDataOutput(column, dwrfSequence).ifPresent(outputDataStreams::add); + outputDataStreams.add(dataStream.getStreamDataOutput(column, dwrfSequence)); + outputDataStreams.add(scaleStream.getStreamDataOutput(column, dwrfSequence)); return outputDataStreams.build(); } diff --git a/presto-orc/src/main/java/com/facebook/presto/orc/writer/DictionaryColumnWriter.java b/presto-orc/src/main/java/com/facebook/presto/orc/writer/DictionaryColumnWriter.java index 2167e6af84479..26b15926909bb 100644 --- a/presto-orc/src/main/java/com/facebook/presto/orc/writer/DictionaryColumnWriter.java +++ b/presto-orc/src/main/java/com/facebook/presto/orc/writer/DictionaryColumnWriter.java @@ -63,6 +63,7 @@ public abstract class DictionaryColumnWriter implements ColumnWriter, DictionaryColumn { protected final int column; + protected final int dwrfSequence; protected final Type type; protected final ColumnWriterOptions columnWriterOptions; protected final Optional dwrfEncryptor; @@ -86,6 +87,7 @@ public abstract class DictionaryColumnWriter public DictionaryColumnWriter( int column, + int dwrfSequence, Type type, ColumnWriterOptions columnWriterOptions, Optional dwrfEncryptor, @@ -93,7 +95,9 @@ public DictionaryColumnWriter( MetadataWriter metadataWriter) { checkArgument(column >= 0, "column is negative"); + checkArgument(dwrfSequence >= 0, "sequence is negative"); this.column = column; + this.dwrfSequence = dwrfSequence; this.type = requireNonNull(type, "type is null"); this.columnWriterOptions = requireNonNull(columnWriterOptions, "columnWriterOptions is null"); this.dwrfEncryptor = requireNonNull(dwrfEncryptor, "dwrfEncryptor is null"); @@ -365,7 +369,7 @@ public List getIndexStreams() } Slice slice = compressedMetadataWriter.writeRowIndexes(rowGroupIndexes.build()); - Stream stream = new Stream(column, StreamKind.ROW_INDEX, slice.length(), false); + Stream stream = new Stream(column, dwrfSequence, StreamKind.ROW_INDEX, slice.length(), false); return ImmutableList.of(new StreamDataOutput(slice, stream)); } @@ -390,8 +394,8 @@ public List getDataStreams() // actually write data ImmutableList.Builder outputDataStreams = ImmutableList.builder(); - presentStream.getStreamDataOutput(column).ifPresent(outputDataStreams::add); - outputDataStreams.add(dataStream.getStreamDataOutput(column)); + presentStream.getStreamDataOutput(column, dwrfSequence).ifPresent(outputDataStreams::add); + outputDataStreams.add(dataStream.getStreamDataOutput(column, dwrfSequence)); outputDataStreams.addAll(getDictionaryStreams(column)); return outputDataStreams.build(); } diff --git a/presto-orc/src/main/java/com/facebook/presto/orc/writer/DoubleColumnWriter.java b/presto-orc/src/main/java/com/facebook/presto/orc/writer/DoubleColumnWriter.java index d2c0ce66a0bad..75688cda85a1d 100644 --- a/presto-orc/src/main/java/com/facebook/presto/orc/writer/DoubleColumnWriter.java +++ b/presto-orc/src/main/java/com/facebook/presto/orc/writer/DoubleColumnWriter.java @@ -54,6 +54,7 @@ public class DoubleColumnWriter private static final ColumnEncoding COLUMN_ENCODING = new ColumnEncoding(DIRECT, 0); private final int column; + private final int dwrfSequence; private final Type type; private final boolean compressed; private final DoubleOutputStream dataStream; @@ -67,13 +68,15 @@ public class DoubleColumnWriter private boolean closed; - public DoubleColumnWriter(int column, Type type, ColumnWriterOptions columnWriterOptions, Optional dwrfEncryptor, MetadataWriter metadataWriter) + public DoubleColumnWriter(int column, int dwrfSequence, Type type, ColumnWriterOptions columnWriterOptions, Optional dwrfEncryptor, MetadataWriter metadataWriter) { checkArgument(column >= 0, "column is negative"); + checkArgument(dwrfSequence >= 0, "sequence is negative"); requireNonNull(columnWriterOptions, "columnWriterOptions is null"); requireNonNull(dwrfEncryptor, "dwrfEncryptor is null"); requireNonNull(metadataWriter, "metadataWriter is null"); this.column = column; + this.dwrfSequence = dwrfSequence; this.type = requireNonNull(type, "type is null"); this.compressed = columnWriterOptions.getCompressionKind() != NONE; this.dataStream = new DoubleOutputStream(columnWriterOptions, dwrfEncryptor); @@ -161,7 +164,7 @@ public List getIndexStreams() } Slice slice = metadataWriter.writeRowIndexes(rowGroupIndexes.build()); - Stream stream = new Stream(column, StreamKind.ROW_INDEX, slice.length(), false); + Stream stream = new Stream(column, dwrfSequence, StreamKind.ROW_INDEX, slice.length(), false); return ImmutableList.of(new StreamDataOutput(slice, stream)); } @@ -182,8 +185,8 @@ public List getDataStreams() checkState(closed); ImmutableList.Builder outputDataStreams = ImmutableList.builder(); - presentStream.getStreamDataOutput(column).ifPresent(outputDataStreams::add); - outputDataStreams.add(dataStream.getStreamDataOutput(column)); + presentStream.getStreamDataOutput(column, dwrfSequence).ifPresent(outputDataStreams::add); + outputDataStreams.add(dataStream.getStreamDataOutput(column, dwrfSequence)); return outputDataStreams.build(); } diff --git a/presto-orc/src/main/java/com/facebook/presto/orc/writer/FloatColumnWriter.java b/presto-orc/src/main/java/com/facebook/presto/orc/writer/FloatColumnWriter.java index 82c1c8c2c6385..0ee37e85f9367 100644 --- a/presto-orc/src/main/java/com/facebook/presto/orc/writer/FloatColumnWriter.java +++ b/presto-orc/src/main/java/com/facebook/presto/orc/writer/FloatColumnWriter.java @@ -55,6 +55,7 @@ public class FloatColumnWriter private static final ColumnEncoding COLUMN_ENCODING = new ColumnEncoding(DIRECT, 0); private final int column; + private final int dwrfSequence; private final Type type; private final boolean compressed; private final FloatOutputStream dataStream; @@ -68,13 +69,15 @@ public class FloatColumnWriter private boolean closed; - public FloatColumnWriter(int column, Type type, ColumnWriterOptions columnWriterOptions, Optional dwrfEncryptor, MetadataWriter metadataWriter) + public FloatColumnWriter(int column, int dwrfSequence, Type type, ColumnWriterOptions columnWriterOptions, Optional dwrfEncryptor, MetadataWriter metadataWriter) { checkArgument(column >= 0, "column is negative"); + checkArgument(dwrfSequence >= 0, "sequence is negative"); requireNonNull(columnWriterOptions, "columnWriterOptions is null"); requireNonNull(dwrfEncryptor, "dwrfEncryptor is null"); requireNonNull(metadataWriter, "metadataWriter is null"); this.column = column; + this.dwrfSequence = dwrfSequence; this.type = requireNonNull(type, "type is null"); this.compressed = columnWriterOptions.getCompressionKind() != NONE; this.dataStream = new FloatOutputStream(columnWriterOptions, dwrfEncryptor); @@ -163,7 +166,7 @@ public List getIndexStreams() } Slice slice = metadataWriter.writeRowIndexes(rowGroupIndexes.build()); - Stream stream = new Stream(column, StreamKind.ROW_INDEX, slice.length(), false); + Stream stream = new Stream(column, dwrfSequence, StreamKind.ROW_INDEX, slice.length(), false); return ImmutableList.of(new StreamDataOutput(slice, stream)); } @@ -184,8 +187,8 @@ public List getDataStreams() checkState(closed); ImmutableList.Builder outputDataStreams = ImmutableList.builder(); - presentStream.getStreamDataOutput(column).ifPresent(outputDataStreams::add); - outputDataStreams.add(dataStream.getStreamDataOutput(column)); + presentStream.getStreamDataOutput(column, dwrfSequence).ifPresent(outputDataStreams::add); + outputDataStreams.add(dataStream.getStreamDataOutput(column, dwrfSequence)); return outputDataStreams.build(); } diff --git a/presto-orc/src/main/java/com/facebook/presto/orc/writer/ListColumnWriter.java b/presto-orc/src/main/java/com/facebook/presto/orc/writer/ListColumnWriter.java index 1c37067c10882..1c227564b95fb 100644 --- a/presto-orc/src/main/java/com/facebook/presto/orc/writer/ListColumnWriter.java +++ b/presto-orc/src/main/java/com/facebook/presto/orc/writer/ListColumnWriter.java @@ -56,6 +56,7 @@ public class ListColumnWriter { private static final int INSTANCE_SIZE = ClassLayout.parseClass(ListColumnWriter.class).instanceSize(); private final int column; + private final int dwrfSequence; private final boolean compressed; private final ColumnEncoding columnEncoding; private final LongOutputStream lengthStream; @@ -70,11 +71,13 @@ public class ListColumnWriter private boolean closed; - public ListColumnWriter(int column, ColumnWriterOptions columnWriterOptions, Optional dwrfEncryptor, OrcEncoding orcEncoding, ColumnWriter elementWriter, MetadataWriter metadataWriter) + public ListColumnWriter(int column, int dwrfSequence, ColumnWriterOptions columnWriterOptions, Optional dwrfEncryptor, OrcEncoding orcEncoding, ColumnWriter elementWriter, MetadataWriter metadataWriter) { checkArgument(column >= 0, "column is negative"); + checkArgument(dwrfSequence >= 0, "sequence is negative"); requireNonNull(columnWriterOptions, "columnWriterOptions is null"); this.column = column; + this.dwrfSequence = dwrfSequence; this.compressed = columnWriterOptions.getCompressionKind() != NONE; this.columnEncoding = new ColumnEncoding(orcEncoding == DWRF ? DIRECT : DIRECT_V2, 0); this.elementWriter = requireNonNull(elementWriter, "elementWriter is null"); @@ -194,7 +197,7 @@ public List getIndexStreams() } Slice slice = metadataWriter.writeRowIndexes(rowGroupIndexes.build()); - Stream stream = new Stream(column, StreamKind.ROW_INDEX, slice.length(), false); + Stream stream = new Stream(column, dwrfSequence, StreamKind.ROW_INDEX, slice.length(), false); ImmutableList.Builder indexStreams = ImmutableList.builder(); indexStreams.add(new StreamDataOutput(slice, stream)); @@ -219,8 +222,8 @@ public List getDataStreams() checkState(closed); ImmutableList.Builder outputDataStreams = ImmutableList.builder(); - presentStream.getStreamDataOutput(column).ifPresent(outputDataStreams::add); - outputDataStreams.add(lengthStream.getStreamDataOutput(column)); + presentStream.getStreamDataOutput(column, dwrfSequence).ifPresent(outputDataStreams::add); + outputDataStreams.add(lengthStream.getStreamDataOutput(column, dwrfSequence)); outputDataStreams.addAll(elementWriter.getDataStreams()); return outputDataStreams.build(); } diff --git a/presto-orc/src/main/java/com/facebook/presto/orc/writer/LongColumnWriter.java b/presto-orc/src/main/java/com/facebook/presto/orc/writer/LongColumnWriter.java index f8e3cdf24fdf0..a5f8b760e0bb6 100644 --- a/presto-orc/src/main/java/com/facebook/presto/orc/writer/LongColumnWriter.java +++ b/presto-orc/src/main/java/com/facebook/presto/orc/writer/LongColumnWriter.java @@ -59,6 +59,7 @@ public class LongColumnWriter { private static final int INSTANCE_SIZE = ClassLayout.parseClass(LongColumnWriter.class).instanceSize(); private final int column; + private final int dwrfSequence; private final Type type; private final boolean compressed; private final ColumnEncoding columnEncoding; @@ -76,6 +77,7 @@ public class LongColumnWriter public LongColumnWriter( int column, + int dwrfSequence, Type type, ColumnWriterOptions columnWriterOptions, Optional dwrfEncryptor, @@ -84,10 +86,12 @@ public LongColumnWriter( MetadataWriter metadataWriter) { checkArgument(column >= 0, "column is negative"); + checkArgument(dwrfSequence >= 0, "sequence is negative"); requireNonNull(columnWriterOptions, "columnWriterOptions is null"); requireNonNull(dwrfEncryptor, "dwrfEncryptor is null"); requireNonNull(metadataWriter, "metadataWriter is null"); this.column = column; + this.dwrfSequence = dwrfSequence; this.type = requireNonNull(type, "type is null"); this.compressed = columnWriterOptions.getCompressionKind() != NONE; if (orcEncoding == DWRF) { @@ -184,7 +188,7 @@ public List getIndexStreams() } Slice slice = metadataWriter.writeRowIndexes(rowGroupIndexes.build()); - Stream stream = new Stream(column, StreamKind.ROW_INDEX, slice.length(), false); + Stream stream = new Stream(column, dwrfSequence, StreamKind.ROW_INDEX, slice.length(), false); return ImmutableList.of(new StreamDataOutput(slice, stream)); } @@ -205,8 +209,8 @@ public List getDataStreams() checkState(closed); ImmutableList.Builder outputDataStreams = ImmutableList.builder(); - presentStream.getStreamDataOutput(column).ifPresent(outputDataStreams::add); - outputDataStreams.add(dataStream.getStreamDataOutput(column)); + presentStream.getStreamDataOutput(column, dwrfSequence).ifPresent(outputDataStreams::add); + outputDataStreams.add(dataStream.getStreamDataOutput(column, dwrfSequence)); return outputDataStreams.build(); } diff --git a/presto-orc/src/main/java/com/facebook/presto/orc/writer/LongDictionaryColumnWriter.java b/presto-orc/src/main/java/com/facebook/presto/orc/writer/LongDictionaryColumnWriter.java index 3ac94bd39b5b2..a520f0a39425e 100644 --- a/presto-orc/src/main/java/com/facebook/presto/orc/writer/LongDictionaryColumnWriter.java +++ b/presto-orc/src/main/java/com/facebook/presto/orc/writer/LongDictionaryColumnWriter.java @@ -62,13 +62,14 @@ public class LongDictionaryColumnWriter public LongDictionaryColumnWriter( int column, + int dwrfSequence, Type type, ColumnWriterOptions columnWriterOptions, Optional dwrfEncryptor, OrcEncoding orcEncoding, MetadataWriter metadataWriter) { - super(column, type, columnWriterOptions, dwrfEncryptor, orcEncoding, metadataWriter); + super(column, dwrfSequence, type, columnWriterOptions, dwrfEncryptor, orcEncoding, metadataWriter); checkArgument(orcEncoding == DWRF, "Long dictionary encoding is only supported in DWRF"); checkArgument(type instanceof FixedWidthType, "Not a fixed width type"); this.dictionaryDataStream = new LongOutputStreamDwrf(columnWriterOptions, dwrfEncryptor, true, DICTIONARY_DATA); @@ -93,7 +94,7 @@ public int getDictionaryBytes() protected ColumnWriter createDirectColumnWriter() { if (directColumnWriter == null) { - directColumnWriter = new LongColumnWriter(column, type, columnWriterOptions, dwrfEncryptor, orcEncoding, IntegerStatisticsBuilder::new, metadataWriter); + directColumnWriter = new LongColumnWriter(column, dwrfSequence, type, columnWriterOptions, dwrfEncryptor, orcEncoding, IntegerStatisticsBuilder::new, metadataWriter); } return directColumnWriter; } @@ -203,7 +204,7 @@ protected void writePresentAndDataStreams( @Override protected List getDictionaryStreams(int column) { - return ImmutableList.of(dictionaryDataStream.getStreamDataOutput(column)); + return ImmutableList.of(dictionaryDataStream.getStreamDataOutput(column, dwrfSequence)); } @Override diff --git a/presto-orc/src/main/java/com/facebook/presto/orc/writer/MapColumnWriter.java b/presto-orc/src/main/java/com/facebook/presto/orc/writer/MapColumnWriter.java index fd6965589d071..e2026a5bdb66c 100644 --- a/presto-orc/src/main/java/com/facebook/presto/orc/writer/MapColumnWriter.java +++ b/presto-orc/src/main/java/com/facebook/presto/orc/writer/MapColumnWriter.java @@ -56,6 +56,7 @@ public class MapColumnWriter { private static final int INSTANCE_SIZE = ClassLayout.parseClass(MapColumnWriter.class).instanceSize(); private final int column; + private final int dwrfSequence; private final boolean compressed; private final ColumnEncoding columnEncoding; private final LongOutputStream lengthStream; @@ -71,11 +72,13 @@ public class MapColumnWriter private boolean closed; - public MapColumnWriter(int column, ColumnWriterOptions columnWriterOptions, Optional dwrfEncryptor, OrcEncoding orcEncoding, ColumnWriter keyWriter, ColumnWriter valueWriter, MetadataWriter metadataWriter) + public MapColumnWriter(int column, int dwrfSequence, ColumnWriterOptions columnWriterOptions, Optional dwrfEncryptor, OrcEncoding orcEncoding, ColumnWriter keyWriter, ColumnWriter valueWriter, MetadataWriter metadataWriter) { checkArgument(column >= 0, "column is negative"); + checkArgument(dwrfSequence >= 0, "sequence is negative"); requireNonNull(columnWriterOptions, "columnWriterOptions is null"); this.column = column; + this.dwrfSequence = dwrfSequence; this.compressed = columnWriterOptions.getCompressionKind() != NONE; this.columnEncoding = new ColumnEncoding(orcEncoding == DWRF ? DIRECT : DIRECT_V2, 0); this.keyWriter = requireNonNull(keyWriter, "keyWriter is null"); @@ -204,7 +207,7 @@ public List getIndexStreams() } Slice slice = metadataWriter.writeRowIndexes(rowGroupIndexes.build()); - Stream stream = new Stream(column, StreamKind.ROW_INDEX, slice.length(), false); + Stream stream = new Stream(column, dwrfSequence, StreamKind.ROW_INDEX, slice.length(), false); ImmutableList.Builder indexStreams = ImmutableList.builder(); indexStreams.add(new StreamDataOutput(slice, stream)); @@ -230,8 +233,8 @@ public List getDataStreams() checkState(closed); ImmutableList.Builder outputDataStreams = ImmutableList.builder(); - presentStream.getStreamDataOutput(column).ifPresent(outputDataStreams::add); - outputDataStreams.add(lengthStream.getStreamDataOutput(column)); + presentStream.getStreamDataOutput(column, dwrfSequence).ifPresent(outputDataStreams::add); + outputDataStreams.add(lengthStream.getStreamDataOutput(column, dwrfSequence)); outputDataStreams.addAll(keyWriter.getDataStreams()); outputDataStreams.addAll(valueWriter.getDataStreams()); return outputDataStreams.build(); diff --git a/presto-orc/src/main/java/com/facebook/presto/orc/writer/SliceDictionaryColumnWriter.java b/presto-orc/src/main/java/com/facebook/presto/orc/writer/SliceDictionaryColumnWriter.java index 46ebe69cea488..9d7b2368cd2a3 100644 --- a/presto-orc/src/main/java/com/facebook/presto/orc/writer/SliceDictionaryColumnWriter.java +++ b/presto-orc/src/main/java/com/facebook/presto/orc/writer/SliceDictionaryColumnWriter.java @@ -62,13 +62,14 @@ public class SliceDictionaryColumnWriter public SliceDictionaryColumnWriter( int column, + int dwrfSequence, Type type, ColumnWriterOptions columnWriterOptions, Optional dwrfEncryptor, OrcEncoding orcEncoding, MetadataWriter metadataWriter) { - super(column, type, columnWriterOptions, dwrfEncryptor, orcEncoding, metadataWriter); + super(column, dwrfSequence, type, columnWriterOptions, dwrfEncryptor, orcEncoding, metadataWriter); this.dictionaryDataStream = new ByteArrayOutputStream(columnWriterOptions, dwrfEncryptor, Stream.StreamKind.DICTIONARY_DATA); this.dictionaryLengthStream = createLengthOutputStream(columnWriterOptions, dwrfEncryptor, orcEncoding); this.stringStatisticsLimitInBytes = toIntExact(columnWriterOptions.getStringStatisticsLimit().toBytes()); @@ -253,7 +254,7 @@ private StringStatisticsBuilder newStringStatisticsBuilder() protected ColumnWriter createDirectColumnWriter() { if (directColumnWriter == null) { - directColumnWriter = new SliceDirectColumnWriter(column, type, columnWriterOptions, dwrfEncryptor, orcEncoding, this::newStringStatisticsBuilder, metadataWriter); + directColumnWriter = new SliceDirectColumnWriter(column, dwrfSequence, type, columnWriterOptions, dwrfEncryptor, orcEncoding, this::newStringStatisticsBuilder, metadataWriter); } return directColumnWriter; } @@ -268,6 +269,6 @@ protected ColumnWriter getDirectColumnWriter() @Override protected List getDictionaryStreams(int column) { - return ImmutableList.of(dictionaryLengthStream.getStreamDataOutput(column), dictionaryDataStream.getStreamDataOutput(column)); + return ImmutableList.of(dictionaryLengthStream.getStreamDataOutput(column, dwrfSequence), dictionaryDataStream.getStreamDataOutput(column, dwrfSequence)); } } diff --git a/presto-orc/src/main/java/com/facebook/presto/orc/writer/SliceDirectColumnWriter.java b/presto-orc/src/main/java/com/facebook/presto/orc/writer/SliceDirectColumnWriter.java index 5e3ed4c99e2fa..426089fe5ce01 100644 --- a/presto-orc/src/main/java/com/facebook/presto/orc/writer/SliceDirectColumnWriter.java +++ b/presto-orc/src/main/java/com/facebook/presto/orc/writer/SliceDirectColumnWriter.java @@ -59,6 +59,7 @@ public class SliceDirectColumnWriter { private static final int INSTANCE_SIZE = ClassLayout.parseClass(SliceDirectColumnWriter.class).instanceSize(); private final int column; + private final int dwrfSequence; private final Type type; private final boolean compressed; private final ColumnEncoding columnEncoding; @@ -77,6 +78,7 @@ public class SliceDirectColumnWriter public SliceDirectColumnWriter( int column, + int dwrfSequence, Type type, ColumnWriterOptions columnWriterOptions, Optional dwrfEncryptor, @@ -85,10 +87,12 @@ public SliceDirectColumnWriter( MetadataWriter metadataWriter) { checkArgument(column >= 0, "column is negative"); + checkArgument(dwrfSequence >= 0, "sequence is negative"); requireNonNull(columnWriterOptions, "columnWriterOptions is null"); requireNonNull(dwrfEncryptor, "dwrfEncryptor is null"); requireNonNull(metadataWriter, "metadataWriter is null"); this.column = column; + this.dwrfSequence = dwrfSequence; this.type = requireNonNull(type, "type is null"); this.compressed = columnWriterOptions.getCompressionKind() != NONE; this.columnEncoding = new ColumnEncoding(orcEncoding == DWRF ? DIRECT : DIRECT_V2, 0); @@ -204,7 +208,7 @@ public List getIndexStreams() } Slice slice = metadataWriter.writeRowIndexes(rowGroupIndexes.build()); - Stream stream = new Stream(column, StreamKind.ROW_INDEX, slice.length(), false); + Stream stream = new Stream(column, dwrfSequence, StreamKind.ROW_INDEX, slice.length(), false); return ImmutableList.of(new StreamDataOutput(slice, stream)); } @@ -227,9 +231,9 @@ public List getDataStreams() checkState(closed); ImmutableList.Builder outputDataStreams = ImmutableList.builder(); - presentStream.getStreamDataOutput(column).ifPresent(outputDataStreams::add); - outputDataStreams.add(lengthStream.getStreamDataOutput(column)); - outputDataStreams.add(dataStream.getStreamDataOutput(column)); + presentStream.getStreamDataOutput(column, dwrfSequence).ifPresent(outputDataStreams::add); + outputDataStreams.add(lengthStream.getStreamDataOutput(column, dwrfSequence)); + outputDataStreams.add(dataStream.getStreamDataOutput(column, dwrfSequence)); return outputDataStreams.build(); } diff --git a/presto-orc/src/main/java/com/facebook/presto/orc/writer/StructColumnWriter.java b/presto-orc/src/main/java/com/facebook/presto/orc/writer/StructColumnWriter.java index e0209431005ee..220dbbc2aecd4 100644 --- a/presto-orc/src/main/java/com/facebook/presto/orc/writer/StructColumnWriter.java +++ b/presto-orc/src/main/java/com/facebook/presto/orc/writer/StructColumnWriter.java @@ -52,6 +52,7 @@ public class StructColumnWriter private static final ColumnEncoding COLUMN_ENCODING = new ColumnEncoding(DIRECT, 0); private final int column; + private final int dwrfSequence; private final boolean compressed; private final PresentOutputStream presentStream; private final CompressedMetadataWriter metadataWriter; @@ -64,11 +65,13 @@ public class StructColumnWriter private boolean closed; - public StructColumnWriter(int column, ColumnWriterOptions columnWriterOptions, Optional dwrfEncryptor, List structFields, MetadataWriter metadataWriter) + public StructColumnWriter(int column, int dwrfSequence, ColumnWriterOptions columnWriterOptions, Optional dwrfEncryptor, List structFields, MetadataWriter metadataWriter) { checkArgument(column >= 0, "column is negative"); + checkArgument(dwrfSequence >= 0, "sequence is negative"); requireNonNull(columnWriterOptions, "columnWriterOptions is null"); this.column = column; + this.dwrfSequence = dwrfSequence; this.compressed = columnWriterOptions.getCompressionKind() != NONE; this.structFields = ImmutableList.copyOf(requireNonNull(structFields, "structFields is null")); this.presentStream = new PresentOutputStream(columnWriterOptions, dwrfEncryptor); @@ -192,7 +195,7 @@ public List getIndexStreams() } Slice slice = metadataWriter.writeRowIndexes(rowGroupIndexes.build()); - Stream stream = new Stream(column, StreamKind.ROW_INDEX, slice.length(), false); + Stream stream = new Stream(column, dwrfSequence, StreamKind.ROW_INDEX, slice.length(), false); ImmutableList.Builder indexStreams = ImmutableList.builder(); indexStreams.add(new StreamDataOutput(slice, stream)); @@ -217,7 +220,7 @@ public List getDataStreams() checkState(closed); ImmutableList.Builder outputDataStreams = ImmutableList.builder(); - presentStream.getStreamDataOutput(column).ifPresent(outputDataStreams::add); + presentStream.getStreamDataOutput(column, dwrfSequence).ifPresent(outputDataStreams::add); for (ColumnWriter structField : structFields) { outputDataStreams.addAll(structField.getDataStreams()); } diff --git a/presto-orc/src/main/java/com/facebook/presto/orc/writer/TimestampColumnWriter.java b/presto-orc/src/main/java/com/facebook/presto/orc/writer/TimestampColumnWriter.java index 78ad325ce0c60..44eb90128d6ee 100644 --- a/presto-orc/src/main/java/com/facebook/presto/orc/writer/TimestampColumnWriter.java +++ b/presto-orc/src/main/java/com/facebook/presto/orc/writer/TimestampColumnWriter.java @@ -63,6 +63,7 @@ public class TimestampColumnWriter private static final int MILLIS_TO_NANOS_TRAILING_ZEROS = 5; private final int column; + private final int dwrfSequence; private final Type type; private final boolean compressed; private final ColumnEncoding columnEncoding; @@ -80,13 +81,15 @@ public class TimestampColumnWriter private boolean closed; - public TimestampColumnWriter(int column, Type type, ColumnWriterOptions columnWriterOptions, Optional dwrfEncryptor, OrcEncoding orcEncoding, DateTimeZone hiveStorageTimeZone, MetadataWriter metadataWriter) + public TimestampColumnWriter(int column, int dwrfSequence, Type type, ColumnWriterOptions columnWriterOptions, Optional dwrfEncryptor, OrcEncoding orcEncoding, DateTimeZone hiveStorageTimeZone, MetadataWriter metadataWriter) { checkArgument(column >= 0, "column is negative"); + checkArgument(dwrfSequence >= 0, "sequence is negative"); requireNonNull(columnWriterOptions, "compression is null"); requireNonNull(dwrfEncryptor, "dwrfEncryptor is null"); requireNonNull(metadataWriter, "metadataWriter is null"); this.column = column; + this.dwrfSequence = dwrfSequence; this.type = requireNonNull(type, "type is null"); this.compressed = columnWriterOptions.getCompressionKind() != NONE; if (orcEncoding == DWRF) { @@ -215,7 +218,7 @@ public List getIndexStreams() } Slice slice = metadataWriter.writeRowIndexes(rowGroupIndexes.build()); - Stream stream = new Stream(column, StreamKind.ROW_INDEX, slice.length(), false); + Stream stream = new Stream(column, dwrfSequence, StreamKind.ROW_INDEX, slice.length(), false); return ImmutableList.of(new StreamDataOutput(slice, stream)); } @@ -238,9 +241,10 @@ public List getDataStreams() checkState(closed); ImmutableList.Builder outputDataStreams = ImmutableList.builder(); - presentStream.getStreamDataOutput(column).ifPresent(outputDataStreams::add); - outputDataStreams.add(secondsStream.getStreamDataOutput(column)); - outputDataStreams.add(nanosStream.getStreamDataOutput(column)); + presentStream.getStreamDataOutput(column, dwrfSequence).ifPresent(outputDataStreams::add); + outputDataStreams.add(secondsStream.getStreamDataOutput(column, dwrfSequence)); + outputDataStreams.add(nanosStream.getStreamDataOutput(column, dwrfSequence)); + return outputDataStreams.build(); } diff --git a/presto-orc/src/test/java/com/facebook/presto/orc/BenchmarkDictionaryWriter.java b/presto-orc/src/test/java/com/facebook/presto/orc/BenchmarkDictionaryWriter.java index e5c88c5bb2db4..894da37c61056 100644 --- a/presto-orc/src/test/java/com/facebook/presto/orc/BenchmarkDictionaryWriter.java +++ b/presto-orc/src/test/java/com/facebook/presto/orc/BenchmarkDictionaryWriter.java @@ -53,6 +53,7 @@ import static com.facebook.presto.common.type.VarcharType.VARCHAR; import static com.facebook.presto.orc.OrcEncoding.DWRF; import static com.facebook.presto.orc.OrcWriterOptions.DEFAULT_MAX_STRING_STATISTICS_LIMIT; +import static com.facebook.presto.orc.metadata.ColumnEncoding.DEFAULT_SEQUENCE_ID; import static com.google.common.base.Preconditions.checkState; import static io.airlift.slice.Slices.utf8Slice; import static io.airlift.units.DataSize.Unit.MEGABYTE; @@ -96,10 +97,10 @@ public void writeDirect(BenchmarkData data) ColumnWriter columnWriter; Type type = data.getType(); if (type.equals(VARCHAR)) { - columnWriter = new SliceDirectColumnWriter(COLUMN_INDEX, type, columnWriterOptions, Optional.empty(), DWRF, this::newStringStatisticsBuilder, DWRF.createMetadataWriter()); + columnWriter = new SliceDirectColumnWriter(COLUMN_INDEX, DEFAULT_SEQUENCE_ID, type, columnWriterOptions, Optional.empty(), DWRF, this::newStringStatisticsBuilder, DWRF.createMetadataWriter()); } else { - columnWriter = new LongColumnWriter(COLUMN_INDEX, type, columnWriterOptions, Optional.empty(), DWRF, IntegerStatisticsBuilder::new, DWRF.createMetadataWriter()); + columnWriter = new LongColumnWriter(COLUMN_INDEX, DEFAULT_SEQUENCE_ID, type, columnWriterOptions, Optional.empty(), DWRF, IntegerStatisticsBuilder::new, DWRF.createMetadataWriter()); } for (Block block : data.getBlocks()) { columnWriter.beginRowGroup(); @@ -144,10 +145,10 @@ private DictionaryColumnWriter getDictionaryColumnWriter(BenchmarkData data) DictionaryColumnWriter columnWriter; Type type = data.getType(); if (type.equals(VARCHAR)) { - columnWriter = new SliceDictionaryColumnWriter(COLUMN_INDEX, type, columnWriterOptions, Optional.empty(), DWRF, DWRF.createMetadataWriter()); + columnWriter = new SliceDictionaryColumnWriter(COLUMN_INDEX, DEFAULT_SEQUENCE_ID, type, columnWriterOptions, Optional.empty(), DWRF, DWRF.createMetadataWriter()); } else { - columnWriter = new LongDictionaryColumnWriter(COLUMN_INDEX, type, columnWriterOptions, Optional.empty(), DWRF, DWRF.createMetadataWriter()); + columnWriter = new LongDictionaryColumnWriter(COLUMN_INDEX, DEFAULT_SEQUENCE_ID, type, columnWriterOptions, Optional.empty(), DWRF, DWRF.createMetadataWriter()); } return columnWriter; } diff --git a/presto-orc/src/test/java/com/facebook/presto/orc/TestDecryption.java b/presto-orc/src/test/java/com/facebook/presto/orc/TestDecryption.java index 1693e35832351..2bc58ccb824b4 100644 --- a/presto-orc/src/test/java/com/facebook/presto/orc/TestDecryption.java +++ b/presto-orc/src/test/java/com/facebook/presto/orc/TestDecryption.java @@ -208,22 +208,22 @@ public void testGetStripeDecryptionKeysUnencrypted() public void testGetDiskRanges() { List unencryptedStreams = ImmutableList.of( - new Stream(3, ROW_INDEX, 5, true, DEFAULT_SEQUENCE_ID, Optional.of(15L)), - new Stream(4, ROW_INDEX, 5, true), - new Stream(3, DATA, 5, true, DEFAULT_SEQUENCE_ID, Optional.of(45L)), - new Stream(4, DATA, 5, true)); + new Stream(3, DEFAULT_SEQUENCE_ID, ROW_INDEX, 5, true, Optional.of(15L)), + new Stream(4, DEFAULT_SEQUENCE_ID, ROW_INDEX, 5, true), + new Stream(3, DEFAULT_SEQUENCE_ID, DATA, 5, true, Optional.of(45L)), + new Stream(4, DEFAULT_SEQUENCE_ID, DATA, 5, true)); List group1Streams = ImmutableList.of( - new Stream(0, ROW_INDEX, 5, true), - new Stream(5, ROW_INDEX, 5, true, DEFAULT_SEQUENCE_ID, Optional.of(25L)), - new Stream(0, DATA, 5, true, DEFAULT_SEQUENCE_ID, Optional.of(30L)), - new Stream(5, DATA, 5, true, DEFAULT_SEQUENCE_ID, Optional.of(55L))); + new Stream(0, DEFAULT_SEQUENCE_ID, ROW_INDEX, 5, true), + new Stream(5, DEFAULT_SEQUENCE_ID, ROW_INDEX, 5, true, Optional.of(25L)), + new Stream(0, DEFAULT_SEQUENCE_ID, DATA, 5, true, Optional.of(30L)), + new Stream(5, DEFAULT_SEQUENCE_ID, DATA, 5, true, Optional.of(55L))); List group2Streams = ImmutableList.of( - new Stream(1, ROW_INDEX, 5, true, DEFAULT_SEQUENCE_ID, Optional.of(5L)), - new Stream(2, ROW_INDEX, 5, true), - new Stream(1, DATA, 5, true, DEFAULT_SEQUENCE_ID, Optional.of(35L)), - new Stream(2, DATA, 5, true)); + new Stream(1, DEFAULT_SEQUENCE_ID, ROW_INDEX, 5, true, Optional.of(5L)), + new Stream(2, DEFAULT_SEQUENCE_ID, ROW_INDEX, 5, true), + new Stream(1, DEFAULT_SEQUENCE_ID, DATA, 5, true, Optional.of(35L)), + new Stream(2, DEFAULT_SEQUENCE_ID, DATA, 5, true)); Map actual = getDiskRanges(ImmutableList.of(unencryptedStreams, group1Streams, group2Streams)); diff --git a/presto-orc/src/test/java/com/facebook/presto/orc/TestDictionaryColumnWriter.java b/presto-orc/src/test/java/com/facebook/presto/orc/TestDictionaryColumnWriter.java index bc193ba5aff04..0a18aac5731c8 100644 --- a/presto-orc/src/test/java/com/facebook/presto/orc/TestDictionaryColumnWriter.java +++ b/presto-orc/src/test/java/com/facebook/presto/orc/TestDictionaryColumnWriter.java @@ -52,6 +52,7 @@ import static com.facebook.presto.orc.metadata.ColumnEncoding.ColumnEncodingKind.DIRECT; import static com.facebook.presto.orc.metadata.ColumnEncoding.ColumnEncodingKind.DIRECT_V2; import static com.facebook.presto.orc.metadata.ColumnEncoding.ColumnEncodingKind.DWRF_DIRECT; +import static com.facebook.presto.orc.metadata.ColumnEncoding.DEFAULT_SEQUENCE_ID; import static com.facebook.presto.orc.metadata.CompressionKind.ZSTD; import static com.google.common.base.Preconditions.checkState; import static com.google.common.collect.Iterables.cycle; @@ -90,6 +91,7 @@ public void testStringDirectConversion() ColumnWriterOptions columnWriterOptions = ColumnWriterOptions.builder().setCompressionKind(CompressionKind.NONE).build(); DictionaryColumnWriter writer = new SliceDictionaryColumnWriter( 0, + DEFAULT_SEQUENCE_ID, VARCHAR, columnWriterOptions, Optional.empty(), diff --git a/presto-orc/src/test/java/com/facebook/presto/orc/TestStreamLayout.java b/presto-orc/src/test/java/com/facebook/presto/orc/TestStreamLayout.java index d51a5a47350b5..3d2f56ae29be4 100644 --- a/presto-orc/src/test/java/com/facebook/presto/orc/TestStreamLayout.java +++ b/presto-orc/src/test/java/com/facebook/presto/orc/TestStreamLayout.java @@ -26,6 +26,7 @@ import java.util.Iterator; import java.util.List; +import static com.facebook.presto.orc.metadata.ColumnEncoding.DEFAULT_SEQUENCE_ID; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertFalse; @@ -33,7 +34,7 @@ public class TestStreamLayout { private static StreamDataOutput createStream(int column, StreamKind streamKind, int length) { - Stream stream = new Stream(column, streamKind, length, true); + Stream stream = new Stream(column, DEFAULT_SEQUENCE_ID, streamKind, length, true); return new StreamDataOutput(Slices.allocate(1024), stream); } diff --git a/presto-orc/src/test/java/com/facebook/presto/orc/stream/AbstractTestValueStream.java b/presto-orc/src/test/java/com/facebook/presto/orc/stream/AbstractTestValueStream.java index 94a281e6318a4..63bf809710194 100644 --- a/presto-orc/src/test/java/com/facebook/presto/orc/stream/AbstractTestValueStream.java +++ b/presto-orc/src/test/java/com/facebook/presto/orc/stream/AbstractTestValueStream.java @@ -29,6 +29,7 @@ import java.util.Optional; import static com.facebook.presto.orc.OrcDecompressor.createOrcDecompressor; +import static com.facebook.presto.orc.metadata.ColumnEncoding.DEFAULT_SEQUENCE_ID; import static com.facebook.presto.orc.metadata.CompressionKind.SNAPPY; import static io.airlift.units.DataSize.Unit.KILOBYTE; import static java.lang.Math.toIntExact; @@ -57,7 +58,7 @@ protected void testWriteValue(List> groups) outputStream.close(); DynamicSliceOutput sliceOutput = new DynamicSliceOutput(1000); - StreamDataOutput streamDataOutput = outputStream.getStreamDataOutput(33); + StreamDataOutput streamDataOutput = outputStream.getStreamDataOutput(33, DEFAULT_SEQUENCE_ID); streamDataOutput.writeData(sliceOutput); Stream stream = streamDataOutput.getStream(); assertEquals(stream.getStreamKind(), StreamKind.DATA); diff --git a/presto-orc/src/test/java/com/facebook/presto/orc/stream/TestBooleanStream.java b/presto-orc/src/test/java/com/facebook/presto/orc/stream/TestBooleanStream.java index cb050ef5e9e66..fdbe4c9e6b539 100644 --- a/presto-orc/src/test/java/com/facebook/presto/orc/stream/TestBooleanStream.java +++ b/presto-orc/src/test/java/com/facebook/presto/orc/stream/TestBooleanStream.java @@ -30,6 +30,7 @@ import java.util.List; import java.util.Optional; +import static com.facebook.presto.orc.metadata.ColumnEncoding.DEFAULT_SEQUENCE_ID; import static org.testng.Assert.assertEquals; public class TestBooleanStream @@ -181,7 +182,7 @@ private BooleanInputStream createValueStream(BooleanOutputStream outputStream) throws OrcCorruptionException { DynamicSliceOutput sliceOutput = new DynamicSliceOutput(1000); - StreamDataOutput streamDataOutput = outputStream.getStreamDataOutput(33); + StreamDataOutput streamDataOutput = outputStream.getStreamDataOutput(33, DEFAULT_SEQUENCE_ID); streamDataOutput.writeData(sliceOutput); Stream stream = streamDataOutput.getStream(); assertEquals(stream.getStreamKind(), StreamKind.DATA);