diff --git a/velox/dwio/parquet/tests/writer/ParquetWriterTest.cpp b/velox/dwio/parquet/tests/writer/ParquetWriterTest.cpp index 87a7c6f0421..cde998d8b88 100644 --- a/velox/dwio/parquet/tests/writer/ParquetWriterTest.cpp +++ b/velox/dwio/parquet/tests/writer/ParquetWriterTest.cpp @@ -22,7 +22,9 @@ #include "velox/connectors/hive/HiveConnector.h" // @manual #include "velox/core/QueryCtx.h" #include "velox/dwio/parquet/RegisterParquetWriter.h" // @manual +#include "velox/dwio/parquet/reader/PageReader.h" #include "velox/dwio/parquet/tests/ParquetTestBase.h" +#include "velox/dwio/parquet/writer/arrow/Properties.h" #include "velox/exec/tests/utils/AssertQueryBuilder.h" #include "velox/exec/tests/utils/Cursor.h" #include "velox/exec/tests/utils/PlanBuilder.h" @@ -74,6 +76,26 @@ class ParquetWriterTest : public ParquetTestBase { opts); }; + facebook::velox::parquet::thrift::PageType::type getDataPageVersion( + const dwio::common::MemorySink* sinkPtr, + const facebook::velox::parquet::ColumnChunkMetaDataPtr& colChunkPtr) { + std::string_view sinkData(sinkPtr->data(), sinkPtr->size()); + auto readFile = std::make_shared(sinkData); + auto file = std::make_shared(std::move(readFile)); + auto inputStream = std::make_unique( + std::move(file), + colChunkPtr.dataPageOffset(), + 150, + *leafPool_, + LogType::TEST); + auto pageReader = std::make_unique( + std::move(inputStream), + *leafPool_, + colChunkPtr.compression(), + colChunkPtr.totalCompressedSize()); + return pageReader->readPageHeader().type; + }; + inline static const std::string kHiveConnectorId = "test-hive"; }; @@ -143,6 +165,50 @@ TEST_F(ParquetWriterTest, compression) { assertReadWithReaderAndExpected(schema, *rowReader, data, *leafPool_); }; +TEST_F(ParquetWriterTest, datapageVersion) { + auto schema = ROW({"c0"}, {INTEGER()}); + const int64_t kRows = 1; + const auto data = makeRowVector({ + makeFlatVector(kRows, [](auto row) { return 987; }), + }); + + // Set parquet datapage version and write data - then read to ensure the + // property took effect. + const auto testDataPageVersion = + [&](facebook::velox::parquet::arrow::ParquetDataPageVersion + dataPageVersion) { + // Create an in-memory writer. + auto sink = std::make_unique( + 200 * 1024 * 1024, + dwio::common::FileSink::Options{.pool = leafPool_.get()}); + auto sinkPtr = sink.get(); + facebook::velox::parquet::WriterOptions writerOptions; + writerOptions.memoryPool = leafPool_.get(); + writerOptions.parquetDataPageVersion = dataPageVersion; + + auto writer = std::make_unique( + std::move(sink), writerOptions, rootPool_, schema); + writer->write(data); + writer->close(); + + dwio::common::ReaderOptions readerOptions{leafPool_.get()}; + auto reader = createReaderInMemory(*sinkPtr, readerOptions); + auto readDataPageVersion = getDataPageVersion( + sinkPtr, reader->fileMetaData().rowGroup(0).columnChunk(0)); + return readDataPageVersion; + }; + + ASSERT_EQ( + testDataPageVersion( + facebook::velox::parquet::arrow::ParquetDataPageVersion::V1), + thrift::PageType::type::DATA_PAGE); + + ASSERT_EQ( + testDataPageVersion( + facebook::velox::parquet::arrow::ParquetDataPageVersion::V2), + thrift::PageType::type::DATA_PAGE_V2); +}; + DEBUG_ONLY_TEST_F(ParquetWriterTest, unitFromWriterOptions) { SCOPED_TESTVALUE_SET( "facebook::velox::parquet::Writer::write", diff --git a/velox/dwio/parquet/writer/Writer.cpp b/velox/dwio/parquet/writer/Writer.cpp index 0a92aa916e0..15aa5a88b09 100644 --- a/velox/dwio/parquet/writer/Writer.cpp +++ b/velox/dwio/parquet/writer/Writer.cpp @@ -146,6 +146,7 @@ std::shared_ptr getArrowParquetWriterOptions( static_cast(flushPolicy->rowsInRowGroup())); properties = properties->codec_options(options.codecOptions); properties = properties->enable_store_decimal_as_integer(); + properties = properties->data_page_version(options.parquetDataPageVersion); return properties->build(); } @@ -433,6 +434,20 @@ std::optional getTimestampTimeZone( return std::nullopt; } +arrow::ParquetDataPageVersion getParquetDataPageVersion( + const config::ConfigBase& config, + const char* configKey) { + const auto version = config.get(configKey); + + if (version == "PARQUET_1_0") { + return arrow::ParquetDataPageVersion::V1; + } else if (version == "PARQUET_2_0") { + return arrow::ParquetDataPageVersion::V2; + } else { + VELOX_FAIL("Unsupported parquet datapage version {}", version.value()); + } +} + } // namespace void WriterOptions::processSessionConfigs(const config::ConfigBase& config) { @@ -445,6 +460,9 @@ void WriterOptions::processSessionConfigs(const config::ConfigBase& config) { parquetWriteTimestampTimeZone = getTimestampTimeZone(config, core::QueryConfig::kSessionTimezone); } + + parquetDataPageVersion = + getParquetDataPageVersion(config, kParquetSessionDataPageVersion); } void WriterOptions::processHiveConnectorConfigs( diff --git a/velox/dwio/parquet/writer/Writer.h b/velox/dwio/parquet/writer/Writer.h index e3da877be4f..e373645e13f 100644 --- a/velox/dwio/parquet/writer/Writer.h +++ b/velox/dwio/parquet/writer/Writer.h @@ -24,6 +24,7 @@ #include "velox/dwio/common/Options.h" #include "velox/dwio/common/Writer.h" #include "velox/dwio/common/WriterFactory.h" +#include "velox/dwio/parquet/writer/arrow/Properties.h" #include "velox/dwio/parquet/writer/arrow/Types.h" #include "velox/dwio/parquet/writer/arrow/util/Compression.h" #include "velox/vector/ComplexVector.h" @@ -99,6 +100,10 @@ struct WriterOptions : public dwio::common::WriterOptions { arrow::Encoding::type encoding = arrow::Encoding::PLAIN; + // Default Parquet datapage version is V2. + arrow::ParquetDataPageVersion parquetDataPageVersion = + arrow::ParquetDataPageVersion::V2; + // The default factory allows the writer to construct the default flush // policy with the configs in its ctor. std::function()> flushPolicyFactory; @@ -122,6 +127,9 @@ struct WriterOptions : public dwio::common::WriterOptions { static constexpr const char* kParquetHiveConnectorWriteTimestampUnit = "hive.parquet.writer.timestamp-unit"; + static constexpr const char* kParquetSessionDataPageVersion = + "parquet_writer_version"; + // Process hive connector and session configs. void processSessionConfigs(const config::ConfigBase& config) override; void processHiveConnectorConfigs(const config::ConfigBase& config) override;