diff --git a/velox/connectors/hive/HiveDataSink.h b/velox/connectors/hive/HiveDataSink.h index 2ae6838a1fe..2f21551eaf8 100644 --- a/velox/connectors/hive/HiveDataSink.h +++ b/velox/connectors/hive/HiveDataSink.h @@ -717,13 +717,12 @@ class HiveDataSink : public DataSink { // Sets up compression, schema, and other writer configuration based on the // insert table handle and connector settings. // The no-argument overload uses the last writer's info (for appendWriter). - virtual std::shared_ptr createWriterOptions() - const; + std::shared_ptr createWriterOptions() const; // Creates WriterOptions for a specific writer index. Use this overload // during writer rotation to ensure the correct writer's memory pool and // nonReclaimableSection are used. - std::shared_ptr createWriterOptions( + virtual std::shared_ptr createWriterOptions( size_t writerIndex) const; // Returns the Hive partition directory name for the given partition ID. @@ -771,14 +770,14 @@ class HiveDataSink : public DataSink { /// Rotates the writer at the given index to a new file. This is called when /// the current file exceeds maxTargetFileBytes_. The old writer is closed /// and a new writer is created for the same partition/bucket. - void rotateWriter(size_t index); + virtual void rotateWriter(size_t index); /// Finalizes the current file for the writer at the given index. /// Captures file stats and adds the file info to writtenFiles. /// Called by rotateWriter() and closeInternal(). void finalizeWriterFile(size_t index); - void closeInternal(); + virtual void closeInternal(); // IMPORTANT NOTE: these are passed to writers as raw pointers. HiveDataSink // owns the lifetime of these objects, and therefore must destroy them last. diff --git a/velox/connectors/hive/iceberg/CMakeLists.txt b/velox/connectors/hive/iceberg/CMakeLists.txt index a4449c50d17..6bb02cb3847 100644 --- a/velox/connectors/hive/iceberg/CMakeLists.txt +++ b/velox/connectors/hive/iceberg/CMakeLists.txt @@ -12,11 +12,12 @@ # See the License for the specific language governing permissions and # limitations under the License. -velox_add_library( - velox_hive_iceberg_splitreader +set( + ICEBERG_SOURCES IcebergConfig.cpp IcebergColumnHandle.cpp IcebergConnector.cpp + IcebergDataFileStatistics.cpp IcebergDataSink.cpp IcebergDataSource.cpp IcebergPartitionName.cpp @@ -28,17 +29,20 @@ velox_add_library( TransformExprBuilder.cpp ) +if(VELOX_ENABLE_PARQUET) + list(APPEND ICEBERG_SOURCES IcebergParquetStatsCollector.cpp) +endif() + +velox_add_library(velox_hive_iceberg_splitreader ${ICEBERG_SOURCES}) + velox_link_libraries( velox_hive_iceberg_splitreader velox_connector + velox_dwio_parquet_field_id velox_functions_iceberg Folly::folly ) -if(VELOX_ENABLE_PARQUET) - velox_link_libraries(velox_hive_iceberg_splitreader velox_dwio_parquet_field_id) -endif() - if(${VELOX_BUILD_TESTING}) add_subdirectory(tests) endif() diff --git a/velox/connectors/hive/iceberg/IcebergDataFileStatistics.cpp b/velox/connectors/hive/iceberg/IcebergDataFileStatistics.cpp new file mode 100644 index 00000000000..018f3f8f68c --- /dev/null +++ b/velox/connectors/hive/iceberg/IcebergDataFileStatistics.cpp @@ -0,0 +1,58 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "velox/connectors/hive/iceberg/IcebergDataFileStatistics.h" + +namespace facebook::velox::connector::hive::iceberg { + +folly::dynamic IcebergDataFileStatistics::toJson() const { + folly::dynamic json = folly::dynamic::object; + json["recordCount"] = numRecords; + + folly::dynamic columnSizes = folly::dynamic::object; + folly::dynamic valueCounts = folly::dynamic::object; + folly::dynamic nullValueCounts = folly::dynamic::object; + folly::dynamic nanValueCounts = folly::dynamic::object; + folly::dynamic lowerBounds = folly::dynamic::object; + folly::dynamic upperBounds = folly::dynamic::object; + + for (const auto& [fieldId, stats] : columnStats) { + auto fieldIdStr = folly::to(fieldId); + columnSizes[fieldIdStr] = stats.columnSize; + valueCounts[fieldIdStr] = stats.valueCount; + nullValueCounts[fieldIdStr] = stats.nullValueCount; + if (stats.nanValueCount.has_value()) { + nanValueCounts[fieldIdStr] = stats.nanValueCount.value(); + } + if (stats.lowerBound.has_value()) { + lowerBounds[fieldIdStr] = stats.lowerBound.value(); + } + if (stats.upperBound.has_value()) { + upperBounds[fieldIdStr] = stats.upperBound.value(); + } + } + + json["columnSizes"] = std::move(columnSizes); + json["valueCounts"] = std::move(valueCounts); + json["nullValueCounts"] = std::move(nullValueCounts); + json["nanValueCounts"] = std::move(nanValueCounts); + json["lowerBounds"] = std::move(lowerBounds); + json["upperBounds"] = std::move(upperBounds); + + return json; +} + +} // namespace facebook::velox::connector::hive::iceberg diff --git a/velox/connectors/hive/iceberg/IcebergDataFileStatistics.h b/velox/connectors/hive/iceberg/IcebergDataFileStatistics.h new file mode 100644 index 00000000000..5bcfb84b83f --- /dev/null +++ b/velox/connectors/hive/iceberg/IcebergDataFileStatistics.h @@ -0,0 +1,71 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include +#include + +#include +#include + +namespace facebook::velox::connector::hive::iceberg { + +/// Statistics for an Iceberg data file, corresponding to the `data_file` +/// structure defined in the Iceberg specification: +/// https://iceberg.apache.org/spec/#data-file-fields. +/// +/// All column-level statistics maps are keyed by Iceberg field IDs (`int32_t`), +/// which uniquely identify columns in the Iceberg schema independent of column +/// names or physical column positions. +struct IcebergDataFileStatistics { + struct ColumnStats { + int64_t columnSize{0}; + + /// Total number of values for this field ID in the file, including null and + /// NaN values. + /// + /// For primitive (flat) columns, this is equal to the number of rows in the + /// file: numRows = valueCount = (nonNullValues + numNulls + numNaNs). + /// + /// For nested columns (e.g. elements inside an array), this represents the + /// total occurrences of the field across all rows, which is not necessarily + /// related to the top-level record count. + int64_t valueCount{0}; + int64_t nullValueCount{0}; + std::optional nanValueCount; + /// Base64 encoded lower bound. + std::optional lowerBound; + /// Base64 encoded upper bound. + std::optional upperBound; + }; + + int64_t numRecords{0}; + folly::F14FastMap columnStats; + + /// Returns a IcebergDataFileStatistics with all values set to zero/empty. + /// Useful for empty data files that have no actual data. + static IcebergDataFileStatistics empty() { + return IcebergDataFileStatistics{.numRecords = 0, .columnStats = {}}; + } + + folly::dynamic toJson() const; +}; + +using IcebergDataFileStatisticsPtr = std::shared_ptr; + +} // namespace facebook::velox::connector::hive::iceberg diff --git a/velox/connectors/hive/iceberg/IcebergDataSink.cpp b/velox/connectors/hive/iceberg/IcebergDataSink.cpp index 7ad55345fac..615070b4de2 100644 --- a/velox/connectors/hive/iceberg/IcebergDataSink.cpp +++ b/velox/connectors/hive/iceberg/IcebergDataSink.cpp @@ -19,6 +19,7 @@ #include #include #include +#include #include #include #include @@ -26,10 +27,18 @@ #include "velox/common/base/Fs.h" #include "velox/common/encode/Base64.h" +#include "velox/common/memory/MemoryArbitrator.h" #include "velox/connectors/hive/PartitionIdGenerator.h" #include "velox/connectors/hive/iceberg/IcebergColumnHandle.h" + +#ifdef VELOX_ENABLE_PARQUET +#include "velox/connectors/hive/iceberg/IcebergParquetStatsCollector.h" +#include "velox/dwio/parquet/writer/Writer.h" +#endif + #include "velox/connectors/hive/iceberg/TransformExprBuilder.h" #include "velox/exec/OperatorUtils.h" +#include "velox/type/Type.h" namespace facebook::velox::connector::hive::iceberg { @@ -302,37 +311,47 @@ IcebergDataSink::IcebergDataSink( partitionSpec_ != nullptr ? std::make_unique(partitionSpec_) : nullptr), - partitionRowType_(std::move(partitionRowType)) { + partitionRowType_(std::move(partitionRowType)), + icebergInsertTableHandle_(insertTableHandle) { commitPartitionValue_.resize(maxOpenWriters_); + +#ifdef VELOX_ENABLE_PARQUET + std::vector columnHandles; + columnHandles.reserve(insertTableHandle->inputColumns().size()); + for (auto& column : insertTableHandle->inputColumns()) { + columnHandles.emplace_back( + checkedPointerCast(column)); + } + parquetStatsCollector_ = + std::make_shared(std::move(columnHandles)); +#endif } std::vector IcebergDataSink::commitMessage() const { std::vector commitTasks; commitTasks.reserve(writerInfo_.size()); - auto icebergInsertTableHandle = - std::dynamic_pointer_cast( - insertTableHandle_); - for (auto i = 0; i < writerInfo_.size(); ++i) { const auto& writerInfo = writerInfo_.at(i); VELOX_CHECK_NOT_NULL(writerInfo); // Following metadata (json format) is consumed by Presto CommitTaskData. // It contains the minimal subset of metadata. - // TODO: Complete metrics is missing now and this could lead to suboptimal - // query plan, will collect full iceberg metrics in following PR. - for (const auto& fileInfo : writerInfo->writtenFiles) { + VELOX_CHECK_EQ(writerInfo->writtenFiles.size(), dataFileStats_[i].size()); + for (auto fileIdx = 0; fileIdx < writerInfo->writtenFiles.size(); + ++fileIdx) { + const auto& fileInfo = writerInfo->writtenFiles[fileIdx]; // clang-format off folly::dynamic commitData = folly::dynamic::object( "path", (fs::path(writerInfo->writerParameters.targetDirectory()) / fileInfo.targetFileName).string()) ("fileSizeInBytes", fileInfo.fileSize) - ("metrics", - folly::dynamic::object("recordCount", fileInfo.numRows)) + ("metrics", dataFileStats_[i][fileIdx]->toJson()) ("partitionSpecJson", - icebergInsertTableHandle->partitionSpec() ? - icebergInsertTableHandle->partitionSpec()->specId : 0) + icebergInsertTableHandle_->partitionSpec() ? + icebergInsertTableHandle_->partitionSpec()->specId : 0) + // Sort order evolution is not supported. Set default id to 0 ( unsorted order). + ("sortOrderId", 0) ("fileFormat", "PARQUET") ("content", "DATA"); // clang-format on @@ -384,8 +403,8 @@ uint32_t IcebergDataSink::ensureWriter(const HiveWriterId& id) { } std::shared_ptr -IcebergDataSink::createWriterOptions() const { - auto options = HiveDataSink::createWriterOptions(); +IcebergDataSink::createWriterOptions(size_t writerIndex) const { + auto options = HiveDataSink::createWriterOptions(writerIndex); // Per Iceberg specification (https://iceberg.apache.org/spec/#parquet): // - Timestamps must be stored with microsecond precision. // - Timestamps must NOT be adjusted to UTC timezone; they should be written @@ -398,6 +417,15 @@ IcebergDataSink::createWriterOptions() const { // (TimestampPrecision::kMicroseconds). options->serdeParameters["parquet.writer.timestamp.unit"] = "6"; options->serdeParameters["parquet.writer.timestamp.timezone"] = ""; + +#ifdef VELOX_ENABLE_PARQUET + if (parquetStatsCollector_) { + auto parquetOptions = checkedPointerCast(options); + parquetOptions->parquetFieldIds = + parquetStatsCollector_->parquetFieldIds().children; + } +#endif + // Re-process configs to apply the serde parameters we just set. options->processConfigs( *hiveConfig_->config(), *connectorQueryCtx_->sessionProperties()); @@ -420,4 +448,83 @@ folly::dynamic IcebergDataSink::makeCommitPartitionValue( return partitionValues; } +void IcebergDataSink::rotateWriter(size_t index) { + VELOX_CHECK_LT(index, writers_.size()); + VELOX_CHECK_NOT_NULL(writers_[index]); + + // Ensure dataFileStats_ has an entry for this writer index. + if (dataFileStats_.size() <= index) { + dataFileStats_.resize(index + 1); + } + + // Collect Iceberg parquet stats from the writer BEFORE closing it. + // The base rotateWriter() will call writers_[index]->close() which returns + // file metadata, but the base class discards that return value. We need to + // close the writer ourselves to capture the metadata, then prevent double + // close by resetting the writer. + { + memory::NonReclaimableSectionGuard nonReclaimableGuard( + writerInfo_[index]->nonReclaimableSectionHolder.get()); + auto metadata = writers_[index]->close(); + bool fileAdded = getCurrentFileBytes(index) > 0; + + // Finalize file info (capture file size, add to writtenFiles). + finalizeWriterFile(index); + +#ifdef VELOX_ENABLE_PARQUET + if (fileAdded) { + dataFileStats_[index].emplace_back( + parquetStatsCollector_->aggregate(std::move(metadata))); + } +#endif + } + + // Release old writer. The new writer will be created lazily on the next + // write call. + writers_[index].reset(); + + ++writerInfo_[index]->fileSequenceNumber; +} + +void IcebergDataSink::closeInternal() { + VELOX_CHECK_NE(state_, State::kRunning); + VELOX_CHECK_NE(state_, State::kFinishing); + + if (state_ == State::kClosed) { + // Ensure dataFileStats_ has entries for all writers. + dataFileStats_.resize(writers_.size()); + + for (auto i = 0; i < writers_.size(); ++i) { + if (writers_[i] == nullptr) { + // Writer was rotated and is null. Stats for rotated files were already + // collected in rotateWriter(). No final file to close. + continue; + } + memory::NonReclaimableSectionGuard nonReclaimableGuard( + writerInfo_[i]->nonReclaimableSectionHolder.get()); + + auto metadata = writers_[i]->close(); + bool fileAdded = getCurrentFileBytes(i) > 0; + + finalizeWriterFile(i); + +#ifdef VELOX_ENABLE_PARQUET + if (fileAdded) { + dataFileStats_[i].emplace_back( + parquetStatsCollector_->aggregate(std::move(metadata))); + } +#endif + } + } else { + for (auto i = 0; i < writers_.size(); ++i) { + if (writers_[i] == nullptr) { + continue; + } + memory::NonReclaimableSectionGuard nonReclaimableGuard( + writerInfo_[i]->nonReclaimableSectionHolder.get()); + writers_[i]->abort(); + } + } +} + } // namespace facebook::velox::connector::hive::iceberg diff --git a/velox/connectors/hive/iceberg/IcebergDataSink.h b/velox/connectors/hive/iceberg/IcebergDataSink.h index 9c563b45b94..6a926727367 100644 --- a/velox/connectors/hive/iceberg/IcebergDataSink.h +++ b/velox/connectors/hive/iceberg/IcebergDataSink.h @@ -24,6 +24,12 @@ #include "velox/connectors/hive/HiveDataSink.h" #include "velox/connectors/hive/TableHandle.h" #include "velox/connectors/hive/iceberg/IcebergColumnHandle.h" +#include "velox/connectors/hive/iceberg/IcebergDataFileStatistics.h" + +#ifdef VELOX_ENABLE_PARQUET +#include "velox/connectors/hive/iceberg/IcebergParquetStatsCollector.h" +#endif + #include "velox/connectors/hive/iceberg/IcebergConfig.h" #include "velox/connectors/hive/iceberg/IcebergPartitionName.h" #include "velox/connectors/hive/iceberg/PartitionSpec.h" @@ -183,8 +189,8 @@ class IcebergDataSink : public HiveDataSink { // base HiveDataSink writer options with Iceberg-specific settings: // - Sets timestamp timezone to nullopt (UTC) for Iceberg compliance. // - Sets timestamp precision to microseconds. - std::shared_ptr createWriterOptions() - const override; + std::shared_ptr createWriterOptions( + size_t writerIndex) const override; // Extracts partition values for a specific writer to be included in the // commit message. Converts the transformed partition values from columnar @@ -194,6 +200,10 @@ class IcebergDataSink : public HiveDataSink { // Returns nullptr for null partition values. folly::dynamic makeCommitPartitionValue(uint32_t writerIndex) const; + void rotateWriter(size_t index) override; + + void closeInternal() override; + // Iceberg partition specification defining how the table is partitioned. // Contains partition fields with source column names, transform types // (e.g., identity, year, month, day, hour, bucket, truncate), transform @@ -240,6 +250,23 @@ class IcebergDataSink : public HiveDataSink { // folly::dynamic array of values across all partition fields), ready for JSON // serialization. std::vector commitPartitionValue_; + + // Statistics for all data files written by this sink, organized by writer + // index and file index within each writer. These statistics are populated + // during rotateWriter() (for rotated files) and during closeInternal() + // (for the final file of each writer). These metrics are subsequently used + // to construct Iceberg commit messages. + // Outer vector: indexed by writer index (same as writerInfo_). + // Inner vector: one entry per file written by that writer (including + // rotated files and the final file). Each entry corresponds to one + // individual data file. + std::vector> dataFileStats_; + + const IcebergInsertTableHandlePtr icebergInsertTableHandle_; + +#ifdef VELOX_ENABLE_PARQUET + std::shared_ptr parquetStatsCollector_; +#endif }; } // namespace facebook::velox::connector::hive::iceberg diff --git a/velox/connectors/hive/iceberg/IcebergParquetStatsCollector.cpp b/velox/connectors/hive/iceberg/IcebergParquetStatsCollector.cpp new file mode 100644 index 00000000000..7f9994f5812 --- /dev/null +++ b/velox/connectors/hive/iceberg/IcebergParquetStatsCollector.cpp @@ -0,0 +1,173 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "velox/connectors/hive/iceberg/IcebergParquetStatsCollector.h" + +#include "velox/common/Casts.h" +#include "velox/common/base/Exceptions.h" +#include "velox/common/encode/Base64.h" +#include "velox/connectors/hive/iceberg/IcebergColumnHandle.h" +#include "velox/connectors/hive/iceberg/IcebergDataFileStatistics.h" +#include "velox/dwio/common/FileMetadata.h" +#include "velox/dwio/parquet/writer/Writer.h" +#include "velox/dwio/parquet/writer/arrow/Metadata.h" +#include "velox/dwio/parquet/writer/arrow/Statistics.h" + +namespace facebook::velox::connector::hive::iceberg { + +namespace { + +void addAllRecursive( + const parquet::ParquetFieldId& field, + const TypePtr& type, + std::unordered_set& fieldIds) { + fieldIds.insert(field.fieldId); + + VELOX_CHECK_EQ(field.children.size(), type->size()); + for (auto i = 0; i < type->size(); ++i) { + addAllRecursive(field.children[i], type->childAt(i), fieldIds); + } +} + +// Recursively collects field IDs that should skip bounds collection. +// Repeated fields (e.g. MAP and ARRAY) are not currently supported by Iceberg. +// These fields, along with all their descendants, should skip bounds +// collection. +// @param field The Parquet field ID structure to process. +// @param type The Velox type corresponding to this field. +// @param fieldIds Output set to populate with field IDs to skip. +void collectSkipBoundsFieldIds( + const parquet::ParquetFieldId& field, + const TypePtr& type, + std::unordered_set& fieldIds) { + VELOX_CHECK_NOT_NULL(type, "Input column type cannot be null."); + + if (type->isMap() || type->isArray()) { + addAllRecursive(field, type, fieldIds); + return; + } + + VELOX_CHECK_EQ(field.children.size(), type->size()); + for (auto i = 0; i < type->size(); ++i) { + collectSkipBoundsFieldIds(field.children[i], type->childAt(i), fieldIds); + } +} + +} // namespace + +IcebergParquetStatsCollector::IcebergParquetStatsCollector( + const std::vector& inputColumns) { + parquetFieldIds_.children.reserve(inputColumns.size()); + for (const auto& columnHandle : inputColumns) { + parquetFieldIds_.children.emplace_back(columnHandle->field()); + collectSkipBoundsFieldIds( + columnHandle->field(), columnHandle->dataType(), skipBoundsFieldIds_); + } +} + +IcebergDataFileStatisticsPtr IcebergParquetStatsCollector::aggregate( + std::unique_ptr fileMetadata) { + // Empty data file. + if (!fileMetadata) { + return std::make_shared( + IcebergDataFileStatistics::empty()); + } + + auto parquetMetadata = + checkedPointerCast(std::move(fileMetadata)); + auto metadata = parquetMetadata->arrowMetadata(); + auto dataFileStats = std::make_shared(); + dataFileStats->numRecords = metadata->numRows(); + const auto numRowGroups = metadata->numRowGroups(); + + // Track global min/max statistics for each column across all row groups. + // Key: Iceberg field ID. + // Value: A pair of Statistics objects where: + // - first: The statistics from the row group containing the global minimum + // value. + // - second: The statistics from the row group containing the global maximum + // value. Two separate objects are stored because the global minimum and + // global maximum for a single column may originate from different row groups. + folly::F14FastMap< + int32_t, + std::pair< + std::shared_ptr, + std::shared_ptr>> + globalMinMaxStats; + + std::unordered_set fieldIds; + for (auto i = 0; i < numRowGroups; ++i) { + const auto& rowGroup = metadata->rowGroup(i); + + for (auto j = 0; j < rowGroup->numColumns(); ++j) { + const auto& columnChunk = rowGroup->columnChunk(j); + const auto fieldId = columnChunk->fieldId(); + fieldIds.insert(fieldId); + + auto& stats = dataFileStats->columnStats[fieldId]; + stats.valueCount += columnChunk->numValues(); + stats.columnSize += columnChunk->totalCompressedSize(); + + const auto& columnChunkStats = columnChunk->statistics(); + if (columnChunkStats) { + stats.nullValueCount += columnChunkStats->nullCount(); + + if (columnChunkStats->hasMinMax() && shouldStoreBounds(fieldId)) { + auto [it, inserted] = globalMinMaxStats.emplace( + fieldId, std::pair{columnChunkStats, columnChunkStats}); + + if (!inserted) { + auto& [minStats, maxStats] = it->second; + + if (columnChunkStats->maxGreaterThan(*maxStats)) { + maxStats = columnChunkStats; + } + if (columnChunkStats->minLessThan(*minStats)) { + minStats = columnChunkStats; + } + } + } + } + } + } + + for (const auto fieldId : fieldIds) { + const auto& [nanCount, hasNanCount] = metadata->getNaNCount(fieldId); + if (hasNanCount) { + dataFileStats->columnStats[fieldId].nanValueCount = nanCount; + } + } + + for (const auto& [fieldId, stats] : globalMinMaxStats) { + const auto& [minStats, maxStats] = stats; + + auto& columnStats = dataFileStats->columnStats[fieldId]; + const auto& lowerBound = + minStats->icebergLowerBoundInclusive(kDefaultTruncateLength); + columnStats.lowerBound = + encoding::Base64::encode(lowerBound.data(), lowerBound.size()); + + const auto upperBound = + maxStats->icebergUpperBoundExclusive(kDefaultTruncateLength); + if (upperBound.has_value()) { + columnStats.upperBound = + encoding::Base64::encode(upperBound->data(), upperBound->size()); + } + } + return dataFileStats; +} + +} // namespace facebook::velox::connector::hive::iceberg diff --git a/velox/connectors/hive/iceberg/IcebergParquetStatsCollector.h b/velox/connectors/hive/iceberg/IcebergParquetStatsCollector.h new file mode 100644 index 00000000000..71db087b12a --- /dev/null +++ b/velox/connectors/hive/iceberg/IcebergParquetStatsCollector.h @@ -0,0 +1,70 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +#include + +#include "velox/connectors/hive/TableHandle.h" +#include "velox/connectors/hive/iceberg/IcebergColumnHandle.h" +#include "velox/connectors/hive/iceberg/IcebergDataFileStatistics.h" +#include "velox/dwio/common/FileMetadata.h" +#include "velox/dwio/parquet/ParquetFieldId.h" +#include "velox/type/Type.h" + +namespace facebook::velox::connector::hive::iceberg { + +class IcebergParquetStatsCollector { + public: + explicit IcebergParquetStatsCollector( + const std::vector& inputColumns); + + /// Returns the Parquet field IDs for all input columns. + /// The field IDs are written to the Parquet data file's column metadata. + /// The return object describes a multi-column input. + const parquet::ParquetFieldId& parquetFieldIds() const { + return parquetFieldIds_; + } + + /// Aggregates Parquet file metadata into Iceberg data file statistics. + /// Iterates through all row groups and columns to collect: + /// - Record count, split offsets, value counts, column sizes, null counts. + /// - Min/max bounds (base64-encoded). Currently not collected for MAP and + /// ARRAY types and all their descendants. + /// @param fileMetadata The Parquet file metadata to aggregate. + IcebergDataFileStatisticsPtr aggregate( + std::unique_ptr fileMetadata); + + /// TODO: Need to support this config property. + /// 16 is default value. See DEFAULT_WRITE_METRICS_MODE_DEFAULT in + /// org.apache.iceberg.TableProperties. + constexpr static int32_t kDefaultTruncateLength{16}; + + private: + bool shouldStoreBounds(int32_t fieldId) const { + return !skipBoundsFieldIds_.contains(fieldId); + } + + // Hierarchical Parquet field IDs for all input columns. A single + // ParquetFieldId can describe all the columns including their nested + // children. + parquet::ParquetFieldId parquetFieldIds_; + + // Set of field IDs for which bounds collection should be skipped. + // This includes MAP and ARRAY types and all their descendants. + std::unordered_set skipBoundsFieldIds_; +}; + +} // namespace facebook::velox::connector::hive::iceberg diff --git a/velox/connectors/hive/iceberg/tests/CMakeLists.txt b/velox/connectors/hive/iceberg/tests/CMakeLists.txt index 883503bbef7..a84064bb861 100644 --- a/velox/connectors/hive/iceberg/tests/CMakeLists.txt +++ b/velox/connectors/hive/iceberg/tests/CMakeLists.txt @@ -61,6 +61,7 @@ if(NOT VELOX_DISABLE_GOOGLETEST) velox_hive_iceberg_insert_test IcebergConnectorTest.cpp IcebergInsertTest.cpp + IcebergParquetStatsTest.cpp IcebergTestBase.cpp Main.cpp PartitionNameTest.cpp diff --git a/velox/connectors/hive/iceberg/tests/IcebergParquetStatsTest.cpp b/velox/connectors/hive/iceberg/tests/IcebergParquetStatsTest.cpp new file mode 100644 index 00000000000..ebab93a2510 --- /dev/null +++ b/velox/connectors/hive/iceberg/tests/IcebergParquetStatsTest.cpp @@ -0,0 +1,881 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include + +#include "velox/common/encode/Base64.h" +#include "velox/connectors/hive/iceberg/IcebergDataFileStatistics.h" +#include "velox/connectors/hive/iceberg/tests/IcebergTestBase.h" + +using namespace facebook::velox::common::testutil; + +namespace facebook::velox::connector::hive::iceberg { + +namespace { + +#ifdef VELOX_ENABLE_PARQUET + +class IcebergParquetStatsTest : public test::IcebergTestBase { + protected: + static IcebergDataFileStatisticsPtr statsFromMetrics( + const folly::dynamic& metrics) { + VELOX_CHECK(metrics.isObject()); + VELOX_CHECK(metrics.count("recordCount") > 0); + auto stats = std::make_shared(); + stats->numRecords = metrics["recordCount"].asInt(); + + auto setIntField = [&](const folly::dynamic& map, auto setter) { + if (!map.isObject()) { + return; + } + for (const auto& item : map.items()) { + const auto fieldId = folly::to(item.first.asString()); + auto& column = stats->columnStats[fieldId]; + setter(column, item.second); + } + }; + + setIntField(metrics["columnSizes"], [](auto& column, const auto& value) { + column.columnSize = value.asInt(); + }); + setIntField(metrics["valueCounts"], [](auto& column, const auto& value) { + column.valueCount = value.asInt(); + }); + setIntField( + metrics["nullValueCounts"], [](auto& column, const auto& value) { + column.nullValueCount = value.asInt(); + }); + setIntField(metrics["nanValueCounts"], [](auto& column, const auto& value) { + column.nanValueCount = value.asInt(); + }); + + const auto& lowerBounds = metrics["lowerBounds"]; + if (lowerBounds.isObject()) { + for (const auto& item : lowerBounds.items()) { + const auto fieldId = folly::to(item.first.asString()); + stats->columnStats[fieldId].lowerBound = item.second.asString(); + } + } + const auto& upperBounds = metrics["upperBounds"]; + if (upperBounds.isObject()) { + for (const auto& item : upperBounds.items()) { + const auto fieldId = folly::to(item.first.asString()); + stats->columnStats[fieldId].upperBound = item.second.asString(); + } + } + + return stats; + } + + static std::vector statsFromCommitTasks( + const std::vector& commitTasks) { + std::vector stats; + stats.reserve(commitTasks.size()); + for (const auto& task : commitTasks) { + auto taskJson = folly::parseJson(task); + VELOX_CHECK(taskJson.isObject()); + VELOX_CHECK(taskJson.count("metrics") > 0); + stats.emplace_back(statsFromMetrics(taskJson["metrics"])); + } + return stats; + } + + // Write data and get all stats (for partitioned tables). + std::vector> + writeDataAndGetAllStats( + const RowVectorPtr& data, + const std::vector& partitionFields = {}) { + const auto outputDir = TempDirectoryPath::create(); + auto dataSink = createDataSinkAndAppendData( + {data}, outputDir->getPath(), partitionFields); + auto commitTasks = dataSink->close(); + EXPECT_FALSE(commitTasks.empty()); + return statsFromCommitTasks(commitTasks); + } + + // Decode and extract typed value from base64 encoded bounds. + template + static std::pair decodeBounds( + const std::shared_ptr& stats, + int32_t fieldId) { + auto decode = [](const std::string& base64Encoded) { + const std::string decoded = encoding::Base64::decode(base64Encoded); + T value; + std::memcpy(&value, decoded.data(), sizeof(T)); + return value; + }; + + const auto& columnStats = stats->columnStats.at(fieldId); + VELOX_CHECK(columnStats.lowerBound.has_value()); + VELOX_CHECK(columnStats.upperBound.has_value()); + return { + decode(columnStats.lowerBound.value()), + decode(columnStats.upperBound.value()), + }; + } + + // Verify basic statistics (record count, value counts, null counts). + static void verifyBasicStats( + const std::shared_ptr& stats, + int64_t expectedRecords, + const std::unordered_map& expectedValueCounts, + const std::unordered_map& expectedNullCounts) { + EXPECT_EQ(stats->numRecords, expectedRecords); + + for (const auto& [fieldId, count] : expectedValueCounts) { + ASSERT_TRUE(stats->columnStats.contains(fieldId)); + EXPECT_EQ(stats->columnStats.at(fieldId).valueCount, count); + } + + if (!expectedNullCounts.empty()) { + for (const auto& [fieldId, count] : expectedNullCounts) { + ASSERT_TRUE(stats->columnStats.contains(fieldId)); + EXPECT_EQ(stats->columnStats.at(fieldId).nullValueCount, count); + } + } + } + + // Verify bounds exist for given field IDs. + static void verifyBoundsExist( + const std::shared_ptr& stats, + const std::vector& fieldIds) { + for (const int32_t fieldId : fieldIds) { + ASSERT_TRUE(stats->columnStats.contains(fieldId)); + const auto& columnStats = stats->columnStats.at(fieldId); + ASSERT_TRUE(columnStats.lowerBound.has_value()); + ASSERT_TRUE(columnStats.upperBound.has_value()); + EXPECT_FALSE(columnStats.lowerBound.value().empty()); + EXPECT_FALSE(columnStats.upperBound.value().empty()); + } + } + + // Verify bounds do not exist for given field IDs. + static void verifyBoundsNotExist( + const std::shared_ptr& stats, + const std::vector& fieldIds) { + for (const int32_t fieldId : fieldIds) { + if (stats->columnStats.contains(fieldId)) { + const auto& columnStats = stats->columnStats.at(fieldId); + ASSERT_FALSE(columnStats.lowerBound.has_value()); + ASSERT_FALSE(columnStats.upperBound.has_value()); + } + } + } +}; + +TEST_F(IcebergParquetStatsTest, mixedNull) { + constexpr vector_size_t size = 100; + constexpr int32_t expectedIntNulls = 34; + constexpr int32_t intColId = 1; + + const auto& stats = + writeDataAndGetAllStats(makeRowVector({makeFlatVector( + size, [](vector_size_t row) { return row * 10; }, nullEvery(3))})); + verifyBasicStats( + stats[0], size, {{intColId, size}}, {{intColId, expectedIntNulls}}); + verifyBoundsExist(stats[0], {intColId}); + + const auto& [minVal, maxVal] = decodeBounds(stats[0], intColId); + EXPECT_EQ(minVal, 10); + EXPECT_EQ(maxVal, 980); +} + +TEST_F(IcebergParquetStatsTest, bigint) { + constexpr vector_size_t size = 100; + constexpr int32_t expectedNulls = 25; + constexpr int32_t bigintColId = 1; + + const auto& stats = + writeDataAndGetAllStats(makeRowVector({makeFlatVector( + size, + [](vector_size_t row) { return row * 1'000'000'000LL; }, + nullEvery(4))})); + verifyBasicStats( + stats[0], size, {{bigintColId, size}}, {{bigintColId, expectedNulls}}); + verifyBoundsExist(stats[0], {bigintColId}); + + const auto& [minVal, maxVal] = decodeBounds(stats[0], bigintColId); + EXPECT_EQ(minVal, 1'000'000'000LL); + EXPECT_EQ(maxVal, 99'000'000'000LL); +} + +TEST_F(IcebergParquetStatsTest, decimal) { + constexpr vector_size_t size = 100; + constexpr int32_t expectedNulls = 20; + constexpr int32_t decimalColId = 1; + + const auto& stats = + writeDataAndGetAllStats(makeRowVector({makeFlatVector( + size, + [](vector_size_t row) { return HugeInt::build(row, row * 123); }, + nullEvery(5), + DECIMAL(38, 3))})); + verifyBasicStats( + stats[0], size, {{decimalColId, size}}, {{decimalColId, expectedNulls}}); + verifyBoundsExist(stats[0], {decimalColId}); +} + +TEST_F(IcebergParquetStatsTest, varchar) { + constexpr vector_size_t size = 100; + constexpr int32_t varcharColId = 1; + + const auto& stats = + writeDataAndGetAllStats(makeRowVector({makeFlatVector( + size, + [](vector_size_t row) { + return "Customer#00000" + std::to_string(row) + "_" + + std::string(row % 10, 'a'); + }, + nullEvery(6))})); + + constexpr int32_t expectedNulls = 17; + verifyBasicStats( + stats[0], size, {{varcharColId, size}}, {{varcharColId, expectedNulls}}); + verifyBoundsExist(stats[0], {varcharColId}); + + EXPECT_EQ( + encoding::Base64::decode( + stats[0]->columnStats.at(varcharColId).lowerBound.value()), + "Customer#0000010"); + EXPECT_EQ( + encoding::Base64::decode( + stats[0]->columnStats.at(varcharColId).upperBound.value()), + "Customer#000009`"); +} + +TEST_F(IcebergParquetStatsTest, varbinary) { + constexpr vector_size_t size = 100; + constexpr int32_t varbinaryColId = 1; + + auto rowVector = makeRowVector({makeFlatVector( + size, + [](vector_size_t row) { + std::string value(17, 11); + value[0] = static_cast(row % 256); + value[1] = static_cast((row * 3) % 256); + value[2] = static_cast((row * 7) % 256); + value[3] = static_cast((row * 11) % 256); + return value; + }, + nullEvery(5), + VARBINARY())}); + + const auto& stats = writeDataAndGetAllStats(rowVector); + constexpr int32_t expectedNulls = 20; + verifyBasicStats( + stats[0], + size, + {{varbinaryColId, size}}, + {{varbinaryColId, expectedNulls}}); + verifyBoundsExist(stats[0], {varbinaryColId}); +} + +TEST_F(IcebergParquetStatsTest, varbinaryWithTransform) { + const auto& fileStats = writeDataAndGetAllStats( + makeRowVector({makeFlatVector( + {"01020304", + "05060708", + "090A0B0C", + "0D0E0F10", + "11121314", + "15161718", + "191A1B1C", + "1D1E1F20", + "21222324", + "25262728"}, + VARBINARY())}), + {{0, TransformType::kBucket, 4}}); + ASSERT_EQ(fileStats.size(), 3); + const auto& stats = fileStats[0]; + EXPECT_EQ(stats->numRecords, 5); + constexpr int32_t varbinaryColId = 1; + EXPECT_EQ(stats->columnStats.at(varbinaryColId).valueCount, 5); +} + +TEST_F(IcebergParquetStatsTest, multipleDataTypes) { + constexpr vector_size_t size = 100; + constexpr int32_t intColId = 1; + constexpr int32_t bigintColId = 2; + constexpr int32_t decimalColId = 3; + constexpr int32_t varcharColId = 4; + constexpr int32_t varbinaryColId = 5; + + constexpr int32_t expectedIntNulls = 34; + constexpr int32_t expectedBigintNulls = 25; + constexpr int32_t expectedDecimalNulls = 20; + constexpr int32_t expectedVarcharNulls = 17; + constexpr int32_t expectedVarbinaryNulls = 15; + + auto rowVector = makeRowVector( + {makeFlatVector( + size, [](vector_size_t row) { return row * 10; }, nullEvery(3)), + makeFlatVector( + size, + [](vector_size_t row) { return row * 1'000'000'000LL; }, + nullEvery(4)), + makeFlatVector( + size, + [](vector_size_t row) { return HugeInt::build(row, row * 12'345); }, + nullEvery(5), + DECIMAL(38, 3)), + makeFlatVector( + size, + [](vector_size_t row) { return "str_" + std::to_string(row); }, + nullEvery(6)), + makeFlatVector( + size, + [](vector_size_t row) { + std::string value(4, 0); + value[0] = static_cast(row % 256); + value[1] = static_cast((row * 3) % 256); + value[2] = static_cast((row * 7) % 256); + value[3] = static_cast((row * 11) % 256); + return value; + }, + nullEvery(7), + VARBINARY())}); + const auto& stats = writeDataAndGetAllStats(rowVector); + + verifyBasicStats( + stats[0], + size, + { + {intColId, size}, + {bigintColId, size}, + {decimalColId, size}, + {varcharColId, size}, + {varbinaryColId, size}, + }, + { + {intColId, expectedIntNulls}, + {bigintColId, expectedBigintNulls}, + {decimalColId, expectedDecimalNulls}, + {varcharColId, expectedVarcharNulls}, + {varbinaryColId, expectedVarbinaryNulls}, + }); + + verifyBoundsExist( + stats[0], + {intColId, bigintColId, decimalColId, varcharColId, varbinaryColId}); +} + +TEST_F(IcebergParquetStatsTest, date) { + constexpr vector_size_t size = 100; + constexpr int32_t expectedNulls = 20; + constexpr int32_t dateColId = 1; + + const auto& stats = + writeDataAndGetAllStats(makeRowVector({makeFlatVector( + size, + [](vector_size_t row) { return 18262 + row; }, + nullEvery(5), + DATE())})); + verifyBasicStats( + stats[0], size, {{dateColId, size}}, {{dateColId, expectedNulls}}); + verifyBoundsExist(stats[0], {dateColId}); + + const auto& [minVal, maxVal] = decodeBounds(stats[0], dateColId); + EXPECT_EQ(minVal, 18263); + EXPECT_EQ(maxVal, 18262 + 99); +} + +TEST_F(IcebergParquetStatsTest, boolean) { + constexpr vector_size_t size = 100; + constexpr int32_t expectedNulls = 10; + constexpr int32_t boolColId = 1; + + const auto& stats = + writeDataAndGetAllStats(makeRowVector({makeFlatVector( + size, + [](vector_size_t row) { return row % 2 == 1; }, + nullEvery(10), + BOOLEAN())})); + verifyBasicStats( + stats[0], size, {{boolColId, size}}, {{boolColId, expectedNulls}}); + verifyBoundsExist(stats[0], {boolColId}); + + // For boolean, the lower bound should be false (0) and upper bound should be + // true (1) if both values are present. + const auto& [minVal, maxVal] = decodeBounds(stats[0], boolColId); + EXPECT_FALSE(minVal); + EXPECT_TRUE(maxVal); +} + +TEST_F(IcebergParquetStatsTest, empty) { + const auto outputDir = TempDirectoryPath::create(); + auto dataSink = createDataSinkAndAppendData( + {makeRowVector( + {makeFlatVector(0), makeFlatVector(0)})}, + outputDir->getPath()); + auto commitTasks = dataSink->close(); + EXPECT_TRUE(commitTasks.empty()); +} + +TEST_F(IcebergParquetStatsTest, nullValues) { + constexpr vector_size_t size = 100; + + const auto& stats = writeDataAndGetAllStats(makeRowVector( + {makeNullConstant(TypeKind::INTEGER, size), + makeNullConstant(TypeKind::VARCHAR, size)})); + EXPECT_EQ(stats[0]->numRecords, size); + ASSERT_EQ(stats[0]->columnStats.at(1).nullValueCount, size); + // Do not collect lower and upper bounds for NULLs. + for (const auto& [fieldId, columnStats] : stats[0]->columnStats) { + ASSERT_FALSE(columnStats.lowerBound.has_value()); + ASSERT_FALSE(columnStats.upperBound.has_value()); + } +} + +TEST_F(IcebergParquetStatsTest, real) { + constexpr vector_size_t size = 100; + constexpr int32_t expectedNulls = 20; + constexpr int32_t realColId = 1; + int32_t expectedNaNs = 0; + + const auto& stats = + writeDataAndGetAllStats(makeRowVector({makeFlatVector( + size, + [&expectedNaNs](vector_size_t row) { + if (row % 6 == 0) { + expectedNaNs++; + return std::numeric_limits::quiet_NaN(); + } + return row * 1.5f; + }, + nullEvery(5), + REAL())})); + verifyBasicStats( + stats[0], size, {{realColId, size}}, {{realColId, expectedNulls}}); + + EXPECT_EQ( + stats[0]->columnStats.at(realColId).nanValueCount.value_or(0), + expectedNaNs); + verifyBoundsExist(stats[0], {realColId}); + const auto& [minVal, maxVal] = decodeBounds(stats[0], realColId); + EXPECT_FLOAT_EQ(minVal, 1.5f); + EXPECT_FLOAT_EQ(maxVal, 148.5f); +} + +TEST_F(IcebergParquetStatsTest, double) { + constexpr vector_size_t size = 100; + constexpr int32_t expectedNulls = 15; + constexpr int32_t doubleColId = 1; + int32_t expectedNaNs = 0; + + auto rowVector = makeRowVector({makeFlatVector( + size, + [&expectedNaNs](vector_size_t row) { + if (row % 3 == 0) { + expectedNaNs++; + return std::numeric_limits::quiet_NaN(); + } + if (row % 4 == 0) { + return std::numeric_limits::infinity(); + } + if (row % 5 == 0) { + return -std::numeric_limits::infinity(); + } + return row * 2.5; + }, + nullEvery(7), + DOUBLE())}); + + const auto& stats = writeDataAndGetAllStats(rowVector); + verifyBasicStats( + stats[0], size, {{doubleColId, size}}, {{doubleColId, expectedNulls}}); + + EXPECT_EQ( + stats[0]->columnStats.at(doubleColId).nanValueCount.value_or(0), + expectedNaNs); + + verifyBoundsExist(stats[0], {doubleColId}); + + // Verify bounds are set correctly and NaN/infinity values don't affect + // min/max incorrectly. + const auto& [minVal, maxVal] = decodeBounds(stats[0], doubleColId); + EXPECT_DOUBLE_EQ(minVal, -std::numeric_limits::infinity()) + << "Lower bound should be -infinity"; + EXPECT_DOUBLE_EQ(maxVal, std::numeric_limits::infinity()) + << "Upper bound should be infinity"; +} + +TEST_F(IcebergParquetStatsTest, mixedDoubleFloat) { + constexpr vector_size_t size = 6; + + auto rowVector = makeRowVector( + {makeFlatVector(size, [](vector_size_t row) { return 1; }), + makeFlatVector( + size, + [](vector_size_t row) { + return -std::numeric_limits::infinity(); + }), + makeFlatVector( + size, + [](vector_size_t row) { + return std::numeric_limits::infinity(); + }), + makeFlatVector(size, [](vector_size_t row) { + switch (row) { + case 0: + return 1.23; + case 1: + return -1.23; + case 2: + return std::numeric_limits::infinity(); + case 3: + return 2.23; + case 4: + return -std::numeric_limits::infinity(); + default: + return -2.23; + } + })}); + + const auto& stats = writeDataAndGetAllStats(rowVector); + constexpr int32_t doubleColId = 4; + verifyBasicStats(stats[0], size, {{doubleColId, size}}, {{doubleColId, 0}}); + const auto& [minVal, maxVal] = decodeBounds(stats[0], doubleColId); + EXPECT_DOUBLE_EQ(minVal, -std::numeric_limits::infinity()); + EXPECT_DOUBLE_EQ(maxVal, std::numeric_limits::infinity()); + + constexpr int32_t floatColId = 2; + const auto& [minFloatVal, maxFloatVal] = + decodeBounds(stats[0], floatColId); + EXPECT_FLOAT_EQ(minFloatVal, -std::numeric_limits::infinity()); + EXPECT_FLOAT_EQ(maxFloatVal, -std::numeric_limits::infinity()); +} + +TEST_F(IcebergParquetStatsTest, NaN) { + constexpr vector_size_t size = 1'000; + constexpr int32_t expectedNulls = 500; + constexpr int32_t doubleColId = 1; + int32_t expectedNaNs = 0; + + const auto& stats = + writeDataAndGetAllStats(makeRowVector({makeFlatVector( + size, + [&expectedNaNs](vector_size_t /*row*/) { + expectedNaNs++; + return std::numeric_limits::quiet_NaN(); + }, + nullEvery(2), + DOUBLE())})); + verifyBasicStats( + stats[0], size, {{doubleColId, size}}, {{doubleColId, expectedNulls}}); + + EXPECT_EQ( + stats[0]->columnStats.at(doubleColId).nanValueCount.value_or(0), + expectedNaNs); + // Do not collect bounds for NULLs and NaNs. + for (const auto& [fieldId, columnStats] : stats[0]->columnStats) { + ASSERT_FALSE(columnStats.lowerBound.has_value()); + ASSERT_FALSE(columnStats.upperBound.has_value()); + } +} + +TEST_F(IcebergParquetStatsTest, partitionedTable) { + std::vector partitionTransforms = { + {0, TransformType::kBucket, 4}, + {1, TransformType::kDay, std::nullopt}, + {2, TransformType::kTruncate, 2}, + }; + + constexpr vector_size_t size = 100; + + auto rowVector = makeRowVector( + {makeFlatVector(size, [](vector_size_t row) { return row; }), + makeFlatVector( + size, + [](vector_size_t row) { return 18262 + (row % 5); }, + nullptr, + DATE()), + makeFlatVector(size, [](vector_size_t row) { + return fmt::format("str{}", row % 10); + })}); + + const auto& fileStats = + writeDataAndGetAllStats(rowVector, partitionTransforms); + + EXPECT_GT(fileStats.size(), 1) + << "Expected multiple files due to partitioning"; + + for (const auto& stats : fileStats) { + EXPECT_GT(stats->numRecords, 0); + ASSERT_FALSE(stats->columnStats.empty()); + + constexpr int32_t intColId = 1; + constexpr int32_t dateColId = 2; + constexpr int32_t varcharColId = 3; + EXPECT_EQ(stats->columnStats.at(intColId).valueCount, stats->numRecords); + EXPECT_EQ(stats->columnStats.at(dateColId).valueCount, stats->numRecords); + EXPECT_EQ( + stats->columnStats.at(varcharColId).valueCount, stats->numRecords); + + for (const auto fieldId : {intColId, dateColId, varcharColId}) { + const auto& columnStats = stats->columnStats.at(fieldId); + ASSERT_TRUE(columnStats.lowerBound.has_value()); + ASSERT_TRUE(columnStats.upperBound.has_value()); + EXPECT_FALSE(columnStats.lowerBound.value().empty()); + EXPECT_FALSE(columnStats.upperBound.value().empty()); + } + } + + // Verify total record count across all partitions. + int64_t totalRecords = 0; + for (const auto& stats : fileStats) { + totalRecords += stats->numRecords; + } + EXPECT_EQ(totalRecords, size); +} + +TEST_F(IcebergParquetStatsTest, multiplePartitionTransforms) { + std::vector partitionTransforms = { + {0, TransformType::kBucket, 2}, + {1, TransformType::kYear, std::nullopt}, + {2, TransformType::kTruncate, 3}, + {3, TransformType::kIdentity, std::nullopt}}; + + constexpr vector_size_t size = 100; + + auto rowVector = makeRowVector( + {makeFlatVector( + size, [](vector_size_t row) { return row * 10; }), + makeFlatVector( + size, + [](vector_size_t row) { return 18262 + (row * 100); }, + nullptr, + DATE()), + makeFlatVector( + size, + [](vector_size_t row) { + return fmt::format("prefix{}_value", row % 5); + }), + makeFlatVector( + size, [](vector_size_t row) { return (row % 3) * 1'000; })}); + + const auto& fileStats = + writeDataAndGetAllStats(rowVector, partitionTransforms); + EXPECT_GT(fileStats.size(), 1) + << "Expected multiple files due to partitioning"; + // Check each file's stats. + for (const auto& stats : fileStats) { + EXPECT_GT(stats->numRecords, 0); + constexpr int32_t intColId = 1; + constexpr int32_t dateColId = 2; + constexpr int32_t bigintColId = 4; + + if (stats->columnStats.contains(intColId)) { + const auto& [minVal, maxVal] = decodeBounds(stats, intColId); + EXPECT_LE(minVal, maxVal) + << "Lower bound should be <= upper bound for int column"; + } + + if (stats->columnStats.contains(dateColId)) { + const auto& [minVal, maxVal] = decodeBounds(stats, dateColId); + EXPECT_LE(minVal, maxVal) + << "Lower bound should be <= upper bound for date column"; + } + + if (stats->columnStats.contains(bigintColId)) { + const auto& [minVal, maxVal] = decodeBounds(stats, bigintColId); + EXPECT_LE(minVal, maxVal) + << "Lower bound should be <= upper bound for bigint column"; + } + } + int64_t totalRecords = 0; + for (const auto& stats : fileStats) { + totalRecords += stats->numRecords; + } + EXPECT_EQ(totalRecords, size); +} + +TEST_F(IcebergParquetStatsTest, partitionedTableWithNulls) { + constexpr vector_size_t size = 100; + constexpr int32_t expectedIntNulls = 20; + constexpr int32_t expectedDateNulls = 15; + constexpr int32_t expectedVarcharNulls = 10; + + std::vector partitionTransforms = { + {0, TransformType::kIdentity, std::nullopt}, + {1, TransformType::kMonth, std::nullopt}, + {2, TransformType::kTruncate, 2}}; + auto rowVector = makeRowVector( + {makeFlatVector( + size, + [](vector_size_t row) { return row % 10; }, + nullEvery(5), + INTEGER()), + makeFlatVector( + size, + [](vector_size_t row) { return 18262 + (row % 3) * 30; }, + nullEvery(7), + DATE()), + makeFlatVector( + size, + [](vector_size_t row) { return fmt::format("val{}", row % 5); }, + nullEvery(11))}); + const auto& fileStats = + writeDataAndGetAllStats(rowVector, partitionTransforms); + int32_t totalIntNulls = 0; + int32_t totalDateNulls = 0; + int32_t totalVarcharNulls = 0; + int32_t totalRecords = 0; + + constexpr int32_t intColId = 1; + constexpr int32_t dateColId = 2; + constexpr int32_t varcharColId = 3; + + for (const auto& stats : fileStats) { + totalRecords += stats->numRecords; + // Add null counts if present. + if (stats->columnStats.contains(intColId)) { + totalIntNulls += stats->columnStats.at(intColId).nullValueCount; + } + + if (stats->columnStats.contains(dateColId)) { + totalDateNulls += stats->columnStats.at(dateColId).nullValueCount; + } + + if (stats->columnStats.contains(varcharColId)) { + totalVarcharNulls += stats->columnStats.at(varcharColId).nullValueCount; + } + } + + // Verify total counts match expected. + EXPECT_EQ(totalRecords, size); + EXPECT_EQ(totalIntNulls, expectedIntNulls); + EXPECT_EQ(totalDateNulls, expectedDateNulls); + EXPECT_EQ(totalVarcharNulls, expectedVarcharNulls); +} + +TEST_F(IcebergParquetStatsTest, mapType) { + constexpr vector_size_t size = 100; + constexpr int32_t intColId = 1; + constexpr int32_t mapValueColId = 3; // Map value field ID. + + std::vector>>>> + mapData; + for (auto i = 0; i < size; ++i) { + std::vector>> mapRow; + for (auto j = 0; j < 5; ++j) { + mapRow.emplace_back(j, fmt::format("value_{}", i * 5 + j)); + } + mapData.push_back(std::move(mapRow)); + } + + const auto& stats = writeDataAndGetAllStats(makeRowVector({ + makeFlatVector(size, [](auto row) { return row * 10; }), + makeNullableMapVector(mapData), + })); + verifyBasicStats(stats[0], size, {{intColId, size}}, {{intColId, 0}}); + + EXPECT_EQ(stats[0]->columnStats.at(mapValueColId).valueCount, size * 5); + // Map values have stats but no bounds (skipBounds=true for maps). + verifyBoundsNotExist(stats[0], {mapValueColId}); +} + +TEST_F(IcebergParquetStatsTest, arrayType) { + constexpr vector_size_t size = 100; + constexpr int32_t intColId = 1; + constexpr int32_t arrayElementColId = 3; // Array element field ID. + + std::vector>> arrayData; + for (auto i = 0; i < size; ++i) { + std::vector> arrayRow; + for (auto j = 0; j < 3; ++j) { + arrayRow.emplace_back(fmt::format("item_{}", i * 3 + j)); + } + arrayData.push_back(std::move(arrayRow)); + } + + const auto& stats = writeDataAndGetAllStats(makeRowVector( + {makeFlatVector(size, [](auto row) { return row * 10; }), + makeNullableArrayVector(arrayData)})); + verifyBasicStats(stats[0], size, {{intColId, size}}, {{intColId, 0}}); + + EXPECT_EQ(stats[0]->columnStats.at(arrayElementColId).valueCount, size * 3); + // Array elements have stats but no bounds (skipBounds=true for arrays). + verifyBoundsNotExist(stats[0], {arrayElementColId}); +} + +// Test statistics collection for nested struct fields. +// Field ID assignment: +// int_col: 1 +// struct_col: 2 (parent, no stats) +// first_level_id: 3 +// first_level_name: 4 +// nested_struct: 5 (parent, no stats) +// second_level_id: 6 +// second_level_name: 7 +// Statistics collected for leaf fields: [1, 3, 4, 6, 7] +TEST_F(IcebergParquetStatsTest, structType) { + constexpr vector_size_t size = 100; + constexpr int32_t intColId = 1; + constexpr int32_t firstLevelIdColId = 3; + constexpr int32_t secondLevelIdColId = 6; + constexpr int32_t secondLevelNameColId = 7; + + auto firstLevelId = makeFlatVector( + size, [](vector_size_t row) { return row % size; }, nullEvery(5)); + + auto firstLevelName = makeFlatVector( + size, + [](vector_size_t row) { return fmt::format("name_{}", row * 10); }, + nullEvery(7)); + + auto secondLevelId = makeFlatVector( + size, [](vector_size_t row) { return row * size; }, nullEvery(6)); + + auto secondLevelName = makeFlatVector( + size, + [](vector_size_t row) { return fmt::format("nested_{}", row * 100); }, + nullEvery(8)); + + auto nestedStruct = makeRowVector({secondLevelId, secondLevelName}); + auto structVector = + makeRowVector({firstLevelId, firstLevelName, nestedStruct}); + auto rowVector = makeRowVector( + {makeFlatVector(size, [](auto row) { return row * 10; }), + structVector}); + + const auto& stats = writeDataAndGetAllStats(rowVector); + EXPECT_EQ(stats[0]->numRecords, size); + EXPECT_EQ(stats[0]->columnStats.size(), 5); + + verifyBasicStats( + stats[0], + size, + {{intColId, size}, {firstLevelIdColId, size}, {secondLevelIdColId, size}}, + {{intColId, 0}, {firstLevelIdColId, 20}}); + + EXPECT_EQ( + encoding::Base64::decode( + stats[0]->columnStats.at(secondLevelNameColId).lowerBound.value()), + "nested_100"); + EXPECT_EQ( + encoding::Base64::decode( + stats[0]->columnStats.at(secondLevelNameColId).upperBound.value()), + "nested_9900"); +} + +#endif + +} // namespace + +} // namespace facebook::velox::connector::hive::iceberg diff --git a/velox/dwio/common/FileMetadata.h b/velox/dwio/common/FileMetadata.h new file mode 100644 index 00000000000..f66cd56e46a --- /dev/null +++ b/velox/dwio/common/FileMetadata.h @@ -0,0 +1,32 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include + +namespace facebook::velox::dwio::common { + +/// File format specific metadata returned when a writer is closed. +/// Caller of Writer::close() can do further processing such as aggregate +/// row group statistics to file level statistics based on the metadata. +class FileMetadata { + public: + virtual ~FileMetadata() = default; +}; + +} // namespace facebook::velox::dwio::common diff --git a/velox/dwio/common/SortingWriter.cpp b/velox/dwio/common/SortingWriter.cpp index d67efd6ec22..3629222ef32 100644 --- a/velox/dwio/common/SortingWriter.cpp +++ b/velox/dwio/common/SortingWriter.cpp @@ -86,12 +86,21 @@ bool SortingWriter::finish() { return true; } +#ifdef VELOX_ENABLE_BACKWARD_COMPATIBILITY void SortingWriter::close() { VELOX_CHECK(isFinishing()); setState(State::kClosed); VELOX_CHECK_NULL(sortBuffer_); outputWriter_->close(); } +#else +std::unique_ptr SortingWriter::close() { + VELOX_CHECK(isFinishing()); + setState(State::kClosed); + VELOX_CHECK_NULL(sortBuffer_); + return outputWriter_->close(); +} +#endif void SortingWriter::abort() { setState(State::kAborted); diff --git a/velox/dwio/common/SortingWriter.h b/velox/dwio/common/SortingWriter.h index a136cff1238..7c36371a2e8 100644 --- a/velox/dwio/common/SortingWriter.h +++ b/velox/dwio/common/SortingWriter.h @@ -42,7 +42,13 @@ class SortingWriter : public Writer { /// be flushed. void flush() override; + /// Closes the writer. Returns file metadata, or null if no metadata is + /// available (e.g. for an empty file). +#ifdef VELOX_ENABLE_BACKWARD_COMPATIBILITY void close() override; +#else + std::unique_ptr close() override; +#endif void abort() override; diff --git a/velox/dwio/common/Writer.h b/velox/dwio/common/Writer.h index 2c5c5ab7868..df591ee3c6d 100644 --- a/velox/dwio/common/Writer.h +++ b/velox/dwio/common/Writer.h @@ -20,6 +20,7 @@ #include #include "velox/common/base/Portability.h" +#include "velox/dwio/common/FileMetadata.h" #include "velox/vector/BaseVector.h" namespace facebook::velox::dwio::common { @@ -69,10 +70,16 @@ class Writer { /// NOTE: this must be called before close(). virtual bool finish() = 0; - /// Invokes closes the writer. Data can no longer be written. + /// Closes the writer. Data can no longer be written. Returns format-specific + /// file metadata collected during write operations. The returned pointer can + /// be null if no metadata is available, such as for an empty data file. /// /// NOTE: this must be called after the last finish() which returns true. +#ifdef VELOX_ENABLE_BACKWARD_COMPATIBILITY virtual void close() = 0; +#else + virtual std::unique_ptr close() = 0; +#endif /// Aborts the writing by closing the writer and dropping everything. /// Data can no longer be written. diff --git a/velox/dwio/common/tests/SortingWriterTest.cpp b/velox/dwio/common/tests/SortingWriterTest.cpp index 4beadc61e1a..1c92ce0b72e 100644 --- a/velox/dwio/common/tests/SortingWriterTest.cpp +++ b/velox/dwio/common/tests/SortingWriterTest.cpp @@ -43,9 +43,16 @@ class MockWriter : public Writer { void flush() override {} +#ifdef VELOX_ENABLE_BACKWARD_COMPATIBILITY void close() override { setState(State::kClosed); } +#else + std::unique_ptr close() override { + setState(State::kClosed); + return nullptr; + } +#endif void abort() override { setState(State::kAborted); diff --git a/velox/dwio/common/tests/WriterTest.cpp b/velox/dwio/common/tests/WriterTest.cpp index 2aa5d992535..c874da60e0b 100644 --- a/velox/dwio/common/tests/WriterTest.cpp +++ b/velox/dwio/common/tests/WriterTest.cpp @@ -51,7 +51,13 @@ class MockWriter : public Writer { void abort() override {} +#ifdef VELOX_ENABLE_BACKWARD_COMPATIBILITY void close() override {} +#else + std::unique_ptr close() override { + return nullptr; + } +#endif }; TEST(WriterTest, stateString) { diff --git a/velox/dwio/dwrf/writer/Writer.cpp b/velox/dwio/dwrf/writer/Writer.cpp index 26edce0d910..522bb1b810a 100644 --- a/velox/dwio/dwrf/writer/Writer.cpp +++ b/velox/dwio/dwrf/writer/Writer.cpp @@ -798,6 +798,7 @@ void Writer::flush() { flushInternal(false); } +#ifdef VELOX_ENABLE_BACKWARD_COMPATIBILITY void Writer::close() { checkRunning(); auto exitGuard = folly::makeGuard([this]() { @@ -807,6 +808,18 @@ void Writer::close() { flushInternal(true); writerBase_->close(); } +#else +std::unique_ptr Writer::close() { + checkRunning(); + auto exitGuard = folly::makeGuard([this]() { + flushPolicy_->onClose(); + setState(State::kClosed); + }); + flushInternal(true); + writerBase_->close(); + return std::make_unique(); +} +#endif void Writer::abort() { checkRunning(); diff --git a/velox/dwio/dwrf/writer/Writer.h b/velox/dwio/dwrf/writer/Writer.h index 4a7d1ab540b..3b6ed904e9c 100644 --- a/velox/dwio/dwrf/writer/Writer.h +++ b/velox/dwio/dwrf/writer/Writer.h @@ -19,6 +19,7 @@ #include #include +#include "velox/dwio/common/FileMetadata.h" #include "velox/dwio/common/Writer.h" #include "velox/dwio/common/WriterFactory.h" #include "velox/dwio/dwrf/common/Encryption.h" @@ -30,6 +31,9 @@ namespace facebook::velox::dwrf { +/// DWRF-specific file metadata wrapper. Currently a placeholder. +class DwrfFileMetadata : public dwio::common::FileMetadata {}; + struct WriterOptions : public dwio::common::WriterOptions { std::shared_ptr config = std::make_shared(); /// Changes the interface to stream list and encoding iter. @@ -86,7 +90,11 @@ class Writer : public dwio::common::Writer { return true; } +#ifdef VELOX_ENABLE_BACKWARD_COMPATIBILITY virtual void close() override; +#else + virtual std::unique_ptr close() override; +#endif virtual void abort() override; diff --git a/velox/dwio/parquet/reader/PageReader.cpp b/velox/dwio/parquet/reader/PageReader.cpp index d0c6a0fcf7d..114868308a7 100644 --- a/velox/dwio/parquet/reader/PageReader.cpp +++ b/velox/dwio/parquet/reader/PageReader.cpp @@ -362,7 +362,7 @@ void PageReader::prepareDictionary(const PageHeader& pageHeader) { ? sizeof(float) : sizeof(double); auto numBytes = dictionary_.numValues * typeSize; - if (type_->type()->isShortDecimal() && + if ((type_->type()->isShortDecimal() || type_->type()->isTime()) && parquetType == thrift::Type::INT32) { auto veloxTypeLength = type_->type()->cppSizeInBytes(); auto numVeloxBytes = dictionary_.numValues * veloxTypeLength; @@ -390,7 +390,7 @@ void PageReader::prepareDictionary(const PageHeader& pageHeader) { } stats_.pageLoadTimeNs.increment(readUs * 1'000); } - if (type_->type()->isShortDecimal() && + if ((type_->type()->isShortDecimal() || type_->type()->isTime()) && parquetType == thrift::Type::INT32) { auto values = dictionary_.values->asMutable(); auto parquetValues = dictionary_.values->asMutable(); diff --git a/velox/dwio/parquet/writer/Writer.cpp b/velox/dwio/parquet/writer/Writer.cpp index 7f4da92c831..5a22aab41d3 100644 --- a/velox/dwio/parquet/writer/Writer.cpp +++ b/velox/dwio/parquet/writer/Writer.cpp @@ -552,17 +552,35 @@ void Writer::newRowGroup(int32_t numRows) { PARQUET_THROW_NOT_OK(arrowContext_->writer->newRowGroup(numRows)); } +#ifdef VELOX_ENABLE_BACKWARD_COMPATIBILITY void Writer::close() { flush(); + if (arrowContext_->writer) { + PARQUET_THROW_NOT_OK(arrowContext_->writer->close()); + arrowContext_->writer.reset(); + } + PARQUET_THROW_NOT_OK(stream_->Close()); + arrowContext_->stagingChunks.clear(); +} +#else +std::unique_ptr Writer::close() { + flush(); + std::unique_ptr parquetFileMetadata; if (arrowContext_->writer) { PARQUET_THROW_NOT_OK(arrowContext_->writer->close()); + parquetFileMetadata = std::make_unique( + arrowContext_->writer->metadata()); arrowContext_->writer.reset(); } + PARQUET_THROW_NOT_OK(stream_->Close()); arrowContext_->stagingChunks.clear(); + + return parquetFileMetadata; } +#endif void Writer::abort() { stream_->abort(); diff --git a/velox/dwio/parquet/writer/Writer.h b/velox/dwio/parquet/writer/Writer.h index e77819b06f4..41a03b83023 100644 --- a/velox/dwio/parquet/writer/Writer.h +++ b/velox/dwio/parquet/writer/Writer.h @@ -20,12 +20,15 @@ #include "velox/common/compression/Compression.h" #include "velox/common/config/Config.h" #include "velox/dwio/common/DataBuffer.h" +#include "velox/dwio/common/FileMetadata.h" #include "velox/dwio/common/FileSink.h" #include "velox/dwio/common/FlushPolicy.h" #include "velox/dwio/common/Options.h" #include "velox/dwio/common/Writer.h" #include "velox/dwio/common/WriterFactory.h" #include "velox/dwio/parquet/ParquetFieldId.h" +#include "velox/dwio/parquet/writer/WriterConfig.h" +#include "velox/dwio/parquet/writer/arrow/Metadata.h" #include "velox/dwio/parquet/writer/arrow/Types.h" #include "velox/dwio/parquet/writer/arrow/util/Compression.h" #include "velox/vector/ComplexVector.h" @@ -39,6 +42,21 @@ class ArrowDataBufferSink; struct ArrowContext; +/// Parquet-specific file metadata wrapper. Provides access to the underlying +/// arrow::FileMetaData. +class ParquetFileMetadata : public dwio::common::FileMetadata { + public: + explicit ParquetFileMetadata(std::shared_ptr metadata) + : metadata_(std::move(metadata)) {} + + std::shared_ptr arrowMetadata() const { + return metadata_; + } + + private: + std::shared_ptr metadata_; +}; + class DefaultFlushPolicy : public dwio::common::FlushPolicy { public: DefaultFlushPolicy() @@ -93,7 +111,7 @@ class LambdaFlushPolicy : public DefaultFlushPolicy { std::function lambda_; }; -struct WriterOptions : public dwio::common::WriterOptions { +struct WriterOptions : public dwio::common::WriterOptions, public WriterConfig { // Growth ratio passed to ArrowDataBufferSink. The default value is a // heuristic borrowed from // folly/FBVector(https://github.com/facebook/folly/blob/main/folly/docs/FBVector.md#memory-handling). @@ -127,51 +145,6 @@ struct WriterOptions : public dwio::common::WriterOptions { /// The structure should match the schema hierarchy with nested children. std::vector parquetFieldIds; - // Parsing session and hive configs. - - // This isn't a typo; session and hive connector config names are different - // ('_' vs '-'). - static constexpr const char* kParquetSessionWriteTimestampUnit = - "hive.parquet.writer.timestamp_unit"; - static constexpr const char* kParquetHiveConnectorWriteTimestampUnit = - "hive.parquet.writer.timestamp-unit"; - static constexpr const char* kParquetSessionEnableDictionary = - "hive.parquet.writer.enable_dictionary"; - static constexpr const char* kParquetHiveConnectorEnableDictionary = - "hive.parquet.writer.enable-dictionary"; - static constexpr const char* kParquetSessionDictionaryPageSizeLimit = - "hive.parquet.writer.dictionary_page_size_limit"; - static constexpr const char* kParquetHiveConnectorDictionaryPageSizeLimit = - "hive.parquet.writer.dictionary-page-size-limit"; - static constexpr const char* kParquetSessionDataPageVersion = - "hive.parquet.writer.datapage_version"; - static constexpr const char* kParquetHiveConnectorDataPageVersion = - "hive.parquet.writer.datapage-version"; - static constexpr const char* kParquetSessionWritePageSize = - "hive.parquet.writer.page_size"; - static constexpr const char* kParquetHiveConnectorWritePageSize = - "hive.parquet.writer.page-size"; - static constexpr const char* kParquetSessionWriteBatchSize = - "hive.parquet.writer.batch_size"; - static constexpr const char* kParquetHiveConnectorWriteBatchSize = - "hive.parquet.writer.batch-size"; - static constexpr const char* kParquetHiveConnectorCreatedBy = - "hive.parquet.writer.created-by"; - - // Use the same property name from HiveConfig::kMaxTargetFileSize. - static constexpr const char* kParquetConnectorMaxTargetFileSize = - "max-target-file-size"; - static constexpr const char* kParquetSessionMaxTargetFileSize = - "max_target_file_size"; - // Serde parameter keys for timestamp settings. These can be set via - // serdeParameters map to override the default timestamp behavior. - // The timezone key accepts a timezone string or empty string to disable - // timezone conversion. - static constexpr const char* kParquetSerdeTimestampUnit = - "parquet.writer.timestamp.unit"; - static constexpr const char* kParquetSerdeTimestampTimezone = - "parquet.writer.timestamp.timezone"; - // Process hive connector and session configs. void processConfigs( const config::ConfigBase& connectorConfig, @@ -213,10 +186,18 @@ class Writer : public dwio::common::Writer { return true; } - // Closes 'this', After close, data can no longer be added and the completed +#ifdef VELOX_ENABLE_BACKWARD_COMPATIBILITY + // Closes 'this'. After close, data can no longer be added and the completed // Parquet file is flushed into 'sink' provided at construction. 'sink' stays // live until destruction of 'this'. void close() override; +#else + // Closes 'this'. After close, data can no longer be added and the completed + // Parquet file is flushed into 'sink' provided at construction. 'sink' stays + // live until destruction of 'this'. Returns file metadata, or null if no + // metadata is available (e.g. for an empty file). + std::unique_ptr close() override; +#endif void abort() override; diff --git a/velox/dwio/parquet/writer/WriterConfig.h b/velox/dwio/parquet/writer/WriterConfig.h new file mode 100644 index 00000000000..53da9b7cbcd --- /dev/null +++ b/velox/dwio/parquet/writer/WriterConfig.h @@ -0,0 +1,72 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +namespace facebook::velox::parquet { + +/// Lightweight config constants for the Parquet writer. +/// Separated from Writer.h to allow access without pulling in Arrow headers. +/// WriterOptions inherits from this struct, so all existing code using +/// WriterOptions::kParquet* constants continues to work. +struct WriterConfig { + // Parsing session and hive configs. + + // This isn't a typo; session and hive connector config names are different + // ('_' vs '-'). + static constexpr const char* kParquetSessionWriteTimestampUnit = + "hive.parquet.writer.timestamp_unit"; + static constexpr const char* kParquetHiveConnectorWriteTimestampUnit = + "hive.parquet.writer.timestamp-unit"; + static constexpr const char* kParquetSessionEnableDictionary = + "hive.parquet.writer.enable_dictionary"; + static constexpr const char* kParquetHiveConnectorEnableDictionary = + "hive.parquet.writer.enable-dictionary"; + static constexpr const char* kParquetSessionDictionaryPageSizeLimit = + "hive.parquet.writer.dictionary_page_size_limit"; + static constexpr const char* kParquetHiveConnectorDictionaryPageSizeLimit = + "hive.parquet.writer.dictionary-page-size-limit"; + static constexpr const char* kParquetSessionDataPageVersion = + "hive.parquet.writer.datapage_version"; + static constexpr const char* kParquetHiveConnectorDataPageVersion = + "hive.parquet.writer.datapage-version"; + static constexpr const char* kParquetSessionWritePageSize = + "hive.parquet.writer.page_size"; + static constexpr const char* kParquetHiveConnectorWritePageSize = + "hive.parquet.writer.page-size"; + static constexpr const char* kParquetSessionWriteBatchSize = + "hive.parquet.writer.batch_size"; + static constexpr const char* kParquetHiveConnectorWriteBatchSize = + "hive.parquet.writer.batch-size"; + static constexpr const char* kParquetHiveConnectorCreatedBy = + "hive.parquet.writer.created-by"; + + // Use the same property name from HiveConfig::kMaxTargetFileSize. + static constexpr const char* kParquetConnectorMaxTargetFileSize = + "max-target-file-size"; + static constexpr const char* kParquetSessionMaxTargetFileSize = + "max_target_file_size"; + // Serde parameter keys for timestamp settings. These can be set via + // serdeParameters map to override the default timestamp behavior. + // The timezone key accepts a timezone string or empty string to disable + // timezone conversion. + static constexpr const char* kParquetSerdeTimestampUnit = + "parquet.writer.timestamp.unit"; + static constexpr const char* kParquetSerdeTimestampTimezone = + "parquet.writer.timestamp.timezone"; +}; + +} // namespace facebook::velox::parquet diff --git a/velox/dwio/text/writer/TextWriter.cpp b/velox/dwio/text/writer/TextWriter.cpp index 0293fa4ee83..47731324910 100644 --- a/velox/dwio/text/writer/TextWriter.cpp +++ b/velox/dwio/text/writer/TextWriter.cpp @@ -175,9 +175,16 @@ void TextWriter::flush() { bufferedWriterSink_->flush(); } +#ifdef VELOX_ENABLE_BACKWARD_COMPATIBILITY void TextWriter::close() { bufferedWriterSink_->close(); } +#else +std::unique_ptr TextWriter::close() { + bufferedWriterSink_->close(); + return std::make_unique(); +} +#endif void TextWriter::abort() { bufferedWriterSink_->abort(); diff --git a/velox/dwio/text/writer/TextWriter.h b/velox/dwio/text/writer/TextWriter.h index 2ec11b82685..2d75df603cf 100644 --- a/velox/dwio/text/writer/TextWriter.h +++ b/velox/dwio/text/writer/TextWriter.h @@ -16,6 +16,7 @@ #pragma once +#include "velox/dwio/common/FileMetadata.h" #include "velox/dwio/common/FileSink.h" #include "velox/dwio/common/Options.h" #include "velox/dwio/common/Writer.h" @@ -26,6 +27,9 @@ namespace facebook::velox::text { using dwio::common::SerDeOptions; +/// Text-specific file metadata wrapper. Currently a placeholder. +class TextFileMetadata : public dwio::common::FileMetadata {}; + struct WriterOptions : public dwio::common::WriterOptions { int64_t defaultFlushCount = 10 << 10; uint8_t headerLineCount = @@ -58,7 +62,11 @@ class TextWriter : public dwio::common::Writer { return true; } +#ifdef VELOX_ENABLE_BACKWARD_COMPATIBILITY void close() override; +#else + std::unique_ptr close() override; +#endif void abort() override;