Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.SortOrder;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableMetadata;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.Transaction;
import org.apache.iceberg.UpdatePartitionSpec;
Expand Down Expand Up @@ -188,6 +189,7 @@
import static com.facebook.presto.iceberg.IcebergTableType.CHANGELOG;
import static com.facebook.presto.iceberg.IcebergTableType.DATA;
import static com.facebook.presto.iceberg.IcebergTableType.EQUALITY_DELETES;
import static com.facebook.presto.iceberg.IcebergUtil.MAX_FORMAT_VERSION_FOR_ROW_LEVEL_OPERATIONS;
import static com.facebook.presto.iceberg.IcebergUtil.MIN_FORMAT_VERSION_FOR_DELETE;
import static com.facebook.presto.iceberg.IcebergUtil.getColumns;
import static com.facebook.presto.iceberg.IcebergUtil.getColumnsForWrite;
Expand Down Expand Up @@ -357,6 +359,43 @@ public Optional<IcebergProcedureContext> getProcedureContext()
return this.procedureContext;
}

/**
* Validates that an Iceberg table does not use unsupported v3 features.
* TODO: Remove when Iceberg v3 is fully supported
*/
protected static void validateTableForPresto(BaseTable table, Optional<Long> tableSnapshotId)
{
Snapshot snapshot = tableSnapshotId
.map(table::snapshot)
.orElse(table.currentSnapshot());
if (snapshot == null) {
// empty table, nothing to validate
return;
}

TableMetadata metadata = table.operations().current();
if (metadata.formatVersion() < 3) {
return;
}

Schema schema = metadata.schemasById().get(snapshot.schemaId());
if (schema == null) {
schema = metadata.schema();
}

// Reject schema default values (initial-default / write-default)
for (Types.NestedField field : schema.columns()) {
if (field.initialDefault() != null || field.writeDefault() != null) {
throw new PrestoException(NOT_SUPPORTED, "Iceberg v3 column default values are not supported");
}
}

// Reject Iceberg table encryption
if (!metadata.encryptionKeys().isEmpty() || snapshot.keyId() != null || metadata.properties().containsKey("encryption.key-id")) {
throw new PrestoException(NOT_SUPPORTED, "Iceberg table encryption is not supported");
}
}

/**
* This class implements the default implementation for getTableLayoutForConstraint which will be used in the case of a Java Worker
*/
Expand Down Expand Up @@ -836,10 +875,17 @@ public ConnectorMergeTableHandle beginMerge(ConnectorSession session, ConnectorT
Table icebergTable = getIcebergTable(session, icebergTableHandle.getSchemaTableName());
int formatVersion = ((BaseTable) icebergTable).operations().current().formatVersion();

if (formatVersion < MIN_FORMAT_VERSION_FOR_DELETE ||
!Optional.ofNullable(icebergTable.properties().get(TableProperties.UPDATE_MODE))
.map(mode -> mode.equals(MERGE_ON_READ.modeName()))
.orElse(false)) {
if (formatVersion < MIN_FORMAT_VERSION_FOR_DELETE) {
throw new PrestoException(ICEBERG_INVALID_FORMAT_VERSION,
"Iceberg table updates require at least format version 2 and update mode must be merge-on-read");
}
if (formatVersion > MAX_FORMAT_VERSION_FOR_ROW_LEVEL_OPERATIONS) {
throw new PrestoException(NOT_SUPPORTED,
format("Iceberg table updates for format version %s are not supported yet", formatVersion));
}
if (!Optional.ofNullable(icebergTable.properties().get(TableProperties.UPDATE_MODE))
.map(mode -> mode.equals(MERGE_ON_READ.modeName()))
.orElse(false)) {
throw new PrestoException(ICEBERG_INVALID_FORMAT_VERSION,
"Iceberg table updates require at least format version 2 and update mode must be merge-on-read");
}
Expand Down Expand Up @@ -1134,6 +1180,8 @@ public ConnectorInsertTableHandle beginInsert(ConnectorSession session, Connecto
verify(table.getIcebergTableName().getTableType() == DATA, "only the data table can have data inserted");
Table icebergTable = getIcebergTable(session, table.getSchemaTableName());
validateTableMode(session, icebergTable);
BaseTable baseTable = (BaseTable) icebergTable;
validateTableForPresto(baseTable, Optional.ofNullable(baseTable.currentSnapshot()).map(Snapshot::snapshotId));

return beginIcebergTableInsert(session, table, icebergTable);
}
Expand Down Expand Up @@ -1324,6 +1372,10 @@ public ConnectorDeleteTableHandle beginDelete(ConnectorSession session, Connecto
if (formatVersion < MIN_FORMAT_VERSION_FOR_DELETE) {
throw new PrestoException(NOT_SUPPORTED, format("This connector only supports delete where one or more partitions are deleted entirely for table versions older than %d", MIN_FORMAT_VERSION_FOR_DELETE));
}
if (formatVersion > MAX_FORMAT_VERSION_FOR_ROW_LEVEL_OPERATIONS) {
throw new PrestoException(NOT_SUPPORTED,
format("Iceberg table updates for format version %s are not supported yet", formatVersion));
}
if (getDeleteMode(icebergTable) == RowLevelOperationMode.COPY_ON_WRITE) {
throw new PrestoException(NOT_SUPPORTED, "This connector only supports delete where one or more partitions are deleted entirely. Configure write.delete.mode table property to allow row level deletions.");
}
Expand Down Expand Up @@ -1580,11 +1632,17 @@ public ConnectorTableHandle beginUpdate(ConnectorSession session, ConnectorTable
IcebergTableHandle handle = (IcebergTableHandle) tableHandle;
Table icebergTable = getIcebergTable(session, handle.getSchemaTableName());
int formatVersion = ((BaseTable) icebergTable).operations().current().formatVersion();
if (formatVersion < MIN_FORMAT_VERSION_FOR_DELETE ||
!Optional.ofNullable(icebergTable.properties().get(TableProperties.UPDATE_MODE))
.map(mode -> mode.equals(MERGE_ON_READ.modeName()))
.orElse(false)) {
throw new RuntimeException("Iceberg table updates require at least format version 2 and update mode must be merge-on-read");
if (formatVersion < MIN_FORMAT_VERSION_FOR_DELETE) {
throw new PrestoException(NOT_SUPPORTED, "Iceberg table updates require at least format version 2");
}
if (formatVersion > MAX_FORMAT_VERSION_FOR_ROW_LEVEL_OPERATIONS) {
throw new PrestoException(NOT_SUPPORTED,
format("Iceberg table updates for format version %s are not supported yet", formatVersion));
}
if (!Optional.ofNullable(icebergTable.properties().get(TableProperties.UPDATE_MODE))
.map(mode -> mode.equals(MERGE_ON_READ.modeName()))
.orElse(false)) {
throw new PrestoException(NOT_SUPPORTED, "Iceberg table updates require update mode to be merge-on-read");
}
validateTableMode(session, icebergTable);
transaction = icebergTable.newTransaction();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import com.facebook.presto.spi.ConnectorSession;
import com.facebook.presto.spi.ConnectorSplit;
import com.facebook.presto.spi.ConnectorSplitSource;
import com.facebook.presto.spi.PrestoException;
import com.facebook.presto.spi.SplitWeight;
import com.facebook.presto.spi.connector.ConnectorPartitionHandle;
import com.facebook.presto.spi.schedule.NodeSelectionStrategy;
Expand Down Expand Up @@ -46,6 +47,7 @@
import static com.facebook.presto.iceberg.IcebergUtil.getTargetSplitSize;
import static com.facebook.presto.iceberg.IcebergUtil.metadataColumnsMatchPredicates;
import static com.facebook.presto.iceberg.IcebergUtil.partitionDataFromStructLike;
import static com.facebook.presto.spi.StandardErrorCode.NOT_SUPPORTED;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static com.google.common.collect.Iterators.limit;
import static java.util.Objects.requireNonNull;
Expand Down Expand Up @@ -124,6 +126,13 @@ private ConnectorSplit toIcebergSplit(FileScanTask task)
PartitionSpec spec = task.spec();
Optional<PartitionData> partitionData = partitionDataFromStructLike(spec, task.file().partition());

// Validate no PUFFIN deletion vectors (Iceberg v3 feature not yet supported)
for (org.apache.iceberg.DeleteFile deleteFile : task.deletes()) {
if (deleteFile.format() == org.apache.iceberg.FileFormat.PUFFIN) {
throw new PrestoException(NOT_SUPPORTED, "Iceberg deletion vectors (PUFFIN format) are not supported");
}
}

// TODO: We should leverage residual expression and convert that to TupleDomain.
// The predicate here is used by readers for predicate push down at reader level,
// so when we do not use residual expression, we are just wasting CPU cycles
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,8 @@ public final class IcebergUtil
{
private static final Logger log = Logger.get(IcebergUtil.class);
public static final int MIN_FORMAT_VERSION_FOR_DELETE = 2;
public static final int MAX_FORMAT_VERSION_FOR_ROW_LEVEL_OPERATIONS = 2;
public static final int MAX_SUPPORTED_FORMAT_VERSION = 3;

public static final long DOUBLE_POSITIVE_ZERO = 0x0000000000000000L;
public static final long DOUBLE_POSITIVE_INFINITE = 0x7ff0000000000000L;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import io.airlift.slice.Slice;
import org.apache.iceberg.BaseTable;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DataFiles;
import org.apache.iceberg.DeleteFile;
Expand Down Expand Up @@ -69,6 +70,7 @@
import static com.facebook.presto.iceberg.ExpressionConverter.toIcebergExpression;
import static com.facebook.presto.iceberg.IcebergAbstractMetadata.getSupportedSortFields;
import static com.facebook.presto.iceberg.IcebergSessionProperties.getCompressionCodec;
import static com.facebook.presto.iceberg.IcebergUtil.MAX_FORMAT_VERSION_FOR_ROW_LEVEL_OPERATIONS;
import static com.facebook.presto.iceberg.IcebergUtil.getColumns;
import static com.facebook.presto.iceberg.IcebergUtil.getFileFormat;
import static com.facebook.presto.iceberg.PartitionSpecConverter.toPrestoPartitionSpec;
Expand Down Expand Up @@ -124,6 +126,16 @@ private ConnectorDistributedProcedureHandle beginCallDistributedProcedure(Connec
Table icebergTable = procedureContext.getTable();
IcebergTableHandle tableHandle = layoutHandle.getTable();

// Validate format version for OPTIMIZE operation
int formatVersion = ((BaseTable) icebergTable).operations().current().formatVersion();
if (formatVersion > MAX_FORMAT_VERSION_FOR_ROW_LEVEL_OPERATIONS) {
throw new PrestoException(NOT_SUPPORTED,
format("OPTIMIZE is not supported for Iceberg table format version > %d. Table %s format version is %s.",
MAX_FORMAT_VERSION_FOR_ROW_LEVEL_OPERATIONS,
icebergTable.name(),
formatVersion));
}

SortOrder sortOrder = icebergTable.sortOrder();
List<String> sortFieldStrings = ImmutableList.of();
if (sortOrderIndex.isPresent()) {
Expand Down
Loading
Loading