From df54132972d48399d736c51065444392da696c33 Mon Sep 17 00:00:00 2001 From: Josh Howard Date: Mon, 11 Oct 2021 14:03:58 -0400 Subject: [PATCH] Change native parquet writer to write v1 parquet files --- .../trino/parquet/writer/ParquetWriter.java | 4 +- .../trino/parquet/writer/ParquetWriters.java | 4 +- .../parquet/writer/PrimitiveColumnWriter.java | 107 +++++++----------- .../product/hive/TestHiveCompression.java | 17 +-- .../hive/TestHiveSparkCompatibility.java | 8 +- 5 files changed, 49 insertions(+), 91 deletions(-) diff --git a/lib/trino-parquet/src/main/java/io/trino/parquet/writer/ParquetWriter.java b/lib/trino-parquet/src/main/java/io/trino/parquet/writer/ParquetWriter.java index 101879833880..a26bdf99dcdd 100644 --- a/lib/trino-parquet/src/main/java/io/trino/parquet/writer/ParquetWriter.java +++ b/lib/trino-parquet/src/main/java/io/trino/parquet/writer/ParquetWriter.java @@ -50,7 +50,7 @@ import static java.lang.Math.toIntExact; import static java.nio.charset.StandardCharsets.US_ASCII; import static java.util.Objects.requireNonNull; -import static org.apache.parquet.column.ParquetProperties.WriterVersion.PARQUET_2_0; +import static org.apache.parquet.column.ParquetProperties.WriterVersion.PARQUET_1_0; public class ParquetWriter implements Closeable @@ -90,7 +90,7 @@ public ParquetWriter( requireNonNull(compressionCodecName, "compressionCodecName is null"); ParquetProperties parquetProperties = ParquetProperties.builder() - .withWriterVersion(PARQUET_2_0) + .withWriterVersion(PARQUET_1_0) .withPageSize(writerOption.getMaxPageSize()) .build(); diff --git a/lib/trino-parquet/src/main/java/io/trino/parquet/writer/ParquetWriters.java b/lib/trino-parquet/src/main/java/io/trino/parquet/writer/ParquetWriters.java index 203c2868c92f..0d49a4c6a9fb 100644 --- a/lib/trino-parquet/src/main/java/io/trino/parquet/writer/ParquetWriters.java +++ b/lib/trino-parquet/src/main/java/io/trino/parquet/writer/ParquetWriters.java @@ -148,8 +148,8 @@ public ColumnWriter primitive(PrimitiveType primitive) return new PrimitiveColumnWriter( columnDescriptor, getValueWriter(parquetProperties.newValuesWriter(columnDescriptor), trinoType, columnDescriptor.getPrimitiveType()), - parquetProperties.newDefinitionLevelEncoder(columnDescriptor), - parquetProperties.newRepetitionLevelEncoder(columnDescriptor), + parquetProperties.newDefinitionLevelWriter(columnDescriptor), + parquetProperties.newRepetitionLevelWriter(columnDescriptor), compressionCodecName, parquetProperties.getPageSizeThreshold()); } diff --git a/lib/trino-parquet/src/main/java/io/trino/parquet/writer/PrimitiveColumnWriter.java b/lib/trino-parquet/src/main/java/io/trino/parquet/writer/PrimitiveColumnWriter.java index f9abaaeaf3c0..cca70680901d 100644 --- a/lib/trino-parquet/src/main/java/io/trino/parquet/writer/PrimitiveColumnWriter.java +++ b/lib/trino-parquet/src/main/java/io/trino/parquet/writer/PrimitiveColumnWriter.java @@ -25,7 +25,7 @@ import org.apache.parquet.column.Encoding; import org.apache.parquet.column.page.DictionaryPage; import org.apache.parquet.column.statistics.Statistics; -import org.apache.parquet.column.values.rle.RunLengthBitPackingHybridEncoder; +import org.apache.parquet.column.values.ValuesWriter; import org.apache.parquet.format.ColumnMetaData; import org.apache.parquet.format.PageEncodingStats; import org.apache.parquet.format.PageType; @@ -63,8 +63,8 @@ public class PrimitiveColumnWriter private final CompressionCodecName compressionCodec; private final PrimitiveValueWriter primitiveValueWriter; - private final RunLengthBitPackingHybridEncoder definitionLevelEncoder; - private final RunLengthBitPackingHybridEncoder repetitionLevelEncoder; + private final ValuesWriter definitionLevelWriter; + private final ValuesWriter repetitionLevelWriter; private final ParquetMetadataConverter parquetMetadataConverter = new ParquetMetadataConverter(); @@ -72,9 +72,8 @@ public class PrimitiveColumnWriter private boolean getDataStreamsCalled; // current page stats - private int currentPageRows; + private int valueCount; private int currentPageNullCounts; - private int currentPageRowCount; // column meta data stats private final Set encodings = new HashSet<>(); @@ -82,7 +81,7 @@ public class PrimitiveColumnWriter private final Map dictionaryPagesWithEncoding = new HashMap<>(); private long totalCompressedSize; private long totalUnCompressedSize; - private long totalRows; + private long totalValues; private Statistics columnStatistics; private final int maxDefinitionLevel; @@ -94,18 +93,16 @@ public class PrimitiveColumnWriter private final int pageSizeThreshold; - public PrimitiveColumnWriter(ColumnDescriptor columnDescriptor, PrimitiveValueWriter primitiveValueWriter, RunLengthBitPackingHybridEncoder definitionLevelEncoder, RunLengthBitPackingHybridEncoder repetitionLevelEncoder, CompressionCodecName compressionCodecName, int pageSizeThreshold) + public PrimitiveColumnWriter(ColumnDescriptor columnDescriptor, PrimitiveValueWriter primitiveValueWriter, ValuesWriter definitionLevelWriter, ValuesWriter repetitionLevelWriter, CompressionCodecName compressionCodecName, int pageSizeThreshold) { this.columnDescriptor = requireNonNull(columnDescriptor, "columnDescriptor is null"); this.maxDefinitionLevel = columnDescriptor.getMaxDefinitionLevel(); - - this.definitionLevelEncoder = requireNonNull(definitionLevelEncoder, "definitionLevelEncoder is null"); - this.repetitionLevelEncoder = requireNonNull(repetitionLevelEncoder, "repetitionLevelEncoder is null"); + this.definitionLevelWriter = requireNonNull(definitionLevelWriter, "definitionLevelWriter is null"); + this.repetitionLevelWriter = requireNonNull(repetitionLevelWriter, "repetitionLevelWriter is null"); this.primitiveValueWriter = requireNonNull(primitiveValueWriter, "primitiveValueWriter is null"); this.compressionCodec = requireNonNull(compressionCodecName, "compressionCodecName is null"); this.compressor = getCompressor(compressionCodecName); this.pageSizeThreshold = pageSizeThreshold; - this.columnStatistics = Statistics.createStats(columnDescriptor.getPrimitiveType()); } @@ -132,21 +129,18 @@ public void writeBlock(ColumnChunk columnChunk) Iterator defIterator = DefLevelIterables.getIterator(current.getDefLevelIterables()); while (defIterator.hasNext()) { int next = defIterator.next(); - definitionLevelEncoder.writeInt(next); + definitionLevelWriter.writeInteger(next); if (next != maxDefinitionLevel) { currentPageNullCounts++; } - currentPageRows++; + valueCount++; } // write repetition levels Iterator repIterator = getIterator(current.getRepLevelIterables()); while (repIterator.hasNext()) { int next = repIterator.next(); - repetitionLevelEncoder.writeInt(next); - if (next == 0) { - currentPageRowCount++; - } + repetitionLevelWriter.writeInteger(next); } if (getBufferedBytes() >= pageSizeThreshold) { @@ -178,14 +172,14 @@ private ColumnMetaData getColumnMetaData() encodings.stream().map(parquetMetadataConverter::getEncoding).collect(toImmutableList()), ImmutableList.copyOf(columnDescriptor.getPath()), compressionCodec.getParquetCompressionCodec(), - totalRows, + totalValues, totalUnCompressedSize, totalCompressedSize, -1); columnMetaData.setStatistics(ParquetMetadataConverter.toParquetStatistics(columnStatistics)); ImmutableList.Builder pageEncodingStats = ImmutableList.builder(); dataPagesWithEncoding.entrySet().stream() - .map(encodingAndCount -> new PageEncodingStats(PageType.DATA_PAGE_V2, encodingAndCount.getKey(), encodingAndCount.getValue())) + .map(encodingAndCount -> new PageEncodingStats(PageType.DATA_PAGE, encodingAndCount.getKey(), encodingAndCount.getValue())) .forEach(pageEncodingStats::add); dictionaryPagesWithEncoding.entrySet().stream() .map(encodingAndCount -> new PageEncodingStats(PageType.DICTIONARY_PAGE, encodingAndCount.getKey(), encodingAndCount.getValue())) @@ -203,68 +197,53 @@ private void flushCurrentPageToBuffer() { ImmutableList.Builder outputDataStreams = ImmutableList.builder(); - BytesInput bytes = primitiveValueWriter.getBytes(); - ParquetDataOutput repetitions = createDataOutput(copy(repetitionLevelEncoder.toBytes())); - ParquetDataOutput definitions = createDataOutput(copy(definitionLevelEncoder.toBytes())); - - // Add encoding should be called after primitiveValueWriter.getBytes() and before primitiveValueWriter.reset() - encodings.add(primitiveValueWriter.getEncoding()); - - long uncompressedSize = bytes.size() + repetitions.size() + definitions.size(); - - ParquetDataOutput data; - long compressedSize; - if (compressor != null) { - data = compressor.compress(bytes); - compressedSize = data.size() + repetitions.size() + definitions.size(); - } - else { - data = createDataOutput(copy(bytes)); - compressedSize = uncompressedSize; - } + BytesInput bytesInput = BytesInput.concat(copy(repetitionLevelWriter.getBytes()), + copy(definitionLevelWriter.getBytes()), + copy(primitiveValueWriter.getBytes())); + ParquetDataOutput pageData = (compressor != null) ? compressor.compress(bytesInput) : createDataOutput(bytesInput); + long uncompressedSize = bytesInput.size(); + long compressedSize = pageData.size(); ByteArrayOutputStream pageHeaderOutputStream = new ByteArrayOutputStream(); Statistics statistics = primitiveValueWriter.getStatistics(); statistics.incrementNumNulls(currentPageNullCounts); - columnStatistics.mergeStatistics(statistics); - parquetMetadataConverter.writeDataPageV2Header((int) uncompressedSize, + parquetMetadataConverter.writeDataPageV1Header((int) uncompressedSize, (int) compressedSize, - currentPageRows, - currentPageNullCounts, - currentPageRowCount, - statistics, + valueCount, + repetitionLevelWriter.getEncoding(), + definitionLevelWriter.getEncoding(), primitiveValueWriter.getEncoding(), - (int) repetitions.size(), - (int) definitions.size(), pageHeaderOutputStream); ParquetDataOutput pageHeader = createDataOutput(Slices.wrappedBuffer(pageHeaderOutputStream.toByteArray())); outputDataStreams.add(pageHeader); - outputDataStreams.add(repetitions); - outputDataStreams.add(definitions); - outputDataStreams.add(data); + outputDataStreams.add(pageData); List dataOutputs = outputDataStreams.build(); - dataPagesWithEncoding.merge(new ParquetMetadataConverter().getEncoding(primitiveValueWriter.getEncoding()), 1, Integer::sum); + dataPagesWithEncoding.merge(parquetMetadataConverter.getEncoding(primitiveValueWriter.getEncoding()), 1, Integer::sum); // update total stats - totalCompressedSize += pageHeader.size() + compressedSize; totalUnCompressedSize += pageHeader.size() + uncompressedSize; - totalRows += currentPageRows; + totalCompressedSize += pageHeader.size() + compressedSize; + totalValues += valueCount; pageBuffer.addAll(dataOutputs); + // Add encoding should be called after ValuesWriter#getBytes() and before ValuesWriter#reset() + encodings.add(repetitionLevelWriter.getEncoding()); + encodings.add(definitionLevelWriter.getEncoding()); + encodings.add(primitiveValueWriter.getEncoding()); + // reset page stats - currentPageRows = 0; + valueCount = 0; currentPageNullCounts = 0; - currentPageRowCount = 0; - definitionLevelEncoder.reset(); - repetitionLevelEncoder.reset(); + repetitionLevelWriter.reset(); + definitionLevelWriter.reset(); primitiveValueWriter.reset(); } @@ -272,7 +251,7 @@ private List getDataStreams() throws IOException { List dictPage = new ArrayList<>(); - if (currentPageRows > 0) { + if (valueCount > 0) { flushCurrentPageToBuffer(); } // write dict page if possible @@ -314,8 +293,8 @@ private List getDataStreams() public long getBufferedBytes() { return pageBuffer.stream().mapToLong(ParquetDataOutput::size).sum() + - definitionLevelEncoder.getBufferedSize() + - repetitionLevelEncoder.getBufferedSize() + + definitionLevelWriter.getBufferedSize() + + repetitionLevelWriter.getBufferedSize() + primitiveValueWriter.getBufferedSize(); } @@ -324,22 +303,22 @@ public long getRetainedBytes() { return INSTANCE_SIZE + primitiveValueWriter.getAllocatedSize() + - definitionLevelEncoder.getAllocatedSize() + - repetitionLevelEncoder.getAllocatedSize(); + definitionLevelWriter.getAllocatedSize() + + repetitionLevelWriter.getAllocatedSize(); } @Override public void reset() { - definitionLevelEncoder.reset(); - repetitionLevelEncoder.reset(); + definitionLevelWriter.reset(); + repetitionLevelWriter.reset(); primitiveValueWriter.reset(); pageBuffer.clear(); closed = false; totalCompressedSize = 0; totalUnCompressedSize = 0; - totalRows = 0; + totalValues = 0; encodings.clear(); dataPagesWithEncoding.clear(); dictionaryPagesWithEncoding.clear(); diff --git a/testing/trino-product-tests/src/main/java/io/trino/tests/product/hive/TestHiveCompression.java b/testing/trino-product-tests/src/main/java/io/trino/tests/product/hive/TestHiveCompression.java index 734b2230be69..66c34e28e87d 100644 --- a/testing/trino-product-tests/src/main/java/io/trino/tests/product/hive/TestHiveCompression.java +++ b/testing/trino-product-tests/src/main/java/io/trino/tests/product/hive/TestHiveCompression.java @@ -16,7 +16,6 @@ import io.trino.tempto.Requirement; import io.trino.tempto.RequirementsProvider; import io.trino.tempto.configuration.Configuration; -import org.assertj.core.api.InstanceOfAssertFactories; import org.intellij.lang.annotations.Language; import org.testng.annotations.Test; @@ -32,7 +31,6 @@ import static io.trino.tests.product.utils.QueryExecutors.onTrino; import static java.lang.String.format; import static org.assertj.core.api.Assertions.assertThat; -import static org.assertj.core.api.Assertions.assertThatThrownBy; public class TestHiveCompression extends HiveProductTest @@ -91,20 +89,7 @@ public void testSnappyCompressedParquetTableCreatedInTrino() @Test(groups = HIVE_COMPRESSION) public void testSnappyCompressedParquetTableCreatedInTrinoWithNativeWriter() { - if (getHiveVersionMajor() >= 2) { - testSnappyCompressedParquetTableCreatedInTrino(true); - return; - } - - // TODO (https://github.com/trinodb/trino/issues/6377) Native Parquet writer creates files that cannot be read by Hive - assertThatThrownBy(() -> testSnappyCompressedParquetTableCreatedInTrino(true)) - .hasStackTraceContaining("at org.apache.hive.jdbc.HiveQueryResultSet.next") // comes via Hive JDBC - .extracting(Throwable::toString, InstanceOfAssertFactories.STRING) - // There are a few cases here each of which are downstream: - // - HDP 2 and CDH 5 cannot read Parquet V2 files and throw "org.apache.parquet.io.ParquetDecodingException: Can not read value at 0 in block -1 in file" - // - CDH 5 Parquet uses parquet.* packages, while HDP 2 uses org.apache.parquet.* packages - // - HDP 3 throws java.lang.ClassCastException: org.apache.hadoop.io.BytesWritable cannot be cast to org.apache.hadoop.hive.serde2.io.HiveVarcharWritable - .matches("\\Qio.trino.tempto.query.QueryExecutionException: java.sql.SQLException: java.io.IOException:\\E (org.apache.)?parquet.io.ParquetDecodingException: Can not read value at 0 in block -1 in file .*"); + testSnappyCompressedParquetTableCreatedInTrino(true); } private void testSnappyCompressedParquetTableCreatedInTrino(boolean optimizedParquetWriter) diff --git a/testing/trino-product-tests/src/main/java/io/trino/tests/product/hive/TestHiveSparkCompatibility.java b/testing/trino-product-tests/src/main/java/io/trino/tests/product/hive/TestHiveSparkCompatibility.java index facb1c6ad462..48fa47aa411b 100644 --- a/testing/trino-product-tests/src/main/java/io/trino/tests/product/hive/TestHiveSparkCompatibility.java +++ b/testing/trino-product-tests/src/main/java/io/trino/tests/product/hive/TestHiveSparkCompatibility.java @@ -14,7 +14,6 @@ package io.trino.tests.product.hive; import io.trino.tempto.ProductTest; -import org.assertj.core.api.InstanceOfAssertFactories; import org.testng.annotations.DataProvider; import org.testng.annotations.Test; @@ -36,7 +35,6 @@ import static java.lang.String.join; import static java.util.Collections.nCopies; import static java.util.Locale.ENGLISH; -import static org.assertj.core.api.Assertions.assertThatThrownBy; public class TestHiveSparkCompatibility extends ProductTest @@ -215,11 +213,7 @@ public void testReadTrinoCreatedParquetTable() public void testReadTrinoCreatedParquetTableWithNativeWriter() { onTrino().executeQuery("SET SESSION " + TRINO_CATALOG + ".experimental_parquet_optimized_writer_enabled = true"); - // TODO (https://github.com/trinodb/trino/issues/6377) Native Parquet Writer writes Parquet V2 files that are not compatible with Spark's vectorized reader, see https://github.com/trinodb/trino/issues/7953 for more details - assertThatThrownBy(() -> testReadTrinoCreatedTable("using_native_parquet", "PARQUET")) - .hasStackTraceContaining("at org.apache.hive.jdbc.HiveStatement.execute") - .extracting(Throwable::toString, InstanceOfAssertFactories.STRING) - .matches("\\Qio.trino.tempto.query.QueryExecutionException: java.sql.SQLException: Error running query: java.lang.UnsupportedOperationException: Unsupported encoding: RLE\\E"); + testReadTrinoCreatedTable("using_native_parquet", "PARQUET"); } private void testReadTrinoCreatedTable(String tableName, String tableFormat)