Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions presto-docs/src/main/sphinx/connector/iceberg.rst
Original file line number Diff line number Diff line change
Expand Up @@ -800,9 +800,9 @@ Details about the table snapshots. For more information see `Snapshots <https://

.. code-block:: text

committed_at | snapshot_id | parent_id | operation | manifest_list | summary
--------------------------------------+---------------------+-----------+-----------+----------------------------------------------------------------------------------------------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
2022-11-25 20:56:31.784 Asia/Kolkata | 7606232158543069775 | NULL | append | s3://my-bucket/ctas_nation/metadata/snap-7606232158543069775-1-395a2cad-b244-409b-b030-cc44949e5a4e.avro | {changed-partition-count=1, added-data-files=1, total-equality-deletes=0, added-records=25, total-position-deletes=0, added-files-size=1648, total-delete-files=0, total-files-size=1648, total-records=25, total-data-files=1}
committed_at | snapshot_id | snapshot_sequence_number | parent_id | operation | manifest_list | summary
--------------------------------------+---------------------+--------------------------+-----------+----------------------------------------------------------------------------------------------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
2022-11-25 20:56:31.784 Asia/Kolkata | 7606232158543069775 | 1 | NULL | append | s3://my-bucket/ctas_nation/metadata/snap-7606232158543069775-1-395a2cad-b244-409b-b030-cc44949e5a4e.avro | {changed-partition-count=1, added-data-files=1, total-equality-deletes=0, added-records=25, total-position-deletes=0, added-files-size=1648, total-delete-files=0, total-files-size=1648, total-records=25, total-data-files=1}

``$manifests`` Table
^^^^^^^^^^^^^^^^^^^^
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@
import org.apache.iceberg.RowLevelOperationMode;
import org.apache.iceberg.Schema;
import org.apache.iceberg.SchemaParser;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.SortOrder;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
Expand Down Expand Up @@ -150,13 +151,16 @@
import static com.facebook.presto.iceberg.IcebergColumnHandle.IS_DELETED_COLUMN_METADATA;
import static com.facebook.presto.iceberg.IcebergColumnHandle.PATH_COLUMN_HANDLE;
import static com.facebook.presto.iceberg.IcebergColumnHandle.PATH_COLUMN_METADATA;
import static com.facebook.presto.iceberg.IcebergColumnHandle.SNAPSHOT_SEQUENCE_NUMBER_COLUMN_HANDLE;
import static com.facebook.presto.iceberg.IcebergColumnHandle.SNAPSHOT_SEQUENCE_NUMBER_COLUMN_METADATA;
import static com.facebook.presto.iceberg.IcebergErrorCode.ICEBERG_COMMIT_ERROR;
import static com.facebook.presto.iceberg.IcebergErrorCode.ICEBERG_INVALID_MATERIALIZED_VIEW;
import static com.facebook.presto.iceberg.IcebergErrorCode.ICEBERG_INVALID_SNAPSHOT_ID;
import static com.facebook.presto.iceberg.IcebergMetadataColumn.DATA_SEQUENCE_NUMBER;
import static com.facebook.presto.iceberg.IcebergMetadataColumn.DELETE_FILE_PATH;
import static com.facebook.presto.iceberg.IcebergMetadataColumn.FILE_PATH;
import static com.facebook.presto.iceberg.IcebergMetadataColumn.IS_DELETED;
import static com.facebook.presto.iceberg.IcebergMetadataColumn.SNAPSHOT_SEQUENCE_NUMBER;
import static com.facebook.presto.iceberg.IcebergMetadataColumn.UPDATE_ROW_DATA;
import static com.facebook.presto.iceberg.IcebergPartitionType.ALL;
import static com.facebook.presto.iceberg.IcebergSessionProperties.getCompressionCodec;
Expand All @@ -183,7 +187,9 @@
import static com.facebook.presto.iceberg.IcebergUtil.getSortFields;
import static com.facebook.presto.iceberg.IcebergUtil.getTableComment;
import static com.facebook.presto.iceberg.IcebergUtil.getViewComment;
import static com.facebook.presto.iceberg.IcebergUtil.removeSnapshotSequenceDomain;
import static com.facebook.presto.iceberg.IcebergUtil.resolveSnapshotIdByName;
import static com.facebook.presto.iceberg.IcebergUtil.rewriteSnapshotSequencePredicate;
import static com.facebook.presto.iceberg.IcebergUtil.toHiveColumns;
import static com.facebook.presto.iceberg.IcebergUtil.tryGetLocation;
import static com.facebook.presto.iceberg.IcebergUtil.tryGetProperties;
Expand Down Expand Up @@ -351,9 +357,14 @@ public ConnectorTableLayoutResult getTableLayoutForConstraint(

IcebergTableHandle handle = (IcebergTableHandle) table;
Table icebergTable = getIcebergTable(session, handle.getSchemaTableName());
IcebergTableName name = IcebergTableName.from(handle.getTableName());
Snapshot latestSnapshot = icebergTable.currentSnapshot();

handle = rewriteSnapshotSequencePredicate(handle, icebergTable, name, constraint, latestSnapshot);
TupleDomain<ColumnHandle> newDomainPredicate = removeSnapshotSequenceDomain(constraint);

List<IcebergColumnHandle> partitionColumns = getPartitionKeyColumnHandles(handle, icebergTable, typeManager);
TupleDomain<ColumnHandle> partitionColumnPredicate = TupleDomain.withColumnDomains(Maps.filterKeys(constraint.getSummary().getDomains().orElse(ImmutableMap.of()), Predicates.in(partitionColumns)));
TupleDomain<ColumnHandle> partitionColumnPredicate = TupleDomain.withColumnDomains(Maps.filterKeys(newDomainPredicate.getDomains().orElse(ImmutableMap.of()), Predicates.in(partitionColumns)));
Optional<Set<IcebergColumnHandle>> requestedColumns = desiredColumns.map(columns -> columns.stream().map(column -> (IcebergColumnHandle) column).collect(toImmutableSet()));

List<HivePartition> partitions;
Expand All @@ -377,7 +388,7 @@ public ConnectorTableLayoutResult getTableLayoutForConstraint(
new IcebergTableLayoutHandle.Builder()
.setPartitionColumns(ImmutableList.copyOf(partitionColumns))
.setDataColumns(toHiveColumns(icebergTable.schema().columns()))
.setDomainPredicate(constraint.getSummary().simplify().transform(IcebergAbstractMetadata::toSubfield))
.setDomainPredicate(newDomainPredicate.simplify().transform(IcebergAbstractMetadata::toSubfield))
.setRemainingPredicate(TRUE_CONSTANT)
.setPredicateColumns(predicateColumns)
.setRequestedColumns(requestedColumns)
Expand All @@ -386,7 +397,7 @@ public ConnectorTableLayoutResult getTableLayoutForConstraint(
.setPartitions(Optional.ofNullable(partitions.size() == 0 ? null : partitions))
.setTable(handle)
.build());
return new ConnectorTableLayoutResult(layout, constraint.getSummary());
return new ConnectorTableLayoutResult(layout, newDomainPredicate);
}

public static Subfield toSubfield(ColumnHandle columnHandle)
Expand Down Expand Up @@ -513,6 +524,7 @@ protected ConnectorTableMetadata getTableOrViewMetadata(ConnectorSession session
columns.add(DATA_SEQUENCE_NUMBER_COLUMN_METADATA);
columns.add(IS_DELETED_COLUMN_METADATA);
columns.add(DELETE_FILE_PATH_COLUMN_METADATA);
columns.add(SNAPSHOT_SEQUENCE_NUMBER_COLUMN_METADATA);
}
return new ConnectorTableMetadata(table, columns.build(), createMetadataProperties(icebergTable, session), getTableComment(icebergTable));
}
Expand Down Expand Up @@ -1057,6 +1069,7 @@ public Map<String, ColumnHandle> getColumnHandles(ConnectorSession session, Conn
columnHandles.put(DATA_SEQUENCE_NUMBER.getColumnName(), DATA_SEQUENCE_NUMBER_COLUMN_HANDLE);
columnHandles.put(IS_DELETED.getColumnName(), IS_DELETED_COLUMN_HANDLE);
columnHandles.put(DELETE_FILE_PATH.getColumnName(), DELETE_FILE_PATH_COLUMN_HANDLE);
columnHandles.put(SNAPSHOT_SEQUENCE_NUMBER.getColumnName(), SNAPSHOT_SEQUENCE_NUMBER_COLUMN_HANDLE);
}
return columnHandles.build();
}
Expand Down Expand Up @@ -1093,7 +1106,7 @@ public IcebergTableHandle getTableHandle(ConnectorSession session, SchemaTableNa

return new IcebergTableHandle(
storageTableName.getSchemaName(),
new IcebergTableName(storageTableName.getTableName(), name.getTableType(), Optional.empty(), Optional.empty()),
new IcebergTableName(storageTableName.getTableName(), name.getTableType(), Optional.empty(), Optional.empty(), false),
name.getSnapshotId().isPresent(),
tryGetLocation(storageTable),
tryGetProperties(storageTable),
Expand Down Expand Up @@ -1126,7 +1139,7 @@ public IcebergTableHandle getTableHandle(ConnectorSession session, SchemaTableNa

return new IcebergTableHandle(
tableNameToLoad.getSchemaName(),
new IcebergTableName(tableNameToLoad.getTableName(), name.getTableType(), tableSnapshotId, name.getChangelogEndSnapshot()),
new IcebergTableName(tableNameToLoad.getTableName(), name.getTableType(), tableSnapshotId, name.getChangelogEndSnapshot(), false),
name.getSnapshotId().isPresent(),
tryGetLocation(table),
tryGetProperties(table),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import static com.facebook.presto.iceberg.IcebergMetadataColumn.DELETE_FILE_PATH;
import static com.facebook.presto.iceberg.IcebergMetadataColumn.FILE_PATH;
import static com.facebook.presto.iceberg.IcebergMetadataColumn.IS_DELETED;
import static com.facebook.presto.iceberg.IcebergMetadataColumn.SNAPSHOT_SEQUENCE_NUMBER;
import static com.facebook.presto.iceberg.IcebergMetadataColumn.UPDATE_ROW_DATA;
import static com.facebook.presto.iceberg.TypeConverter.toPrestoType;
import static com.google.common.base.Preconditions.checkArgument;
Expand All @@ -56,6 +57,8 @@ public class IcebergColumnHandle
public static final ColumnMetadata IS_DELETED_COLUMN_METADATA = getColumnMetadata(IS_DELETED);
public static final IcebergColumnHandle DELETE_FILE_PATH_COLUMN_HANDLE = getIcebergColumnHandle(DELETE_FILE_PATH);
public static final ColumnMetadata DELETE_FILE_PATH_COLUMN_METADATA = getColumnMetadata(DELETE_FILE_PATH);
public static final IcebergColumnHandle SNAPSHOT_SEQUENCE_NUMBER_COLUMN_HANDLE = getIcebergColumnHandle(SNAPSHOT_SEQUENCE_NUMBER);
public static final ColumnMetadata SNAPSHOT_SEQUENCE_NUMBER_COLUMN_METADATA = getColumnMetadata(SNAPSHOT_SEQUENCE_NUMBER);

private final ColumnIdentity columnIdentity;
private final Type type;
Expand Down Expand Up @@ -195,6 +198,10 @@ public boolean isDeleteFilePathColumn()
{
return getColumnIdentity().getId() == DELETE_FILE_PATH.getId();
}
public boolean isSnapshotSequenceNumberColumn()
{
return getColumnIdentity().getId() == SNAPSHOT_SEQUENCE_NUMBER.getId();
}

public static IcebergColumnHandle primitiveIcebergColumnHandle(int id, String name, Type type, Optional<String> comment)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -394,7 +394,7 @@ public ConnectorOutputTableHandle beginCreateTable(ConnectorSession session, Con

return new IcebergOutputTableHandle(
schemaName,
new IcebergTableName(tableName, DATA, Optional.empty(), Optional.empty()),
new IcebergTableName(tableName, DATA, Optional.empty(), Optional.empty(), false),
toPrestoSchema(metadata.schema(), typeManager),
toPrestoPartitionSpec(metadata.spec(), typeManager),
getColumnsForWrite(metadata.schema(), metadata.spec(), typeManager),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ public enum IcebergMetadataColumn
DATA_SEQUENCE_NUMBER(Integer.MAX_VALUE - 1001, "$data_sequence_number", BIGINT, PRIMITIVE),
IS_DELETED(MetadataColumns.IS_DELETED.fieldId(), "$deleted", BOOLEAN, PRIMITIVE),
DELETE_FILE_PATH(MetadataColumns.DELETE_FILE_PATH.fieldId(), "$delete_file_path", VARCHAR, PRIMITIVE),
SNAPSHOT_SEQUENCE_NUMBER(Integer.MAX_VALUE - 1002, "$snapshot_sequence_number", BIGINT, PRIMITIVE),
/**
* Iceberg reserved row ids begin at INTEGER.MAX_VALUE and count down. Starting with MIN_VALUE here to avoid conflicts.
* Inner type for row is not known until runtime.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -444,7 +444,7 @@ public ConnectorOutputTableHandle beginCreateTable(ConnectorSession session, Con

return new IcebergOutputTableHandle(
schemaName,
new IcebergTableName(tableName, DATA, Optional.empty(), Optional.empty()),
new IcebergTableName(tableName, DATA, Optional.empty(), Optional.empty(), false),
toPrestoSchema(icebergTable.schema(), typeManager),
toPrestoPartitionSpec(icebergTable.spec(), typeManager),
getColumnsForWrite(icebergTable.schema(), icebergTable.spec(), typeManager),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -744,6 +744,14 @@ public ConnectorPageSource createPageSource(
.map(IcebergColumnHandle.class::cast)
.collect(toImmutableList());

// Reject if $snapshot_sequence_number is actually projected (not just used in WHERE)
if (icebergColumns.stream().anyMatch(colum -> colum.isSnapshotSequenceNumberColumn()
&& !icebergLayout.getPredicateColumns().containsKey(colum.getName()))) {
throw new PrestoException(
Comment on lines +748 to +750
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue: The check for $snapshot_sequence_number projection may not handle case sensitivity.

If column names differ in case, this check may not detect all instances of $snapshot_sequence_number. Normalize column names or enforce case sensitivity to ensure the restriction is applied consistently.

NOT_SUPPORTED,
"The column $snapshot_sequence_number is internal and cannot be selected directly");
}

Optional<String> tableSchemaJson = table.getTableSchemaJson();
verify(tableSchemaJson.isPresent(), "tableSchemaJson is null");
Schema tableSchema = SchemaParser.fromJson(tableSchemaJson.get());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import com.google.common.collect.ImmutableList;
import jakarta.inject.Inject;
import org.apache.iceberg.DeleteFile;
import org.apache.iceberg.IncrementalAppendScan;
import org.apache.iceberg.IncrementalChangelogScan;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableScan;
Expand All @@ -41,6 +42,7 @@
import static com.facebook.presto.iceberg.ExpressionConverter.toIcebergExpression;
import static com.facebook.presto.iceberg.IcebergTableType.CHANGELOG;
import static com.facebook.presto.iceberg.IcebergTableType.EQUALITY_DELETES;
import static com.facebook.presto.iceberg.IcebergTableType.INCREMENTAL;
import static com.facebook.presto.iceberg.IcebergUtil.getIcebergTable;
import static com.facebook.presto.iceberg.IcebergUtil.getMetadataColumnConstraints;
import static com.facebook.presto.iceberg.IcebergUtil.getNonMetadataColumnConstraints;
Expand Down Expand Up @@ -75,8 +77,9 @@ public ConnectorSplitSource getSplits(
{
IcebergTableLayoutHandle layoutHandle = (IcebergTableLayoutHandle) layout;
IcebergTableHandle table = layoutHandle.getTable();
IcebergTableName icebergTableName = table.getIcebergTableName();

if (!table.getIcebergTableName().getSnapshotId().isPresent()) {
if (!icebergTableName.getSnapshotId().isPresent()) {
return new FixedSplitSource(ImmutableList.of());
}

Expand All @@ -85,20 +88,44 @@ public ConnectorSplitSource getSplits(

Table icebergTable = getIcebergTable(transactionManager.get(transaction), session, table.getSchemaTableName());

if (table.getIcebergTableName().getTableType() == CHANGELOG) {
if (icebergTableName.getTableType() == INCREMENTAL) {
long fromSnapshot = icebergTableName.getSnapshotId().orElseGet(() -> SnapshotUtil.oldestAncestor(icebergTable).snapshotId());
long toSnapshot = icebergTableName.getChangelogEndSnapshot()
.orElseGet(icebergTable.currentSnapshot()::snapshotId);

IncrementalAppendScan scan = icebergTable.newIncrementalAppendScan()
.metricsReporter(new RuntimeStatsMetricsReporter(session.getRuntimeStats()))
.filter(toIcebergExpression(predicate))
.planWith(executor);

if (icebergTableName.isFromInclusive()) {
scan = scan.fromSnapshotInclusive(fromSnapshot);
}
else {
scan = scan.fromSnapshotExclusive(fromSnapshot);
}
scan = scan.toSnapshot(toSnapshot);

return new IcebergSplitSource(
session,
scan,
getMetadataColumnConstraints(layoutHandle.getValidPredicate()));
}

else if (icebergTableName.getTableType() == CHANGELOG) {
// if the snapshot isn't specified, grab the oldest available version of the table
long fromSnapshot = table.getIcebergTableName().getSnapshotId().orElseGet(() -> SnapshotUtil.oldestAncestor(icebergTable).snapshotId());
long toSnapshot = table.getIcebergTableName().getChangelogEndSnapshot()
long fromSnapshot = icebergTableName.getSnapshotId().orElseGet(() -> SnapshotUtil.oldestAncestor(icebergTable).snapshotId());
long toSnapshot = icebergTableName.getChangelogEndSnapshot()
.orElseGet(icebergTable.currentSnapshot()::snapshotId);
IncrementalChangelogScan scan = icebergTable.newIncrementalChangelogScan()
.metricsReporter(new RuntimeStatsMetricsReporter(session.getRuntimeStats()))
.fromSnapshotExclusive(fromSnapshot)
.toSnapshot(toSnapshot);
return new ChangelogSplitSource(session, typeManager, icebergTable, scan);
}
else if (table.getIcebergTableName().getTableType() == EQUALITY_DELETES) {
else if (icebergTableName.getTableType() == EQUALITY_DELETES) {
CloseableIterable<DeleteFile> deleteFiles = IcebergUtil.getDeleteFiles(icebergTable,
table.getIcebergTableName().getSnapshotId().get(),
icebergTableName.getSnapshotId().get(),
predicate,
table.getPartitionSpecId(),
table.getEqualityFieldIds(),
Expand All @@ -110,7 +137,7 @@ else if (table.getIcebergTableName().getTableType() == EQUALITY_DELETES) {
TableScan tableScan = icebergTable.newScan()
.metricsReporter(new RuntimeStatsMetricsReporter(session.getRuntimeStats()))
.filter(toIcebergExpression(predicate))
.useSnapshot(table.getIcebergTableName().getSnapshotId().get())
.useSnapshot(icebergTableName.getSnapshotId().get())
.planWith(executor);

// TODO Use residual. Right now there is no way to propagate residual to presto but at least we can
Expand Down
Loading
Loading