Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions velox/connectors/hive/HiveConfig.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -232,4 +232,13 @@ uint8_t HiveConfig::parquetWriteTimestampUnit(const Config* session) const {
return unit;
}

std::string HiveConfig::parquetWriterVersion(const Config* session) const {
const auto parquetVersion = session->get<std::string>(
kParquetWriterVersion,
config_->get<std::string>(kParquetWriterVersion, "PARQUET_2_6"));
// VELOX_CHECK(
// parquetVersion == "PARQUET_1_0" || parquetVersion == "PARQUET_2_6",
// "Invalid Parquet version.");
return parquetVersion;
}
} // namespace facebook::velox::connector::hive
8 changes: 8 additions & 0 deletions velox/connectors/hive/HiveConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,9 @@ class HiveConfig {
static constexpr const char* kParquetWriteTimestampUnitSession =
"hive.parquet.writer.timestamp_unit";

/// File format version to be used by Parquet writer.
static constexpr const char* kParquetWriterVersion = "parquet_writer_version";

InsertExistingPartitionsBehavior insertExistingPartitionsBehavior(
const Config* session) const;

Expand Down Expand Up @@ -272,6 +275,11 @@ class HiveConfig {
/// through Arrow bridge. 0: second, 3: milli, 6: micro, 9: nano.
uint8_t parquetWriteTimestampUnit(const Config* session) const;

/// Returns the file format version used by the Parquet writer.
/// Default version is 2.6; 1.0 can be specified via session property or Hive
/// config.
std::string parquetWriterVersion(const Config* session) const;

HiveConfig(std::shared_ptr<const Config> config) {
VELOX_CHECK_NOT_NULL(
config, "Config is null for HiveConfig initialization");
Expand Down
9 changes: 9 additions & 0 deletions velox/connectors/hive/HiveDataSink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -668,6 +668,15 @@ uint32_t HiveDataSink::appendWriter(const HiveWriterId& id) {
connectorQueryCtx_->sessionProperties();
options.schema = getNonPartitionTypes(dataChannels_, inputType_);

auto parquetVersion =
hiveConfig_->parquetWriterVersion(connectorSessionProperties);

if (parquetVersion == "PARQUET_1_0") {
options.parquetVersion = dwio::common::ParquetVersion::PARQUET_1_0;
} else {
options.parquetVersion = dwio::common::ParquetVersion::PARQUET_2_6;
}

options.memoryPool = writerInfo_.back()->writerPool.get();
options.compressionKind = insertTableHandle_->compressionKind();
if (canReclaim()) {
Expand Down
3 changes: 3 additions & 0 deletions velox/dwio/common/Options.h
Original file line number Diff line number Diff line change
Expand Up @@ -603,6 +603,8 @@ class ReaderOptions : public io::ReaderOptions {
}
};

enum class ParquetVersion { PARQUET_1_0, PARQUET_2_6 };

struct WriterOptions {
TypePtr schema;
velox::memory::MemoryPool* memoryPool;
Expand All @@ -613,6 +615,7 @@ struct WriterOptions {
std::optional<uint64_t> maxDictionaryMemory{std::nullopt};
std::map<std::string, std::string> serdeParameters;
std::optional<uint8_t> parquetWriteTimestampUnit;
ParquetVersion parquetVersion;
};

} // namespace facebook::velox::dwio::common
9 changes: 9 additions & 0 deletions velox/dwio/parquet/writer/Writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,7 @@ std::shared_ptr<WriterProperties> getArrowParquetWriterOptions(
static_cast<int64_t>(flushPolicy->rowsInRowGroup()));
properties = properties->codec_options(options.codecOptions);
properties = properties->enable_store_decimal_as_integer();
properties = properties->version(options.parquetVersion);
return properties->build();
}

Expand Down Expand Up @@ -394,6 +395,14 @@ parquet::WriterOptions getParquetOptions(
parquetOptions.parquetWriteTimestampUnit =
options.parquetWriteTimestampUnit.value();
}

// Default Parquet file format version is 2.6.
if (options.parquetVersion == dwio::common::ParquetVersion::PARQUET_1_0) {
parquetOptions.parquetVersion = arrow::ParquetVersion::PARQUET_1_0;
} else {
parquetOptions.parquetVersion = arrow::ParquetVersion::PARQUET_2_6;
}

return parquetOptions;
}

Expand Down
5 changes: 5 additions & 0 deletions velox/dwio/parquet/writer/Writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include "velox/dwio/common/Options.h"
#include "velox/dwio/common/Writer.h"
#include "velox/dwio/common/WriterFactory.h"
#include "velox/dwio/parquet/writer/arrow/Properties.h"
#include "velox/dwio/parquet/writer/arrow/Types.h"
#include "velox/dwio/parquet/writer/arrow/util/Compression.h"
#include "velox/vector/ComplexVector.h"
Expand Down Expand Up @@ -97,6 +98,10 @@ struct WriterOptions {
common::CompressionKind compression = common::CompressionKind_NONE;
arrow::Encoding::type encoding = arrow::Encoding::PLAIN;
velox::memory::MemoryPool* memoryPool;
// Default Parquet file format version is 2.6 - can be set to 1.0 via session
// property.
arrow::ParquetVersion::type parquetVersion =
arrow::ParquetVersion::PARQUET_2_6;
// The default factory allows the writer to construct the default flush
// policy with the configs in its ctor.
std::function<std::unique_ptr<DefaultFlushPolicy>()> flushPolicyFactory;
Expand Down