Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@
import static java.lang.Math.toIntExact;
import static java.nio.charset.StandardCharsets.US_ASCII;
import static java.util.Objects.requireNonNull;
import static org.apache.parquet.column.ParquetProperties.WriterVersion.PARQUET_2_0;

public class ParquetWriter
implements Closeable
Expand Down Expand Up @@ -90,7 +89,7 @@ public ParquetWriter(
requireNonNull(compressionCodecName, "compressionCodecName is null");

ParquetProperties parquetProperties = ParquetProperties.builder()
.withWriterVersion(PARQUET_2_0)
.withWriterVersion(writerOption.getWriterVersion())
.withPageSize(writerOption.getMaxPageSize())
.build();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package io.trino.parquet.writer;

import io.airlift.units.DataSize;
import org.apache.parquet.column.ParquetProperties.WriterVersion;
import org.apache.parquet.hadoop.ParquetWriter;

import static java.lang.Math.toIntExact;
Expand All @@ -24,6 +25,7 @@ public class ParquetWriterOptions
private static final DataSize DEFAULT_MAX_ROW_GROUP_SIZE = DataSize.ofBytes(ParquetWriter.DEFAULT_BLOCK_SIZE);
private static final DataSize DEFAULT_MAX_PAGE_SIZE = DataSize.ofBytes(ParquetWriter.DEFAULT_PAGE_SIZE);
public static final int DEFAULT_BATCH_SIZE = 10_000;
private static final WriterVersion DEFAULT_WRITER_VERSION = WriterVersion.PARQUET_2_0;

public static ParquetWriterOptions.Builder builder()
{
Expand All @@ -33,12 +35,14 @@ public static ParquetWriterOptions.Builder builder()
private final int maxRowGroupSize;
private final int maxPageSize;
private final int batchSize;
private final WriterVersion writerVersion;

private ParquetWriterOptions(DataSize maxBlockSize, DataSize maxPageSize, int batchSize)
private ParquetWriterOptions(DataSize maxBlockSize, DataSize maxPageSize, int batchSize, WriterVersion writerVersion)
{
this.maxRowGroupSize = toIntExact(requireNonNull(maxBlockSize, "maxBlockSize is null").toBytes());
this.maxPageSize = toIntExact(requireNonNull(maxPageSize, "maxPageSize is null").toBytes());
this.batchSize = batchSize;
this.writerVersion = requireNonNull(writerVersion, "writerVersion is null");
}

public long getMaxRowGroupSize()
Expand All @@ -56,11 +60,17 @@ public int getBatchSize()
return batchSize;
}

public WriterVersion getWriterVersion()
{
return writerVersion;
}

public static class Builder
{
private DataSize maxBlockSize = DEFAULT_MAX_ROW_GROUP_SIZE;
private DataSize maxPageSize = DEFAULT_MAX_PAGE_SIZE;
private int batchSize = DEFAULT_BATCH_SIZE;
private WriterVersion writerVersion = DEFAULT_WRITER_VERSION;

public Builder setMaxBlockSize(DataSize maxBlockSize)
{
Expand All @@ -80,9 +90,15 @@ public Builder setBatchSize(int batchSize)
return this;
}

public Builder setWriterVersion(WriterVersion writerVersion)
{
this.writerVersion = writerVersion;
return this;
}

public ParquetWriterOptions build()
{
return new ParquetWriterOptions(maxBlockSize, maxPageSize, batchSize);
return new ParquetWriterOptions(maxBlockSize, maxPageSize, batchSize, writerVersion);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import io.trino.spi.connector.ConnectorSession;
import io.trino.spi.session.PropertyMetadata;
import io.trino.spi.type.ArrayType;
import org.apache.parquet.column.ParquetProperties;

import javax.inject.Inject;

Expand Down Expand Up @@ -87,6 +88,7 @@ public final class HiveSessionProperties
private static final String PARQUET_WRITER_BLOCK_SIZE = "parquet_writer_block_size";
private static final String PARQUET_WRITER_PAGE_SIZE = "parquet_writer_page_size";
private static final String PARQUET_WRITER_BATCH_SIZE = "parquet_writer_batch_size";
private static final String PARQUET_WRITER_VERSION = "parquet_writer_version";
private static final String MAX_SPLIT_SIZE = "max_split_size";
private static final String MAX_INITIAL_SPLIT_SIZE = "max_initial_split_size";
private static final String RCFILE_OPTIMIZED_WRITER_VALIDATE = "rcfile_optimized_writer_validate";
Expand Down Expand Up @@ -331,6 +333,12 @@ public HiveSessionProperties(
"Parquet: Maximum number of rows passed to the writer in each batch",
parquetWriterConfig.getBatchSize(),
false),
enumProperty(
PARQUET_WRITER_VERSION,
"Parquet: Writer version",
ParquetProperties.WriterVersion.class,
ParquetProperties.WriterVersion.PARQUET_2_0,
false),
dataSizeProperty(
MAX_SPLIT_SIZE,
"Max split size",
Expand Down Expand Up @@ -644,6 +652,11 @@ public static int getParquetBatchSize(ConnectorSession session)
return session.getProperty(PARQUET_WRITER_BATCH_SIZE, Integer.class);
}

public static ParquetProperties.WriterVersion getParquetWriterVersion(ConnectorSession session)
{
return session.getProperty(PARQUET_WRITER_VERSION, ParquetProperties.WriterVersion.class);
}

public static DataSize getMaxSplitSize(ConnectorSession session)
{
return session.getProperty(MAX_SPLIT_SIZE, DataSize.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ public Optional<FileWriter> createFileWriter(
.setMaxPageSize(HiveSessionProperties.getParquetWriterPageSize(session))
.setMaxBlockSize(HiveSessionProperties.getParquetWriterBlockSize(session))
.setBatchSize(HiveSessionProperties.getParquetBatchSize(session))
.setWriterVersion(HiveSessionProperties.getParquetWriterVersion(session))
.build();

CompressionCodecName compressionCodecName = getCompression(conf);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@
import static io.trino.plugin.iceberg.IcebergSessionProperties.getParquetWriterBatchSize;
import static io.trino.plugin.iceberg.IcebergSessionProperties.getParquetWriterBlockSize;
import static io.trino.plugin.iceberg.IcebergSessionProperties.getParquetWriterPageSize;
import static io.trino.plugin.iceberg.IcebergSessionProperties.getParquetWriterVersion;
import static io.trino.plugin.iceberg.IcebergSessionProperties.isOrcWriterValidate;
import static io.trino.plugin.iceberg.TypeConverter.toOrcType;
import static io.trino.plugin.iceberg.TypeConverter.toTrinoType;
Expand Down Expand Up @@ -148,6 +149,7 @@ private IcebergFileWriter createParquetWriter(
.setMaxPageSize(getParquetWriterPageSize(session))
.setMaxBlockSize(getParquetWriterBlockSize(session))
.setBatchSize(getParquetWriterBatchSize(session))
.setWriterVersion(getParquetWriterVersion(session))
.build();

return new IcebergParquetFileWriter(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import io.trino.spi.TrinoException;
import io.trino.spi.connector.ConnectorSession;
import io.trino.spi.session.PropertyMetadata;
import org.apache.parquet.column.ParquetProperties;

import javax.inject.Inject;

Expand Down Expand Up @@ -64,6 +65,7 @@ public final class IcebergSessionProperties
private static final String PARQUET_WRITER_BLOCK_SIZE = "parquet_writer_block_size";
private static final String PARQUET_WRITER_PAGE_SIZE = "parquet_writer_page_size";
private static final String PARQUET_WRITER_BATCH_SIZE = "parquet_writer_batch_size";
private static final String PARQUET_WRITER_VERSION = "parquet_writer_version";
private final List<PropertyMetadata<?>> sessionProperties;

@Inject
Expand Down Expand Up @@ -190,6 +192,12 @@ public IcebergSessionProperties(
"Parquet: Maximum number of rows passed to the writer in each batch",
parquetWriterConfig.getBatchSize(),
false))
.add(enumProperty(
PARQUET_WRITER_VERSION,
"Parquet: Writer version",
ParquetProperties.WriterVersion.class,
ParquetProperties.WriterVersion.PARQUET_2_0,
false))
.build();
}

Expand Down Expand Up @@ -310,4 +318,9 @@ public static int getParquetWriterBatchSize(ConnectorSession session)
{
return session.getProperty(PARQUET_WRITER_BATCH_SIZE, Integer.class);
}

public static ParquetProperties.WriterVersion getParquetWriterVersion(ConnectorSession session)
{
return session.getProperty(PARQUET_WRITER_VERSION, ParquetProperties.WriterVersion.class);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -96,11 +96,7 @@ public void testSnappyCompressedParquetTableCreatedInTrinoWithNativeWriter()
assertThatThrownBy(() -> testSnappyCompressedParquetTableCreatedInTrino(true))
.hasStackTraceContaining("at org.apache.hive.jdbc.HiveQueryResultSet.next") // comes via Hive JDBC
.extracting(Throwable::toString, InstanceOfAssertFactories.STRING)
// There are a few cases here each of which are downstream:
// - HDP 2 and CDH 5 cannot read Parquet V2 files and throw "org.apache.parquet.io.ParquetDecodingException: Can not read value at 0 in block -1 in file"
// - CDH 5 Parquet uses parquet.* packages, while HDP 2 uses org.apache.parquet.* packages
// - HDP 3 throws java.lang.ClassCastException: org.apache.hadoop.io.BytesWritable cannot be cast to org.apache.hadoop.hive.serde2.io.HiveVarcharWritable
.matches("\\Qio.trino.tempto.query.QueryExecutionException: java.sql.SQLException: java.io.IOException:\\E ((org.apache.)?parquet.io.ParquetDecodingException: Can not read value at 0 in block -1 in file .*|org.apache.hadoop.hive.ql.metadata.HiveException: java.lang.ClassCastException: org.apache.hadoop.io.BytesWritable cannot be cast to org.apache.hadoop.hive.serde2.io.HiveVarcharWritable)");
.matches("\\Qio.trino.tempto.query.QueryExecutionException: java.sql.SQLException: java.io.IOException: org.apache.hadoop.hive.ql.metadata.HiveException: java.lang.ClassCastException: org.apache.hadoop.io.BytesWritable cannot be cast to org.apache.hadoop.hive.serde2.io.HiveVarcharWritable\\E");
}

private void testSnappyCompressedParquetTableCreatedInTrino(boolean optimizedParquetWriter)
Expand All @@ -117,6 +113,8 @@ private void testSnappyCompressedParquetTableCreatedInTrino(boolean optimizedPar
String catalog = (String) getOnlyElement(getOnlyElement(onTrino().executeQuery("SELECT CURRENT_CATALOG").rows()));
onTrino().executeQuery("SET SESSION " + catalog + ".compression_codec = 'SNAPPY'");
onTrino().executeQuery("SET SESSION " + catalog + ".experimental_parquet_optimized_writer_enabled = " + optimizedParquetWriter);
// TODO (https://github.com/trinodb/trino/issues/6377) Native Parquet Writer writes Parquet V2 files by default that are not compatible with Hive, see https://github.com/trinodb/trino/issues/7953 for more details
onTrino().executeQuery("SET SESSION " + catalog + ".parquet_writer_version = 'PARQUET_1_0'");
onTrino().executeQuery(format("INSERT INTO %s VALUES(1, 'test data')", tableName));

assertThat(onTrino().executeQuery("SELECT * FROM " + tableName)).containsExactlyInOrder(row(1, "test data"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
package io.trino.tests.product.hive;

import io.trino.tempto.ProductTest;
import org.assertj.core.api.InstanceOfAssertFactories;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

Expand All @@ -36,7 +35,6 @@
import static java.lang.String.join;
import static java.util.Collections.nCopies;
import static java.util.Locale.ENGLISH;
import static org.assertj.core.api.Assertions.assertThatThrownBy;

public class TestHiveSparkCompatibility
extends ProductTest
Expand Down Expand Up @@ -215,11 +213,9 @@ public void testReadTrinoCreatedParquetTable()
public void testReadTrinoCreatedParquetTableWithNativeWriter()
{
onTrino().executeQuery("SET SESSION " + TRINO_CATALOG + ".experimental_parquet_optimized_writer_enabled = true");
// TODO (https://github.com/trinodb/trino/issues/6377) Native Parquet Writer writes Parquet V2 files that are not compatible with Spark's vectorized reader, see https://github.com/trinodb/trino/issues/7953 for more details
assertThatThrownBy(() -> testReadTrinoCreatedTable("using_native_parquet", "PARQUET"))
.hasStackTraceContaining("at org.apache.hive.jdbc.HiveStatement.execute")
.extracting(Throwable::toString, InstanceOfAssertFactories.STRING)
.matches("\\Qio.trino.tempto.query.QueryExecutionException: java.sql.SQLException: Error running query: java.lang.UnsupportedOperationException: Unsupported encoding: RLE\\E");
// TODO (https://github.com/trinodb/trino/issues/6377) Native Parquet Writer writes Parquet V2 files by default that are not compatible with Spark's vectorized reader, see https://github.com/trinodb/trino/issues/7953 for more details
onTrino().executeQuery("SET SESSION " + TRINO_CATALOG + ".parquet_writer_version = 'PARQUET_1_0'");
testReadTrinoCreatedTable("using_native_parquet", "PARQUET");
}

private void testReadTrinoCreatedTable(String tableName, String tableFormat)
Expand Down