diff --git a/velox/connectors/hive/HiveConfig.cpp b/velox/connectors/hive/HiveConfig.cpp index 275ff690b04..ff53176a8d1 100644 --- a/velox/connectors/hive/HiveConfig.cpp +++ b/velox/connectors/hive/HiveConfig.cpp @@ -303,4 +303,14 @@ bool HiveConfig::cacheNoRetention(const Config* session) const { config_->get(kCacheNoRetention, /*defaultValue=*/false)); } +std::string HiveConfig::parquetDataPageVersion(const Config* session) const { + const auto parquetDataPageVersion = session->get( + kParquetDataPageVersion, + config_->get(kParquetDataPageVersion, "PARQUET_2_0")); + VELOX_CHECK( + parquetDataPageVersion == "PARQUET_1_0" || + parquetDataPageVersion == "PARQUET_2_0", + "Invalid Parquet version."); + return parquetDataPageVersion; +} } // namespace facebook::velox::connector::hive diff --git a/velox/connectors/hive/HiveConfig.h b/velox/connectors/hive/HiveConfig.h index 312d810b8e6..c4a852c9c2d 100644 --- a/velox/connectors/hive/HiveConfig.h +++ b/velox/connectors/hive/HiveConfig.h @@ -240,6 +240,10 @@ class HiveConfig { static constexpr const char* kCacheNoRetention = "cache.no_retention"; static constexpr const char* kCacheNoRetentionSession = "cache.no_retention"; + /// File format version to be used by Parquet writer. + static constexpr const char* kParquetDataPageVersion = + "parquet_writer_version"; + InsertExistingPartitionsBehavior insertExistingPartitionsBehavior( const Config* session) const; @@ -344,6 +348,11 @@ class HiveConfig { /// locality. bool cacheNoRetention(const Config* session) const; + /// Returns the datapage version used by the Parquet writer. + /// Default version is PARQUET_2_0; PARQUET_1_0 can be specified + /// via session property or Hive config. + std::string parquetDataPageVersion(const Config* session) const; + HiveConfig(std::shared_ptr config) { VELOX_CHECK_NOT_NULL( config, "Config is null for HiveConfig initialization"); diff --git a/velox/connectors/hive/HiveDataSink.cpp b/velox/connectors/hive/HiveDataSink.cpp index 645833e5fb4..643a2729aa6 100644 --- a/velox/connectors/hive/HiveDataSink.cpp +++ b/velox/connectors/hive/HiveDataSink.cpp @@ -770,7 +770,17 @@ uint32_t HiveDataSink::appendWriter(const HiveWriterId& id) { if (!options->zstdCompressionLevel) { options->zstdCompressionLevel = compressionLevel.value_or(kDefaultZstdCompressionLevel); - } + + auto parquetDataPageVersion = + hiveConfig_->parquetDataPageVersion(connectorSessionProperties); + + if (parquetDataPageVersion == "PARQUET_1_0") { + options->parquetDataPageVersion = + dwio::common::ParquetDataPageVersion::V1; + } else { + options->parquetDataPageVersion = + dwio::common::ParquetDataPageVersion::V2; + } // Prevents the memory allocation during the writer creation. WRITER_NON_RECLAIMABLE_SECTION_GUARD(writerInfo_.size() - 1); diff --git a/velox/dwio/common/Options.h b/velox/dwio/common/Options.h index 1f8c0daee43..e8188b7b698 100644 --- a/velox/dwio/common/Options.h +++ b/velox/dwio/common/Options.h @@ -592,6 +592,8 @@ class ReaderOptions : public io::ReaderOptions { const date::time_zone* sessionTimezone_{nullptr}; }; +enum class ParquetDataPageVersion { V1, V2 }; + struct WriterOptions { TypePtr schema{nullptr}; velox::memory::MemoryPool* memoryPool{nullptr}; @@ -615,6 +617,7 @@ struct WriterOptions { std::optional parquetWriteTimestampUnit; std::optional zlibCompressionLevel; std::optional zstdCompressionLevel; + ParquetDataPageVersion parquetDataPageVersion; virtual ~WriterOptions() = default; }; diff --git a/velox/dwio/parquet/tests/writer/ParquetWriterTest.cpp b/velox/dwio/parquet/tests/writer/ParquetWriterTest.cpp index f1caaecaf9b..8fab98ba27d 100644 --- a/velox/dwio/parquet/tests/writer/ParquetWriterTest.cpp +++ b/velox/dwio/parquet/tests/writer/ParquetWriterTest.cpp @@ -21,7 +21,9 @@ #include "velox/common/testutil/TestValue.h" #include "velox/connectors/hive/HiveConnector.h" #include "velox/core/QueryCtx.h" +#include "velox/dwio/parquet/reader/PageReader.h" #include "velox/dwio/parquet/tests/ParquetTestBase.h" +#include "velox/dwio/parquet/writer/arrow/Properties.h" #include "velox/exec/tests/utils/Cursor.h" #include "velox/exec/tests/utils/PlanBuilder.h" #include "velox/exec/tests/utils/QueryAssertions.h" @@ -67,6 +69,26 @@ class ParquetWriterTest : public ParquetTestBase { opts); }; + facebook::velox::parquet::thrift::PageType::type getDataPageVersion( + const dwio::common::MemorySink* sinkPtr, + const facebook::velox::parquet::ColumnChunkMetaDataPtr& colChunkPtr) { + std::string_view sinkData(sinkPtr->data(), sinkPtr->size()); + auto readFile = std::make_shared(sinkData); + auto file = std::make_shared(std::move(readFile)); + auto inputStream = std::make_unique( + std::move(file), + colChunkPtr.dataPageOffset(), + 150, + *leafPool_, + LogType::TEST); + auto pageReader = std::make_unique( + std::move(inputStream), + *leafPool_, + colChunkPtr.compression(), + colChunkPtr.totalCompressedSize()); + return pageReader->readPageHeader().type; + }; + inline static const std::string kHiveConnectorId = "test-hive"; }; @@ -136,6 +158,64 @@ TEST_F(ParquetWriterTest, compression) { assertReadWithReaderAndExpected(schema, *rowReader, data, *leafPool_); }; +TEST_F(ParquetWriterTest, datapageVersionV1) { + auto schema = ROW({"c0"}, {INTEGER()}); + const int64_t kRows = 1; + const auto data = makeRowVector({ + makeFlatVector(kRows, [](auto row) { return 987; }), + }); + + // Create an in-memory writer + auto sink = std::make_unique( + 200 * 1024 * 1024, + dwio::common::FileSink::Options{.pool = leafPool_.get()}); + auto sinkPtr = sink.get(); + facebook::velox::parquet::WriterOptions writerOptions; + writerOptions.memoryPool = leafPool_.get(); + writerOptions.parquetDataPageVersion = + facebook::velox::parquet::arrow::ParquetDataPageVersion::V1; + + auto writer = std::make_unique( + std::move(sink), writerOptions, rootPool_, schema); + writer->write(data); + writer->close(); + + dwio::common::ReaderOptions readerOptions{leafPool_.get()}; + auto reader = createReaderInMemory(*sinkPtr, readerOptions); + auto dataPageVersion = getDataPageVersion( + sinkPtr, reader->fileMetaData().rowGroup(0).columnChunk(0)); + ASSERT_EQ(dataPageVersion, thrift::PageType::type::DATA_PAGE); +}; + +TEST_F(ParquetWriterTest, datapageVersionV2) { + auto schema = ROW({"c0"}, {INTEGER()}); + const int64_t kRows = 1; + const auto data = makeRowVector({ + makeFlatVector(kRows, [](auto row) { return 987; }), + }); + + // Create an in-memory writer + auto sink = std::make_unique( + 200 * 1024 * 1024, + dwio::common::FileSink::Options{.pool = leafPool_.get()}); + auto sinkPtr = sink.get(); + facebook::velox::parquet::WriterOptions writerOptions; + writerOptions.memoryPool = leafPool_.get(); + writerOptions.parquetDataPageVersion = + facebook::velox::parquet::arrow::ParquetDataPageVersion::V1; + + auto writer = std::make_unique( + std::move(sink), writerOptions, rootPool_, schema); + writer->write(data); + writer->close(); + + dwio::common::ReaderOptions readerOptions{leafPool_.get()}; + auto reader = createReaderInMemory(*sinkPtr, readerOptions); + auto dataPageVersion = getDataPageVersion( + sinkPtr, reader->fileMetaData().rowGroup(0).columnChunk(0)); + ASSERT_EQ(dataPageVersion, thrift::PageType::type::DATA_PAGE_V2); +}; + DEBUG_ONLY_TEST_F(ParquetWriterTest, unitFromWriterOptions) { SCOPED_TESTVALUE_SET( "facebook::velox::parquet::Writer::write", diff --git a/velox/dwio/parquet/writer/Writer.cpp b/velox/dwio/parquet/writer/Writer.cpp index 56f686126a1..1b4016fd518 100644 --- a/velox/dwio/parquet/writer/Writer.cpp +++ b/velox/dwio/parquet/writer/Writer.cpp @@ -144,6 +144,7 @@ std::shared_ptr getArrowParquetWriterOptions( static_cast(flushPolicy->rowsInRowGroup())); properties = properties->codec_options(options.codecOptions); properties = properties->enable_store_decimal_as_integer(); + properties = properties->data_page_version(options.parquetDataPageVersion); return properties->build(); } @@ -394,6 +395,16 @@ parquet::WriterOptions getParquetOptions( parquetOptions.parquetWriteTimestampUnit = options.parquetWriteTimestampUnit.value(); } + + // Default Parquet datapage version is V2; V1 can be set via session property + // or Hive config. + if (options.parquetDataPageVersion == + dwio::common::ParquetDataPageVersion::V1) { + parquetOptions.parquetDataPageVersion = arrow::ParquetDataPageVersion::V1; + } else { + parquetOptions.parquetDataPageVersion = arrow::ParquetDataPageVersion::V2; + } + return parquetOptions; } diff --git a/velox/dwio/parquet/writer/Writer.h b/velox/dwio/parquet/writer/Writer.h index 98e735af9b8..1d988ad23d9 100644 --- a/velox/dwio/parquet/writer/Writer.h +++ b/velox/dwio/parquet/writer/Writer.h @@ -23,6 +23,7 @@ #include "velox/dwio/common/Options.h" #include "velox/dwio/common/Writer.h" #include "velox/dwio/common/WriterFactory.h" +#include "velox/dwio/parquet/writer/arrow/Properties.h" #include "velox/dwio/parquet/writer/arrow/Types.h" #include "velox/dwio/parquet/writer/arrow/util/Compression.h" #include "velox/vector/ComplexVector.h" @@ -97,6 +98,10 @@ struct WriterOptions { common::CompressionKind compression = common::CompressionKind_NONE; arrow::Encoding::type encoding = arrow::Encoding::PLAIN; velox::memory::MemoryPool* memoryPool; + // Default Parquet datapage version is V2 - can be set to V1 via session + // property. + arrow::ParquetDataPageVersion parquetDataPageVersion = + arrow::ParquetDataPageVersion::V2; // The default factory allows the writer to construct the default flush // policy with the configs in its ctor. std::function()> flushPolicyFactory;