diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/AllManifestsTable.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/AllManifestsTable.java new file mode 100644 index 000000000000..1f2cc27e4270 --- /dev/null +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/AllManifestsTable.java @@ -0,0 +1,97 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.iceberg; + +import com.google.common.collect.ImmutableList; +import io.trino.plugin.iceberg.util.PageListBuilder; +import io.trino.spi.block.ArrayBlockBuilder; +import io.trino.spi.block.RowBlockBuilder; +import io.trino.spi.connector.ColumnMetadata; +import io.trino.spi.connector.ConnectorTableMetadata; +import io.trino.spi.connector.SchemaTableName; +import io.trino.spi.type.ArrayType; +import io.trino.spi.type.RowType; +import io.trino.spi.type.TimeZoneKey; +import org.apache.iceberg.StructLike; +import org.apache.iceberg.Table; + +import java.util.List; + +import static io.trino.spi.type.BigintType.BIGINT; +import static io.trino.spi.type.BooleanType.BOOLEAN; +import static io.trino.spi.type.IntegerType.INTEGER; +import static io.trino.spi.type.VarcharType.VARCHAR; +import static java.util.Objects.requireNonNull; +import static org.apache.iceberg.MetadataTableType.ALL_MANIFESTS; + +public class AllManifestsTable + extends BaseSystemTable +{ + public AllManifestsTable(SchemaTableName tableName, Table icebergTable) + { + super(requireNonNull(icebergTable, "icebergTable is null"), + new ConnectorTableMetadata(requireNonNull(tableName, "tableName is null"), ImmutableList.builder() + .add(new ColumnMetadata("path", VARCHAR)) + .add(new ColumnMetadata("length", BIGINT)) + .add(new ColumnMetadata("partition_spec_id", INTEGER)) + .add(new ColumnMetadata("added_snapshot_id", BIGINT)) + .add(new ColumnMetadata("added_data_files_count", INTEGER)) + .add(new ColumnMetadata("existing_data_files_count", INTEGER)) + .add(new ColumnMetadata("deleted_data_files_count", INTEGER)) + .add(new ColumnMetadata("partition_summaries", new ArrayType(RowType.rowType( + RowType.field("contains_null", BOOLEAN), + RowType.field("contains_nan", BOOLEAN), + RowType.field("lower_bound", VARCHAR), + RowType.field("upper_bound", VARCHAR))))) + .build()), + ALL_MANIFESTS); + } + + @Override + protected void addRow(PageListBuilder pagesBuilder, Row row, TimeZoneKey timeZoneKey) + { + pagesBuilder.beginRow(); + pagesBuilder.appendVarchar(row.get("path", String.class)); + pagesBuilder.appendBigint(row.get("length", Long.class)); + pagesBuilder.appendInteger(row.get("partition_spec_id", Integer.class)); + pagesBuilder.appendBigint(row.get("added_snapshot_id", Long.class)); + pagesBuilder.appendInteger(row.get("added_data_files_count", Integer.class)); + pagesBuilder.appendInteger(row.get("existing_data_files_count", Integer.class)); + pagesBuilder.appendInteger(row.get("deleted_data_files_count", Integer.class)); + //noinspection unchecked + appendPartitionSummaries((ArrayBlockBuilder) pagesBuilder.nextColumn(), row.get("partition_summaries", List.class)); + pagesBuilder.endRow(); + } + + private static void appendPartitionSummaries(ArrayBlockBuilder arrayBuilder, List partitionSummaries) + { + arrayBuilder.buildEntry(elementBuilder -> { + for (StructLike partitionSummary : partitionSummaries) { + ((RowBlockBuilder) elementBuilder).buildEntry(fieldBuilders -> { + BOOLEAN.writeBoolean(fieldBuilders.get(0), partitionSummary.get(0, Boolean.class)); // required contains_null + Boolean containsNan = partitionSummary.get(1, Boolean.class); + if (containsNan == null) { + // This usually occurs when reading from V1 table, where contains_nan is not populated. + fieldBuilders.get(1).appendNull(); + } + else { + BOOLEAN.writeBoolean(fieldBuilders.get(1), containsNan); + } + VARCHAR.writeString(fieldBuilders.get(2), partitionSummary.get(2, String.class)); // optional lower_bound (human-readable) + VARCHAR.writeString(fieldBuilders.get(3), partitionSummary.get(3, String.class)); // optional upper_bound (human-readable) + }); + } + }); + } +} diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java index 323e916e6116..09580d36c6eb 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java @@ -654,6 +654,7 @@ private Optional getRawSystemTable(ConnectorSession session, Schema case METADATA_LOG_ENTRIES -> Optional.of(new MetadataLogEntriesTable(tableName, table)); case SNAPSHOTS -> Optional.of(new SnapshotsTable(tableName, typeManager, table)); case PARTITIONS -> Optional.of(new PartitionsTable(tableName, typeManager, table, getCurrentSnapshotId(table))); + case ALL_MANIFESTS -> Optional.of(new AllManifestsTable(tableName, table)); case MANIFESTS -> Optional.of(new ManifestsTable(tableName, table, getCurrentSnapshotId(table))); case FILES -> Optional.of(new FilesTable(tableName, typeManager, table, getCurrentSnapshotId(table))); case PROPERTIES -> Optional.of(new PropertiesTable(tableName, table)); diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/TableType.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/TableType.java index a878f9dada65..2ba845acc379 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/TableType.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/TableType.java @@ -19,6 +19,7 @@ public enum TableType HISTORY, METADATA_LOG_ENTRIES, SNAPSHOTS, + ALL_MANIFESTS, MANIFESTS, PARTITIONS, FILES, diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergSystemTables.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergSystemTables.java index 98b030cbdf02..b730049a362c 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergSystemTables.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergSystemTables.java @@ -294,6 +294,45 @@ public void testSnapshotsTable() assertQuery("SELECT summary['total-records'] FROM test_schema.\"test_table$snapshots\"", "VALUES '0', '3', '6'"); } + @Test + void testAllManifests() + { + try (TestTable table = new TestTable(getQueryRunner()::execute, "test_all_manifests", "AS SELECT 1 x")) { + assertThat(query("SHOW COLUMNS FROM \"" + table.getName() + "$all_manifests\"")) + .skippingTypesCheck() + .matches("VALUES " + + "('path', 'varchar', '', '')," + + "('length', 'bigint', '', '')," + + "('partition_spec_id', 'integer', '', '')," + + "('added_snapshot_id', 'bigint', '', '')," + + "('added_data_files_count', 'integer', '', '')," + + "('existing_data_files_count', 'integer', '', '')," + + "('deleted_data_files_count', 'integer', '', '')," + + "('partition_summaries', 'array(row(contains_null boolean, contains_nan boolean, lower_bound varchar, upper_bound varchar))', '', '')"); + + assertThat((String) computeScalar("SELECT path FROM \"" + table.getName() + "$all_manifests\"")).endsWith("-m0.avro"); + assertThat((Long) computeScalar("SELECT length FROM \"" + table.getName() + "$all_manifests\"")).isPositive(); + assertThat((Integer) computeScalar("SELECT partition_spec_id FROM \"" + table.getName() + "$all_manifests\"")).isZero(); + assertThat((Long) computeScalar("SELECT added_snapshot_id FROM \"" + table.getName() + "$all_manifests\"")).isPositive(); + assertThat((Integer) computeScalar("SELECT added_data_files_count FROM \"" + table.getName() + "$all_manifests\"")).isEqualTo(1); + assertThat((Integer) computeScalar("SELECT existing_data_files_count FROM \"" + table.getName() + "$all_manifests\"")).isZero(); + assertThat((Integer) computeScalar("SELECT deleted_data_files_count FROM \"" + table.getName() + "$all_manifests\"")).isZero(); + assertThat((List) computeScalar("SELECT partition_summaries FROM \"" + table.getName() + "$all_manifests\"")).isEmpty(); + + assertUpdate("DELETE FROM " + table.getName(), 1); + assertThat((Long) computeScalar("SELECT count(1) FROM \"" + table.getName() + "$all_manifests\"")).isEqualTo(2); + } + } + + @Test + void testAllManifestsWithPartitionTable() + { + try (TestTable table = new TestTable(getQueryRunner()::execute, "test_all_manifests", "WITH (partitioning = ARRAY['dt']) AS SELECT 1 x, DATE '2021-01-01' dt")) { + assertThat(query("SELECT partition_summaries FROM \"" + table.getName() + "$all_manifests\"")) + .matches("VALUES CAST(ARRAY[ROW(false, false, VARCHAR '2021-01-01', VARCHAR '2021-01-01')] AS array(row(contains_null boolean, contains_nan boolean, lower_bound varchar, upper_bound varchar)))"); + } + } + @Test public void testManifestsTable() { diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergMetastoreAccessOperations.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergMetastoreAccessOperations.java index 0694ba19f090..23a3b809d513 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergMetastoreAccessOperations.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergMetastoreAccessOperations.java @@ -36,6 +36,7 @@ import static io.trino.plugin.hive.metastore.MetastoreMethod.GET_TABLES; import static io.trino.plugin.hive.metastore.MetastoreMethod.REPLACE_TABLE; import static io.trino.plugin.iceberg.IcebergSessionProperties.COLLECT_EXTENDED_STATISTICS_ON_WRITE; +import static io.trino.plugin.iceberg.TableType.ALL_MANIFESTS; import static io.trino.plugin.iceberg.TableType.DATA; import static io.trino.plugin.iceberg.TableType.FILES; import static io.trino.plugin.iceberg.TableType.HISTORY; @@ -313,6 +314,12 @@ public void testSelectSystemTable() .addCopies(GET_TABLE, 1) .build()); + // select from $all_manifests + assertMetastoreInvocations("SELECT * FROM \"test_select_snapshots$all_manifests\"", + ImmutableMultiset.builder() + .addCopies(GET_TABLE, 1) + .build()); + // select from $manifests assertMetastoreInvocations("SELECT * FROM \"test_select_snapshots$manifests\"", ImmutableMultiset.builder() @@ -342,7 +349,7 @@ public void testSelectSystemTable() // This test should get updated if a new system table is added. assertThat(TableType.values()) - .containsExactly(DATA, HISTORY, METADATA_LOG_ENTRIES, SNAPSHOTS, MANIFESTS, PARTITIONS, FILES, PROPERTIES, REFS, MATERIALIZED_VIEW_STORAGE); + .containsExactly(DATA, HISTORY, METADATA_LOG_ENTRIES, SNAPSHOTS, ALL_MANIFESTS, MANIFESTS, PARTITIONS, FILES, PROPERTIES, REFS, MATERIALIZED_VIEW_STORAGE); } @Test