Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions velox/connectors/hive/HiveConfig.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -303,4 +303,14 @@ bool HiveConfig::cacheNoRetention(const Config* session) const {
config_->get<bool>(kCacheNoRetention, /*defaultValue=*/false));
}

std::string HiveConfig::parquetDataPageVersion(const Config* session) const {
const auto parquetDataPageVersion = session->get<std::string>(
kParquetDataPageVersion,
config_->get<std::string>(kParquetDataPageVersion, "PARQUET_2_0"));
VELOX_CHECK(
parquetDataPageVersion == "PARQUET_1_0" ||
parquetDataPageVersion == "PARQUET_2_0",
"Invalid Parquet version.");
return parquetDataPageVersion;
}
} // namespace facebook::velox::connector::hive
9 changes: 9 additions & 0 deletions velox/connectors/hive/HiveConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,10 @@ class HiveConfig {
static constexpr const char* kCacheNoRetention = "cache.no_retention";
static constexpr const char* kCacheNoRetentionSession = "cache.no_retention";

/// File format version to be used by Parquet writer.
static constexpr const char* kParquetDataPageVersion =
"parquet_writer_version";

InsertExistingPartitionsBehavior insertExistingPartitionsBehavior(
const Config* session) const;

Expand Down Expand Up @@ -344,6 +348,11 @@ class HiveConfig {
/// locality.
bool cacheNoRetention(const Config* session) const;

/// Returns the datapage version used by the Parquet writer.
/// Default version is PARQUET_2_0; PARQUET_1_0 can be specified
/// via session property or Hive config.
std::string parquetDataPageVersion(const Config* session) const;

HiveConfig(std::shared_ptr<const Config> config) {
VELOX_CHECK_NOT_NULL(
config, "Config is null for HiveConfig initialization");
Expand Down
12 changes: 11 additions & 1 deletion velox/connectors/hive/HiveDataSink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -770,7 +770,17 @@ uint32_t HiveDataSink::appendWriter(const HiveWriterId& id) {
if (!options->zstdCompressionLevel) {
options->zstdCompressionLevel =
compressionLevel.value_or(kDefaultZstdCompressionLevel);
}

auto parquetDataPageVersion =
hiveConfig_->parquetDataPageVersion(connectorSessionProperties);

if (parquetDataPageVersion == "PARQUET_1_0") {
options->parquetDataPageVersion =
dwio::common::ParquetDataPageVersion::V1;
} else {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Check if it's "PARQUET_2_0", otherwise throw an error

Copy link
Copy Markdown
Contributor Author

@svm1 svm1 Jul 23, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My thinking was that parquetDataPageVersion value must be either "PARQUET_1_0" or "PARQUET_2_0" if we've reached this point, as I added validation in the HiveConfig::parquetDataPageVersion() function from which this value is fetched:

VELOX_CHECK(
      parquetDataPageVersion == "PARQUET_1_0" ||
          parquetDataPageVersion == "PARQUET_2_0",
      "Invalid Parquet version.");

options->parquetDataPageVersion =
dwio::common::ParquetDataPageVersion::V2;
}

// Prevents the memory allocation during the writer creation.
WRITER_NON_RECLAIMABLE_SECTION_GUARD(writerInfo_.size() - 1);
Expand Down
3 changes: 3 additions & 0 deletions velox/dwio/common/Options.h
Original file line number Diff line number Diff line change
Expand Up @@ -592,6 +592,8 @@ class ReaderOptions : public io::ReaderOptions {
const date::time_zone* sessionTimezone_{nullptr};
};

enum class ParquetDataPageVersion { V1, V2 };
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Parquet specific options are better to be in Parquet writer options. It'll be better to rebase this PR after #10470 is merged.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@yingsu00 I rebased and opened a new updated PR for this here - #10573. Would appreciate if you could take a look!


struct WriterOptions {
TypePtr schema{nullptr};
velox::memory::MemoryPool* memoryPool{nullptr};
Expand All @@ -615,6 +617,7 @@ struct WriterOptions {
std::optional<uint8_t> parquetWriteTimestampUnit;
std::optional<uint8_t> zlibCompressionLevel;
std::optional<uint8_t> zstdCompressionLevel;
ParquetDataPageVersion parquetDataPageVersion;

virtual ~WriterOptions() = default;
};
Expand Down
80 changes: 80 additions & 0 deletions velox/dwio/parquet/tests/writer/ParquetWriterTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@
#include "velox/common/testutil/TestValue.h"
#include "velox/connectors/hive/HiveConnector.h"
#include "velox/core/QueryCtx.h"
#include "velox/dwio/parquet/reader/PageReader.h"
#include "velox/dwio/parquet/tests/ParquetTestBase.h"
#include "velox/dwio/parquet/writer/arrow/Properties.h"
#include "velox/exec/tests/utils/Cursor.h"
#include "velox/exec/tests/utils/PlanBuilder.h"
#include "velox/exec/tests/utils/QueryAssertions.h"
Expand Down Expand Up @@ -67,6 +69,26 @@ class ParquetWriterTest : public ParquetTestBase {
opts);
};

facebook::velox::parquet::thrift::PageType::type getDataPageVersion(
const dwio::common::MemorySink* sinkPtr,
const facebook::velox::parquet::ColumnChunkMetaDataPtr& colChunkPtr) {
std::string_view sinkData(sinkPtr->data(), sinkPtr->size());
auto readFile = std::make_shared<InMemoryReadFile>(sinkData);
auto file = std::make_shared<ReadFileInputStream>(std::move(readFile));
auto inputStream = std::make_unique<SeekableFileInputStream>(
std::move(file),
colChunkPtr.dataPageOffset(),
150,
*leafPool_,
LogType::TEST);
auto pageReader = std::make_unique<PageReader>(
std::move(inputStream),
*leafPool_,
colChunkPtr.compression(),
colChunkPtr.totalCompressedSize());
return pageReader->readPageHeader().type;
};

inline static const std::string kHiveConnectorId = "test-hive";
};

Expand Down Expand Up @@ -136,6 +158,64 @@ TEST_F(ParquetWriterTest, compression) {
assertReadWithReaderAndExpected(schema, *rowReader, data, *leafPool_);
};

TEST_F(ParquetWriterTest, datapageVersionV1) {
auto schema = ROW({"c0"}, {INTEGER()});
const int64_t kRows = 1;
const auto data = makeRowVector({
makeFlatVector<int32_t>(kRows, [](auto row) { return 987; }),
});

// Create an in-memory writer
auto sink = std::make_unique<MemorySink>(
200 * 1024 * 1024,
dwio::common::FileSink::Options{.pool = leafPool_.get()});
auto sinkPtr = sink.get();
facebook::velox::parquet::WriterOptions writerOptions;
writerOptions.memoryPool = leafPool_.get();
writerOptions.parquetDataPageVersion =
facebook::velox::parquet::arrow::ParquetDataPageVersion::V1;

auto writer = std::make_unique<facebook::velox::parquet::Writer>(
std::move(sink), writerOptions, rootPool_, schema);
writer->write(data);
writer->close();

dwio::common::ReaderOptions readerOptions{leafPool_.get()};
auto reader = createReaderInMemory(*sinkPtr, readerOptions);
auto dataPageVersion = getDataPageVersion(
sinkPtr, reader->fileMetaData().rowGroup(0).columnChunk(0));
ASSERT_EQ(dataPageVersion, thrift::PageType::type::DATA_PAGE);
};

TEST_F(ParquetWriterTest, datapageVersionV2) {
auto schema = ROW({"c0"}, {INTEGER()});
const int64_t kRows = 1;
const auto data = makeRowVector({
makeFlatVector<int32_t>(kRows, [](auto row) { return 987; }),
});

// Create an in-memory writer
auto sink = std::make_unique<MemorySink>(
200 * 1024 * 1024,
dwio::common::FileSink::Options{.pool = leafPool_.get()});
auto sinkPtr = sink.get();
facebook::velox::parquet::WriterOptions writerOptions;
writerOptions.memoryPool = leafPool_.get();
writerOptions.parquetDataPageVersion =
facebook::velox::parquet::arrow::ParquetDataPageVersion::V1;

auto writer = std::make_unique<facebook::velox::parquet::Writer>(
std::move(sink), writerOptions, rootPool_, schema);
writer->write(data);
writer->close();

dwio::common::ReaderOptions readerOptions{leafPool_.get()};
auto reader = createReaderInMemory(*sinkPtr, readerOptions);
auto dataPageVersion = getDataPageVersion(
sinkPtr, reader->fileMetaData().rowGroup(0).columnChunk(0));
ASSERT_EQ(dataPageVersion, thrift::PageType::type::DATA_PAGE_V2);
};

DEBUG_ONLY_TEST_F(ParquetWriterTest, unitFromWriterOptions) {
SCOPED_TESTVALUE_SET(
"facebook::velox::parquet::Writer::write",
Expand Down
11 changes: 11 additions & 0 deletions velox/dwio/parquet/writer/Writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,7 @@ std::shared_ptr<WriterProperties> getArrowParquetWriterOptions(
static_cast<int64_t>(flushPolicy->rowsInRowGroup()));
properties = properties->codec_options(options.codecOptions);
properties = properties->enable_store_decimal_as_integer();
properties = properties->data_page_version(options.parquetDataPageVersion);
return properties->build();
}

Expand Down Expand Up @@ -394,6 +395,16 @@ parquet::WriterOptions getParquetOptions(
parquetOptions.parquetWriteTimestampUnit =
options.parquetWriteTimestampUnit.value();
}

// Default Parquet datapage version is V2; V1 can be set via session property
// or Hive config.
if (options.parquetDataPageVersion ==
dwio::common::ParquetDataPageVersion::V1) {
parquetOptions.parquetDataPageVersion = arrow::ParquetDataPageVersion::V1;
} else {
parquetOptions.parquetDataPageVersion = arrow::ParquetDataPageVersion::V2;
}

return parquetOptions;
}

Expand Down
5 changes: 5 additions & 0 deletions velox/dwio/parquet/writer/Writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include "velox/dwio/common/Options.h"
#include "velox/dwio/common/Writer.h"
#include "velox/dwio/common/WriterFactory.h"
#include "velox/dwio/parquet/writer/arrow/Properties.h"
#include "velox/dwio/parquet/writer/arrow/Types.h"
#include "velox/dwio/parquet/writer/arrow/util/Compression.h"
#include "velox/vector/ComplexVector.h"
Expand Down Expand Up @@ -97,6 +98,10 @@ struct WriterOptions {
common::CompressionKind compression = common::CompressionKind_NONE;
arrow::Encoding::type encoding = arrow::Encoding::PLAIN;
velox::memory::MemoryPool* memoryPool;
// Default Parquet datapage version is V2 - can be set to V1 via session
// property.
arrow::ParquetDataPageVersion parquetDataPageVersion =
arrow::ParquetDataPageVersion::V2;
// The default factory allows the writer to construct the default flush
// policy with the configs in its ctor.
std::function<std::unique_ptr<DefaultFlushPolicy>()> flushPolicyFactory;
Expand Down