diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/FileFormat.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/FileFormat.java index df4700bc8db80..e2d94f71b6b33 100644 --- a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/FileFormat.java +++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/FileFormat.java @@ -26,7 +26,8 @@ public enum FileFormat ORC("orc", true), PARQUET("parquet", true), AVRO("avro", true), - METADATA("metadata.json", false); + METADATA("metadata.json", false), + PUFFIN("puffin", false); private final String ext; private final boolean splittable; @@ -61,6 +62,9 @@ public static FileFormat fromIcebergFileFormat(org.apache.iceberg.FileFormat for case METADATA: prestoFileFormat = METADATA; break; + case PUFFIN: + prestoFileFormat = PUFFIN; + break; default: throw new PrestoException(NOT_SUPPORTED, "Unsupported file format: " + format); } @@ -81,6 +85,12 @@ public org.apache.iceberg.FileFormat toIceberg() case AVRO: fileFormat = org.apache.iceberg.FileFormat.AVRO; break; + case METADATA: + fileFormat = org.apache.iceberg.FileFormat.METADATA; + break; + case PUFFIN: + fileFormat = org.apache.iceberg.FileFormat.PUFFIN; + break; default: throw new PrestoException(NOT_SUPPORTED, "Unsupported file format: " + this); } diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergSplitSource.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergSplitSource.java index 9d4cd1a615636..98ee9f2693450 100644 --- a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergSplitSource.java +++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergSplitSource.java @@ -18,7 +18,6 @@ import com.facebook.presto.spi.ConnectorSession; import com.facebook.presto.spi.ConnectorSplit; import com.facebook.presto.spi.ConnectorSplitSource; -import com.facebook.presto.spi.PrestoException; import com.facebook.presto.spi.SplitWeight; import com.facebook.presto.spi.connector.ConnectorPartitionHandle; import com.facebook.presto.spi.schedule.NodeSelectionStrategy; @@ -47,7 +46,6 @@ import static com.facebook.presto.iceberg.IcebergUtil.getTargetSplitSize; import static com.facebook.presto.iceberg.IcebergUtil.metadataColumnsMatchPredicates; import static com.facebook.presto.iceberg.IcebergUtil.partitionDataFromStructLike; -import static com.facebook.presto.spi.StandardErrorCode.NOT_SUPPORTED; import static com.google.common.collect.ImmutableList.toImmutableList; import static com.google.common.collect.Iterators.limit; import static java.util.Objects.requireNonNull; @@ -126,13 +124,6 @@ private ConnectorSplit toIcebergSplit(FileScanTask task) PartitionSpec spec = task.spec(); Optional partitionData = partitionDataFromStructLike(spec, task.file().partition()); - // Validate no PUFFIN deletion vectors (Iceberg v3 feature not yet supported) - for (org.apache.iceberg.DeleteFile deleteFile : task.deletes()) { - if (deleteFile.format() == org.apache.iceberg.FileFormat.PUFFIN) { - throw new PrestoException(NOT_SUPPORTED, "Iceberg deletion vectors (PUFFIN format) are not supported"); - } - } - // TODO: We should leverage residual expression and convert that to TupleDomain. // The predicate here is used by readers for predicate push down at reader level, // so when we do not use residual expression, we are just wasting CPU cycles diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/delete/DeleteFile.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/delete/DeleteFile.java index 16b4943000651..691d073f1a90a 100644 --- a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/delete/DeleteFile.java +++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/delete/DeleteFile.java @@ -42,6 +42,7 @@ public final class DeleteFile private final List equalityFieldIds; private final Map lowerBounds; private final Map upperBounds; + private final long dataSequenceNumber; public static DeleteFile fromIceberg(org.apache.iceberg.DeleteFile deleteFile) { @@ -50,6 +51,8 @@ public static DeleteFile fromIceberg(org.apache.iceberg.DeleteFile deleteFile) Map upperBounds = firstNonNull(deleteFile.upperBounds(), ImmutableMap.of()) .entrySet().stream().collect(toImmutableMap(Map.Entry::getKey, entry -> entry.getValue().array().clone())); + long dataSequenceNumber = deleteFile.dataSequenceNumber() != null ? deleteFile.dataSequenceNumber() : 0L; + return new DeleteFile( fromIcebergFileContent(deleteFile.content()), deleteFile.path().toString(), @@ -58,7 +61,8 @@ public static DeleteFile fromIceberg(org.apache.iceberg.DeleteFile deleteFile) deleteFile.fileSizeInBytes(), Optional.ofNullable(deleteFile.equalityFieldIds()).orElseGet(ImmutableList::of), lowerBounds, - upperBounds); + upperBounds, + dataSequenceNumber); } @JsonCreator @@ -70,7 +74,8 @@ public DeleteFile( @JsonProperty("fileSizeInBytes") long fileSizeInBytes, @JsonProperty("equalityFieldIds") List equalityFieldIds, @JsonProperty("lowerBounds") Map lowerBounds, - @JsonProperty("upperBounds") Map upperBounds) + @JsonProperty("upperBounds") Map upperBounds, + @JsonProperty("dataSequenceNumber") long dataSequenceNumber) { this.content = requireNonNull(content, "content is null"); this.path = requireNonNull(path, "path is null"); @@ -80,6 +85,7 @@ public DeleteFile( this.equalityFieldIds = ImmutableList.copyOf(requireNonNull(equalityFieldIds, "equalityFieldIds is null")); this.lowerBounds = ImmutableMap.copyOf(requireNonNull(lowerBounds, "lowerBounds is null")); this.upperBounds = ImmutableMap.copyOf(requireNonNull(upperBounds, "upperBounds is null")); + this.dataSequenceNumber = dataSequenceNumber; } @JsonProperty @@ -130,12 +136,19 @@ public Map getUpperBounds() return upperBounds; } + @JsonProperty + public long getDataSequenceNumber() + { + return dataSequenceNumber; + } + @Override public String toString() { return toStringHelper(this) .addValue(path) .add("records", recordCount) + .add("dataSequenceNumber", dataSequenceNumber) .toString(); } } diff --git a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/TestIcebergV3.java b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/TestIcebergV3.java index fb28aee470d42..2b9a745bb0183 100644 --- a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/TestIcebergV3.java +++ b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/TestIcebergV3.java @@ -44,6 +44,7 @@ import static java.lang.String.format; import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertFalse; public class TestIcebergV3 extends AbstractTestQueryFramework @@ -279,10 +280,10 @@ public void testOptimizeOnV3Table() } @Test - public void testPuffinDeletionVectorsNotSupported() + public void testPuffinDeletionVectorsAccepted() throws Exception { - String tableName = "test_puffin_deletion_vectors_not_supported"; + String tableName = "test_puffin_deletion_vectors_accepted"; try { assertUpdate("CREATE TABLE " + tableName + " (id integer, value varchar) WITH (\"format-version\" = '3')"); assertUpdate("INSERT INTO " + tableName + " VALUES (1, 'one'), (2, 'two')", 2); @@ -309,7 +310,20 @@ public void testPuffinDeletionVectorsNotSupported() .commit(); } - assertQueryFails("SELECT * FROM " + tableName, "Iceberg deletion vectors.*PUFFIN.*not supported"); + // The PUFFIN delete file is now accepted by the split source (no longer + // throws NOT_SUPPORTED). The query will fail downstream because the fake + // .puffin file doesn't exist on disk, but the important thing is that the + // coordinator no longer rejects it at split enumeration time. + try { + computeActual("SELECT * FROM " + tableName); + } + catch (RuntimeException e) { + // Verify the error is NOT the old "PUFFIN not supported" rejection. + // Other failures (e.g., fake .puffin file not on disk) are acceptable. + assertFalse( + e.getMessage().contains("Iceberg deletion vectors") && e.getMessage().contains("not supported"), + "PUFFIN deletion vectors should be accepted, not rejected: " + e.getMessage()); + } } finally { dropTable(tableName); diff --git a/presto-native-execution/presto_cpp/main/connectors/IcebergPrestoToVeloxConnector.cpp b/presto-native-execution/presto_cpp/main/connectors/IcebergPrestoToVeloxConnector.cpp index 707739048c721..2f0317901bc52 100644 --- a/presto-native-execution/presto_cpp/main/connectors/IcebergPrestoToVeloxConnector.cpp +++ b/presto-native-execution/presto_cpp/main/connectors/IcebergPrestoToVeloxConnector.cpp @@ -30,6 +30,8 @@ velox::connector::hive::iceberg::FileContent toVeloxFileContent( return velox::connector::hive::iceberg::FileContent::kData; } else if (content == protocol::iceberg::FileContent::POSITION_DELETES) { return velox::connector::hive::iceberg::FileContent::kPositionalDeletes; + } else if (content == protocol::iceberg::FileContent::EQUALITY_DELETES) { + return velox::connector::hive::iceberg::FileContent::kEqualityDeletes; } VELOX_UNSUPPORTED("Unsupported file content: {}", fmt::underlying(content)); } @@ -40,6 +42,14 @@ velox::dwio::common::FileFormat toVeloxFileFormat( return velox::dwio::common::FileFormat::ORC; } else if (format == protocol::iceberg::FileFormat::PARQUET) { return velox::dwio::common::FileFormat::PARQUET; + } else if (format == protocol::iceberg::FileFormat::PUFFIN) { + // PUFFIN is used for Iceberg V3 deletion vectors. The DeletionVectorReader + // reads raw binary from the file and does not use the DWRF/Parquet reader, + // so we map PUFFIN to DWRF as a placeholder — the format value is not + // actually used by the reader. This mapping is only safe for deletion + // vector files; if PUFFIN is encountered for other file content types, + // the DV routing logic in toHiveIcebergSplit() must reclassify it first. + return velox::dwio::common::FileFormat::DWRF; } VELOX_UNSUPPORTED("Unsupported file format: {}", fmt::underlying(format)); } @@ -171,11 +181,14 @@ IcebergPrestoToVeloxConnector::toVeloxSplit( const protocol::ConnectorId& catalogId, const protocol::ConnectorSplit* connectorSplit, const protocol::SplitContext* splitContext) const { - auto icebergSplit = + const auto* icebergSplit = dynamic_cast(connectorSplit); VELOX_CHECK_NOT_NULL( icebergSplit, "Unexpected split type {}", connectorSplit->_type); + const int64_t dataSequenceNumber = + icebergSplit->dataSequenceNumber; // NOLINT(facebook-bugprone-unchecked-pointer-access) + std::unordered_map> partitionKeys; for (const auto& entry : icebergSplit->partitionKeys) { partitionKeys.emplace( @@ -191,28 +204,42 @@ IcebergPrestoToVeloxConnector::toVeloxSplit( std::vector deletes; deletes.reserve(icebergSplit->deletes.size()); for (const auto& deleteFile : icebergSplit->deletes) { - std::unordered_map lowerBounds( + const std::unordered_map lowerBounds( deleteFile.lowerBounds.begin(), deleteFile.lowerBounds.end()); - std::unordered_map upperBounds( + const std::unordered_map upperBounds( deleteFile.upperBounds.begin(), deleteFile.upperBounds.end()); - velox::connector::hive::iceberg::IcebergDeleteFile icebergDeleteFile( - toVeloxFileContent(deleteFile.content), + // Iceberg V3 deletion vectors arrive from the coordinator as + // POSITION_DELETES with PUFFIN format. Reclassify them as + // kDeletionVector so that IcebergSplitReader routes them to + // DeletionVectorReader instead of PositionalDeleteFileReader. + velox::connector::hive::iceberg::FileContent veloxContent = + toVeloxFileContent(deleteFile.content); + if (veloxContent == + velox::connector::hive::iceberg::FileContent::kPositionalDeletes && + deleteFile.format == protocol::iceberg::FileFormat::PUFFIN) { + veloxContent = + velox::connector::hive::iceberg::FileContent::kDeletionVector; + } + + const velox::connector::hive::iceberg::IcebergDeleteFile icebergDeleteFile( + veloxContent, deleteFile.path, toVeloxFileFormat(deleteFile.format), deleteFile.recordCount, deleteFile.fileSizeInBytes, std::vector(deleteFile.equalityFieldIds), lowerBounds, - upperBounds); + upperBounds, + deleteFile.dataSequenceNumber); deletes.emplace_back(icebergDeleteFile); } + std::unordered_map infoColumns = { - {"$data_sequence_number", - std::to_string(icebergSplit->dataSequenceNumber)}, + {"$data_sequence_number", std::to_string(dataSequenceNumber)}, {"$path", icebergSplit->path}}; return std::make_unique( @@ -227,7 +254,9 @@ IcebergPrestoToVeloxConnector::toVeloxSplit( nullptr, splitContext->cacheable, deletes, - infoColumns); + infoColumns, + std::nullopt, + dataSequenceNumber); } std::unique_ptr diff --git a/presto-native-execution/presto_cpp/presto_protocol/connector/iceberg/presto_protocol_iceberg.cpp b/presto-native-execution/presto_cpp/presto_protocol/connector/iceberg/presto_protocol_iceberg.cpp index ec74e80c58192..0a5a82eaea408 100644 --- a/presto-native-execution/presto_cpp/presto_protocol/connector/iceberg/presto_protocol_iceberg.cpp +++ b/presto-native-execution/presto_cpp/presto_protocol/connector/iceberg/presto_protocol_iceberg.cpp @@ -306,7 +306,8 @@ static const std::pair FileFormat_enum_table[] = {FileFormat::ORC, "ORC"}, {FileFormat::PARQUET, "PARQUET"}, {FileFormat::AVRO, "AVRO"}, - {FileFormat::METADATA, "METADATA"}}; + {FileFormat::METADATA, "METADATA"}, + {FileFormat::PUFFIN, "PUFFIN"}}; void to_json(json& j, const FileFormat& e) { static_assert(std::is_enum::value, "FileFormat must be an enum!"); const auto* it = std::find_if( @@ -371,6 +372,13 @@ void to_json(json& j, const DeleteFile& p) { "DeleteFile", "Map", "upperBounds"); + to_json_key( + j, + "dataSequenceNumber", + p.dataSequenceNumber, + "DeleteFile", + "int64_t", + "dataSequenceNumber"); } void from_json(const json& j, DeleteFile& p) { @@ -408,6 +416,13 @@ void from_json(const json& j, DeleteFile& p) { "DeleteFile", "Map", "upperBounds"); + from_json_key( + j, + "dataSequenceNumber", + p.dataSequenceNumber, + "DeleteFile", + "int64_t", + "dataSequenceNumber"); } } // namespace facebook::presto::protocol::iceberg namespace facebook::presto::protocol::iceberg { diff --git a/presto-native-execution/presto_cpp/presto_protocol/connector/iceberg/presto_protocol_iceberg.h b/presto-native-execution/presto_cpp/presto_protocol/connector/iceberg/presto_protocol_iceberg.h index b09cd4903a5bf..6d1cfd204992c 100644 --- a/presto-native-execution/presto_cpp/presto_protocol/connector/iceberg/presto_protocol_iceberg.h +++ b/presto-native-execution/presto_cpp/presto_protocol/connector/iceberg/presto_protocol_iceberg.h @@ -78,12 +78,16 @@ void to_json(json& j, const ChangelogSplitInfo& p); void from_json(const json& j, ChangelogSplitInfo& p); } // namespace facebook::presto::protocol::iceberg namespace facebook::presto::protocol::iceberg { -enum class FileContent { DATA, POSITION_DELETES, EQUALITY_DELETES }; +enum class FileContent { + DATA, + POSITION_DELETES, + EQUALITY_DELETES, +}; extern void to_json(json& j, const FileContent& e); extern void from_json(const json& j, FileContent& e); } // namespace facebook::presto::protocol::iceberg namespace facebook::presto::protocol::iceberg { -enum class FileFormat { ORC, PARQUET, AVRO, METADATA }; +enum class FileFormat { ORC, PARQUET, AVRO, METADATA, PUFFIN }; extern void to_json(json& j, const FileFormat& e); extern void from_json(const json& j, FileFormat& e); } // namespace facebook::presto::protocol::iceberg @@ -97,6 +101,7 @@ struct DeleteFile { List equalityFieldIds = {}; Map lowerBounds = {}; Map upperBounds = {}; + int64_t dataSequenceNumber = {}; }; void to_json(json& j, const DeleteFile& p); void from_json(const json& j, DeleteFile& p);