From 9678dbba536beaaa8db993c77da83bf363e25c38 Mon Sep 17 00:00:00 2001 From: Josh Howard Date: Mon, 4 Oct 2021 14:14:47 -0400 Subject: [PATCH 1/2] Added toggle for Parquet V1 and V2 formats --- .../trino/parquet/writer/ParquetWriter.java | 3 +-- .../parquet/writer/ParquetWriterOptions.java | 20 +++++++++++++++++-- .../plugin/hive/HiveSessionProperties.java | 13 ++++++++++++ .../parquet/ParquetFileWriterFactory.java | 1 + .../iceberg/IcebergFileWriterFactory.java | 2 ++ .../iceberg/IcebergSessionProperties.java | 13 ++++++++++++ .../product/hive/TestHiveCompression.java | 8 +++----- .../hive/TestHiveSparkCompatibility.java | 10 +++------- 8 files changed, 54 insertions(+), 16 deletions(-) diff --git a/lib/trino-parquet/src/main/java/io/trino/parquet/writer/ParquetWriter.java b/lib/trino-parquet/src/main/java/io/trino/parquet/writer/ParquetWriter.java index 53502e40f89e..b7347bc7d01b 100644 --- a/lib/trino-parquet/src/main/java/io/trino/parquet/writer/ParquetWriter.java +++ b/lib/trino-parquet/src/main/java/io/trino/parquet/writer/ParquetWriter.java @@ -50,7 +50,6 @@ import static java.lang.Math.toIntExact; import static java.nio.charset.StandardCharsets.US_ASCII; import static java.util.Objects.requireNonNull; -import static org.apache.parquet.column.ParquetProperties.WriterVersion.PARQUET_2_0; public class ParquetWriter implements Closeable @@ -90,7 +89,7 @@ public ParquetWriter( requireNonNull(compressionCodecName, "compressionCodecName is null"); ParquetProperties parquetProperties = ParquetProperties.builder() - .withWriterVersion(PARQUET_2_0) + .withWriterVersion(writerOption.getWriterVersion()) .withPageSize(writerOption.getMaxPageSize()) .build(); diff --git a/lib/trino-parquet/src/main/java/io/trino/parquet/writer/ParquetWriterOptions.java b/lib/trino-parquet/src/main/java/io/trino/parquet/writer/ParquetWriterOptions.java index 3d31e31737ef..f211324878dd 100644 --- a/lib/trino-parquet/src/main/java/io/trino/parquet/writer/ParquetWriterOptions.java +++ b/lib/trino-parquet/src/main/java/io/trino/parquet/writer/ParquetWriterOptions.java @@ -14,6 +14,7 @@ package io.trino.parquet.writer; import io.airlift.units.DataSize; +import org.apache.parquet.column.ParquetProperties.WriterVersion; import org.apache.parquet.hadoop.ParquetWriter; import static java.lang.Math.toIntExact; @@ -24,6 +25,7 @@ public class ParquetWriterOptions private static final DataSize DEFAULT_MAX_ROW_GROUP_SIZE = DataSize.ofBytes(ParquetWriter.DEFAULT_BLOCK_SIZE); private static final DataSize DEFAULT_MAX_PAGE_SIZE = DataSize.ofBytes(ParquetWriter.DEFAULT_PAGE_SIZE); public static final int DEFAULT_BATCH_SIZE = 10_000; + private static final WriterVersion DEFAULT_WRITER_VERSION = WriterVersion.PARQUET_2_0; public static ParquetWriterOptions.Builder builder() { @@ -33,12 +35,14 @@ public static ParquetWriterOptions.Builder builder() private final int maxRowGroupSize; private final int maxPageSize; private final int batchSize; + private final WriterVersion writerVersion; - private ParquetWriterOptions(DataSize maxBlockSize, DataSize maxPageSize, int batchSize) + private ParquetWriterOptions(DataSize maxBlockSize, DataSize maxPageSize, int batchSize, WriterVersion writerVersion) { this.maxRowGroupSize = toIntExact(requireNonNull(maxBlockSize, "maxBlockSize is null").toBytes()); this.maxPageSize = toIntExact(requireNonNull(maxPageSize, "maxPageSize is null").toBytes()); this.batchSize = batchSize; + this.writerVersion = requireNonNull(writerVersion, "writerVersion is null"); } public long getMaxRowGroupSize() @@ -56,11 +60,17 @@ public int getBatchSize() return batchSize; } + public WriterVersion getWriterVersion() + { + return writerVersion; + } + public static class Builder { private DataSize maxBlockSize = DEFAULT_MAX_ROW_GROUP_SIZE; private DataSize maxPageSize = DEFAULT_MAX_PAGE_SIZE; private int batchSize = DEFAULT_BATCH_SIZE; + private WriterVersion writerVersion = DEFAULT_WRITER_VERSION; public Builder setMaxBlockSize(DataSize maxBlockSize) { @@ -80,9 +90,15 @@ public Builder setBatchSize(int batchSize) return this; } + public Builder setWriterVersion(WriterVersion writerVersion) + { + this.writerVersion = writerVersion; + return this; + } + public ParquetWriterOptions build() { - return new ParquetWriterOptions(maxBlockSize, maxPageSize, batchSize); + return new ParquetWriterOptions(maxBlockSize, maxPageSize, batchSize, writerVersion); } } } diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveSessionProperties.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveSessionProperties.java index 2c6c006ac3af..8b8f28cb1e3d 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveSessionProperties.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveSessionProperties.java @@ -26,6 +26,7 @@ import io.trino.spi.connector.ConnectorSession; import io.trino.spi.session.PropertyMetadata; import io.trino.spi.type.ArrayType; +import org.apache.parquet.column.ParquetProperties; import javax.inject.Inject; @@ -87,6 +88,7 @@ public final class HiveSessionProperties private static final String PARQUET_WRITER_BLOCK_SIZE = "parquet_writer_block_size"; private static final String PARQUET_WRITER_PAGE_SIZE = "parquet_writer_page_size"; private static final String PARQUET_WRITER_BATCH_SIZE = "parquet_writer_batch_size"; + private static final String PARQUET_WRITER_VERSION = "parquet_writer_version"; private static final String MAX_SPLIT_SIZE = "max_split_size"; private static final String MAX_INITIAL_SPLIT_SIZE = "max_initial_split_size"; private static final String RCFILE_OPTIMIZED_WRITER_VALIDATE = "rcfile_optimized_writer_validate"; @@ -331,6 +333,12 @@ public HiveSessionProperties( "Parquet: Maximum number of rows passed to the writer in each batch", parquetWriterConfig.getBatchSize(), false), + enumProperty( + PARQUET_WRITER_VERSION, + "Parquet: Writer version", + ParquetProperties.WriterVersion.class, + ParquetProperties.WriterVersion.PARQUET_2_0, + false), dataSizeProperty( MAX_SPLIT_SIZE, "Max split size", @@ -644,6 +652,11 @@ public static int getParquetBatchSize(ConnectorSession session) return session.getProperty(PARQUET_WRITER_BATCH_SIZE, Integer.class); } + public static ParquetProperties.WriterVersion getParquetWriterVersion(ConnectorSession session) + { + return session.getProperty(PARQUET_WRITER_VERSION, ParquetProperties.WriterVersion.class); + } + public static DataSize getMaxSplitSize(ConnectorSession session) { return session.getProperty(MAX_SPLIT_SIZE, DataSize.class); diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/parquet/ParquetFileWriterFactory.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/parquet/ParquetFileWriterFactory.java index 1f55a88aa088..aa866ad0c4cb 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/parquet/ParquetFileWriterFactory.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/parquet/ParquetFileWriterFactory.java @@ -93,6 +93,7 @@ public Optional createFileWriter( .setMaxPageSize(HiveSessionProperties.getParquetWriterPageSize(session)) .setMaxBlockSize(HiveSessionProperties.getParquetWriterBlockSize(session)) .setBatchSize(HiveSessionProperties.getParquetBatchSize(session)) + .setWriterVersion(HiveSessionProperties.getParquetWriterVersion(session)) .build(); CompressionCodecName compressionCodecName = getCompression(conf); diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergFileWriterFactory.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergFileWriterFactory.java index 435cdf7586fc..0a67a26878cb 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergFileWriterFactory.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergFileWriterFactory.java @@ -65,6 +65,7 @@ import static io.trino.plugin.iceberg.IcebergSessionProperties.getParquetWriterBatchSize; import static io.trino.plugin.iceberg.IcebergSessionProperties.getParquetWriterBlockSize; import static io.trino.plugin.iceberg.IcebergSessionProperties.getParquetWriterPageSize; +import static io.trino.plugin.iceberg.IcebergSessionProperties.getParquetWriterVersion; import static io.trino.plugin.iceberg.IcebergSessionProperties.isOrcWriterValidate; import static io.trino.plugin.iceberg.TypeConverter.toOrcType; import static io.trino.plugin.iceberg.TypeConverter.toTrinoType; @@ -148,6 +149,7 @@ private IcebergFileWriter createParquetWriter( .setMaxPageSize(getParquetWriterPageSize(session)) .setMaxBlockSize(getParquetWriterBlockSize(session)) .setBatchSize(getParquetWriterBatchSize(session)) + .setWriterVersion(getParquetWriterVersion(session)) .build(); return new IcebergParquetFileWriter( diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergSessionProperties.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergSessionProperties.java index d94a68e9b024..5050afde219f 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergSessionProperties.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergSessionProperties.java @@ -25,6 +25,7 @@ import io.trino.spi.TrinoException; import io.trino.spi.connector.ConnectorSession; import io.trino.spi.session.PropertyMetadata; +import org.apache.parquet.column.ParquetProperties; import javax.inject.Inject; @@ -64,6 +65,7 @@ public final class IcebergSessionProperties private static final String PARQUET_WRITER_BLOCK_SIZE = "parquet_writer_block_size"; private static final String PARQUET_WRITER_PAGE_SIZE = "parquet_writer_page_size"; private static final String PARQUET_WRITER_BATCH_SIZE = "parquet_writer_batch_size"; + private static final String PARQUET_WRITER_VERSION = "parquet_writer_version"; private final List> sessionProperties; @Inject @@ -190,6 +192,12 @@ public IcebergSessionProperties( "Parquet: Maximum number of rows passed to the writer in each batch", parquetWriterConfig.getBatchSize(), false)) + .add(enumProperty( + PARQUET_WRITER_VERSION, + "Parquet: Writer version", + ParquetProperties.WriterVersion.class, + ParquetProperties.WriterVersion.PARQUET_2_0, + false)) .build(); } @@ -310,4 +318,9 @@ public static int getParquetWriterBatchSize(ConnectorSession session) { return session.getProperty(PARQUET_WRITER_BATCH_SIZE, Integer.class); } + + public static ParquetProperties.WriterVersion getParquetWriterVersion(ConnectorSession session) + { + return session.getProperty(PARQUET_WRITER_VERSION, ParquetProperties.WriterVersion.class); + } } diff --git a/testing/trino-product-tests/src/main/java/io/trino/tests/product/hive/TestHiveCompression.java b/testing/trino-product-tests/src/main/java/io/trino/tests/product/hive/TestHiveCompression.java index d6760b7f4c35..2ba27e762096 100644 --- a/testing/trino-product-tests/src/main/java/io/trino/tests/product/hive/TestHiveCompression.java +++ b/testing/trino-product-tests/src/main/java/io/trino/tests/product/hive/TestHiveCompression.java @@ -96,11 +96,7 @@ public void testSnappyCompressedParquetTableCreatedInTrinoWithNativeWriter() assertThatThrownBy(() -> testSnappyCompressedParquetTableCreatedInTrino(true)) .hasStackTraceContaining("at org.apache.hive.jdbc.HiveQueryResultSet.next") // comes via Hive JDBC .extracting(Throwable::toString, InstanceOfAssertFactories.STRING) - // There are a few cases here each of which are downstream: - // - HDP 2 and CDH 5 cannot read Parquet V2 files and throw "org.apache.parquet.io.ParquetDecodingException: Can not read value at 0 in block -1 in file" - // - CDH 5 Parquet uses parquet.* packages, while HDP 2 uses org.apache.parquet.* packages - // - HDP 3 throws java.lang.ClassCastException: org.apache.hadoop.io.BytesWritable cannot be cast to org.apache.hadoop.hive.serde2.io.HiveVarcharWritable - .matches("\\Qio.trino.tempto.query.QueryExecutionException: java.sql.SQLException: java.io.IOException:\\E ((org.apache.)?parquet.io.ParquetDecodingException: Can not read value at 0 in block -1 in file .*|org.apache.hadoop.hive.ql.metadata.HiveException: java.lang.ClassCastException: org.apache.hadoop.io.BytesWritable cannot be cast to org.apache.hadoop.hive.serde2.io.HiveVarcharWritable)"); + .matches("\\Qio.trino.tempto.query.QueryExecutionException: java.sql.SQLException: java.io.IOException: org.apache.hadoop.hive.ql.metadata.HiveException: java.lang.ClassCastException: org.apache.hadoop.io.BytesWritable cannot be cast to org.apache.hadoop.hive.serde2.io.HiveVarcharWritable\\E"); } private void testSnappyCompressedParquetTableCreatedInTrino(boolean optimizedParquetWriter) @@ -117,6 +113,8 @@ private void testSnappyCompressedParquetTableCreatedInTrino(boolean optimizedPar String catalog = (String) getOnlyElement(getOnlyElement(onTrino().executeQuery("SELECT CURRENT_CATALOG").rows())); onTrino().executeQuery("SET SESSION " + catalog + ".compression_codec = 'SNAPPY'"); onTrino().executeQuery("SET SESSION " + catalog + ".experimental_parquet_optimized_writer_enabled = " + optimizedParquetWriter); + // TODO (https://github.com/trinodb/trino/issues/6377) Native Parquet Writer writes Parquet V2 files by default that are not compatible with Hive, see https://github.com/trinodb/trino/issues/7953 for more details + onTrino().executeQuery("SET SESSION " + catalog + ".parquet_writer_version = 'PARQUET_1_0'"); onTrino().executeQuery(format("INSERT INTO %s VALUES(1, 'test data')", tableName)); assertThat(onTrino().executeQuery("SELECT * FROM " + tableName)).containsExactlyInOrder(row(1, "test data")); diff --git a/testing/trino-product-tests/src/main/java/io/trino/tests/product/hive/TestHiveSparkCompatibility.java b/testing/trino-product-tests/src/main/java/io/trino/tests/product/hive/TestHiveSparkCompatibility.java index facb1c6ad462..86b95b2d8371 100644 --- a/testing/trino-product-tests/src/main/java/io/trino/tests/product/hive/TestHiveSparkCompatibility.java +++ b/testing/trino-product-tests/src/main/java/io/trino/tests/product/hive/TestHiveSparkCompatibility.java @@ -14,7 +14,6 @@ package io.trino.tests.product.hive; import io.trino.tempto.ProductTest; -import org.assertj.core.api.InstanceOfAssertFactories; import org.testng.annotations.DataProvider; import org.testng.annotations.Test; @@ -36,7 +35,6 @@ import static java.lang.String.join; import static java.util.Collections.nCopies; import static java.util.Locale.ENGLISH; -import static org.assertj.core.api.Assertions.assertThatThrownBy; public class TestHiveSparkCompatibility extends ProductTest @@ -215,11 +213,9 @@ public void testReadTrinoCreatedParquetTable() public void testReadTrinoCreatedParquetTableWithNativeWriter() { onTrino().executeQuery("SET SESSION " + TRINO_CATALOG + ".experimental_parquet_optimized_writer_enabled = true"); - // TODO (https://github.com/trinodb/trino/issues/6377) Native Parquet Writer writes Parquet V2 files that are not compatible with Spark's vectorized reader, see https://github.com/trinodb/trino/issues/7953 for more details - assertThatThrownBy(() -> testReadTrinoCreatedTable("using_native_parquet", "PARQUET")) - .hasStackTraceContaining("at org.apache.hive.jdbc.HiveStatement.execute") - .extracting(Throwable::toString, InstanceOfAssertFactories.STRING) - .matches("\\Qio.trino.tempto.query.QueryExecutionException: java.sql.SQLException: Error running query: java.lang.UnsupportedOperationException: Unsupported encoding: RLE\\E"); + // TODO (https://github.com/trinodb/trino/issues/6377) Native Parquet Writer writes Parquet V2 files by default that are not compatible with Spark's vectorized reader, see https://github.com/trinodb/trino/issues/7953 for more details + onTrino().executeQuery("SET SESSION " + TRINO_CATALOG + ".parquet_writer_version = 'PARQUET_1_0'"); + testReadTrinoCreatedTable("using_native_parquet", "PARQUET"); } private void testReadTrinoCreatedTable(String tableName, String tableFormat) From 0716e0e6dc2f634afe04619207fcf20566f76320 Mon Sep 17 00:00:00 2001 From: Piotr Findeisen Date: Tue, 5 Oct 2021 13:22:48 +0200 Subject: [PATCH 2/2] empty