Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions presto-docs/src/main/sphinx/connector/iceberg.rst
Original file line number Diff line number Diff line change
Expand Up @@ -1240,6 +1240,18 @@ even if the data has changed or been deleted since then.
10 | united states | 1 | comment
(1 row)

.. code-block:: sql

// snapshot ID for second record using BEFORE clause to retrieve previous state
SELECT * FROM ctas_nation FOR SYSTEM_VERSION BEFORE 6891257133877048303;

.. code-block:: text

nationkey | name | regionkey | comment
-----------+---------------+-----------+---------
10 | united states | 1 | comment
(1 row)

In above example, SYSTEM_VERSION can be used as an alias for VERSION.

You can access the historical data of a table using FOR TIMESTAMP AS OF TIMESTAMP.
Expand Down Expand Up @@ -1279,6 +1291,18 @@ In the following query, the expression CURRENT_TIMESTAMP returns the current tim
30 | mexico | 3 | comment
(3 rows)

.. code-block:: sql

// In following query, timestamp string is matching with second inserted record.
// BEFORE clause returns first record which is less than timestamp of the second record.
SELECT * FROM ctas_nation FOR TIMESTAMP BEFORE TIMESTAMP '2023-10-17 13:29:46.822 America/Los_Angeles';

.. code-block:: text

nationkey | name | regionkey | comment
-----------+---------------+-----------+---------
10 | united states | 1 | comment
(1 row)

Type mapping
------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@
import com.facebook.presto.spi.connector.ConnectorMetadata;
import com.facebook.presto.spi.connector.ConnectorOutputMetadata;
import com.facebook.presto.spi.connector.ConnectorTableVersion;
import com.facebook.presto.spi.connector.ConnectorTableVersion.VersionOperator;
import com.facebook.presto.spi.connector.ConnectorTableVersion.VersionType;
import com.facebook.presto.spi.function.StandardFunctionResolution;
import com.facebook.presto.spi.relation.RowExpression;
import com.facebook.presto.spi.relation.RowExpressionService;
Expand Down Expand Up @@ -128,7 +130,7 @@
import static com.facebook.presto.iceberg.IcebergUtil.getPartitionKeyColumnHandles;
import static com.facebook.presto.iceberg.IcebergUtil.getPartitionSpecsIncludingValidData;
import static com.facebook.presto.iceberg.IcebergUtil.getPartitions;
import static com.facebook.presto.iceberg.IcebergUtil.getSnapshotIdAsOfTime;
import static com.facebook.presto.iceberg.IcebergUtil.getSnapshotIdTimeOperator;
import static com.facebook.presto.iceberg.IcebergUtil.getTableComment;
import static com.facebook.presto.iceberg.IcebergUtil.resolveSnapshotIdByName;
import static com.facebook.presto.iceberg.IcebergUtil.toHiveColumns;
Expand Down Expand Up @@ -946,20 +948,25 @@ private OptionalLong removeScanFiles(Table icebergTable, TupleDomain<IcebergColu

private static long getSnapshotIdForTableVersion(Table table, ConnectorTableVersion tableVersion)
{
if (tableVersion.getVersionType() == ConnectorTableVersion.VersionType.TIMESTAMP) {
if (tableVersion.getVersionType() == VersionType.TIMESTAMP) {
if (tableVersion.getVersionExpressionType() instanceof TimestampWithTimeZoneType) {
long millisUtc = new SqlTimestampWithTimeZone((long) tableVersion.getTableVersion()).getMillisUtc();
return getSnapshotIdAsOfTime(table, millisUtc);
return getSnapshotIdTimeOperator(table, millisUtc, tableVersion.getVersionOperator());
}
throw new PrestoException(NOT_SUPPORTED, "Unsupported table version expression type: " + tableVersion.getVersionExpressionType());
}
if (tableVersion.getVersionType() == ConnectorTableVersion.VersionType.VERSION) {
if (tableVersion.getVersionType() == VersionType.VERSION) {
if (tableVersion.getVersionExpressionType() instanceof BigintType) {
long snapshotId = (long) tableVersion.getTableVersion();
if (table.snapshot(snapshotId) == null) {
throw new PrestoException(ICEBERG_INVALID_SNAPSHOT_ID, "Iceberg snapshot ID does not exists: " + snapshotId);
}
return snapshotId;
if (tableVersion.getVersionOperator() == VersionOperator.EQUAL) { // AS OF Case
return snapshotId;
}
else { // BEFORE Case
return getSnapshotIdTimeOperator(table, table.snapshot(snapshotId).timestampMillis(), VersionOperator.LESS_THAN);
}
}
throw new PrestoException(NOT_SUPPORTED, "Unsupported table version expression type: " + tableVersion.getVersionExpressionType());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import com.facebook.presto.spi.SchemaTableName;
import com.facebook.presto.spi.TableNotFoundException;
import com.facebook.presto.spi.connector.ConnectorMetadata;
import com.facebook.presto.spi.connector.ConnectorTableVersion.VersionOperator;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
Expand Down Expand Up @@ -264,10 +265,10 @@ public static Optional<Long> resolveSnapshotIdByName(Table table, IcebergTableNa
return tryGetCurrentSnapshot(table).map(Snapshot::snapshotId);
}

public static long getSnapshotIdAsOfTime(Table table, long millisUtc)
public static long getSnapshotIdTimeOperator(Table table, long millisUtc, VersionOperator operator)
{
return table.history().stream()
.filter(logEntry -> logEntry.timestampMillis() <= millisUtc)
.filter(logEntry -> operator == VersionOperator.EQUAL ? logEntry.timestampMillis() <= millisUtc : logEntry.timestampMillis() < millisUtc)
.max(comparing(HistoryEntry::timestampMillis))
.orElseThrow(() -> new PrestoException(ICEBERG_INVALID_TABLE_TIMESTAMP, format("No history found based on timestamp for table %s", table.name())))
.snapshotId();
Expand Down
Loading