From a9b4895ba9805ef4df1913a9f34479c922617c37 Mon Sep 17 00:00:00 2001 From: svm1 Date: Tue, 7 May 2024 18:54:35 -0700 Subject: [PATCH] Add support for parquet_writer_version session property --- velox/connectors/hive/HiveConfig.cpp | 9 +++++++++ velox/connectors/hive/HiveConfig.h | 8 ++++++++ velox/connectors/hive/HiveDataSink.cpp | 9 +++++++++ velox/dwio/common/Options.h | 3 +++ velox/dwio/parquet/writer/Writer.cpp | 9 +++++++++ velox/dwio/parquet/writer/Writer.h | 5 +++++ 6 files changed, 43 insertions(+) diff --git a/velox/connectors/hive/HiveConfig.cpp b/velox/connectors/hive/HiveConfig.cpp index 892f774d65d..9301c9dc9a3 100644 --- a/velox/connectors/hive/HiveConfig.cpp +++ b/velox/connectors/hive/HiveConfig.cpp @@ -232,4 +232,13 @@ uint8_t HiveConfig::parquetWriteTimestampUnit(const Config* session) const { return unit; } +std::string HiveConfig::parquetWriterVersion(const Config* session) const { + const auto parquetVersion = session->get( + kParquetWriterVersion, + config_->get(kParquetWriterVersion, "PARQUET_2_6")); + // VELOX_CHECK( + // parquetVersion == "PARQUET_1_0" || parquetVersion == "PARQUET_2_6", + // "Invalid Parquet version."); + return parquetVersion; +} } // namespace facebook::velox::connector::hive diff --git a/velox/connectors/hive/HiveConfig.h b/velox/connectors/hive/HiveConfig.h index 6c8bbf47691..b625389548e 100644 --- a/velox/connectors/hive/HiveConfig.h +++ b/velox/connectors/hive/HiveConfig.h @@ -193,6 +193,9 @@ class HiveConfig { static constexpr const char* kParquetWriteTimestampUnitSession = "hive.parquet.writer.timestamp_unit"; + /// File format version to be used by Parquet writer. + static constexpr const char* kParquetWriterVersion = "parquet_writer_version"; + InsertExistingPartitionsBehavior insertExistingPartitionsBehavior( const Config* session) const; @@ -272,6 +275,11 @@ class HiveConfig { /// through Arrow bridge. 0: second, 3: milli, 6: micro, 9: nano. uint8_t parquetWriteTimestampUnit(const Config* session) const; + /// Returns the file format version used by the Parquet writer. + /// Default version is 2.6; 1.0 can be specified via session property or Hive + /// config. + std::string parquetWriterVersion(const Config* session) const; + HiveConfig(std::shared_ptr config) { VELOX_CHECK_NOT_NULL( config, "Config is null for HiveConfig initialization"); diff --git a/velox/connectors/hive/HiveDataSink.cpp b/velox/connectors/hive/HiveDataSink.cpp index d88617a5b39..9bcfee9f405 100644 --- a/velox/connectors/hive/HiveDataSink.cpp +++ b/velox/connectors/hive/HiveDataSink.cpp @@ -668,6 +668,15 @@ uint32_t HiveDataSink::appendWriter(const HiveWriterId& id) { connectorQueryCtx_->sessionProperties(); options.schema = getNonPartitionTypes(dataChannels_, inputType_); + auto parquetVersion = + hiveConfig_->parquetWriterVersion(connectorSessionProperties); + + if (parquetVersion == "PARQUET_1_0") { + options.parquetVersion = dwio::common::ParquetVersion::PARQUET_1_0; + } else { + options.parquetVersion = dwio::common::ParquetVersion::PARQUET_2_6; + } + options.memoryPool = writerInfo_.back()->writerPool.get(); options.compressionKind = insertTableHandle_->compressionKind(); if (canReclaim()) { diff --git a/velox/dwio/common/Options.h b/velox/dwio/common/Options.h index 5f806f22b08..55c878d9557 100644 --- a/velox/dwio/common/Options.h +++ b/velox/dwio/common/Options.h @@ -603,6 +603,8 @@ class ReaderOptions : public io::ReaderOptions { } }; +enum class ParquetVersion { PARQUET_1_0, PARQUET_2_6 }; + struct WriterOptions { TypePtr schema; velox::memory::MemoryPool* memoryPool; @@ -613,6 +615,7 @@ struct WriterOptions { std::optional maxDictionaryMemory{std::nullopt}; std::map serdeParameters; std::optional parquetWriteTimestampUnit; + ParquetVersion parquetVersion; }; } // namespace facebook::velox::dwio::common diff --git a/velox/dwio/parquet/writer/Writer.cpp b/velox/dwio/parquet/writer/Writer.cpp index a37dea6c8e8..f44c4a2ea36 100644 --- a/velox/dwio/parquet/writer/Writer.cpp +++ b/velox/dwio/parquet/writer/Writer.cpp @@ -144,6 +144,7 @@ std::shared_ptr getArrowParquetWriterOptions( static_cast(flushPolicy->rowsInRowGroup())); properties = properties->codec_options(options.codecOptions); properties = properties->enable_store_decimal_as_integer(); + properties = properties->version(options.parquetVersion); return properties->build(); } @@ -394,6 +395,14 @@ parquet::WriterOptions getParquetOptions( parquetOptions.parquetWriteTimestampUnit = options.parquetWriteTimestampUnit.value(); } + + // Default Parquet file format version is 2.6. + if (options.parquetVersion == dwio::common::ParquetVersion::PARQUET_1_0) { + parquetOptions.parquetVersion = arrow::ParquetVersion::PARQUET_1_0; + } else { + parquetOptions.parquetVersion = arrow::ParquetVersion::PARQUET_2_6; + } + return parquetOptions; } diff --git a/velox/dwio/parquet/writer/Writer.h b/velox/dwio/parquet/writer/Writer.h index 7f1886708a2..a17fff330c5 100644 --- a/velox/dwio/parquet/writer/Writer.h +++ b/velox/dwio/parquet/writer/Writer.h @@ -23,6 +23,7 @@ #include "velox/dwio/common/Options.h" #include "velox/dwio/common/Writer.h" #include "velox/dwio/common/WriterFactory.h" +#include "velox/dwio/parquet/writer/arrow/Properties.h" #include "velox/dwio/parquet/writer/arrow/Types.h" #include "velox/dwio/parquet/writer/arrow/util/Compression.h" #include "velox/vector/ComplexVector.h" @@ -97,6 +98,10 @@ struct WriterOptions { common::CompressionKind compression = common::CompressionKind_NONE; arrow::Encoding::type encoding = arrow::Encoding::PLAIN; velox::memory::MemoryPool* memoryPool; + // Default Parquet file format version is 2.6 - can be set to 1.0 via session + // property. + arrow::ParquetVersion::type parquetVersion = + arrow::ParquetVersion::PARQUET_2_6; // The default factory allows the writer to construct the default flush // policy with the configs in its ctor. std::function()> flushPolicyFactory;