From 3d9316f3d8980f125b9af17fbabdb9c9c1539741 Mon Sep 17 00:00:00 2001 From: Jack Ye Date: Thu, 23 Feb 2023 11:09:26 -0800 Subject: [PATCH] Support Iceberg refs system table --- docs/src/main/sphinx/connector/iceberg.rst | 44 +++++++++ .../trino/plugin/iceberg/IcebergMetadata.java | 1 + .../io/trino/plugin/iceberg/RefsTable.java | 93 +++++++++++++++++++ .../trino/plugin/iceberg/SnapshotsTable.java | 25 +---- .../io/trino/plugin/iceberg/TableType.java | 1 + .../plugin/iceberg/util/PageListBuilder.java | 23 +++++ .../TestIcebergMetastoreAccessOperations.java | 3 +- .../trino/plugin/iceberg/TestIcebergV2.java | 43 +++++++++ 8 files changed, 211 insertions(+), 22 deletions(-) create mode 100644 plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/RefsTable.java diff --git a/docs/src/main/sphinx/connector/iceberg.rst b/docs/src/main/sphinx/connector/iceberg.rst index 6a095a721509..f4a19ac40c6e 100644 --- a/docs/src/main/sphinx/connector/iceberg.rst +++ b/docs/src/main/sphinx/connector/iceberg.rst @@ -1356,6 +1356,50 @@ The output of the query has the following columns: - ``array(integer)`` - The set of field IDs used for equality comparison in equality delete files +``$refs`` table +^^^^^^^^^^^^^^^ + +The ``$refs`` table provides information about Iceberg references including branches and tags. + +You can retrieve the references of the Iceberg table ``test_table`` by using the following query:: + + SELECT * FROM "test_table$refs" + +.. code-block:: text + + name | type | snapshot_id | max_reference_age_in_ms | min_snapshots_to_keep | max_snapshot_age_in_ms | + ----------------+--------+-------------+-------------------------+-----------------------+------------------------+ + example_tag | TAG | 10000000000 | 10000 | null | null | + example_branch | BRANCH | 20000000000 | 20000 | 2 | 30000 | + +The output of the query has the following columns: + +.. list-table:: Refs columns + :widths: 20, 30, 50 + :header-rows: 1 + + * - Name + - Type + - Description + * - ``name`` + - ``varchar`` + - Name of the reference + * - ``type`` + - ``varchar`` + - Type of the reference, either ``BRANCH`` or ``TAG`` + * - ``snapshot_id`` + - ``bigint`` + - The snapshot ID of the reference + * - ``max_reference_age_in_ms`` + - ``bigint`` + - The maximum age of the reference before it could be expired. + * - ``min_snapshots_to_keep`` + - ``integer`` + - For branch only, the minimum number of snapshots to keep in a branch. + * - ``max_snapshot_age_in_ms`` + - ``bigint`` + - For branch only, the max snapshot age allowed in a branch. Older snapshots in the branch will be expired. + .. _iceberg-materialized-views: Materialized views diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java index aba5468f82ea..dabfbb861f9c 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java @@ -482,6 +482,7 @@ private Optional getRawSystemTable(ConnectorSession session, Schema case MANIFESTS -> Optional.of(new ManifestsTable(systemTableName, table, getCurrentSnapshotId(table))); case FILES -> Optional.of(new FilesTable(systemTableName, typeManager, table, getCurrentSnapshotId(table))); case PROPERTIES -> Optional.of(new PropertiesTable(systemTableName, table)); + case REFS -> Optional.of(new RefsTable(systemTableName, table)); }; } diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/RefsTable.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/RefsTable.java new file mode 100644 index 000000000000..b08b6d3b7f7c --- /dev/null +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/RefsTable.java @@ -0,0 +1,93 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.iceberg; + +import com.google.common.collect.ImmutableList; +import io.trino.plugin.iceberg.util.PageListBuilder; +import io.trino.spi.Page; +import io.trino.spi.connector.ColumnMetadata; +import io.trino.spi.connector.ConnectorPageSource; +import io.trino.spi.connector.ConnectorSession; +import io.trino.spi.connector.ConnectorTableMetadata; +import io.trino.spi.connector.ConnectorTransactionHandle; +import io.trino.spi.connector.FixedPageSource; +import io.trino.spi.connector.SchemaTableName; +import io.trino.spi.connector.SystemTable; +import io.trino.spi.predicate.TupleDomain; +import org.apache.iceberg.Table; + +import java.util.List; + +import static io.trino.spi.type.BigintType.BIGINT; +import static io.trino.spi.type.IntegerType.INTEGER; +import static io.trino.spi.type.VarcharType.VARCHAR; +import static java.util.Objects.requireNonNull; + +public class RefsTable + implements SystemTable +{ + private final ConnectorTableMetadata tableMetadata; + private final Table icebergTable; + + public RefsTable(SchemaTableName tableName, Table icebergTable) + { + this.icebergTable = requireNonNull(icebergTable, "icebergTable is null"); + + this.tableMetadata = new ConnectorTableMetadata(requireNonNull(tableName, "tableName is null"), + ImmutableList.builder() + .add(new ColumnMetadata("name", VARCHAR)) + .add(new ColumnMetadata("type", VARCHAR)) + .add(new ColumnMetadata("snapshot_id", BIGINT)) + .add(new ColumnMetadata("max_reference_age_in_ms", BIGINT)) + .add(new ColumnMetadata("min_snapshots_to_keep", INTEGER)) + .add(new ColumnMetadata("max_snapshot_age_in_ms", BIGINT)) + .build()); + } + + @Override + public Distribution getDistribution() + { + return Distribution.SINGLE_COORDINATOR; + } + + @Override + public ConnectorTableMetadata getTableMetadata() + { + return tableMetadata; + } + + @Override + public ConnectorPageSource pageSource(ConnectorTransactionHandle transactionHandle, ConnectorSession session, TupleDomain constraint) + { + return new FixedPageSource(buildPages(tableMetadata, icebergTable)); + } + + private static List buildPages(ConnectorTableMetadata tableMetadata, Table icebergTable) + { + PageListBuilder pagesBuilder = PageListBuilder.forTable(tableMetadata); + + icebergTable.refs().forEach((refName, ref) -> { + pagesBuilder.beginRow(); + pagesBuilder.appendVarchar(refName); + pagesBuilder.appendVarchar(ref.isBranch() ? "BRANCH" : "TAG"); + pagesBuilder.appendBigint(ref.snapshotId()); + pagesBuilder.appendBigint(ref.maxRefAgeMs()); + pagesBuilder.appendInteger(ref.minSnapshotsToKeep()); + pagesBuilder.appendBigint(ref.maxSnapshotAgeMs()); + pagesBuilder.endRow(); + }); + + return pagesBuilder.build(); + } +} diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/SnapshotsTable.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/SnapshotsTable.java index 94754c63ecfa..bafcc19cfc64 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/SnapshotsTable.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/SnapshotsTable.java @@ -86,30 +86,13 @@ private static List buildPages(ConnectorTableMetadata tableMetadata, Conne pagesBuilder.beginRow(); pagesBuilder.appendTimestampTzMillis(snapshot.timestampMillis(), timeZoneKey); pagesBuilder.appendBigint(snapshot.snapshotId()); - if (checkNonNull(snapshot.parentId(), pagesBuilder)) { - pagesBuilder.appendBigint(snapshot.parentId()); - } - if (checkNonNull(snapshot.operation(), pagesBuilder)) { - pagesBuilder.appendVarchar(snapshot.operation()); - } - if (checkNonNull(snapshot.manifestListLocation(), pagesBuilder)) { - pagesBuilder.appendVarchar(snapshot.manifestListLocation()); - } - if (checkNonNull(snapshot.summary(), pagesBuilder)) { - pagesBuilder.appendVarcharVarcharMap(snapshot.summary()); - } + pagesBuilder.appendBigint(snapshot.parentId()); + pagesBuilder.appendVarchar(snapshot.operation()); + pagesBuilder.appendVarchar(snapshot.manifestListLocation()); + pagesBuilder.appendVarcharVarcharMap(snapshot.summary()); pagesBuilder.endRow(); }); return pagesBuilder.build(); } - - private static boolean checkNonNull(Object object, PageListBuilder pagesBuilder) - { - if (object == null) { - pagesBuilder.appendNull(); - return false; - } - return true; - } } diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/TableType.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/TableType.java index 72a52fabf0c2..7141f488d7da 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/TableType.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/TableType.java @@ -22,4 +22,5 @@ public enum TableType PARTITIONS, FILES, PROPERTIES, + REFS } diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/util/PageListBuilder.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/util/PageListBuilder.java index 5d04169980c1..90ec89715c8a 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/util/PageListBuilder.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/util/PageListBuilder.java @@ -94,11 +94,25 @@ public void appendInteger(int value) INTEGER.writeLong(nextColumn(), value); } + public void appendInteger(Integer value) + { + if (checkNonNull(value)) { + appendInteger(value.intValue()); + } + } + public void appendBigint(long value) { BIGINT.writeLong(nextColumn(), value); } + public void appendBigint(Long value) + { + if (checkNonNull(value)) { + appendBigint(value.longValue()); + } + } + public void appendTimestampTzMillis(long millisUtc, TimeZoneKey timeZoneKey) { TIMESTAMP_TZ_MILLIS.writeLong(nextColumn(), packDateTimeWithZone(millisUtc, timeZoneKey)); @@ -190,4 +204,13 @@ public static PageListBuilder forTable(ConnectorTableMetadata table) .map(ColumnMetadata::getType) .collect(toImmutableList())); } + + private boolean checkNonNull(Object object) + { + if (object == null) { + appendNull(); + return false; + } + return true; + } } diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergMetastoreAccessOperations.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergMetastoreAccessOperations.java index afda2c067913..87ab9cc63a58 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergMetastoreAccessOperations.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergMetastoreAccessOperations.java @@ -43,6 +43,7 @@ import static io.trino.plugin.iceberg.TableType.MANIFESTS; import static io.trino.plugin.iceberg.TableType.PARTITIONS; import static io.trino.plugin.iceberg.TableType.PROPERTIES; +import static io.trino.plugin.iceberg.TableType.REFS; import static io.trino.plugin.iceberg.TableType.SNAPSHOTS; import static io.trino.testing.TestingSession.testSessionBuilder; import static java.lang.String.format; @@ -295,7 +296,7 @@ public void testSelectSystemTable() // This test should get updated if a new system table is added. assertThat(TableType.values()) - .containsExactly(DATA, HISTORY, SNAPSHOTS, MANIFESTS, PARTITIONS, FILES, PROPERTIES); + .containsExactly(DATA, HISTORY, SNAPSHOTS, MANIFESTS, PARTITIONS, FILES, PROPERTIES, REFS); } @Test diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergV2.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergV2.java index 67283ca4ce8c..5a368c9362a6 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergV2.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergV2.java @@ -541,6 +541,49 @@ public void testStatsFilePruning() } } + @Test + public void testSnapshotReferenceSystemTable() + { + String tableName = "test_snapshot_reference_system_table_" + randomNameSuffix(); + assertUpdate("CREATE TABLE " + tableName + " WITH (partitioning = ARRAY['regionkey']) AS SELECT * FROM tpch.tiny.nation", 25); + Table icebergTable = this.loadTable(tableName); + long snapshotId1 = icebergTable.currentSnapshot().snapshotId(); + icebergTable.manageSnapshots() + .createTag("test-tag", snapshotId1) + .setMaxRefAgeMs("test-tag", 1) + .commit(); + + assertUpdate("INSERT INTO " + tableName + " SELECT * FROM tpch.tiny.nation LIMIT 5", 5); + icebergTable.refresh(); + long snapshotId2 = icebergTable.currentSnapshot().snapshotId(); + icebergTable.manageSnapshots() + .createBranch("test-branch", snapshotId2) + .setMaxSnapshotAgeMs("test-branch", 1) + .commit(); + + assertUpdate("INSERT INTO " + tableName + " SELECT * FROM tpch.tiny.nation LIMIT 5", 5); + icebergTable.refresh(); + long snapshotId3 = icebergTable.currentSnapshot().snapshotId(); + icebergTable.manageSnapshots() + .createBranch("test-branch2", snapshotId3) + .setMinSnapshotsToKeep("test-branch2", 1) + .commit(); + + assertQuery("SHOW COLUMNS FROM \"" + tableName + "$refs\"", + "VALUES ('name', 'varchar', '', '')," + + "('type', 'varchar', '', '')," + + "('snapshot_id', 'bigint', '', '')," + + "('max_reference_age_in_ms', 'bigint', '', '')," + + "('min_snapshots_to_keep', 'integer', '', '')," + + "('max_snapshot_age_in_ms', 'bigint', '', '')"); + + assertQuery("SELECT * FROM \"" + tableName + "$refs\"", + "VALUES ('test-tag', 'TAG', " + snapshotId1 + ", 1, null, null)," + + "('test-branch', 'BRANCH', " + snapshotId2 + ", null, null, 1)," + + "('test-branch2', 'BRANCH', " + snapshotId3 + ", null, 1, null)," + + "('main', 'BRANCH', " + snapshotId3 + ", null, null, null)"); + } + private void writeEqualityDeleteToNationTable(Table icebergTable) throws Exception {