diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/delete/DeleteFile.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/delete/DeleteFile.java index 16b4943000651..691d073f1a90a 100644 --- a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/delete/DeleteFile.java +++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/delete/DeleteFile.java @@ -42,6 +42,7 @@ public final class DeleteFile private final List equalityFieldIds; private final Map lowerBounds; private final Map upperBounds; + private final long dataSequenceNumber; public static DeleteFile fromIceberg(org.apache.iceberg.DeleteFile deleteFile) { @@ -50,6 +51,8 @@ public static DeleteFile fromIceberg(org.apache.iceberg.DeleteFile deleteFile) Map upperBounds = firstNonNull(deleteFile.upperBounds(), ImmutableMap.of()) .entrySet().stream().collect(toImmutableMap(Map.Entry::getKey, entry -> entry.getValue().array().clone())); + long dataSequenceNumber = deleteFile.dataSequenceNumber() != null ? deleteFile.dataSequenceNumber() : 0L; + return new DeleteFile( fromIcebergFileContent(deleteFile.content()), deleteFile.path().toString(), @@ -58,7 +61,8 @@ public static DeleteFile fromIceberg(org.apache.iceberg.DeleteFile deleteFile) deleteFile.fileSizeInBytes(), Optional.ofNullable(deleteFile.equalityFieldIds()).orElseGet(ImmutableList::of), lowerBounds, - upperBounds); + upperBounds, + dataSequenceNumber); } @JsonCreator @@ -70,7 +74,8 @@ public DeleteFile( @JsonProperty("fileSizeInBytes") long fileSizeInBytes, @JsonProperty("equalityFieldIds") List equalityFieldIds, @JsonProperty("lowerBounds") Map lowerBounds, - @JsonProperty("upperBounds") Map upperBounds) + @JsonProperty("upperBounds") Map upperBounds, + @JsonProperty("dataSequenceNumber") long dataSequenceNumber) { this.content = requireNonNull(content, "content is null"); this.path = requireNonNull(path, "path is null"); @@ -80,6 +85,7 @@ public DeleteFile( this.equalityFieldIds = ImmutableList.copyOf(requireNonNull(equalityFieldIds, "equalityFieldIds is null")); this.lowerBounds = ImmutableMap.copyOf(requireNonNull(lowerBounds, "lowerBounds is null")); this.upperBounds = ImmutableMap.copyOf(requireNonNull(upperBounds, "upperBounds is null")); + this.dataSequenceNumber = dataSequenceNumber; } @JsonProperty @@ -130,12 +136,19 @@ public Map getUpperBounds() return upperBounds; } + @JsonProperty + public long getDataSequenceNumber() + { + return dataSequenceNumber; + } + @Override public String toString() { return toStringHelper(this) .addValue(path) .add("records", recordCount) + .add("dataSequenceNumber", dataSequenceNumber) .toString(); } } diff --git a/presto-native-execution/presto_cpp/main/connectors/IcebergPrestoToVeloxConnector.cpp b/presto-native-execution/presto_cpp/main/connectors/IcebergPrestoToVeloxConnector.cpp index 707739048c721..8eedece1d92fd 100644 --- a/presto-native-execution/presto_cpp/main/connectors/IcebergPrestoToVeloxConnector.cpp +++ b/presto-native-execution/presto_cpp/main/connectors/IcebergPrestoToVeloxConnector.cpp @@ -30,6 +30,8 @@ velox::connector::hive::iceberg::FileContent toVeloxFileContent( return velox::connector::hive::iceberg::FileContent::kData; } else if (content == protocol::iceberg::FileContent::POSITION_DELETES) { return velox::connector::hive::iceberg::FileContent::kPositionalDeletes; + } else if (content == protocol::iceberg::FileContent::EQUALITY_DELETES) { + return velox::connector::hive::iceberg::FileContent::kEqualityDeletes; } VELOX_UNSUPPORTED("Unsupported file content: {}", fmt::underlying(content)); } @@ -176,6 +178,9 @@ IcebergPrestoToVeloxConnector::toVeloxSplit( VELOX_CHECK_NOT_NULL( icebergSplit, "Unexpected split type {}", connectorSplit->_type); + const int64_t dataSequenceNumber = + icebergSplit->dataSequenceNumber; // NOLINT(facebook-bugprone-unchecked-pointer-access) + std::unordered_map> partitionKeys; for (const auto& entry : icebergSplit->partitionKeys) { partitionKeys.emplace( @@ -205,14 +210,16 @@ IcebergPrestoToVeloxConnector::toVeloxSplit( deleteFile.fileSizeInBytes, std::vector(deleteFile.equalityFieldIds), lowerBounds, - upperBounds); + upperBounds, + deleteFile.dataSequenceNumber); deletes.emplace_back(icebergDeleteFile); } + std::unordered_map infoColumns = { {"$data_sequence_number", - std::to_string(icebergSplit->dataSequenceNumber)}, + std::to_string(dataSequenceNumber)}, {"$path", icebergSplit->path}}; return std::make_unique( @@ -227,7 +234,9 @@ IcebergPrestoToVeloxConnector::toVeloxSplit( nullptr, splitContext->cacheable, deletes, - infoColumns); + infoColumns, + std::nullopt, + dataSequenceNumber); } std::unique_ptr diff --git a/presto-native-execution/presto_cpp/presto_protocol/connector/iceberg/presto_protocol_iceberg.cpp b/presto-native-execution/presto_cpp/presto_protocol/connector/iceberg/presto_protocol_iceberg.cpp index ec74e80c58192..41d2ded58dfe9 100644 --- a/presto-native-execution/presto_cpp/presto_protocol/connector/iceberg/presto_protocol_iceberg.cpp +++ b/presto-native-execution/presto_cpp/presto_protocol/connector/iceberg/presto_protocol_iceberg.cpp @@ -371,6 +371,13 @@ void to_json(json& j, const DeleteFile& p) { "DeleteFile", "Map", "upperBounds"); + to_json_key( + j, + "dataSequenceNumber", + p.dataSequenceNumber, + "DeleteFile", + "int64_t", + "dataSequenceNumber"); } void from_json(const json& j, DeleteFile& p) { @@ -408,6 +415,13 @@ void from_json(const json& j, DeleteFile& p) { "DeleteFile", "Map", "upperBounds"); + from_json_key( + j, + "dataSequenceNumber", + p.dataSequenceNumber, + "DeleteFile", + "int64_t", + "dataSequenceNumber"); } } // namespace facebook::presto::protocol::iceberg namespace facebook::presto::protocol::iceberg { diff --git a/presto-native-execution/presto_cpp/presto_protocol/connector/iceberg/presto_protocol_iceberg.h b/presto-native-execution/presto_cpp/presto_protocol/connector/iceberg/presto_protocol_iceberg.h index b09cd4903a5bf..7c5826826b1fa 100644 --- a/presto-native-execution/presto_cpp/presto_protocol/connector/iceberg/presto_protocol_iceberg.h +++ b/presto-native-execution/presto_cpp/presto_protocol/connector/iceberg/presto_protocol_iceberg.h @@ -78,7 +78,11 @@ void to_json(json& j, const ChangelogSplitInfo& p); void from_json(const json& j, ChangelogSplitInfo& p); } // namespace facebook::presto::protocol::iceberg namespace facebook::presto::protocol::iceberg { -enum class FileContent { DATA, POSITION_DELETES, EQUALITY_DELETES }; +enum class FileContent { + DATA, + POSITION_DELETES, + EQUALITY_DELETES, +}; extern void to_json(json& j, const FileContent& e); extern void from_json(const json& j, FileContent& e); } // namespace facebook::presto::protocol::iceberg @@ -97,6 +101,7 @@ struct DeleteFile { List equalityFieldIds = {}; Map lowerBounds = {}; Map upperBounds = {}; + int64_t dataSequenceNumber = {}; }; void to_json(json& j, const DeleteFile& p); void from_json(const json& j, DeleteFile& p);