Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
import com.facebook.airlift.log.Logger;
import com.facebook.presto.common.RuntimeStats;
import com.facebook.presto.common.Subfield;
import com.facebook.presto.common.predicate.Domain;
import com.facebook.presto.common.predicate.Range;
import com.facebook.presto.common.predicate.TupleDomain;
import com.facebook.presto.common.type.BigintType;
import com.facebook.presto.common.type.SqlTimestampWithTimeZone;
Expand Down Expand Up @@ -139,12 +141,15 @@
import static com.facebook.presto.iceberg.IcebergColumnHandle.IS_DELETED_COLUMN_METADATA;
import static com.facebook.presto.iceberg.IcebergColumnHandle.PATH_COLUMN_HANDLE;
import static com.facebook.presto.iceberg.IcebergColumnHandle.PATH_COLUMN_METADATA;
import static com.facebook.presto.iceberg.IcebergColumnHandle.SNAPSHOT_ID_COLUMN_HANDLE;
import static com.facebook.presto.iceberg.IcebergColumnHandle.SNAPSHOT_ID_COLUMN_METADATA;
import static com.facebook.presto.iceberg.IcebergErrorCode.ICEBERG_COMMIT_ERROR;
import static com.facebook.presto.iceberg.IcebergErrorCode.ICEBERG_INVALID_SNAPSHOT_ID;
import static com.facebook.presto.iceberg.IcebergMetadataColumn.DATA_SEQUENCE_NUMBER;
import static com.facebook.presto.iceberg.IcebergMetadataColumn.DELETE_FILE_PATH;
import static com.facebook.presto.iceberg.IcebergMetadataColumn.FILE_PATH;
import static com.facebook.presto.iceberg.IcebergMetadataColumn.IS_DELETED;
import static com.facebook.presto.iceberg.IcebergMetadataColumn.SNAPSHOT_ID;
import static com.facebook.presto.iceberg.IcebergMetadataColumn.UPDATE_ROW_DATA;
import static com.facebook.presto.iceberg.IcebergPartitionType.ALL;
import static com.facebook.presto.iceberg.IcebergSessionProperties.getCompressionCodec;
Expand Down Expand Up @@ -282,6 +287,42 @@ public ConnectorTableLayoutResult getTableLayoutForConstraint(

IcebergTableHandle handle = (IcebergTableHandle) table;
Table icebergTable = getIcebergTable(session, handle.getSchemaTableName());
IcebergTableName name = IcebergTableName.from(handle.getTableName());

Map<ColumnHandle, Domain> domains = constraint.getSummary().getDomains().orElse(Collections.emptyMap());
for (Map.Entry<ColumnHandle, Domain> entry : domains.entrySet()) {
IcebergColumnHandle column = (IcebergColumnHandle) entry.getKey();

if (column.getName().equalsIgnoreCase("$snapshot_id")) {
Domain domain = entry.getValue();

if (domain.isSingleValue()) {
Optional<Long> snapshotId = Optional.of(((Number) domain.getSingleValue()).longValue());
handle = handle.withUpdatedIcebergTableName(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if we are querying time travel tables? Here we will always overwrite the snapshotId.
Probably we should add some check for the snapshot in predicate and the snapshot specified in time travel.

new IcebergTableName(name.getTableName(), name.getTableType(), snapshotId, name.getChangelogEndSnapshot()));
}
else if (domain.getValues().getRanges().getOrderedRanges().size() == 1) {
Range range = domain.getValues().getRanges().getOrderedRanges().get(0);
if (range.isSingleValue()) {
Optional<Long> snapshotId = Optional.of(((Number) range.getSingleValue()).longValue());
handle = handle.withUpdatedIcebergTableName(
new IcebergTableName(name.getTableName(), name.getTableType(), snapshotId, name.getChangelogEndSnapshot()));
}
else if (!range.isLowUnbounded() && range.isLowInclusive() && range.isHighUnbounded()) {
// Only support >= X
Optional<Long> lower = Optional.of(((Number) range.getLowBoundedValue()).longValue());
handle = handle.withUpdatedIcebergTableName(
new IcebergTableName(name.getTableName(), name.getTableType(), lower, name.getChangelogEndSnapshot()));
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@tdcmeehan I wanted to confirm a point here -
Currently, I am updating IcebergTableHandle with the lower bound (X) here (for >= X), but shouldn't we use the latest available snapshot whose ID >= X when the query predicate is $snapshot_id >= X instead? So it ensures we always read using the most recent snapshot schema, which avoids issues that can occur if older snapshots have outdated or incompatible schemas.

}
else {
throw new PrestoException(NOT_SUPPORTED, "Unsupported predicate for $snapshot_id; only >= constant is allowed");
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we change the message to >= and = since both of them are supported.

}
}
else {
throw new PrestoException(NOT_SUPPORTED, "Unsupported complex predicate for $snapshot_id; only >= constant is allowed");
}
}
}

List<IcebergColumnHandle> partitionColumns = getPartitionKeyColumnHandles(handle, icebergTable, typeManager);
TupleDomain<ColumnHandle> partitionColumnPredicate = TupleDomain.withColumnDomains(Maps.filterKeys(constraint.getSummary().getDomains().get(), Predicates.in(partitionColumns)));
Expand Down Expand Up @@ -444,6 +485,7 @@ protected ConnectorTableMetadata getTableOrViewMetadata(ConnectorSession session
columns.add(DATA_SEQUENCE_NUMBER_COLUMN_METADATA);
columns.add(IS_DELETED_COLUMN_METADATA);
columns.add(DELETE_FILE_PATH_COLUMN_METADATA);
columns.add(SNAPSHOT_ID_COLUMN_METADATA);
}
return new ConnectorTableMetadata(table, columns.build(), createMetadataProperties(icebergTable, session), getTableComment(icebergTable));
}
Expand Down Expand Up @@ -955,6 +997,7 @@ public Map<String, ColumnHandle> getColumnHandles(ConnectorSession session, Conn
columnHandles.put(DATA_SEQUENCE_NUMBER.getColumnName(), DATA_SEQUENCE_NUMBER_COLUMN_HANDLE);
columnHandles.put(IS_DELETED.getColumnName(), IS_DELETED_COLUMN_HANDLE);
columnHandles.put(DELETE_FILE_PATH.getColumnName(), DELETE_FILE_PATH_COLUMN_HANDLE);
columnHandles.put(SNAPSHOT_ID.getColumnName(), SNAPSHOT_ID_COLUMN_HANDLE);
}
return columnHandles.build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import static com.facebook.presto.iceberg.IcebergMetadataColumn.DELETE_FILE_PATH;
import static com.facebook.presto.iceberg.IcebergMetadataColumn.FILE_PATH;
import static com.facebook.presto.iceberg.IcebergMetadataColumn.IS_DELETED;
import static com.facebook.presto.iceberg.IcebergMetadataColumn.SNAPSHOT_ID;
import static com.facebook.presto.iceberg.IcebergMetadataColumn.UPDATE_ROW_DATA;
import static com.facebook.presto.iceberg.TypeConverter.toPrestoType;
import static com.google.common.base.Preconditions.checkArgument;
Expand All @@ -56,6 +57,8 @@ public class IcebergColumnHandle
public static final ColumnMetadata IS_DELETED_COLUMN_METADATA = getColumnMetadata(IS_DELETED);
public static final IcebergColumnHandle DELETE_FILE_PATH_COLUMN_HANDLE = getIcebergColumnHandle(DELETE_FILE_PATH);
public static final ColumnMetadata DELETE_FILE_PATH_COLUMN_METADATA = getColumnMetadata(DELETE_FILE_PATH);
public static final IcebergColumnHandle SNAPSHOT_ID_COLUMN_HANDLE = getIcebergColumnHandle(SNAPSHOT_ID);
public static final ColumnMetadata SNAPSHOT_ID_COLUMN_METADATA = getColumnMetadata(SNAPSHOT_ID);

private final ColumnIdentity columnIdentity;
private final Type type;
Expand Down Expand Up @@ -186,6 +189,11 @@ public boolean isDataSequenceNumberColumn()
return getColumnIdentity().getId() == DATA_SEQUENCE_NUMBER.getId();
}

public boolean isSnapshotId()
{
return getColumnIdentity().getId() == SNAPSHOT_ID.getId();
}

public boolean isDeletedColumn()
{
return getColumnIdentity().getId() == IS_DELETED.getId();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ public enum IcebergMetadataColumn
{
FILE_PATH(MetadataColumns.FILE_PATH.fieldId(), "$path", VARCHAR, PRIMITIVE),
DATA_SEQUENCE_NUMBER(Integer.MAX_VALUE - 1001, "$data_sequence_number", BIGINT, PRIMITIVE),
SNAPSHOT_ID(Integer.MAX_VALUE - 1002, "$snapshot_id", BIGINT, PRIMITIVE),
IS_DELETED(MetadataColumns.IS_DELETED.fieldId(), "$deleted", BOOLEAN, PRIMITIVE),
DELETE_FILE_PATH(MetadataColumns.DELETE_FILE_PATH.fieldId(), "$delete_file_path", VARCHAR, PRIMITIVE),
/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -807,6 +807,9 @@ public ConnectorPageSource createPageSource(
else if (icebergColumn.isDataSequenceNumberColumn()) {
metadataValues.put(icebergColumn.getColumnIdentity().getId(), split.getDataSequenceNumber());
}
else if (icebergColumn.isSnapshotId()) {
metadataValues.put(icebergColumn.getColumnIdentity().getId(), split.getSnapshotId());
}
}

List<IcebergColumnHandle> delegateColumns = columnsToReadFromStorage.stream().collect(toImmutableList());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ public class IcebergSplit
private final long dataSequenceNumber;
private final long affinitySchedulingFileSectionSize;
private final long affinitySchedulingFileSectionIndex;
private final long snapshotId;

@JsonCreator
public IcebergSplit(
Expand All @@ -69,7 +70,8 @@ public IcebergSplit(
@JsonProperty("deletes") List<DeleteFile> deletes,
@JsonProperty("changelogSplitInfo") Optional<ChangelogSplitInfo> changelogSplitInfo,
@JsonProperty("dataSequenceNumber") long dataSequenceNumber,
@JsonProperty("affinitySchedulingSectionSize") long affinitySchedulingFileSectionSize)
@JsonProperty("affinitySchedulingSectionSize") long affinitySchedulingFileSectionSize,
@JsonProperty("snapshotId") long snapshotId)
{
requireNonNull(nodeSelectionStrategy, "nodeSelectionStrategy is null");
this.path = requireNonNull(path, "path is null");
Expand All @@ -87,6 +89,7 @@ public IcebergSplit(
this.dataSequenceNumber = dataSequenceNumber;
this.affinitySchedulingFileSectionSize = affinitySchedulingFileSectionSize;
this.affinitySchedulingFileSectionIndex = start / affinitySchedulingFileSectionSize;
this.snapshotId = snapshotId;
}

@JsonProperty
Expand Down Expand Up @@ -184,6 +187,12 @@ public long getAffinitySchedulingFileSectionSize()
return affinitySchedulingFileSectionSize;
}

@JsonProperty
public long getSnapshotId()
{
return snapshotId;
}

@Override
public Object getInfo()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ public ConnectorSplitSource getSplits(
.metricsReporter(new RuntimeStatsMetricsReporter(session.getRuntimeStats()))
.fromSnapshotExclusive(fromSnapshot)
.toSnapshot(toSnapshot);
return new ChangelogSplitSource(session, typeManager, icebergTable, scan);
return new ChangelogSplitSource(session, typeManager, icebergTable, scan, toSnapshot);
}
else if (table.getIcebergTableName().getTableType() == EQUALITY_DELETES) {
CloseableIterable<DeleteFile> deleteFiles = IcebergUtil.getDeleteFiles(icebergTable,
Expand All @@ -103,7 +103,7 @@ else if (table.getIcebergTableName().getTableType() == EQUALITY_DELETES) {
table.getEqualityFieldIds(),
session.getRuntimeStats());

return new EqualityDeletesSplitSource(session, icebergTable, deleteFiles);
return new EqualityDeletesSplitSource(session, icebergTable, deleteFiles, table.getIcebergTableName().getSnapshotId().get());
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

getSnapshotId() returns Optional.
Need to check isPresent first before calling get.

}
else {
TableScan tableScan = icebergTable.newScan()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,8 @@ public class IcebergSplitSource
private final long targetSplitSize;
private final NodeSelectionStrategy nodeSelectionStrategy;
private final long affinitySchedulingFileSectionSize;

private final TupleDomain<IcebergColumnHandle> metadataColumnConstraints;
private final long snapshotId;

public IcebergSplitSource(
ConnectorSession session,
Expand All @@ -81,6 +81,7 @@ public IcebergSplitSource(
closer.register(tableScan.planFiles()),
targetSplitSize)
.iterator());
this.snapshotId = tableScan.snapshot().snapshotId();
}

@Override
Expand Down Expand Up @@ -143,6 +144,7 @@ private ConnectorSplit toIcebergSplit(FileScanTask task)
task.deletes().stream().map(DeleteFile::fromIceberg).collect(toImmutableList()),
Optional.empty(),
getDataSequenceNumber(task.file()),
affinitySchedulingFileSectionSize);
affinitySchedulingFileSectionSize,
snapshotId);
Comment on lines -146 to +148
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a bit concerned about the snapshotId selection here. It seems like we are using the table-level snapshotId taken when the entire table was scanned, but my understanding is that it should be the snapshotId calculated based on the corresponding data file and delete files, right?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@hantangwangd this is an important observation. Selecting this column would be more useful if it returned the snapshot ID of the data file, i.e. which snapshot ID created the file. However, this column is primarily intended for filtering, as a way of altering the table handle to force a time travel on the table without introducing a new SPI or connector optimizer. Given this column will be hidden and not intended for direct use, I am comfortable with this being the snapshot ID of the scan, as that fulfills the intended purpose.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@tdcmeehan thanks for the detailed explanation. Based on my understanding of PR #26164 and the comments here, the primary purpose of this $snapshot_id column is to enable predicate pushdown for the filter WHERE $snapshot_id > xxx which is used to query incremental data since a specified snapshot. Therefore column $snapshot_id should be disallowed to be specified directly in a query, and shouldn't exist in any filter node which couldn't be completely pushdown to Iceberg connector, is this correct?

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,21 @@ public IcebergTableHandle withUpdatedColumns(List<IcebergColumnHandle> updatedCo
updatedColumns);
}

public IcebergTableHandle withUpdatedIcebergTableName(IcebergTableName icebergTableName)
{
return new IcebergTableHandle(
getSchemaName(),
icebergTableName,
snapshotSpecified,
outputPath,
storageProperties,
tableSchemaJson,
partitionFieldIds,
equalityFieldIds,
sortOrder,
updatedColumns);
}

@Override
public boolean equals(Object o)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,12 +73,14 @@ public class ChangelogSplitSource
private final List<IcebergColumnHandle> columnHandles;
private final NodeSelectionStrategy nodeSelectionStrategy;
private final long affinitySchedulingSectionSize;
private final long snapshotId;

public ChangelogSplitSource(
ConnectorSession session,
TypeManager typeManager,
Table table,
IncrementalChangelogScan tableScan)
IncrementalChangelogScan tableScan,
long snapshotId)
{
requireNonNull(session, "session is null");
requireNonNull(typeManager, "typeManager is null");
Expand All @@ -89,6 +91,7 @@ public ChangelogSplitSource(
this.fileScanTaskIterable = closer.register(tableScan.planFiles());
this.fileScanTaskIterator = closer.register(fileScanTaskIterable.iterator());
this.affinitySchedulingSectionSize = getAffinitySchedulingFileSectionSize(session).toBytes();
this.snapshotId = snapshotId;
}

@Override
Expand Down Expand Up @@ -157,6 +160,7 @@ private IcebergSplit splitFromContentScanTask(ContentScanTask<DataFile> task, Ch
changeTask.commitSnapshotId(),
columnHandles)),
getDataSequenceNumber(task.file()),
affinitySchedulingSectionSize);
affinitySchedulingSectionSize,
snapshotId);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,18 +55,21 @@ public class EqualityDeletesSplitSource
private final Map<Integer, PartitionSpec> specById;
private final long affinitySchedulingSectionSize;
private CloseableIterator<DeleteFile> deleteFiles;
private final long snapshotId;

public EqualityDeletesSplitSource(
ConnectorSession session,
Table table,
CloseableIterable<DeleteFile> deleteFiles)
CloseableIterable<DeleteFile> deleteFiles,
long snapshotId)
{
this.session = requireNonNull(session, "session is null");
requireNonNull(table, "table is null");
requireNonNull(deleteFiles, "deleteFiles is null");
this.specById = table.specs();
this.deleteFiles = CloseableIterable.filter(deleteFiles, deleteFile -> fromIcebergFileContent(deleteFile.content()) == EQUALITY_DELETES).iterator();
this.affinitySchedulingSectionSize = getAffinitySchedulingFileSectionSize(session).toBytes();
this.snapshotId = snapshotId;
}

@Override
Expand Down Expand Up @@ -125,6 +128,7 @@ private IcebergSplit splitFromDeleteFile(DeleteFile deleteFile)
ImmutableList.of(),
Optional.empty(),
IcebergUtil.getDataSequenceNumber(deleteFile),
affinitySchedulingSectionSize);
affinitySchedulingSectionSize,
snapshotId);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2254,6 +2254,53 @@ public void testDeleteFilePathHiddenColumn()
});
}

@Test
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you also add a case where there are multiple snapshots?

public void testSnapshotIdHiddenColumnSimple()
{
String tableName = "test_snapshot_id_hidden_" + randomTableSuffix();

assertUpdate("DROP TABLE IF EXISTS " + tableName);

assertUpdate("CREATE TABLE " + tableName + " AS SELECT * FROM tpch.tiny.region WHERE regionkey=0", 1);
assertUpdate("INSERT INTO " + tableName + " SELECT * FROM tpch.tiny.region WHERE regionkey=1", 1);
Table icebergTable = loadTable(tableName);

assertEquals(
computeActual("SELECT COUNT(DISTINCT \"$snapshot_id\") FROM " + tableName).getOnlyValue(),
1L,
"Scan should return a single $snapshot_id");

Long snapshotIdFromQuery = (Long) computeActual("SELECT \"$snapshot_id\" FROM " + tableName + " LIMIT 1").getOnlyValue();
assertEquals(snapshotIdFromQuery, icebergTable.currentSnapshot().snapshotId());
}

@Test
public void testSnapshotIdPredicatePushdown()
{
String tableName = "test_snapshot_id_pred_pushdown_" + randomTableSuffix();

assertUpdate("DROP TABLE IF EXISTS " + tableName);

assertUpdate("CREATE TABLE " + tableName + "(id int, data varchar)");
assertUpdate("INSERT INTO " + tableName + " VALUES (1, 'a')", 1);

Long snapshotId = (Long) computeActual("SELECT \"$snapshot_id\" FROM " + tableName + " LIMIT 1").getOnlyValue();
loadTable(tableName).refresh();

// Single value predicate
assertQuery("SELECT COUNT(*) FROM " + tableName + " WHERE \"$snapshot_id\" = " + snapshotId, "VALUES 1");

// Range predicate >=
assertQuery("SELECT COUNT(*) FROM " + tableName + " WHERE \"$snapshot_id\" >= " + snapshotId, "VALUES 1");

// Unsupported predicate
assertQueryFails("SELECT * FROM " + tableName + " WHERE \"$snapshot_id\" < " + snapshotId,
"Unsupported predicate for \\$snapshot_id; only >= constant is allowed");

// BETWEEN same value
assertQuery("SELECT COUNT(*) FROM " + tableName + " WHERE \"$snapshot_id\" BETWEEN " + snapshotId + " AND " + snapshotId, "VALUES 1");
}

@Test(dataProvider = "equalityDeleteOptions")
public void testEqualityDeletesWithDeletedHiddenColumn(String fileFormat, boolean joinRewriteEnabled)
throws Exception
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1156,6 +1156,8 @@ void to_json(json& j, const IcebergSplit& p) {
"IcebergSplit",
"int64_t",
"affinitySchedulingSectionSize");
to_json_key(
j, "snapshotId", p.snapshotId, "IcebergSplit", "int64_t", "snapshotId");
}

void from_json(const json& j, IcebergSplit& p) {
Expand Down Expand Up @@ -1235,6 +1237,8 @@ void from_json(const json& j, IcebergSplit& p) {
"IcebergSplit",
"int64_t",
"affinitySchedulingSectionSize");
from_json_key(
j, "snapshotId", p.snapshotId, "IcebergSplit", "int64_t", "snapshotId");
}
} // namespace facebook::presto::protocol::iceberg
namespace facebook::presto::protocol::iceberg {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,7 @@ struct IcebergSplit : public ConnectorSplit {
std::shared_ptr<ChangelogSplitInfo> changelogSplitInfo = {};
int64_t dataSequenceNumber = {};
int64_t affinitySchedulingSectionSize = {};
int64_t snapshotId = {};

IcebergSplit() noexcept;
};
Expand Down
Loading