Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ public enum FileFormat
ORC("orc", true),
PARQUET("parquet", true),
AVRO("avro", true),
METADATA("metadata.json", false);
METADATA("metadata.json", false),
PUFFIN("puffin", false);

private final String ext;
private final boolean splittable;
Expand Down Expand Up @@ -61,6 +62,9 @@ public static FileFormat fromIcebergFileFormat(org.apache.iceberg.FileFormat for
case METADATA:
prestoFileFormat = METADATA;
break;
case PUFFIN:
prestoFileFormat = PUFFIN;
break;
default:
throw new PrestoException(NOT_SUPPORTED, "Unsupported file format: " + format);
}
Expand All @@ -81,6 +85,12 @@ public org.apache.iceberg.FileFormat toIceberg()
case AVRO:
fileFormat = org.apache.iceberg.FileFormat.AVRO;
break;
case METADATA:
fileFormat = org.apache.iceberg.FileFormat.METADATA;
break;
case PUFFIN:
fileFormat = org.apache.iceberg.FileFormat.PUFFIN;
break;
default:
throw new PrestoException(NOT_SUPPORTED, "Unsupported file format: " + this);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
import com.facebook.presto.spi.ConnectorSession;
import com.facebook.presto.spi.ConnectorSplit;
import com.facebook.presto.spi.ConnectorSplitSource;
import com.facebook.presto.spi.PrestoException;
import com.facebook.presto.spi.SplitWeight;
import com.facebook.presto.spi.connector.ConnectorPartitionHandle;
import com.facebook.presto.spi.schedule.NodeSelectionStrategy;
Expand Down Expand Up @@ -47,7 +46,6 @@
import static com.facebook.presto.iceberg.IcebergUtil.getTargetSplitSize;
import static com.facebook.presto.iceberg.IcebergUtil.metadataColumnsMatchPredicates;
import static com.facebook.presto.iceberg.IcebergUtil.partitionDataFromStructLike;
import static com.facebook.presto.spi.StandardErrorCode.NOT_SUPPORTED;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static com.google.common.collect.Iterators.limit;
import static java.util.Objects.requireNonNull;
Expand Down Expand Up @@ -126,13 +124,6 @@ private ConnectorSplit toIcebergSplit(FileScanTask task)
PartitionSpec spec = task.spec();
Optional<PartitionData> partitionData = partitionDataFromStructLike(spec, task.file().partition());

// Validate no PUFFIN deletion vectors (Iceberg v3 feature not yet supported)
for (org.apache.iceberg.DeleteFile deleteFile : task.deletes()) {
if (deleteFile.format() == org.apache.iceberg.FileFormat.PUFFIN) {
throw new PrestoException(NOT_SUPPORTED, "Iceberg deletion vectors (PUFFIN format) are not supported");
}
}

// TODO: We should leverage residual expression and convert that to TupleDomain.
// The predicate here is used by readers for predicate push down at reader level,
// so when we do not use residual expression, we are just wasting CPU cycles
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ public final class DeleteFile
private final List<Integer> equalityFieldIds;
private final Map<Integer, byte[]> lowerBounds;
private final Map<Integer, byte[]> upperBounds;
private final long dataSequenceNumber;

public static DeleteFile fromIceberg(org.apache.iceberg.DeleteFile deleteFile)
{
Expand All @@ -50,6 +51,8 @@ public static DeleteFile fromIceberg(org.apache.iceberg.DeleteFile deleteFile)
Map<Integer, byte[]> upperBounds = firstNonNull(deleteFile.upperBounds(), ImmutableMap.<Integer, ByteBuffer>of())
.entrySet().stream().collect(toImmutableMap(Map.Entry::getKey, entry -> entry.getValue().array().clone()));

long dataSequenceNumber = deleteFile.dataSequenceNumber() != null ? deleteFile.dataSequenceNumber() : 0L;

return new DeleteFile(
fromIcebergFileContent(deleteFile.content()),
deleteFile.path().toString(),
Expand All @@ -58,7 +61,8 @@ public static DeleteFile fromIceberg(org.apache.iceberg.DeleteFile deleteFile)
deleteFile.fileSizeInBytes(),
Optional.ofNullable(deleteFile.equalityFieldIds()).orElseGet(ImmutableList::of),
lowerBounds,
upperBounds);
upperBounds,
dataSequenceNumber);
}

@JsonCreator
Expand All @@ -70,7 +74,8 @@ public DeleteFile(
@JsonProperty("fileSizeInBytes") long fileSizeInBytes,
@JsonProperty("equalityFieldIds") List<Integer> equalityFieldIds,
@JsonProperty("lowerBounds") Map<Integer, byte[]> lowerBounds,
@JsonProperty("upperBounds") Map<Integer, byte[]> upperBounds)
@JsonProperty("upperBounds") Map<Integer, byte[]> upperBounds,
@JsonProperty("dataSequenceNumber") long dataSequenceNumber)
{
this.content = requireNonNull(content, "content is null");
this.path = requireNonNull(path, "path is null");
Expand All @@ -80,6 +85,7 @@ public DeleteFile(
this.equalityFieldIds = ImmutableList.copyOf(requireNonNull(equalityFieldIds, "equalityFieldIds is null"));
this.lowerBounds = ImmutableMap.copyOf(requireNonNull(lowerBounds, "lowerBounds is null"));
this.upperBounds = ImmutableMap.copyOf(requireNonNull(upperBounds, "upperBounds is null"));
this.dataSequenceNumber = dataSequenceNumber;
}

@JsonProperty
Expand Down Expand Up @@ -130,12 +136,19 @@ public Map<Integer, byte[]> getUpperBounds()
return upperBounds;
}

@JsonProperty
public long getDataSequenceNumber()
{
return dataSequenceNumber;
}

@Override
public String toString()
{
return toStringHelper(this)
.addValue(path)
.add("records", recordCount)
.add("dataSequenceNumber", dataSequenceNumber)
.toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import static java.lang.String.format;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;

public class TestIcebergV3
extends AbstractTestQueryFramework
Expand Down Expand Up @@ -279,10 +280,10 @@ public void testOptimizeOnV3Table()
}

@Test
public void testPuffinDeletionVectorsNotSupported()
public void testPuffinDeletionVectorsAccepted()
throws Exception
{
String tableName = "test_puffin_deletion_vectors_not_supported";
String tableName = "test_puffin_deletion_vectors_accepted";
try {
assertUpdate("CREATE TABLE " + tableName + " (id integer, value varchar) WITH (\"format-version\" = '3')");
assertUpdate("INSERT INTO " + tableName + " VALUES (1, 'one'), (2, 'two')", 2);
Expand All @@ -309,7 +310,20 @@ public void testPuffinDeletionVectorsNotSupported()
.commit();
}

assertQueryFails("SELECT * FROM " + tableName, "Iceberg deletion vectors.*PUFFIN.*not supported");
// The PUFFIN delete file is now accepted by the split source (no longer
// throws NOT_SUPPORTED). The query will fail downstream because the fake
// .puffin file doesn't exist on disk, but the important thing is that the
// coordinator no longer rejects it at split enumeration time.
try {
computeActual("SELECT * FROM " + tableName);
}
catch (RuntimeException e) {
// Verify the error is NOT the old "PUFFIN not supported" rejection.
// Other failures (e.g., fake .puffin file not on disk) are acceptable.
assertFalse(
e.getMessage().contains("Iceberg deletion vectors") && e.getMessage().contains("not supported"),
"PUFFIN deletion vectors should be accepted, not rejected: " + e.getMessage());
}
Comment on lines +317 to +326
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion: Defensive handling of a potential null exception message to avoid NPEs in the test itself.

This assertion calls e.getMessage().contains(...) twice; if the exception message is null, the test will throw NullPointerException instead of cleanly asserting on PUFFIN support. You can defensively handle this by normalizing the message first, e.g.

tString message = String.valueOf(e.getMessage());
assertFalse(
        message.contains("Iceberg deletion vectors") && message.contains("not supported"),
        "PUFFIN deletion vectors should be accepted, not rejected: " + message);

so the test remains stable even when the exception message is null.

Suggested change
try {
computeActual("SELECT * FROM " + tableName);
}
catch (RuntimeException e) {
// Verify the error is NOT the old "PUFFIN not supported" rejection.
// Other failures (e.g., fake .puffin file not on disk) are acceptable.
assertFalse(
e.getMessage().contains("Iceberg deletion vectors") && e.getMessage().contains("not supported"),
"PUFFIN deletion vectors should be accepted, not rejected: " + e.getMessage());
}
try {
computeActual("SELECT * FROM " + tableName);
}
catch (RuntimeException e) {
// Verify the error is NOT the old "PUFFIN not supported" rejection.
// Other failures (e.g., fake .puffin file not on disk) are acceptable.
String message = String.valueOf(e.getMessage());
assertFalse(
message.contains("Iceberg deletion vectors") && message.contains("not supported"),
"PUFFIN deletion vectors should be accepted, not rejected: " + message);
}

}
finally {
dropTable(tableName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ velox::connector::hive::iceberg::FileContent toVeloxFileContent(
return velox::connector::hive::iceberg::FileContent::kData;
} else if (content == protocol::iceberg::FileContent::POSITION_DELETES) {
return velox::connector::hive::iceberg::FileContent::kPositionalDeletes;
} else if (content == protocol::iceberg::FileContent::EQUALITY_DELETES) {
return velox::connector::hive::iceberg::FileContent::kEqualityDeletes;
}
VELOX_UNSUPPORTED("Unsupported file content: {}", fmt::underlying(content));
}
Expand All @@ -40,6 +42,14 @@ velox::dwio::common::FileFormat toVeloxFileFormat(
return velox::dwio::common::FileFormat::ORC;
} else if (format == protocol::iceberg::FileFormat::PARQUET) {
return velox::dwio::common::FileFormat::PARQUET;
} else if (format == protocol::iceberg::FileFormat::PUFFIN) {
// PUFFIN is used for Iceberg V3 deletion vectors. The DeletionVectorReader
// reads raw binary from the file and does not use the DWRF/Parquet reader,
// so we map PUFFIN to DWRF as a placeholder — the format value is not
// actually used by the reader. This mapping is only safe for deletion
// vector files; if PUFFIN is encountered for other file content types,
// the DV routing logic in toHiveIcebergSplit() must reclassify it first.
return velox::dwio::common::FileFormat::DWRF;
}
VELOX_UNSUPPORTED("Unsupported file format: {}", fmt::underlying(format));
}
Expand Down Expand Up @@ -171,11 +181,14 @@ IcebergPrestoToVeloxConnector::toVeloxSplit(
const protocol::ConnectorId& catalogId,
const protocol::ConnectorSplit* connectorSplit,
const protocol::SplitContext* splitContext) const {
auto icebergSplit =
const auto* icebergSplit =
dynamic_cast<const protocol::iceberg::IcebergSplit*>(connectorSplit);
VELOX_CHECK_NOT_NULL(
icebergSplit, "Unexpected split type {}", connectorSplit->_type);

const int64_t dataSequenceNumber =
icebergSplit->dataSequenceNumber; // NOLINT(facebook-bugprone-unchecked-pointer-access)

std::unordered_map<std::string, std::optional<std::string>> partitionKeys;
for (const auto& entry : icebergSplit->partitionKeys) {
partitionKeys.emplace(
Expand All @@ -191,28 +204,42 @@ IcebergPrestoToVeloxConnector::toVeloxSplit(
std::vector<velox::connector::hive::iceberg::IcebergDeleteFile> deletes;
deletes.reserve(icebergSplit->deletes.size());
for (const auto& deleteFile : icebergSplit->deletes) {
std::unordered_map<int32_t, std::string> lowerBounds(
const std::unordered_map<int32_t, std::string> lowerBounds(
deleteFile.lowerBounds.begin(), deleteFile.lowerBounds.end());

std::unordered_map<int32_t, std::string> upperBounds(
const std::unordered_map<int32_t, std::string> upperBounds(
deleteFile.upperBounds.begin(), deleteFile.upperBounds.end());

velox::connector::hive::iceberg::IcebergDeleteFile icebergDeleteFile(
toVeloxFileContent(deleteFile.content),
// Iceberg V3 deletion vectors arrive from the coordinator as
// POSITION_DELETES with PUFFIN format. Reclassify them as
// kDeletionVector so that IcebergSplitReader routes them to
// DeletionVectorReader instead of PositionalDeleteFileReader.
velox::connector::hive::iceberg::FileContent veloxContent =
toVeloxFileContent(deleteFile.content);
if (veloxContent ==
velox::connector::hive::iceberg::FileContent::kPositionalDeletes &&
deleteFile.format == protocol::iceberg::FileFormat::PUFFIN) {
veloxContent =
velox::connector::hive::iceberg::FileContent::kDeletionVector;
}

const velox::connector::hive::iceberg::IcebergDeleteFile icebergDeleteFile(
veloxContent,
deleteFile.path,
toVeloxFileFormat(deleteFile.format),
deleteFile.recordCount,
deleteFile.fileSizeInBytes,
std::vector(deleteFile.equalityFieldIds),
lowerBounds,
upperBounds);
upperBounds,
deleteFile.dataSequenceNumber);

deletes.emplace_back(icebergDeleteFile);
}


std::unordered_map<std::string, std::string> infoColumns = {
{"$data_sequence_number",
std::to_string(icebergSplit->dataSequenceNumber)},
{"$data_sequence_number", std::to_string(dataSequenceNumber)},
{"$path", icebergSplit->path}};

return std::make_unique<velox::connector::hive::iceberg::HiveIcebergSplit>(
Expand All @@ -227,7 +254,9 @@ IcebergPrestoToVeloxConnector::toVeloxSplit(
nullptr,
splitContext->cacheable,
deletes,
infoColumns);
infoColumns,
std::nullopt,
dataSequenceNumber);
}

std::unique_ptr<velox::connector::ColumnHandle>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -306,7 +306,8 @@ static const std::pair<FileFormat, json> FileFormat_enum_table[] =
{FileFormat::ORC, "ORC"},
{FileFormat::PARQUET, "PARQUET"},
{FileFormat::AVRO, "AVRO"},
{FileFormat::METADATA, "METADATA"}};
{FileFormat::METADATA, "METADATA"},
{FileFormat::PUFFIN, "PUFFIN"}};
void to_json(json& j, const FileFormat& e) {
static_assert(std::is_enum<FileFormat>::value, "FileFormat must be an enum!");
const auto* it = std::find_if(
Expand Down Expand Up @@ -371,6 +372,13 @@ void to_json(json& j, const DeleteFile& p) {
"DeleteFile",
"Map<Integer, String>",
"upperBounds");
to_json_key(
j,
"dataSequenceNumber",
p.dataSequenceNumber,
"DeleteFile",
"int64_t",
"dataSequenceNumber");
}

void from_json(const json& j, DeleteFile& p) {
Expand Down Expand Up @@ -408,6 +416,13 @@ void from_json(const json& j, DeleteFile& p) {
"DeleteFile",
"Map<Integer, String>",
"upperBounds");
from_json_key(
j,
"dataSequenceNumber",
p.dataSequenceNumber,
"DeleteFile",
"int64_t",
"dataSequenceNumber");
}
} // namespace facebook::presto::protocol::iceberg
namespace facebook::presto::protocol::iceberg {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,12 +78,16 @@ void to_json(json& j, const ChangelogSplitInfo& p);
void from_json(const json& j, ChangelogSplitInfo& p);
} // namespace facebook::presto::protocol::iceberg
namespace facebook::presto::protocol::iceberg {
enum class FileContent { DATA, POSITION_DELETES, EQUALITY_DELETES };
enum class FileContent {
DATA,
POSITION_DELETES,
EQUALITY_DELETES,
};
extern void to_json(json& j, const FileContent& e);
extern void from_json(const json& j, FileContent& e);
} // namespace facebook::presto::protocol::iceberg
namespace facebook::presto::protocol::iceberg {
enum class FileFormat { ORC, PARQUET, AVRO, METADATA };
enum class FileFormat { ORC, PARQUET, AVRO, METADATA, PUFFIN };
extern void to_json(json& j, const FileFormat& e);
extern void from_json(const json& j, FileFormat& e);
} // namespace facebook::presto::protocol::iceberg
Expand All @@ -97,6 +101,7 @@ struct DeleteFile {
List<Integer> equalityFieldIds = {};
Map<Integer, String> lowerBounds = {};
Map<Integer, String> upperBounds = {};
int64_t dataSequenceNumber = {};
};
void to_json(json& j, const DeleteFile& p);
void from_json(const json& j, DeleteFile& p);
Expand Down
Loading