Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions velox/connectors/hive/iceberg/IcebergMetadataColumns.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,11 @@ struct IcebergMetadataColumn {
std::shared_ptr<const Type> type;
std::string doc;

// Position delete file's metadata column ID, see
// https://iceberg.apache.org/spec/#position-delete-files.
static constexpr int32_t kPosId = 2'147'483'545;
static constexpr int32_t kFilePathId = 2'147'483'546;
Comment on lines +33 to +34
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where do these numbers come from?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They are defined in https://iceberg.apache.org/spec/#position-delete-files, added a comment to explain the value.


IcebergMetadataColumn(
int _id,
const std::string& _name,
Expand All @@ -37,15 +42,15 @@ struct IcebergMetadataColumn {

static std::shared_ptr<IcebergMetadataColumn> icebergDeleteFilePathColumn() {
return std::make_shared<IcebergMetadataColumn>(
2147483546,
kFilePathId,
"file_path",
VARCHAR(),
"Path of a file in which a deleted row is stored");
}

static std::shared_ptr<IcebergMetadataColumn> icebergDeletePosColumn() {
return std::make_shared<IcebergMetadataColumn>(
2147483545,
kPosId,
"pos",
BIGINT(),
"Ordinal position of a deleted row in the data file");
Expand Down
21 changes: 16 additions & 5 deletions velox/connectors/hive/iceberg/IcebergSplitReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@

#include "velox/connectors/hive/iceberg/IcebergSplitReader.h"

#include <folly/lang/Bits.h>

#include "velox/common/encode/Base64.h"
#include "velox/connectors/hive/iceberg/IcebergDeleteFile.h"
#include "velox/connectors/hive/iceberg/IcebergMetadataColumns.h"
#include "velox/connectors/hive/iceberg/IcebergSplit.h"
Expand Down Expand Up @@ -82,11 +85,19 @@ void IcebergSplitReader::prepareSplit(
// Skip the delete file if all delete positions are before this split.
// TODO: Skip delete files where all positions are after the split, if
// split row count becomes available.
auto posColumn = IcebergMetadataColumn::icebergDeletePosColumn();
if (auto posUpperBoundIt = deleteFile.upperBounds.find(posColumn->id);
posUpperBoundIt != deleteFile.upperBounds.end()) {
auto deleteUpperBound = folly::to<uint64_t>(posUpperBoundIt->second);
if (deleteUpperBound < splitOffset_) {
if (auto iter =
deleteFile.upperBounds.find(IcebergMetadataColumn::kPosId);
iter != deleteFile.upperBounds.end()) {
auto decodedBound = encoding::Base64::decode(iter->second);
VELOX_CHECK_EQ(
decodedBound.size(),
sizeof(uint64_t),
"Unexpected decoded size for positional delete upper bound.");
uint64_t posDeleteUpperBound;
std::memcpy(
&posDeleteUpperBound, decodedBound.data(), sizeof(uint64_t));
posDeleteUpperBound = folly::Endian::little(posDeleteUpperBound);
if (posDeleteUpperBound < splitOffset_) {
continue;
}
}
Expand Down
12 changes: 10 additions & 2 deletions velox/connectors/hive/iceberg/tests/IcebergReadTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@
*/

#include <folly/Singleton.h>
#include <folly/lang/Bits.h>
#include "velox/common/base/tests/GTestUtils.h"
#include "velox/common/encode/Base64.h"
#include "velox/common/file/FileSystems.h"
#include "velox/connectors/hive/HiveConnectorSplit.h"
#include "velox/connectors/hive/iceberg/IcebergConnector.h"
Expand Down Expand Up @@ -920,7 +922,13 @@ TEST_F(HiveIcebergTest, skipDeleteFileByPositionUpperBound) {
makeFlatVector<int64_t>({0, 1, 2})})};
writeToFile(deleteFilePath->getPath(), deleteVectors);

// upperBound "2" is the max position in the delete file.
// upperBound "2" is the max position in the delete file. Iceberg stores
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@PingLiuPing Would you confirm that the test fails without the fix?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes,

unknown file: Failure
C++ exception with description "Exception: VeloxRuntimeError
Error Source: RUNTIME
Error Code: INVALID_STATE
Reason: Operator::getOutput failed for [operator: TableScan, plan node ID: 0]: Non-digit character found: "AgAAAAAAAAA="
Retriable: False
Function: operator()

"AgAAAAAAAAA=" is base64 encoded value.

// long bounds as 8-byte little-endian binary, then Base64 encodes them.
uint64_t upperBound = 2;
auto upperBoundLE = folly::Endian::little(upperBound);
auto encodedUpperBound = encoding::Base64::encode(
std::string_view(
reinterpret_cast<const char*>(&upperBoundLE), sizeof(upperBoundLE)));
IcebergDeleteFile deleteFile(
FileContent::kPositionalDeletes,
deleteFilePath->getPath(),
Expand All @@ -930,7 +938,7 @@ TEST_F(HiveIcebergTest, skipDeleteFileByPositionUpperBound) {
std::fopen(deleteFilePath->getPath().c_str(), "r")),
{},
{},
{{posColumn->id, "2"}});
{{posColumn->id, encodedUpperBound}});

auto plan = PlanBuilder()
.startTableScan()
Expand Down
Loading