Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions presto-docs/src/main/sphinx/connector/iceberg.rst
Original file line number Diff line number Diff line change
Expand Up @@ -706,6 +706,23 @@ example uses the earliest snapshot ID: ``2423571386296047175``
INSERT | 2 | 677209275408372885 | {orderkey=18016, custkey=403, orderstatus=O, totalprice=174070.99, orderdate=1996-03-19, orderpriority=1-URGENT, clerk=Clerk#000000629, shippriority=0, comment=ly. quickly ironic excuses are furiously. carefully ironic pack}
INSERT | 2 | 677209275408372885 | {orderkey=18017, custkey=958, orderstatus=F, totalprice=203091.02, orderdate=1993-03-26, orderpriority=1-URGENT, clerk=Clerk#000000830, shippriority=0, comment=sleep quickly bold requests. slyly pending pinto beans haggle in pla}

``$refs`` Table
^^^^^^^^^^^^^^^^^^^^
* ``$refs`` : Details about Iceberg references including branches and tags. For more information see `Branching and Tagging <https://iceberg.apache.org/docs/nightly/branching/>`_.

.. code-block:: sql

SELECT * FROM "ctas_nation$refs";

.. code-block:: text

name | type | snapshot_id | max_reference_age_in_ms | min_snapshots_to_keep | max_snapshot_age_in_ms
------------+--------+---------------------+-------------------------+-----------------------+------------------------
main | BRANCH | 3074797416068623476 | NULL | NULL | NULL
testBranch | BRANCH | 3374797416068698476 | NULL | NULL | NULL
testTag | TAG | 4686954189838128572 | 10 | NULL | NULL


Procedures
----------

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -361,6 +361,8 @@ protected Optional<SystemTable> getIcebergSystemTable(SchemaTableName tableName,
return Optional.of(new FilesTable(systemTableName, table, snapshotId, typeManager));
case PROPERTIES:
return Optional.of(new PropertiesTable(systemTableName, table));
case REFS:
return Optional.of(new RefsTable(systemTableName, table));
}
return Optional.empty();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import static com.facebook.presto.iceberg.IcebergTableType.MANIFESTS;
import static com.facebook.presto.iceberg.IcebergTableType.PARTITIONS;
import static com.facebook.presto.iceberg.IcebergTableType.PROPERTIES;
import static com.facebook.presto.iceberg.IcebergTableType.REFS;
import static com.facebook.presto.iceberg.IcebergTableType.SNAPSHOTS;
import static com.facebook.presto.spi.StandardErrorCode.NOT_SUPPORTED;
import static java.lang.Long.parseLong;
Expand All @@ -50,7 +51,7 @@ public class IcebergTableName

private final Optional<Long> changelogEndSnapshot;

private static final Set<IcebergTableType> SYSTEM_TABLES = Sets.immutableEnumSet(FILES, MANIFESTS, PARTITIONS, HISTORY, SNAPSHOTS, PROPERTIES);
private static final Set<IcebergTableType> SYSTEM_TABLES = Sets.immutableEnumSet(FILES, MANIFESTS, PARTITIONS, HISTORY, SNAPSHOTS, PROPERTIES, REFS);

@JsonCreator
public IcebergTableName(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ public enum IcebergTableType
MANIFESTS(true),
PARTITIONS(true),
FILES(true),
REFS(true),
PROPERTIES(true),
CHANGELOG(true),
EQUALITY_DELETES(true),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.iceberg;

import com.facebook.presto.common.Page;
import com.facebook.presto.common.predicate.TupleDomain;
import com.facebook.presto.iceberg.util.PageListBuilder;
import com.facebook.presto.spi.ColumnMetadata;
import com.facebook.presto.spi.ConnectorPageSource;
import com.facebook.presto.spi.ConnectorSession;
import com.facebook.presto.spi.ConnectorTableMetadata;
import com.facebook.presto.spi.FixedPageSource;
import com.facebook.presto.spi.SchemaTableName;
import com.facebook.presto.spi.SystemTable;
import com.facebook.presto.spi.connector.ConnectorTransactionHandle;
import com.google.common.collect.ImmutableList;
import org.apache.iceberg.Table;

import java.util.List;

import static com.facebook.presto.common.type.BigintType.BIGINT;
import static com.facebook.presto.common.type.VarcharType.VARCHAR;
import static com.facebook.presto.spi.SystemTable.Distribution.SINGLE_COORDINATOR;
import static java.util.Objects.requireNonNull;

public class RefsTable
implements SystemTable
{
private final ConnectorTableMetadata tableMetadata;
private final Table icebergTable;

private static final List<ColumnMetadata> COLUMNS = ImmutableList.<ColumnMetadata>builder()
.add(new ColumnMetadata("name", VARCHAR))
.add(new ColumnMetadata("type", VARCHAR))
.add(new ColumnMetadata("snapshot_id", BIGINT))
.add(new ColumnMetadata("max_reference_age_in_ms", BIGINT))
.add(new ColumnMetadata("min_snapshots_to_keep", BIGINT))
.add(new ColumnMetadata("max_snapshot_age_in_ms", BIGINT))
.build();

public RefsTable(SchemaTableName tableName, Table icebergTable)
{
tableMetadata = new ConnectorTableMetadata(requireNonNull(tableName, "tableName is null"), COLUMNS);
this.icebergTable = requireNonNull(icebergTable, "icebergTable is null");
}

@Override
public Distribution getDistribution()
{
return SINGLE_COORDINATOR;
}

@Override
public ConnectorTableMetadata getTableMetadata()
{
return tableMetadata;
}

@Override
public ConnectorPageSource pageSource(ConnectorTransactionHandle transactionHandle, ConnectorSession session, TupleDomain<Integer> constraint)
{
return new FixedPageSource(buildPages(tableMetadata, icebergTable));
}

private static void appendValue(Long value, PageListBuilder pagesBuilder)
{
if (value == null) {
pagesBuilder.appendNull();
}
else {
pagesBuilder.appendBigint(value);
}
}

private static List<Page> buildPages(ConnectorTableMetadata tableMetadata, Table icebergTable)
{
PageListBuilder pagesBuilder = PageListBuilder.forTable(tableMetadata);

icebergTable.refs().forEach((key, value) -> {
pagesBuilder.beginRow();
pagesBuilder.appendVarchar(key);
pagesBuilder.appendVarchar(String.valueOf(value.type()));
pagesBuilder.appendBigint(value.snapshotId());
appendValue(value.maxRefAgeMs(), pagesBuilder);
appendValue(value.minSnapshotsToKeep() != null ? Long.valueOf(value.minSnapshotsToKeep()) : null, pagesBuilder);
appendValue(value.maxSnapshotAgeMs(), pagesBuilder);
pagesBuilder.endRow();
});

return pagesBuilder.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1666,6 +1666,42 @@ public void testDecimal(boolean decimalVectorReaderEnabled)
}
}

@Test
public void testRefsTable()
{
assertUpdate("CREATE TABLE test_table_references (id BIGINT)");
assertUpdate("INSERT INTO test_table_references VALUES (0), (1), (2)", 3);

Table icebergTable = loadTable("test_table_references");
icebergTable.manageSnapshots().createBranch("testBranch").commit();

assertUpdate("INSERT INTO test_table_references VALUES (0), (1), (2)", 3);

assertEquals(icebergTable.refs().size(), 2);
icebergTable.manageSnapshots().createTag("testTag", icebergTable.currentSnapshot().snapshotId()).commit();

assertEquals(icebergTable.refs().size(), 3);
assertQuery("SELECT count(*) FROM \"test_table_references$refs\"", "VALUES 3");
Comment on lines +1680 to +1684
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we can add some more detailed assertions like follows:

assertQuery("SELECT * from \"test_table_references$refs\" where name = 'main' and type = 'BRANCH'",
                format("VALUES('%s', '%s', %s, %s, %s, %s)",
                        "main",
                        "BRANCH",
                        icebergTable.refs().get("main").snapshotId(),
                        icebergTable.refs().get("main").maxRefAgeMs(),
                        icebergTable.refs().get("main").minSnapshotsToKeep(),
                        icebergTable.refs().get("main").maxSnapshotAgeMs()));

        assertQuery("SELECT * from \"test_table_references$refs\" where type = 'TAG'",
                format("VALUES('%s', '%s', %s, %s, %s, %s)",
                        "testTag",
                        "TAG",
                        icebergTable.refs().get("testTag").snapshotId(),
                        icebergTable.refs().get("testTag").maxRefAgeMs(),
                        icebergTable.refs().get("testTag").minSnapshotsToKeep(),
                        icebergTable.refs().get("testTag").maxSnapshotAgeMs()));

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, I have added detailed assertions for testTag & testBranch instead of the main branch. Please LMK if that's fine


assertQuery("SELECT * from \"test_table_references$refs\" where name = 'testBranch' and type = 'BRANCH'",
format("VALUES('%s', '%s', %s, %s, %s, %s)",
"testBranch",
"BRANCH",
icebergTable.refs().get("testBranch").snapshotId(),
icebergTable.refs().get("testBranch").maxRefAgeMs(),
icebergTable.refs().get("testBranch").minSnapshotsToKeep(),
icebergTable.refs().get("testBranch").maxSnapshotAgeMs()));

assertQuery("SELECT * from \"test_table_references$refs\" where type = 'TAG'",
format("VALUES('%s', '%s', %s, %s, %s, %s)",
"testTag",
"TAG",
icebergTable.refs().get("testTag").snapshotId(),
icebergTable.refs().get("testTag").maxRefAgeMs(),
icebergTable.refs().get("testTag").minSnapshotsToKeep(),
icebergTable.refs().get("testTag").maxSnapshotAgeMs()));
}

@Test
public void testAllIcebergType()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,25 @@ public void testFilesTable()
assertQuerySucceeds("SELECT * FROM test_schema.\"test_table$files\"");
}

@Test
public void testRefsTable()
{
assertQuery("SHOW COLUMNS FROM test_schema.\"test_table$refs\"",
"VALUES ('name', 'varchar', '', '')," +
"('type', 'varchar', '', '')," +
"('snapshot_id', 'bigint', '', '')," +
"('max_reference_age_in_ms', 'bigint', '', '')," +
"('min_snapshots_to_keep', 'bigint', '', '')," +
"('max_snapshot_age_in_ms', 'bigint', '', '')");
assertQuerySucceeds("SELECT * FROM test_schema.\"test_table$refs\"");

// Check main branch entry
assertQuery("SELECT count(*) FROM test_schema.\"test_table$refs\"", "VALUES 1");
assertQuery("SELECT name FROM test_schema.\"test_table$refs\"", "VALUES 'main'");
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would also be prudent to add assertions on values in the row of the table matching the SnapshotRef object.

You could add or refactor the loadTable method from IcebergDistributedTestBase to get a reference to the Iceberg table, then re-create the values and run assertQuery

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added tests inIcebergDistributedTestBase which I could reuse in my subsequent PR around adding support for querying tags & branch. Please review


assertQuerySucceeds("SELECT * FROM test_schema.\"test_table_multilevel_partitions$refs\"");
}

@Test
public void testSessionPropertiesInManuallyStartedTransaction()
{
Expand Down