From f487bb33d2e5eb808e03a7ec44bc95942d836425 Mon Sep 17 00:00:00 2001 From: Marius Grama Date: Mon, 13 Jun 2022 06:11:58 +0200 Subject: [PATCH 1/3] Retrieve table schema depending on the table snapshot In the context of dealing with time travel queries, the partition spec is intentionally not retrieved because it would involve going through the all the metadata files of the table and finding out which is the initial metadata file (containing the partition spec) corresponding to the specified table snapshot. --- .../plugin/iceberg/IcebergInputInfo.java | 10 ++--- .../trino/plugin/iceberg/IcebergMetadata.java | 42 +++++++++++++------ .../iceberg/IcebergPageSourceProvider.java | 5 ++- .../plugin/iceberg/IcebergTableHandle.java | 9 ++-- .../iceberg/BaseIcebergConnectorTest.java | 7 +++- .../plugin/iceberg/TestIcebergInputInfo.java | 2 +- ...stIcebergNodeLocalDynamicSplitPruning.java | 2 +- .../iceberg/TestIcebergSplitSource.java | 2 +- ...TestConnectorPushdownRulesWithIceberg.java | 8 ++-- 9 files changed, 55 insertions(+), 32 deletions(-) diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergInputInfo.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergInputInfo.java index fd041bbda87d..806e5b1b1ad6 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergInputInfo.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergInputInfo.java @@ -24,17 +24,17 @@ public class IcebergInputInfo { private final Optional snapshotId; - private final boolean partitioned; + private final Optional partitioned; private final String tableDefaultFileFormat; @JsonCreator public IcebergInputInfo( @JsonProperty("snapshotId") Optional snapshotId, - @JsonProperty("partitioned") boolean partitioned, + @JsonProperty("partitioned") Optional partitioned, @JsonProperty("fileFormat") String tableDefaultFileFormat) { this.snapshotId = requireNonNull(snapshotId, "snapshotId is null"); - this.partitioned = partitioned; + this.partitioned = requireNonNull(partitioned, "partitioned is null"); this.tableDefaultFileFormat = requireNonNull(tableDefaultFileFormat, "tableDefaultFileFormat is null"); } @@ -45,7 +45,7 @@ public Optional getSnapshotId() } @JsonProperty - public boolean isPartitioned() + public Optional getPartitioned() { return partitioned; } @@ -66,7 +66,7 @@ public boolean equals(Object o) return false; } IcebergInputInfo that = (IcebergInputInfo) o; - return partitioned == that.partitioned + return partitioned.equals(that.partitioned) && snapshotId.equals(that.snapshotId) && tableDefaultFileFormat.equals(that.tableDefaultFileFormat); } diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java index 4e189aaf7e9d..991bb7e2e9e2 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java @@ -316,8 +316,21 @@ public IcebergTableHandle getTableHandle( throw new TrinoException(GENERIC_USER_ERROR, "Cannot specify end version both in table name and FOR clause"); } - Optional snapshotId = endVersion.map(version -> getSnapshotIdFromVersion(table, version)) - .or(() -> getSnapshotId(table, name.getSnapshotId(), isAllowLegacySnapshotSyntax(session))); + Optional tableSnapshotId; + Schema tableSchema; + Optional partitionSpec; + if (endVersion.isPresent() || name.getSnapshotId().isPresent()) { + long snapshotId = endVersion.map(connectorTableVersion -> getSnapshotIdFromVersion(table, connectorTableVersion)) + .orElseGet(() -> resolveSnapshotId(table, name.getSnapshotId().get(), isAllowLegacySnapshotSyntax(session))); + tableSnapshotId = Optional.of(snapshotId); + tableSchema = table.schemas().get(table.snapshot(snapshotId).schemaId()); + partitionSpec = Optional.empty(); + } + else { + tableSnapshotId = Optional.ofNullable(table.currentSnapshot()).map(Snapshot::snapshotId); + tableSchema = table.schema(); + partitionSpec = Optional.of(table.spec()); + } Map tableProperties = table.properties(); String nameMappingJson = tableProperties.get(TableProperties.DEFAULT_NAME_MAPPING); @@ -325,9 +338,9 @@ public IcebergTableHandle getTableHandle( tableName.getSchemaName(), name.getTableName(), name.getTableType(), - snapshotId, - SchemaParser.toJson(table.schema()), - PartitionSpecParser.toJson(table.spec()), + tableSnapshotId, + SchemaParser.toJson(tableSchema), + partitionSpec.map(PartitionSpecParser::toJson), table.operations().current().formatVersion(), TupleDomain.all(), TupleDomain.all(), @@ -1282,13 +1295,12 @@ private void scanAndDeleteInvalidFiles(Table table, ConnectorSession session, Sc public Optional getInfo(ConnectorTableHandle tableHandle) { IcebergTableHandle icebergTableHandle = (IcebergTableHandle) tableHandle; - PartitionSpec partitionSpec = PartitionSpecParser.fromJson( - SchemaParser.fromJson(icebergTableHandle.getTableSchemaJson()), - icebergTableHandle.getPartitionSpecJson()); + Optional partitioned = icebergTableHandle.getPartitionSpecJson() + .map(partitionSpecJson -> PartitionSpecParser.fromJson(SchemaParser.fromJson(icebergTableHandle.getTableSchemaJson()), partitionSpecJson).isPartitioned()); return Optional.of(new IcebergInputInfo( icebergTableHandle.getSnapshotId(), - partitionSpec.isPartitioned(), + partitioned, getFileFormat(icebergTableHandle.getStorageProperties()).name())); } @@ -1958,13 +1970,17 @@ private Optional getSnapshotId(Table table, Optional snapshotId, boo { // table.name() is an encoded version of SchemaTableName return snapshotId - .map(id -> - snapshotIds.computeIfAbsent( - table.name() + "@" + id, - ignored -> IcebergUtil.resolveSnapshotId(table, id, allowLegacySnapshotSyntax))) + .map(id -> resolveSnapshotId(table, id, allowLegacySnapshotSyntax)) .or(() -> Optional.ofNullable(table.currentSnapshot()).map(Snapshot::snapshotId)); } + private long resolveSnapshotId(Table table, long id, boolean allowLegacySnapshotSyntax) + { + return snapshotIds.computeIfAbsent( + table.name() + "@" + id, + ignored -> IcebergUtil.resolveSnapshotId(table, id, allowLegacySnapshotSyntax)); + } + Table getIcebergTable(ConnectorSession session, SchemaTableName schemaTableName) { return catalog.loadTable(session, schemaTableName); diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSourceProvider.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSourceProvider.java index 1dcd9a575601..9dd54402ec03 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSourceProvider.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSourceProvider.java @@ -365,7 +365,10 @@ public ConnectorPageSource createPageSource( Supplier updatedRowPageSinkSupplier = () -> new IcebergPageSink( tableSchema, - PartitionSpecParser.fromJson(tableSchema, table.getPartitionSpecJson()), + PartitionSpecParser.fromJson( + tableSchema, + table.getPartitionSpecJson() + .orElseThrow(() -> new VerifyException("Partition spec missing in the table handle"))), locationProvider, fileWriterFactory, pageIndexerFactory, diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergTableHandle.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergTableHandle.java index a0e822199ce1..9d3b38c2315b 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergTableHandle.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergTableHandle.java @@ -42,7 +42,8 @@ public class IcebergTableHandle private final TableType tableType; private final Optional snapshotId; private final String tableSchemaJson; - private final String partitionSpecJson; + // Empty means the partitioning spec is not known (can be the case for certain time travel queries). + private final Optional partitionSpecJson; private final int formatVersion; private final String tableLocation; private final Map storageProperties; @@ -71,7 +72,7 @@ public static IcebergTableHandle fromJsonForDeserializationOnly( @JsonProperty("tableType") TableType tableType, @JsonProperty("snapshotId") Optional snapshotId, @JsonProperty("tableSchemaJson") String tableSchemaJson, - @JsonProperty("partitionSpecJson") String partitionSpecJson, + @JsonProperty("partitionSpecJson") Optional partitionSpecJson, @JsonProperty("formatVersion") int formatVersion, @JsonProperty("unenforcedPredicate") TupleDomain unenforcedPredicate, @JsonProperty("enforcedPredicate") TupleDomain enforcedPredicate, @@ -108,7 +109,7 @@ public IcebergTableHandle( TableType tableType, Optional snapshotId, String tableSchemaJson, - String partitionSpecJson, + Optional partitionSpecJson, int formatVersion, TupleDomain unenforcedPredicate, TupleDomain enforcedPredicate, @@ -171,7 +172,7 @@ public String getTableSchemaJson() } @JsonProperty - public String getPartitionSpecJson() + public Optional getPartitionSpecJson() { return partitionSpecJson; } diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorTest.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorTest.java index a4f0978647bd..30789d9da36e 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorTest.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorTest.java @@ -4879,11 +4879,14 @@ public void testModifyingOldSnapshotIsNotPossible() .hasMessage("Modifying old snapshot is not supported in Iceberg."); assertThatThrownBy(() -> query(sessionWithLegacySyntaxSupport, format("ALTER TABLE \"%s@%d\" EXECUTE OPTIMIZE", tableName, oldSnapshotId))) .hasMessage("Modifying old snapshot is not supported in Iceberg."); + // TODO Change to assertThatThrownBy because the syntax `table@versionid` should not be supported for DML operations assertUpdate(sessionWithLegacySyntaxSupport, format("INSERT INTO \"%s@%d\" VALUES 7,8,9", tableName, getCurrentSnapshotId(tableName)), 3); assertUpdate(sessionWithLegacySyntaxSupport, format("DELETE FROM \"%s@%d\" WHERE col = 9", tableName, getCurrentSnapshotId(tableName)), 1); - assertUpdate(sessionWithLegacySyntaxSupport, format("UPDATE \"%s@%d\" set col = 50 WHERE col = 5", tableName, getCurrentSnapshotId(tableName)), 1); + assertThatThrownBy(() -> assertUpdate(sessionWithLegacySyntaxSupport, format("UPDATE \"%s@%d\" set col = 50 WHERE col = 5", tableName, getCurrentSnapshotId(tableName)))) + .hasMessage("Partition spec missing in the table handle"); + // TODO Change to assertThatThrownBy because the syntax `table@versionid` should not be supported for DML operations assertQuerySucceeds(sessionWithLegacySyntaxSupport, format("ALTER TABLE \"%s@%d\" EXECUTE OPTIMIZE", tableName, getCurrentSnapshotId(tableName))); - assertQuery(format("SELECT * FROM %s", tableName), "VALUES 1,2,3,4,50,6,7,8"); + assertQuery(format("SELECT * FROM %s", tableName), "VALUES 1,2,3,4,5,6,7,8"); assertUpdate("DROP TABLE " + tableName); } diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergInputInfo.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergInputInfo.java index f878928ea4cd..ddd98e09c027 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergInputInfo.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergInputInfo.java @@ -82,7 +82,7 @@ private void assertInputInfo(String tableName, boolean expectedPartition, String IcebergInputInfo icebergInputInfo = (IcebergInputInfo) tableInfo.get(); assertThat(icebergInputInfo).isEqualTo(new IcebergInputInfo( icebergInputInfo.getSnapshotId(), - expectedPartition, + Optional.of(expectedPartition), expectedFileFormat)); }); } diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergNodeLocalDynamicSplitPruning.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergNodeLocalDynamicSplitPruning.java index 784c09188da0..272bb56cc6ac 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergNodeLocalDynamicSplitPruning.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergNodeLocalDynamicSplitPruning.java @@ -173,7 +173,7 @@ private static ConnectorPageSource createTestingPageSource(HiveTransactionHandle TableType.DATA, Optional.empty(), SchemaParser.toJson(TABLE_SCHEMA), - PartitionSpecParser.toJson(PartitionSpec.unpartitioned()), + Optional.of(PartitionSpecParser.toJson(PartitionSpec.unpartitioned())), 2, TupleDomain.withColumnDomains(ImmutableMap.of(KEY_ICEBERG_COLUMN_HANDLE, Domain.singleValue(INTEGER, (long) KEY_COLUMN_VALUE))), TupleDomain.all(), diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergSplitSource.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergSplitSource.java index dbbfef8ed2a9..2b8f12343990 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergSplitSource.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergSplitSource.java @@ -128,7 +128,7 @@ public void testIncompleteDynamicFilterTimeout() TableType.DATA, Optional.empty(), SchemaParser.toJson(nationTable.schema()), - PartitionSpecParser.toJson(nationTable.spec()), + Optional.of(PartitionSpecParser.toJson(nationTable.spec())), 1, TupleDomain.all(), TupleDomain.all(), diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/optimizer/TestConnectorPushdownRulesWithIceberg.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/optimizer/TestConnectorPushdownRulesWithIceberg.java index b1c25554cfc2..065ede30d5c1 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/optimizer/TestConnectorPushdownRulesWithIceberg.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/optimizer/TestConnectorPushdownRulesWithIceberg.java @@ -171,7 +171,7 @@ public void testProjectionPushdown() DATA, Optional.of(1L), "", - "", + Optional.of(""), 1, TupleDomain.all(), TupleDomain.all(), @@ -252,7 +252,7 @@ public void testPredicatePushdown() DATA, Optional.of(1L), "", - "", + Optional.of(""), 1, TupleDomain.all(), TupleDomain.all(), @@ -301,7 +301,7 @@ public void testColumnPruningProjectionPushdown() DATA, Optional.empty(), "", - "", + Optional.of(""), 1, TupleDomain.all(), TupleDomain.all(), @@ -361,7 +361,7 @@ public void testPushdownWithDuplicateExpressions() DATA, Optional.of(1L), "", - "", + Optional.of(""), 1, TupleDomain.all(), TupleDomain.all(), From 4561ef0797017f5dc60250d30b8f578024c056d7 Mon Sep 17 00:00:00 2001 From: Marius Grama Date: Fri, 10 Jun 2022 11:01:39 +0200 Subject: [PATCH 2/3] Use table schema corresponding to snapshot in snapshot queries In the context of the dealing with an Iceberg table with a structure which evolves over time (columns are added / dropped) in case of performing a snapshot/time travel query, the schema of the output should match the corresponding schema of the table snapshot queried. --- .../trino/plugin/iceberg/IcebergMetadata.java | 31 ++++++------ .../iceberg/BaseIcebergConnectorTest.java | 50 +++++++++++++++++++ 2 files changed, 64 insertions(+), 17 deletions(-) diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java index 991bb7e2e9e2..3f6e72b9b949 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java @@ -521,7 +521,10 @@ public ConnectorTableProperties getTableProperties(ConnectorSession session, Con @Override public ConnectorTableMetadata getTableMetadata(ConnectorSession session, ConnectorTableHandle table) { - return getTableMetadata(session, ((IcebergTableHandle) table).getSchemaTableName()); + IcebergTableHandle tableHandle = (IcebergTableHandle) table; + Table icebergTable = catalog.loadTable(session, tableHandle.getSchemaTableName()); + List columns = getColumnMetadatas(SchemaParser.fromJson(tableHandle.getTableSchemaJson())); + return new ConnectorTableMetadata(tableHandle.getSchemaTableName(), columns, getIcebergTableProperties(icebergTable), getTableComment(icebergTable)); } @Override @@ -579,7 +582,10 @@ public Iterator streamTableColumns(ConnectorSession sessio if (redirectTable(session, tableName).isPresent()) { return Stream.of(TableColumnsMetadata.forRedirectedTable(tableName)); } - return Stream.of(TableColumnsMetadata.forTable(tableName, getTableMetadata(session, tableName).getColumns())); + + Table icebergTable = catalog.loadTable(session, tableName); + List columns = getColumnMetadatas(icebergTable.schema()); + return Stream.of(TableColumnsMetadata.forTable(tableName, columns)); } catch (TableNotFoundException e) { // Table disappeared during listing operation @@ -1421,24 +1427,11 @@ public void renameColumn(ConnectorSession session, ConnectorTableHandle tableHan icebergTable.updateSchema().renameColumn(columnHandle.getName(), target).commit(); } - /** - * @throws TableNotFoundException when table cannot be found - */ - private ConnectorTableMetadata getTableMetadata(ConnectorSession session, SchemaTableName table) + private List getColumnMetadatas(Schema schema) { - Table icebergTable = catalog.loadTable(session, table); - ImmutableList.Builder columns = ImmutableList.builder(); - columns.addAll(getColumnMetadatas(icebergTable)); - columns.add(pathColumnMetadata()); - columns.add(fileModifiedTimeColumnMetadata()); - return new ConnectorTableMetadata(table, columns.build(), getIcebergTableProperties(icebergTable), getTableComment(icebergTable)); - } - - private List getColumnMetadatas(Table table) - { - return table.schema().columns().stream() + List schemaColumns = schema.columns().stream() .map(column -> ColumnMetadata.builder() .setName(column.name()) @@ -1447,6 +1440,10 @@ private List getColumnMetadatas(Table table) .setComment(Optional.ofNullable(column.doc())) .build()) .collect(toImmutableList()); + columns.addAll(schemaColumns); + columns.add(pathColumnMetadata()); + columns.add(fileModifiedTimeColumnMetadata()); + return columns.build(); } @Override diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorTest.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorTest.java index 30789d9da36e..4a9c0486e5b7 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorTest.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorTest.java @@ -102,6 +102,7 @@ import static io.trino.spi.predicate.Domain.singleValue; import static io.trino.spi.type.BigintType.BIGINT; import static io.trino.spi.type.DoubleType.DOUBLE; +import static io.trino.spi.type.IntegerType.INTEGER; import static io.trino.spi.type.TimeZoneKey.UTC_KEY; import static io.trino.spi.type.TimeZoneKey.getTimeZoneKey; import static io.trino.spi.type.VarcharType.VARCHAR; @@ -4999,6 +5000,55 @@ public void testInsertingIntoTablesWithColumnsWithQuotesInName() assertUpdate("DROP TABLE " + tableName); } + @Test + public void testReadFromVersionedTableWithSchemaEvolution() + { + String tableName = "test_versioned_table_schema_evolution_" + randomTableSuffix(); + + assertQuerySucceeds("CREATE TABLE " + tableName + "(col1 varchar)"); + long v1SnapshotId = getLatestSnapshotId(tableName); + assertThat(query("SELECT * FROM " + tableName + " FOR VERSION AS OF " + v1SnapshotId)) + .hasOutputTypes(ImmutableList.of(VARCHAR)) + .returnsEmptyResult(); + + assertUpdate("ALTER TABLE " + tableName + " ADD COLUMN col2 integer"); + assertThat(query("SELECT * FROM " + tableName)) + .hasOutputTypes(ImmutableList.of(VARCHAR, INTEGER)) + .returnsEmptyResult(); + + assertUpdate("INSERT INTO " + tableName + " VALUES ('a', 11)", 1); + long v2SnapshotId = getLatestSnapshotId(tableName); + assertThat(query("SELECT * FROM " + tableName + " FOR VERSION AS OF " + v2SnapshotId)) + .hasOutputTypes(ImmutableList.of(VARCHAR, INTEGER)) + .matches("VALUES (VARCHAR 'a', 11)"); + assertThat(query("SELECT * FROM " + tableName)) + .hasOutputTypes(ImmutableList.of(VARCHAR, INTEGER)) + .matches("VALUES (VARCHAR 'a', 11)"); + + assertUpdate("ALTER TABLE " + tableName + " ADD COLUMN col3 bigint"); + assertThat(query("SELECT * FROM " + tableName + " FOR VERSION AS OF " + v2SnapshotId)) + .hasOutputTypes(ImmutableList.of(VARCHAR, INTEGER)) + .matches("VALUES (VARCHAR 'a', 11)"); + assertThat(query("SELECT * FROM " + tableName)) + .hasOutputTypes(ImmutableList.of(VARCHAR, INTEGER, BIGINT)) + .matches("VALUES (VARCHAR 'a', 11, CAST(NULL AS bigint))"); + + assertUpdate("INSERT INTO " + tableName + " VALUES ('b', 22, 32)", 1); + long v3SnapshotId = getLatestSnapshotId(tableName); + assertThat(query("SELECT * FROM " + tableName + " FOR VERSION AS OF " + v1SnapshotId)) + .hasOutputTypes(ImmutableList.of(VARCHAR)) + .returnsEmptyResult(); + assertThat(query("SELECT * FROM " + tableName + " FOR VERSION AS OF " + v2SnapshotId)) + .hasOutputTypes(ImmutableList.of(VARCHAR, INTEGER)) + .matches("VALUES (VARCHAR 'a', 11)"); + assertThat(query("SELECT * FROM " + tableName + " FOR VERSION AS OF " + v3SnapshotId)) + .hasOutputTypes(ImmutableList.of(VARCHAR, INTEGER, BIGINT)) + .matches("VALUES (VARCHAR 'a', 11, NULL), (VARCHAR 'b', 22, BIGINT '32')"); + assertThat(query("SELECT * FROM " + tableName)) + .hasOutputTypes(ImmutableList.of(VARCHAR, INTEGER, BIGINT)) + .matches("VALUES (VARCHAR 'a', 11, NULL), (VARCHAR 'b', 22, BIGINT '32')"); + } + @Override protected OptionalInt maxTableNameLength() { From c30d10998c7ed9752559a38ad824d4fabdeca73a Mon Sep 17 00:00:00 2001 From: Marius Grama Date: Mon, 13 Jun 2022 11:57:01 +0200 Subject: [PATCH 3/3] Verify accuracy of reading from versioned table --- .../iceberg/BaseIcebergConnectorTest.java | 78 ++++++++++++++++ .../TestIcebergMetadataFileOperations.java | 92 +++++++++++++++++++ 2 files changed, 170 insertions(+) diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorTest.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorTest.java index 4a9c0486e5b7..33e80421ee6c 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorTest.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorTest.java @@ -5049,6 +5049,84 @@ public void testReadFromVersionedTableWithSchemaEvolution() .matches("VALUES (VARCHAR 'a', 11, NULL), (VARCHAR 'b', 22, BIGINT '32')"); } + @Test + public void testReadFromVersionedTableWithPartitionSpecEvolution() + throws Exception + { + String tableName = "test_version_table_with_partition_spec_evolution_" + randomTableSuffix(); + assertUpdate("CREATE TABLE " + tableName + " (day varchar, views bigint) WITH(partitioning = ARRAY['day'])"); + long v1SnapshotId = getLatestSnapshotId(tableName); + long v1EpochMillis = getCommittedAtInEpochMilliseconds(tableName, v1SnapshotId); + Thread.sleep(1); + + assertUpdate("INSERT INTO " + tableName + " (day, views) VALUES ('2022-06-01', 1)", 1); + long v2SnapshotId = getLatestSnapshotId(tableName); + long v2EpochMillis = getCommittedAtInEpochMilliseconds(tableName, v2SnapshotId); + Thread.sleep(1); + + assertUpdate("ALTER TABLE " + tableName + " ADD COLUMN hour varchar"); + assertUpdate("ALTER TABLE " + tableName + " SET PROPERTIES partitioning = ARRAY['day', 'hour']"); + assertUpdate("INSERT INTO " + tableName + " (day, hour, views) VALUES ('2022-06-02', '10', 2), ('2022-06-02', '10', 3), ('2022-06-02', '11', 10)", 3); + long v3SnapshotId = getLatestSnapshotId(tableName); + long v3EpochMillis = getCommittedAtInEpochMilliseconds(tableName, v3SnapshotId); + + assertThat(query("SELECT sum(views), day FROM " + tableName + " GROUP BY day")) + .matches("VALUES ROW(BIGINT '1', VARCHAR '2022-06-01'), ROW(BIGINT '15', VARCHAR '2022-06-02')"); + assertThat(query("SELECT sum(views), day FROM " + tableName + " FOR VERSION AS OF " + v1SnapshotId + " GROUP BY day")) + .returnsEmptyResult(); + assertThat(query("SELECT sum(views), day FROM " + tableName + " FOR TIMESTAMP AS OF " + timestampLiteral(v1EpochMillis, 9) + " GROUP BY day")) + .returnsEmptyResult(); + assertThat(query("SELECT sum(views), day FROM " + tableName + " FOR VERSION AS OF " + v2SnapshotId + " GROUP BY day")) + .matches("VALUES ROW(BIGINT '1', VARCHAR '2022-06-01')"); + assertThat(query("SELECT sum(views), day FROM " + tableName + " FOR TIMESTAMP AS OF " + timestampLiteral(v2EpochMillis, 9) + " GROUP BY day")) + .matches("VALUES ROW(BIGINT '1', VARCHAR '2022-06-01')"); + assertThat(query("SELECT sum(views), day FROM " + tableName + " FOR VERSION AS OF " + v3SnapshotId + " GROUP BY day")) + .matches("VALUES ROW(BIGINT '1', VARCHAR '2022-06-01'), ROW(BIGINT '15', VARCHAR '2022-06-02')"); + assertThat(query("SELECT sum(views), day FROM " + tableName + " FOR TIMESTAMP AS OF " + timestampLiteral(v3EpochMillis, 9) + " GROUP BY day")) + .matches("VALUES ROW(BIGINT '1', VARCHAR '2022-06-01'), ROW(BIGINT '15', VARCHAR '2022-06-02')"); + + assertThat(query("SELECT sum(views), day, hour FROM " + tableName + " FOR VERSION AS OF " + v3SnapshotId + " WHERE day = '2022-06-02' GROUP BY day, hour")) + .matches("VALUES ROW(BIGINT '5', VARCHAR '2022-06-02', VARCHAR '10'), ROW(BIGINT '10', VARCHAR '2022-06-02', VARCHAR '11')"); + assertThat(query("SELECT sum(views), day, hour FROM " + tableName + " FOR TIMESTAMP AS OF " + timestampLiteral(v3EpochMillis, 9) + " WHERE day = '2022-06-02' GROUP BY day, hour")) + .matches("VALUES ROW(BIGINT '5', VARCHAR '2022-06-02', VARCHAR '10'), ROW(BIGINT '10', VARCHAR '2022-06-02', VARCHAR '11')"); + } + + @Test + public void testReadFromVersionedTableWithExpiredHistory() + throws Exception + { + String tableName = "test_version_table_with_expired_snapshots_" + randomTableSuffix(); + Session sessionWithShortRetentionUnlocked = prepareCleanUpSession(); + assertUpdate("CREATE TABLE " + tableName + " (key varchar, value integer)"); + long v1SnapshotId = getLatestSnapshotId(tableName); + long v1EpochMillis = getCommittedAtInEpochMilliseconds(tableName, v1SnapshotId); + Thread.sleep(1); + assertUpdate("INSERT INTO " + tableName + " VALUES ('one', 1)", 1); + long v2SnapshotId = getLatestSnapshotId(tableName); + long v2EpochMillis = getCommittedAtInEpochMilliseconds(tableName, v2SnapshotId); + Thread.sleep(1); + assertUpdate("INSERT INTO " + tableName + " VALUES ('two', 2)", 1); + long v3SnapshotId = getLatestSnapshotId(tableName); + long v3EpochMillis = getCommittedAtInEpochMilliseconds(tableName, v3SnapshotId); + assertThat(query("SELECT sum(value), listagg(key, ' ') WITHIN GROUP (ORDER BY key) FROM " + tableName)) + .matches("VALUES (BIGINT '3', VARCHAR 'one two')"); + List initialSnapshots = getSnapshotIds(tableName); + assertQuerySucceeds(sessionWithShortRetentionUnlocked, "ALTER TABLE " + tableName + " EXECUTE EXPIRE_SNAPSHOTS (retention_threshold => '0s')"); + List updatedSnapshots = getSnapshotIds(tableName); + assertThat(updatedSnapshots.size()).isLessThan(initialSnapshots.size()); + assertThat(updatedSnapshots.size()).isEqualTo(1); + + assertThat(query("SELECT sum(value), listagg(key, ' ') WITHIN GROUP (ORDER BY key) FROM " + tableName + " FOR VERSION AS OF " + v3SnapshotId)) + .matches("VALUES (BIGINT '3', VARCHAR 'one two')"); + assertThat(query("SELECT sum(value), listagg(key, ' ') WITHIN GROUP (ORDER BY key) FROM " + tableName + " FOR TIMESTAMP AS OF " + timestampLiteral(v3EpochMillis, 9))) + .matches("VALUES (BIGINT '3', VARCHAR 'one two')"); + + assertQueryFails("SELECT * FROM " + tableName + " FOR VERSION AS OF " + v2SnapshotId, "Iceberg snapshot ID does not exists\\: " + v2SnapshotId); + assertQueryFails("SELECT * FROM " + tableName + " FOR TIMESTAMP AS OF " + timestampLiteral(v2EpochMillis, 9), "No version history table .* at or before .*"); + assertQueryFails("SELECT * FROM " + tableName + " FOR VERSION AS OF " + v1SnapshotId, "Iceberg snapshot ID does not exists\\: " + v1SnapshotId); + assertQueryFails("SELECT * FROM " + tableName + " FOR TIMESTAMP AS OF " + timestampLiteral(v1EpochMillis, 9), "No version history table .* at or before .*"); + } + @Override protected OptionalInt maxTableNameLength() { diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergMetadataFileOperations.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergMetadataFileOperations.java index 913a1eaeaf75..6360ecc05006 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergMetadataFileOperations.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergMetadataFileOperations.java @@ -47,6 +47,7 @@ import static io.trino.plugin.tpch.TpchMetadata.TINY_SCHEMA_NAME; import static io.trino.testing.QueryAssertions.copyTpchTables; import static io.trino.testing.TestingSession.testSessionBuilder; +import static java.lang.String.format; import static java.util.Collections.nCopies; import static java.util.Objects.requireNonNull; import static java.util.stream.Collectors.toCollection; @@ -137,6 +138,91 @@ public void testSelect() .build()); } + @Test + public void testSelectFromVersionedTable() + { + String tableName = "test_select_from_versioned_table"; + assertUpdate("CREATE TABLE " + tableName + " (id int, age int)"); + long v1SnapshotId = getLatestSnapshotId(tableName); + assertUpdate("INSERT INTO " + tableName + " VALUES (2, 20)", 1); + long v2SnapshotId = getLatestSnapshotId(tableName); + assertUpdate("INSERT INTO " + tableName + " VALUES (3, 30)", 1); + long v3SnapshotId = getLatestSnapshotId(tableName); + assertFileSystemAccesses("SELECT * FROM " + tableName + " FOR VERSION AS OF " + v1SnapshotId, + ImmutableMultiset.builder() + .addCopies(new FileOperation(SNAPSHOT, INPUT_FILE_GET_LENGTH), 1) + .addCopies(new FileOperation(METADATA_JSON, INPUT_FILE_NEW_STREAM), 1) + .addCopies(new FileOperation(SNAPSHOT, INPUT_FILE_NEW_STREAM), 1) + .build()); + assertFileSystemAccesses("SELECT * FROM " + tableName + " FOR VERSION AS OF " + v2SnapshotId, + ImmutableMultiset.builder() + .addCopies(new FileOperation(SNAPSHOT, INPUT_FILE_GET_LENGTH), 1) + .addCopies(new FileOperation(MANIFEST, INPUT_FILE_GET_LENGTH), 2) + .addCopies(new FileOperation(METADATA_JSON, INPUT_FILE_NEW_STREAM), 1) + .addCopies(new FileOperation(SNAPSHOT, INPUT_FILE_NEW_STREAM), 1) + .addCopies(new FileOperation(MANIFEST, INPUT_FILE_NEW_STREAM), 2) + .build()); + assertFileSystemAccesses("SELECT * FROM " + tableName + " FOR VERSION AS OF " + v3SnapshotId, + ImmutableMultiset.builder() + .addCopies(new FileOperation(SNAPSHOT, INPUT_FILE_GET_LENGTH), 1) + .addCopies(new FileOperation(MANIFEST, INPUT_FILE_GET_LENGTH), 4) + .addCopies(new FileOperation(METADATA_JSON, INPUT_FILE_NEW_STREAM), 1) + .addCopies(new FileOperation(SNAPSHOT, INPUT_FILE_NEW_STREAM), 1) + .addCopies(new FileOperation(MANIFEST, INPUT_FILE_NEW_STREAM), 4) + .build()); + assertFileSystemAccesses("SELECT * FROM " + tableName, + ImmutableMultiset.builder() + .addCopies(new FileOperation(SNAPSHOT, INPUT_FILE_GET_LENGTH), 1) + .addCopies(new FileOperation(MANIFEST, INPUT_FILE_GET_LENGTH), 4) + .addCopies(new FileOperation(METADATA_JSON, INPUT_FILE_NEW_STREAM), 1) + .addCopies(new FileOperation(SNAPSHOT, INPUT_FILE_NEW_STREAM), 1) + .addCopies(new FileOperation(MANIFEST, INPUT_FILE_NEW_STREAM), 4) + .build()); + } + + @Test + public void testSelectFromVersionedTableWithSchemaEvolution() + { + String tableName = "test_select_from_versioned_table_with_schema_evolution"; + assertUpdate("CREATE TABLE " + tableName + " (id int, age int)"); + long v1SnapshotId = getLatestSnapshotId(tableName); + assertUpdate("INSERT INTO " + tableName + " VALUES (2, 20)", 1); + long v2SnapshotId = getLatestSnapshotId(tableName); + assertUpdate("ALTER TABLE " + tableName + " ADD COLUMN address varchar"); + assertUpdate("INSERT INTO " + tableName + " VALUES (3, 30, 'London')", 1); + long v3SnapshotId = getLatestSnapshotId(tableName); + assertFileSystemAccesses("SELECT * FROM " + tableName + " FOR VERSION AS OF " + v1SnapshotId, + ImmutableMultiset.builder() + .addCopies(new FileOperation(SNAPSHOT, INPUT_FILE_GET_LENGTH), 1) + .addCopies(new FileOperation(METADATA_JSON, INPUT_FILE_NEW_STREAM), 1) + .addCopies(new FileOperation(SNAPSHOT, INPUT_FILE_NEW_STREAM), 1) + .build()); + assertFileSystemAccesses("SELECT * FROM " + tableName + " FOR VERSION AS OF " + v2SnapshotId, + ImmutableMultiset.builder() + .addCopies(new FileOperation(SNAPSHOT, INPUT_FILE_GET_LENGTH), 1) + .addCopies(new FileOperation(MANIFEST, INPUT_FILE_GET_LENGTH), 2) + .addCopies(new FileOperation(METADATA_JSON, INPUT_FILE_NEW_STREAM), 1) + .addCopies(new FileOperation(SNAPSHOT, INPUT_FILE_NEW_STREAM), 1) + .addCopies(new FileOperation(MANIFEST, INPUT_FILE_NEW_STREAM), 2) + .build()); + assertFileSystemAccesses("SELECT * FROM " + tableName + " FOR VERSION AS OF " + v3SnapshotId, + ImmutableMultiset.builder() + .addCopies(new FileOperation(SNAPSHOT, INPUT_FILE_GET_LENGTH), 1) + .addCopies(new FileOperation(MANIFEST, INPUT_FILE_GET_LENGTH), 4) + .addCopies(new FileOperation(METADATA_JSON, INPUT_FILE_NEW_STREAM), 1) + .addCopies(new FileOperation(SNAPSHOT, INPUT_FILE_NEW_STREAM), 1) + .addCopies(new FileOperation(MANIFEST, INPUT_FILE_NEW_STREAM), 4) + .build()); + assertFileSystemAccesses("SELECT * FROM " + tableName, + ImmutableMultiset.builder() + .addCopies(new FileOperation(SNAPSHOT, INPUT_FILE_GET_LENGTH), 1) + .addCopies(new FileOperation(MANIFEST, INPUT_FILE_GET_LENGTH), 4) + .addCopies(new FileOperation(METADATA_JSON, INPUT_FILE_NEW_STREAM), 1) + .addCopies(new FileOperation(SNAPSHOT, INPUT_FILE_NEW_STREAM), 1) + .addCopies(new FileOperation(MANIFEST, INPUT_FILE_NEW_STREAM), 4) + .build()); + } + @Test public void testSelectWithFilter() { @@ -267,6 +353,12 @@ private Multiset getOperations() .collect(toCollection(HashMultiset::create)); } + private long getLatestSnapshotId(String tableName) + { + return (long) computeActual(format("SELECT snapshot_id FROM \"%s$snapshots\" ORDER BY committed_at DESC LIMIT 1", tableName)) + .getOnlyValue(); + } + static class FileOperation { private final FileType fileType;