diff --git a/velox/connectors/hive/iceberg/IcebergMetadataColumns.h b/velox/connectors/hive/iceberg/IcebergMetadataColumns.h index 4cbf2a7862b..becd7d5de3c 100644 --- a/velox/connectors/hive/iceberg/IcebergMetadataColumns.h +++ b/velox/connectors/hive/iceberg/IcebergMetadataColumns.h @@ -28,6 +28,11 @@ struct IcebergMetadataColumn { std::shared_ptr type; std::string doc; + // Position delete file's metadata column ID, see + // https://iceberg.apache.org/spec/#position-delete-files. + static constexpr int32_t kPosId = 2'147'483'545; + static constexpr int32_t kFilePathId = 2'147'483'546; + IcebergMetadataColumn( int _id, const std::string& _name, @@ -37,7 +42,7 @@ struct IcebergMetadataColumn { static std::shared_ptr icebergDeleteFilePathColumn() { return std::make_shared( - 2147483546, + kFilePathId, "file_path", VARCHAR(), "Path of a file in which a deleted row is stored"); @@ -45,7 +50,7 @@ struct IcebergMetadataColumn { static std::shared_ptr icebergDeletePosColumn() { return std::make_shared( - 2147483545, + kPosId, "pos", BIGINT(), "Ordinal position of a deleted row in the data file"); diff --git a/velox/connectors/hive/iceberg/IcebergSplitReader.cpp b/velox/connectors/hive/iceberg/IcebergSplitReader.cpp index 73bd0e14672..a8821a75204 100644 --- a/velox/connectors/hive/iceberg/IcebergSplitReader.cpp +++ b/velox/connectors/hive/iceberg/IcebergSplitReader.cpp @@ -16,6 +16,9 @@ #include "velox/connectors/hive/iceberg/IcebergSplitReader.h" +#include + +#include "velox/common/encode/Base64.h" #include "velox/connectors/hive/iceberg/IcebergDeleteFile.h" #include "velox/connectors/hive/iceberg/IcebergMetadataColumns.h" #include "velox/connectors/hive/iceberg/IcebergSplit.h" @@ -82,11 +85,19 @@ void IcebergSplitReader::prepareSplit( // Skip the delete file if all delete positions are before this split. // TODO: Skip delete files where all positions are after the split, if // split row count becomes available. - auto posColumn = IcebergMetadataColumn::icebergDeletePosColumn(); - if (auto posUpperBoundIt = deleteFile.upperBounds.find(posColumn->id); - posUpperBoundIt != deleteFile.upperBounds.end()) { - auto deleteUpperBound = folly::to(posUpperBoundIt->second); - if (deleteUpperBound < splitOffset_) { + if (auto iter = + deleteFile.upperBounds.find(IcebergMetadataColumn::kPosId); + iter != deleteFile.upperBounds.end()) { + auto decodedBound = encoding::Base64::decode(iter->second); + VELOX_CHECK_EQ( + decodedBound.size(), + sizeof(uint64_t), + "Unexpected decoded size for positional delete upper bound."); + uint64_t posDeleteUpperBound; + std::memcpy( + &posDeleteUpperBound, decodedBound.data(), sizeof(uint64_t)); + posDeleteUpperBound = folly::Endian::little(posDeleteUpperBound); + if (posDeleteUpperBound < splitOffset_) { continue; } } diff --git a/velox/connectors/hive/iceberg/tests/IcebergReadTest.cpp b/velox/connectors/hive/iceberg/tests/IcebergReadTest.cpp index 91c8fed3d5b..29acc03a2e1 100644 --- a/velox/connectors/hive/iceberg/tests/IcebergReadTest.cpp +++ b/velox/connectors/hive/iceberg/tests/IcebergReadTest.cpp @@ -15,7 +15,9 @@ */ #include +#include #include "velox/common/base/tests/GTestUtils.h" +#include "velox/common/encode/Base64.h" #include "velox/common/file/FileSystems.h" #include "velox/connectors/hive/HiveConnectorSplit.h" #include "velox/connectors/hive/iceberg/IcebergConnector.h" @@ -920,7 +922,13 @@ TEST_F(HiveIcebergTest, skipDeleteFileByPositionUpperBound) { makeFlatVector({0, 1, 2})})}; writeToFile(deleteFilePath->getPath(), deleteVectors); - // upperBound "2" is the max position in the delete file. + // upperBound "2" is the max position in the delete file. Iceberg stores + // long bounds as 8-byte little-endian binary, then Base64 encodes them. + uint64_t upperBound = 2; + auto upperBoundLE = folly::Endian::little(upperBound); + auto encodedUpperBound = encoding::Base64::encode( + std::string_view( + reinterpret_cast(&upperBoundLE), sizeof(upperBoundLE))); IcebergDeleteFile deleteFile( FileContent::kPositionalDeletes, deleteFilePath->getPath(), @@ -930,7 +938,7 @@ TEST_F(HiveIcebergTest, skipDeleteFileByPositionUpperBound) { std::fopen(deleteFilePath->getPath().c_str(), "r")), {}, {}, - {{posColumn->id, "2"}}); + {{posColumn->id, encodedUpperBound}}); auto plan = PlanBuilder() .startTableScan()