Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 44 additions & 0 deletions docs/src/main/sphinx/connector/iceberg.rst
Original file line number Diff line number Diff line change
Expand Up @@ -1356,6 +1356,50 @@ The output of the query has the following columns:
- ``array(integer)``
- The set of field IDs used for equality comparison in equality delete files

``$refs`` table
^^^^^^^^^^^^^^^

The ``$refs`` table provides information about Iceberg references including branches and tags.

You can retrieve the references of the Iceberg table ``test_table`` by using the following query::

SELECT * FROM "test_table$refs"

.. code-block:: text

name | type | snapshot_id | max_reference_age_in_ms | min_snapshots_to_keep | max_snapshot_age_in_ms |
----------------+--------+-------------+-------------------------+-----------------------+------------------------+
example_tag | TAG | 10000000000 | 10000 | null | null |
example_branch | BRANCH | 20000000000 | 20000 | 2 | 30000 |

The output of the query has the following columns:

.. list-table:: Refs columns
:widths: 20, 30, 50
:header-rows: 1

* - Name
- Type
- Description
* - ``name``
- ``varchar``
- Name of the reference
* - ``type``
- ``varchar``
- Type of the reference, either ``BRANCH`` or ``TAG``
* - ``snapshot_id``
- ``bigint``
- The snapshot ID of the reference
* - ``max_reference_age_in_ms``
- ``bigint``
- The maximum age of the reference before it could be expired.
* - ``min_snapshots_to_keep``
- ``integer``
- For branch only, the minimum number of snapshots to keep in a branch.
* - ``max_snapshot_age_in_ms``
- ``bigint``
- For branch only, the max snapshot age allowed in a branch. Older snapshots in the branch will be expired.

.. _iceberg-materialized-views:

Materialized views
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -482,6 +482,7 @@ private Optional<SystemTable> getRawSystemTable(ConnectorSession session, Schema
case MANIFESTS -> Optional.of(new ManifestsTable(systemTableName, table, getCurrentSnapshotId(table)));
case FILES -> Optional.of(new FilesTable(systemTableName, typeManager, table, getCurrentSnapshotId(table)));
case PROPERTIES -> Optional.of(new PropertiesTable(systemTableName, table));
case REFS -> Optional.of(new RefsTable(systemTableName, table));
};
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.plugin.iceberg;

import com.google.common.collect.ImmutableList;
import io.trino.plugin.iceberg.util.PageListBuilder;
import io.trino.spi.Page;
import io.trino.spi.connector.ColumnMetadata;
import io.trino.spi.connector.ConnectorPageSource;
import io.trino.spi.connector.ConnectorSession;
import io.trino.spi.connector.ConnectorTableMetadata;
import io.trino.spi.connector.ConnectorTransactionHandle;
import io.trino.spi.connector.FixedPageSource;
import io.trino.spi.connector.SchemaTableName;
import io.trino.spi.connector.SystemTable;
import io.trino.spi.predicate.TupleDomain;
import org.apache.iceberg.Table;

import java.util.List;

import static io.trino.spi.type.BigintType.BIGINT;
import static io.trino.spi.type.IntegerType.INTEGER;
import static io.trino.spi.type.VarcharType.VARCHAR;
import static java.util.Objects.requireNonNull;

public class RefsTable
implements SystemTable
{
private final ConnectorTableMetadata tableMetadata;
private final Table icebergTable;

public RefsTable(SchemaTableName tableName, Table icebergTable)
{
this.icebergTable = requireNonNull(icebergTable, "icebergTable is null");

this.tableMetadata = new ConnectorTableMetadata(requireNonNull(tableName, "tableName is null"),
ImmutableList.<ColumnMetadata>builder()
.add(new ColumnMetadata("name", VARCHAR))
.add(new ColumnMetadata("type", VARCHAR))
.add(new ColumnMetadata("snapshot_id", BIGINT))
.add(new ColumnMetadata("max_reference_age_in_ms", BIGINT))
.add(new ColumnMetadata("min_snapshots_to_keep", INTEGER))
.add(new ColumnMetadata("max_snapshot_age_in_ms", BIGINT))
.build());
}

@Override
public Distribution getDistribution()
{
return Distribution.SINGLE_COORDINATOR;
}

@Override
public ConnectorTableMetadata getTableMetadata()
{
return tableMetadata;
}

@Override
public ConnectorPageSource pageSource(ConnectorTransactionHandle transactionHandle, ConnectorSession session, TupleDomain<Integer> constraint)
{
return new FixedPageSource(buildPages(tableMetadata, icebergTable));
}

private static List<Page> buildPages(ConnectorTableMetadata tableMetadata, Table icebergTable)
{
PageListBuilder pagesBuilder = PageListBuilder.forTable(tableMetadata);

icebergTable.refs().forEach((refName, ref) -> {
Comment thread
ebyhr marked this conversation as resolved.
Outdated
pagesBuilder.beginRow();
pagesBuilder.appendVarchar(refName);
pagesBuilder.appendVarchar(ref.isBranch() ? "BRANCH" : "TAG");
pagesBuilder.appendBigint(ref.snapshotId());
pagesBuilder.appendBigint(ref.maxRefAgeMs());
pagesBuilder.appendInteger(ref.minSnapshotsToKeep());
pagesBuilder.appendBigint(ref.maxSnapshotAgeMs());
pagesBuilder.endRow();
});

return pagesBuilder.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -86,30 +86,13 @@ private static List<Page> buildPages(ConnectorTableMetadata tableMetadata, Conne
pagesBuilder.beginRow();
pagesBuilder.appendTimestampTzMillis(snapshot.timestampMillis(), timeZoneKey);
pagesBuilder.appendBigint(snapshot.snapshotId());
if (checkNonNull(snapshot.parentId(), pagesBuilder)) {
pagesBuilder.appendBigint(snapshot.parentId());
}
if (checkNonNull(snapshot.operation(), pagesBuilder)) {
pagesBuilder.appendVarchar(snapshot.operation());
}
if (checkNonNull(snapshot.manifestListLocation(), pagesBuilder)) {
pagesBuilder.appendVarchar(snapshot.manifestListLocation());
}
if (checkNonNull(snapshot.summary(), pagesBuilder)) {
pagesBuilder.appendVarcharVarcharMap(snapshot.summary());
}
pagesBuilder.appendBigint(snapshot.parentId());
pagesBuilder.appendVarchar(snapshot.operation());
pagesBuilder.appendVarchar(snapshot.manifestListLocation());
pagesBuilder.appendVarcharVarcharMap(snapshot.summary());
pagesBuilder.endRow();
});

return pagesBuilder.build();
}

private static boolean checkNonNull(Object object, PageListBuilder pagesBuilder)
{
if (object == null) {
pagesBuilder.appendNull();
return false;
}
return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,5 @@ public enum TableType
PARTITIONS,
FILES,
PROPERTIES,
REFS
}
Original file line number Diff line number Diff line change
Expand Up @@ -94,11 +94,25 @@ public void appendInteger(int value)
INTEGER.writeLong(nextColumn(), value);
}

public void appendInteger(Integer value)
Comment thread
ebyhr marked this conversation as resolved.
Outdated
{
if (checkNonNull(value)) {
appendInteger(value.intValue());
}
}

public void appendBigint(long value)
{
BIGINT.writeLong(nextColumn(), value);
}

public void appendBigint(Long value)
{
if (checkNonNull(value)) {
appendBigint(value.longValue());
}
}

public void appendTimestampTzMillis(long millisUtc, TimeZoneKey timeZoneKey)
{
TIMESTAMP_TZ_MILLIS.writeLong(nextColumn(), packDateTimeWithZone(millisUtc, timeZoneKey));
Expand Down Expand Up @@ -190,4 +204,13 @@ public static PageListBuilder forTable(ConnectorTableMetadata table)
.map(ColumnMetadata::getType)
.collect(toImmutableList()));
}

private boolean checkNonNull(Object object)
{
if (object == null) {
appendNull();
return false;
}
return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import static io.trino.plugin.iceberg.TableType.MANIFESTS;
import static io.trino.plugin.iceberg.TableType.PARTITIONS;
import static io.trino.plugin.iceberg.TableType.PROPERTIES;
import static io.trino.plugin.iceberg.TableType.REFS;
import static io.trino.plugin.iceberg.TableType.SNAPSHOTS;
import static io.trino.testing.TestingSession.testSessionBuilder;
import static java.lang.String.format;
Expand Down Expand Up @@ -295,7 +296,7 @@ public void testSelectSystemTable()

// This test should get updated if a new system table is added.
assertThat(TableType.values())
.containsExactly(DATA, HISTORY, SNAPSHOTS, MANIFESTS, PARTITIONS, FILES, PROPERTIES);
.containsExactly(DATA, HISTORY, SNAPSHOTS, MANIFESTS, PARTITIONS, FILES, PROPERTIES, REFS);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -541,6 +541,49 @@ public void testStatsFilePruning()
}
}

@Test
public void testSnapshotReferenceSystemTable()
{
String tableName = "test_snapshot_reference_system_table_" + randomNameSuffix();
assertUpdate("CREATE TABLE " + tableName + " WITH (partitioning = ARRAY['regionkey']) AS SELECT * FROM tpch.tiny.nation", 25);
Table icebergTable = this.loadTable(tableName);
long snapshotId1 = icebergTable.currentSnapshot().snapshotId();
icebergTable.manageSnapshots()
.createTag("test-tag", snapshotId1)
.setMaxRefAgeMs("test-tag", 1)
.commit();

assertUpdate("INSERT INTO " + tableName + " SELECT * FROM tpch.tiny.nation LIMIT 5", 5);
icebergTable.refresh();
long snapshotId2 = icebergTable.currentSnapshot().snapshotId();
icebergTable.manageSnapshots()
.createBranch("test-branch", snapshotId2)
.setMaxSnapshotAgeMs("test-branch", 1)
.commit();

assertUpdate("INSERT INTO " + tableName + " SELECT * FROM tpch.tiny.nation LIMIT 5", 5);
icebergTable.refresh();
long snapshotId3 = icebergTable.currentSnapshot().snapshotId();
icebergTable.manageSnapshots()
.createBranch("test-branch2", snapshotId3)
.setMinSnapshotsToKeep("test-branch2", 1)
.commit();

assertQuery("SHOW COLUMNS FROM \"" + tableName + "$refs\"",
"VALUES ('name', 'varchar', '', '')," +
"('type', 'varchar', '', '')," +
"('snapshot_id', 'bigint', '', '')," +
"('max_reference_age_in_ms', 'bigint', '', '')," +
"('min_snapshots_to_keep', 'integer', '', '')," +
"('max_snapshot_age_in_ms', 'bigint', '', '')");

assertQuery("SELECT * FROM \"" + tableName + "$refs\"",
"VALUES ('test-tag', 'TAG', " + snapshotId1 + ", 1, null, null)," +
"('test-branch', 'BRANCH', " + snapshotId2 + ", null, null, 1)," +
"('test-branch2', 'BRANCH', " + snapshotId3 + ", null, 1, null)," +
"('main', 'BRANCH', " + snapshotId3 + ", null, null, null)");
}

private void writeEqualityDeleteToNationTable(Table icebergTable)
throws Exception
{
Expand Down