diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergInputInfo.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergInputInfo.java index fd041bbda87d..806e5b1b1ad6 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergInputInfo.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergInputInfo.java @@ -24,17 +24,17 @@ public class IcebergInputInfo { private final Optional snapshotId; - private final boolean partitioned; + private final Optional partitioned; private final String tableDefaultFileFormat; @JsonCreator public IcebergInputInfo( @JsonProperty("snapshotId") Optional snapshotId, - @JsonProperty("partitioned") boolean partitioned, + @JsonProperty("partitioned") Optional partitioned, @JsonProperty("fileFormat") String tableDefaultFileFormat) { this.snapshotId = requireNonNull(snapshotId, "snapshotId is null"); - this.partitioned = partitioned; + this.partitioned = requireNonNull(partitioned, "partitioned is null"); this.tableDefaultFileFormat = requireNonNull(tableDefaultFileFormat, "tableDefaultFileFormat is null"); } @@ -45,7 +45,7 @@ public Optional getSnapshotId() } @JsonProperty - public boolean isPartitioned() + public Optional getPartitioned() { return partitioned; } @@ -66,7 +66,7 @@ public boolean equals(Object o) return false; } IcebergInputInfo that = (IcebergInputInfo) o; - return partitioned == that.partitioned + return partitioned.equals(that.partitioned) && snapshotId.equals(that.snapshotId) && tableDefaultFileFormat.equals(that.tableDefaultFileFormat); } diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java index 4e189aaf7e9d..3f6e72b9b949 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java @@ -316,8 +316,21 @@ public IcebergTableHandle getTableHandle( throw new TrinoException(GENERIC_USER_ERROR, "Cannot specify end version both in table name and FOR clause"); } - Optional snapshotId = endVersion.map(version -> getSnapshotIdFromVersion(table, version)) - .or(() -> getSnapshotId(table, name.getSnapshotId(), isAllowLegacySnapshotSyntax(session))); + Optional tableSnapshotId; + Schema tableSchema; + Optional partitionSpec; + if (endVersion.isPresent() || name.getSnapshotId().isPresent()) { + long snapshotId = endVersion.map(connectorTableVersion -> getSnapshotIdFromVersion(table, connectorTableVersion)) + .orElseGet(() -> resolveSnapshotId(table, name.getSnapshotId().get(), isAllowLegacySnapshotSyntax(session))); + tableSnapshotId = Optional.of(snapshotId); + tableSchema = table.schemas().get(table.snapshot(snapshotId).schemaId()); + partitionSpec = Optional.empty(); + } + else { + tableSnapshotId = Optional.ofNullable(table.currentSnapshot()).map(Snapshot::snapshotId); + tableSchema = table.schema(); + partitionSpec = Optional.of(table.spec()); + } Map tableProperties = table.properties(); String nameMappingJson = tableProperties.get(TableProperties.DEFAULT_NAME_MAPPING); @@ -325,9 +338,9 @@ public IcebergTableHandle getTableHandle( tableName.getSchemaName(), name.getTableName(), name.getTableType(), - snapshotId, - SchemaParser.toJson(table.schema()), - PartitionSpecParser.toJson(table.spec()), + tableSnapshotId, + SchemaParser.toJson(tableSchema), + partitionSpec.map(PartitionSpecParser::toJson), table.operations().current().formatVersion(), TupleDomain.all(), TupleDomain.all(), @@ -508,7 +521,10 @@ public ConnectorTableProperties getTableProperties(ConnectorSession session, Con @Override public ConnectorTableMetadata getTableMetadata(ConnectorSession session, ConnectorTableHandle table) { - return getTableMetadata(session, ((IcebergTableHandle) table).getSchemaTableName()); + IcebergTableHandle tableHandle = (IcebergTableHandle) table; + Table icebergTable = catalog.loadTable(session, tableHandle.getSchemaTableName()); + List columns = getColumnMetadatas(SchemaParser.fromJson(tableHandle.getTableSchemaJson())); + return new ConnectorTableMetadata(tableHandle.getSchemaTableName(), columns, getIcebergTableProperties(icebergTable), getTableComment(icebergTable)); } @Override @@ -566,7 +582,10 @@ public Iterator streamTableColumns(ConnectorSession sessio if (redirectTable(session, tableName).isPresent()) { return Stream.of(TableColumnsMetadata.forRedirectedTable(tableName)); } - return Stream.of(TableColumnsMetadata.forTable(tableName, getTableMetadata(session, tableName).getColumns())); + + Table icebergTable = catalog.loadTable(session, tableName); + List columns = getColumnMetadatas(icebergTable.schema()); + return Stream.of(TableColumnsMetadata.forTable(tableName, columns)); } catch (TableNotFoundException e) { // Table disappeared during listing operation @@ -1282,13 +1301,12 @@ private void scanAndDeleteInvalidFiles(Table table, ConnectorSession session, Sc public Optional getInfo(ConnectorTableHandle tableHandle) { IcebergTableHandle icebergTableHandle = (IcebergTableHandle) tableHandle; - PartitionSpec partitionSpec = PartitionSpecParser.fromJson( - SchemaParser.fromJson(icebergTableHandle.getTableSchemaJson()), - icebergTableHandle.getPartitionSpecJson()); + Optional partitioned = icebergTableHandle.getPartitionSpecJson() + .map(partitionSpecJson -> PartitionSpecParser.fromJson(SchemaParser.fromJson(icebergTableHandle.getTableSchemaJson()), partitionSpecJson).isPartitioned()); return Optional.of(new IcebergInputInfo( icebergTableHandle.getSnapshotId(), - partitionSpec.isPartitioned(), + partitioned, getFileFormat(icebergTableHandle.getStorageProperties()).name())); } @@ -1409,24 +1427,11 @@ public void renameColumn(ConnectorSession session, ConnectorTableHandle tableHan icebergTable.updateSchema().renameColumn(columnHandle.getName(), target).commit(); } - /** - * @throws TableNotFoundException when table cannot be found - */ - private ConnectorTableMetadata getTableMetadata(ConnectorSession session, SchemaTableName table) + private List getColumnMetadatas(Schema schema) { - Table icebergTable = catalog.loadTable(session, table); - ImmutableList.Builder columns = ImmutableList.builder(); - columns.addAll(getColumnMetadatas(icebergTable)); - columns.add(pathColumnMetadata()); - columns.add(fileModifiedTimeColumnMetadata()); - return new ConnectorTableMetadata(table, columns.build(), getIcebergTableProperties(icebergTable), getTableComment(icebergTable)); - } - - private List getColumnMetadatas(Table table) - { - return table.schema().columns().stream() + List schemaColumns = schema.columns().stream() .map(column -> ColumnMetadata.builder() .setName(column.name()) @@ -1435,6 +1440,10 @@ private List getColumnMetadatas(Table table) .setComment(Optional.ofNullable(column.doc())) .build()) .collect(toImmutableList()); + columns.addAll(schemaColumns); + columns.add(pathColumnMetadata()); + columns.add(fileModifiedTimeColumnMetadata()); + return columns.build(); } @Override @@ -1958,13 +1967,17 @@ private Optional getSnapshotId(Table table, Optional snapshotId, boo { // table.name() is an encoded version of SchemaTableName return snapshotId - .map(id -> - snapshotIds.computeIfAbsent( - table.name() + "@" + id, - ignored -> IcebergUtil.resolveSnapshotId(table, id, allowLegacySnapshotSyntax))) + .map(id -> resolveSnapshotId(table, id, allowLegacySnapshotSyntax)) .or(() -> Optional.ofNullable(table.currentSnapshot()).map(Snapshot::snapshotId)); } + private long resolveSnapshotId(Table table, long id, boolean allowLegacySnapshotSyntax) + { + return snapshotIds.computeIfAbsent( + table.name() + "@" + id, + ignored -> IcebergUtil.resolveSnapshotId(table, id, allowLegacySnapshotSyntax)); + } + Table getIcebergTable(ConnectorSession session, SchemaTableName schemaTableName) { return catalog.loadTable(session, schemaTableName); diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSourceProvider.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSourceProvider.java index 1dcd9a575601..9dd54402ec03 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSourceProvider.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSourceProvider.java @@ -365,7 +365,10 @@ public ConnectorPageSource createPageSource( Supplier updatedRowPageSinkSupplier = () -> new IcebergPageSink( tableSchema, - PartitionSpecParser.fromJson(tableSchema, table.getPartitionSpecJson()), + PartitionSpecParser.fromJson( + tableSchema, + table.getPartitionSpecJson() + .orElseThrow(() -> new VerifyException("Partition spec missing in the table handle"))), locationProvider, fileWriterFactory, pageIndexerFactory, diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergTableHandle.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergTableHandle.java index a0e822199ce1..9d3b38c2315b 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergTableHandle.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergTableHandle.java @@ -42,7 +42,8 @@ public class IcebergTableHandle private final TableType tableType; private final Optional snapshotId; private final String tableSchemaJson; - private final String partitionSpecJson; + // Empty means the partitioning spec is not known (can be the case for certain time travel queries). + private final Optional partitionSpecJson; private final int formatVersion; private final String tableLocation; private final Map storageProperties; @@ -71,7 +72,7 @@ public static IcebergTableHandle fromJsonForDeserializationOnly( @JsonProperty("tableType") TableType tableType, @JsonProperty("snapshotId") Optional snapshotId, @JsonProperty("tableSchemaJson") String tableSchemaJson, - @JsonProperty("partitionSpecJson") String partitionSpecJson, + @JsonProperty("partitionSpecJson") Optional partitionSpecJson, @JsonProperty("formatVersion") int formatVersion, @JsonProperty("unenforcedPredicate") TupleDomain unenforcedPredicate, @JsonProperty("enforcedPredicate") TupleDomain enforcedPredicate, @@ -108,7 +109,7 @@ public IcebergTableHandle( TableType tableType, Optional snapshotId, String tableSchemaJson, - String partitionSpecJson, + Optional partitionSpecJson, int formatVersion, TupleDomain unenforcedPredicate, TupleDomain enforcedPredicate, @@ -171,7 +172,7 @@ public String getTableSchemaJson() } @JsonProperty - public String getPartitionSpecJson() + public Optional getPartitionSpecJson() { return partitionSpecJson; } diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorTest.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorTest.java index a4f0978647bd..33e80421ee6c 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorTest.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorTest.java @@ -102,6 +102,7 @@ import static io.trino.spi.predicate.Domain.singleValue; import static io.trino.spi.type.BigintType.BIGINT; import static io.trino.spi.type.DoubleType.DOUBLE; +import static io.trino.spi.type.IntegerType.INTEGER; import static io.trino.spi.type.TimeZoneKey.UTC_KEY; import static io.trino.spi.type.TimeZoneKey.getTimeZoneKey; import static io.trino.spi.type.VarcharType.VARCHAR; @@ -4879,11 +4880,14 @@ public void testModifyingOldSnapshotIsNotPossible() .hasMessage("Modifying old snapshot is not supported in Iceberg."); assertThatThrownBy(() -> query(sessionWithLegacySyntaxSupport, format("ALTER TABLE \"%s@%d\" EXECUTE OPTIMIZE", tableName, oldSnapshotId))) .hasMessage("Modifying old snapshot is not supported in Iceberg."); + // TODO Change to assertThatThrownBy because the syntax `table@versionid` should not be supported for DML operations assertUpdate(sessionWithLegacySyntaxSupport, format("INSERT INTO \"%s@%d\" VALUES 7,8,9", tableName, getCurrentSnapshotId(tableName)), 3); assertUpdate(sessionWithLegacySyntaxSupport, format("DELETE FROM \"%s@%d\" WHERE col = 9", tableName, getCurrentSnapshotId(tableName)), 1); - assertUpdate(sessionWithLegacySyntaxSupport, format("UPDATE \"%s@%d\" set col = 50 WHERE col = 5", tableName, getCurrentSnapshotId(tableName)), 1); + assertThatThrownBy(() -> assertUpdate(sessionWithLegacySyntaxSupport, format("UPDATE \"%s@%d\" set col = 50 WHERE col = 5", tableName, getCurrentSnapshotId(tableName)))) + .hasMessage("Partition spec missing in the table handle"); + // TODO Change to assertThatThrownBy because the syntax `table@versionid` should not be supported for DML operations assertQuerySucceeds(sessionWithLegacySyntaxSupport, format("ALTER TABLE \"%s@%d\" EXECUTE OPTIMIZE", tableName, getCurrentSnapshotId(tableName))); - assertQuery(format("SELECT * FROM %s", tableName), "VALUES 1,2,3,4,50,6,7,8"); + assertQuery(format("SELECT * FROM %s", tableName), "VALUES 1,2,3,4,5,6,7,8"); assertUpdate("DROP TABLE " + tableName); } @@ -4996,6 +5000,133 @@ public void testInsertingIntoTablesWithColumnsWithQuotesInName() assertUpdate("DROP TABLE " + tableName); } + @Test + public void testReadFromVersionedTableWithSchemaEvolution() + { + String tableName = "test_versioned_table_schema_evolution_" + randomTableSuffix(); + + assertQuerySucceeds("CREATE TABLE " + tableName + "(col1 varchar)"); + long v1SnapshotId = getLatestSnapshotId(tableName); + assertThat(query("SELECT * FROM " + tableName + " FOR VERSION AS OF " + v1SnapshotId)) + .hasOutputTypes(ImmutableList.of(VARCHAR)) + .returnsEmptyResult(); + + assertUpdate("ALTER TABLE " + tableName + " ADD COLUMN col2 integer"); + assertThat(query("SELECT * FROM " + tableName)) + .hasOutputTypes(ImmutableList.of(VARCHAR, INTEGER)) + .returnsEmptyResult(); + + assertUpdate("INSERT INTO " + tableName + " VALUES ('a', 11)", 1); + long v2SnapshotId = getLatestSnapshotId(tableName); + assertThat(query("SELECT * FROM " + tableName + " FOR VERSION AS OF " + v2SnapshotId)) + .hasOutputTypes(ImmutableList.of(VARCHAR, INTEGER)) + .matches("VALUES (VARCHAR 'a', 11)"); + assertThat(query("SELECT * FROM " + tableName)) + .hasOutputTypes(ImmutableList.of(VARCHAR, INTEGER)) + .matches("VALUES (VARCHAR 'a', 11)"); + + assertUpdate("ALTER TABLE " + tableName + " ADD COLUMN col3 bigint"); + assertThat(query("SELECT * FROM " + tableName + " FOR VERSION AS OF " + v2SnapshotId)) + .hasOutputTypes(ImmutableList.of(VARCHAR, INTEGER)) + .matches("VALUES (VARCHAR 'a', 11)"); + assertThat(query("SELECT * FROM " + tableName)) + .hasOutputTypes(ImmutableList.of(VARCHAR, INTEGER, BIGINT)) + .matches("VALUES (VARCHAR 'a', 11, CAST(NULL AS bigint))"); + + assertUpdate("INSERT INTO " + tableName + " VALUES ('b', 22, 32)", 1); + long v3SnapshotId = getLatestSnapshotId(tableName); + assertThat(query("SELECT * FROM " + tableName + " FOR VERSION AS OF " + v1SnapshotId)) + .hasOutputTypes(ImmutableList.of(VARCHAR)) + .returnsEmptyResult(); + assertThat(query("SELECT * FROM " + tableName + " FOR VERSION AS OF " + v2SnapshotId)) + .hasOutputTypes(ImmutableList.of(VARCHAR, INTEGER)) + .matches("VALUES (VARCHAR 'a', 11)"); + assertThat(query("SELECT * FROM " + tableName + " FOR VERSION AS OF " + v3SnapshotId)) + .hasOutputTypes(ImmutableList.of(VARCHAR, INTEGER, BIGINT)) + .matches("VALUES (VARCHAR 'a', 11, NULL), (VARCHAR 'b', 22, BIGINT '32')"); + assertThat(query("SELECT * FROM " + tableName)) + .hasOutputTypes(ImmutableList.of(VARCHAR, INTEGER, BIGINT)) + .matches("VALUES (VARCHAR 'a', 11, NULL), (VARCHAR 'b', 22, BIGINT '32')"); + } + + @Test + public void testReadFromVersionedTableWithPartitionSpecEvolution() + throws Exception + { + String tableName = "test_version_table_with_partition_spec_evolution_" + randomTableSuffix(); + assertUpdate("CREATE TABLE " + tableName + " (day varchar, views bigint) WITH(partitioning = ARRAY['day'])"); + long v1SnapshotId = getLatestSnapshotId(tableName); + long v1EpochMillis = getCommittedAtInEpochMilliseconds(tableName, v1SnapshotId); + Thread.sleep(1); + + assertUpdate("INSERT INTO " + tableName + " (day, views) VALUES ('2022-06-01', 1)", 1); + long v2SnapshotId = getLatestSnapshotId(tableName); + long v2EpochMillis = getCommittedAtInEpochMilliseconds(tableName, v2SnapshotId); + Thread.sleep(1); + + assertUpdate("ALTER TABLE " + tableName + " ADD COLUMN hour varchar"); + assertUpdate("ALTER TABLE " + tableName + " SET PROPERTIES partitioning = ARRAY['day', 'hour']"); + assertUpdate("INSERT INTO " + tableName + " (day, hour, views) VALUES ('2022-06-02', '10', 2), ('2022-06-02', '10', 3), ('2022-06-02', '11', 10)", 3); + long v3SnapshotId = getLatestSnapshotId(tableName); + long v3EpochMillis = getCommittedAtInEpochMilliseconds(tableName, v3SnapshotId); + + assertThat(query("SELECT sum(views), day FROM " + tableName + " GROUP BY day")) + .matches("VALUES ROW(BIGINT '1', VARCHAR '2022-06-01'), ROW(BIGINT '15', VARCHAR '2022-06-02')"); + assertThat(query("SELECT sum(views), day FROM " + tableName + " FOR VERSION AS OF " + v1SnapshotId + " GROUP BY day")) + .returnsEmptyResult(); + assertThat(query("SELECT sum(views), day FROM " + tableName + " FOR TIMESTAMP AS OF " + timestampLiteral(v1EpochMillis, 9) + " GROUP BY day")) + .returnsEmptyResult(); + assertThat(query("SELECT sum(views), day FROM " + tableName + " FOR VERSION AS OF " + v2SnapshotId + " GROUP BY day")) + .matches("VALUES ROW(BIGINT '1', VARCHAR '2022-06-01')"); + assertThat(query("SELECT sum(views), day FROM " + tableName + " FOR TIMESTAMP AS OF " + timestampLiteral(v2EpochMillis, 9) + " GROUP BY day")) + .matches("VALUES ROW(BIGINT '1', VARCHAR '2022-06-01')"); + assertThat(query("SELECT sum(views), day FROM " + tableName + " FOR VERSION AS OF " + v3SnapshotId + " GROUP BY day")) + .matches("VALUES ROW(BIGINT '1', VARCHAR '2022-06-01'), ROW(BIGINT '15', VARCHAR '2022-06-02')"); + assertThat(query("SELECT sum(views), day FROM " + tableName + " FOR TIMESTAMP AS OF " + timestampLiteral(v3EpochMillis, 9) + " GROUP BY day")) + .matches("VALUES ROW(BIGINT '1', VARCHAR '2022-06-01'), ROW(BIGINT '15', VARCHAR '2022-06-02')"); + + assertThat(query("SELECT sum(views), day, hour FROM " + tableName + " FOR VERSION AS OF " + v3SnapshotId + " WHERE day = '2022-06-02' GROUP BY day, hour")) + .matches("VALUES ROW(BIGINT '5', VARCHAR '2022-06-02', VARCHAR '10'), ROW(BIGINT '10', VARCHAR '2022-06-02', VARCHAR '11')"); + assertThat(query("SELECT sum(views), day, hour FROM " + tableName + " FOR TIMESTAMP AS OF " + timestampLiteral(v3EpochMillis, 9) + " WHERE day = '2022-06-02' GROUP BY day, hour")) + .matches("VALUES ROW(BIGINT '5', VARCHAR '2022-06-02', VARCHAR '10'), ROW(BIGINT '10', VARCHAR '2022-06-02', VARCHAR '11')"); + } + + @Test + public void testReadFromVersionedTableWithExpiredHistory() + throws Exception + { + String tableName = "test_version_table_with_expired_snapshots_" + randomTableSuffix(); + Session sessionWithShortRetentionUnlocked = prepareCleanUpSession(); + assertUpdate("CREATE TABLE " + tableName + " (key varchar, value integer)"); + long v1SnapshotId = getLatestSnapshotId(tableName); + long v1EpochMillis = getCommittedAtInEpochMilliseconds(tableName, v1SnapshotId); + Thread.sleep(1); + assertUpdate("INSERT INTO " + tableName + " VALUES ('one', 1)", 1); + long v2SnapshotId = getLatestSnapshotId(tableName); + long v2EpochMillis = getCommittedAtInEpochMilliseconds(tableName, v2SnapshotId); + Thread.sleep(1); + assertUpdate("INSERT INTO " + tableName + " VALUES ('two', 2)", 1); + long v3SnapshotId = getLatestSnapshotId(tableName); + long v3EpochMillis = getCommittedAtInEpochMilliseconds(tableName, v3SnapshotId); + assertThat(query("SELECT sum(value), listagg(key, ' ') WITHIN GROUP (ORDER BY key) FROM " + tableName)) + .matches("VALUES (BIGINT '3', VARCHAR 'one two')"); + List initialSnapshots = getSnapshotIds(tableName); + assertQuerySucceeds(sessionWithShortRetentionUnlocked, "ALTER TABLE " + tableName + " EXECUTE EXPIRE_SNAPSHOTS (retention_threshold => '0s')"); + List updatedSnapshots = getSnapshotIds(tableName); + assertThat(updatedSnapshots.size()).isLessThan(initialSnapshots.size()); + assertThat(updatedSnapshots.size()).isEqualTo(1); + + assertThat(query("SELECT sum(value), listagg(key, ' ') WITHIN GROUP (ORDER BY key) FROM " + tableName + " FOR VERSION AS OF " + v3SnapshotId)) + .matches("VALUES (BIGINT '3', VARCHAR 'one two')"); + assertThat(query("SELECT sum(value), listagg(key, ' ') WITHIN GROUP (ORDER BY key) FROM " + tableName + " FOR TIMESTAMP AS OF " + timestampLiteral(v3EpochMillis, 9))) + .matches("VALUES (BIGINT '3', VARCHAR 'one two')"); + + assertQueryFails("SELECT * FROM " + tableName + " FOR VERSION AS OF " + v2SnapshotId, "Iceberg snapshot ID does not exists\\: " + v2SnapshotId); + assertQueryFails("SELECT * FROM " + tableName + " FOR TIMESTAMP AS OF " + timestampLiteral(v2EpochMillis, 9), "No version history table .* at or before .*"); + assertQueryFails("SELECT * FROM " + tableName + " FOR VERSION AS OF " + v1SnapshotId, "Iceberg snapshot ID does not exists\\: " + v1SnapshotId); + assertQueryFails("SELECT * FROM " + tableName + " FOR TIMESTAMP AS OF " + timestampLiteral(v1EpochMillis, 9), "No version history table .* at or before .*"); + } + @Override protected OptionalInt maxTableNameLength() { diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergInputInfo.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergInputInfo.java index f878928ea4cd..ddd98e09c027 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergInputInfo.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergInputInfo.java @@ -82,7 +82,7 @@ private void assertInputInfo(String tableName, boolean expectedPartition, String IcebergInputInfo icebergInputInfo = (IcebergInputInfo) tableInfo.get(); assertThat(icebergInputInfo).isEqualTo(new IcebergInputInfo( icebergInputInfo.getSnapshotId(), - expectedPartition, + Optional.of(expectedPartition), expectedFileFormat)); }); } diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergMetadataFileOperations.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergMetadataFileOperations.java index 913a1eaeaf75..6360ecc05006 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergMetadataFileOperations.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergMetadataFileOperations.java @@ -47,6 +47,7 @@ import static io.trino.plugin.tpch.TpchMetadata.TINY_SCHEMA_NAME; import static io.trino.testing.QueryAssertions.copyTpchTables; import static io.trino.testing.TestingSession.testSessionBuilder; +import static java.lang.String.format; import static java.util.Collections.nCopies; import static java.util.Objects.requireNonNull; import static java.util.stream.Collectors.toCollection; @@ -137,6 +138,91 @@ public void testSelect() .build()); } + @Test + public void testSelectFromVersionedTable() + { + String tableName = "test_select_from_versioned_table"; + assertUpdate("CREATE TABLE " + tableName + " (id int, age int)"); + long v1SnapshotId = getLatestSnapshotId(tableName); + assertUpdate("INSERT INTO " + tableName + " VALUES (2, 20)", 1); + long v2SnapshotId = getLatestSnapshotId(tableName); + assertUpdate("INSERT INTO " + tableName + " VALUES (3, 30)", 1); + long v3SnapshotId = getLatestSnapshotId(tableName); + assertFileSystemAccesses("SELECT * FROM " + tableName + " FOR VERSION AS OF " + v1SnapshotId, + ImmutableMultiset.builder() + .addCopies(new FileOperation(SNAPSHOT, INPUT_FILE_GET_LENGTH), 1) + .addCopies(new FileOperation(METADATA_JSON, INPUT_FILE_NEW_STREAM), 1) + .addCopies(new FileOperation(SNAPSHOT, INPUT_FILE_NEW_STREAM), 1) + .build()); + assertFileSystemAccesses("SELECT * FROM " + tableName + " FOR VERSION AS OF " + v2SnapshotId, + ImmutableMultiset.builder() + .addCopies(new FileOperation(SNAPSHOT, INPUT_FILE_GET_LENGTH), 1) + .addCopies(new FileOperation(MANIFEST, INPUT_FILE_GET_LENGTH), 2) + .addCopies(new FileOperation(METADATA_JSON, INPUT_FILE_NEW_STREAM), 1) + .addCopies(new FileOperation(SNAPSHOT, INPUT_FILE_NEW_STREAM), 1) + .addCopies(new FileOperation(MANIFEST, INPUT_FILE_NEW_STREAM), 2) + .build()); + assertFileSystemAccesses("SELECT * FROM " + tableName + " FOR VERSION AS OF " + v3SnapshotId, + ImmutableMultiset.builder() + .addCopies(new FileOperation(SNAPSHOT, INPUT_FILE_GET_LENGTH), 1) + .addCopies(new FileOperation(MANIFEST, INPUT_FILE_GET_LENGTH), 4) + .addCopies(new FileOperation(METADATA_JSON, INPUT_FILE_NEW_STREAM), 1) + .addCopies(new FileOperation(SNAPSHOT, INPUT_FILE_NEW_STREAM), 1) + .addCopies(new FileOperation(MANIFEST, INPUT_FILE_NEW_STREAM), 4) + .build()); + assertFileSystemAccesses("SELECT * FROM " + tableName, + ImmutableMultiset.builder() + .addCopies(new FileOperation(SNAPSHOT, INPUT_FILE_GET_LENGTH), 1) + .addCopies(new FileOperation(MANIFEST, INPUT_FILE_GET_LENGTH), 4) + .addCopies(new FileOperation(METADATA_JSON, INPUT_FILE_NEW_STREAM), 1) + .addCopies(new FileOperation(SNAPSHOT, INPUT_FILE_NEW_STREAM), 1) + .addCopies(new FileOperation(MANIFEST, INPUT_FILE_NEW_STREAM), 4) + .build()); + } + + @Test + public void testSelectFromVersionedTableWithSchemaEvolution() + { + String tableName = "test_select_from_versioned_table_with_schema_evolution"; + assertUpdate("CREATE TABLE " + tableName + " (id int, age int)"); + long v1SnapshotId = getLatestSnapshotId(tableName); + assertUpdate("INSERT INTO " + tableName + " VALUES (2, 20)", 1); + long v2SnapshotId = getLatestSnapshotId(tableName); + assertUpdate("ALTER TABLE " + tableName + " ADD COLUMN address varchar"); + assertUpdate("INSERT INTO " + tableName + " VALUES (3, 30, 'London')", 1); + long v3SnapshotId = getLatestSnapshotId(tableName); + assertFileSystemAccesses("SELECT * FROM " + tableName + " FOR VERSION AS OF " + v1SnapshotId, + ImmutableMultiset.builder() + .addCopies(new FileOperation(SNAPSHOT, INPUT_FILE_GET_LENGTH), 1) + .addCopies(new FileOperation(METADATA_JSON, INPUT_FILE_NEW_STREAM), 1) + .addCopies(new FileOperation(SNAPSHOT, INPUT_FILE_NEW_STREAM), 1) + .build()); + assertFileSystemAccesses("SELECT * FROM " + tableName + " FOR VERSION AS OF " + v2SnapshotId, + ImmutableMultiset.builder() + .addCopies(new FileOperation(SNAPSHOT, INPUT_FILE_GET_LENGTH), 1) + .addCopies(new FileOperation(MANIFEST, INPUT_FILE_GET_LENGTH), 2) + .addCopies(new FileOperation(METADATA_JSON, INPUT_FILE_NEW_STREAM), 1) + .addCopies(new FileOperation(SNAPSHOT, INPUT_FILE_NEW_STREAM), 1) + .addCopies(new FileOperation(MANIFEST, INPUT_FILE_NEW_STREAM), 2) + .build()); + assertFileSystemAccesses("SELECT * FROM " + tableName + " FOR VERSION AS OF " + v3SnapshotId, + ImmutableMultiset.builder() + .addCopies(new FileOperation(SNAPSHOT, INPUT_FILE_GET_LENGTH), 1) + .addCopies(new FileOperation(MANIFEST, INPUT_FILE_GET_LENGTH), 4) + .addCopies(new FileOperation(METADATA_JSON, INPUT_FILE_NEW_STREAM), 1) + .addCopies(new FileOperation(SNAPSHOT, INPUT_FILE_NEW_STREAM), 1) + .addCopies(new FileOperation(MANIFEST, INPUT_FILE_NEW_STREAM), 4) + .build()); + assertFileSystemAccesses("SELECT * FROM " + tableName, + ImmutableMultiset.builder() + .addCopies(new FileOperation(SNAPSHOT, INPUT_FILE_GET_LENGTH), 1) + .addCopies(new FileOperation(MANIFEST, INPUT_FILE_GET_LENGTH), 4) + .addCopies(new FileOperation(METADATA_JSON, INPUT_FILE_NEW_STREAM), 1) + .addCopies(new FileOperation(SNAPSHOT, INPUT_FILE_NEW_STREAM), 1) + .addCopies(new FileOperation(MANIFEST, INPUT_FILE_NEW_STREAM), 4) + .build()); + } + @Test public void testSelectWithFilter() { @@ -267,6 +353,12 @@ private Multiset getOperations() .collect(toCollection(HashMultiset::create)); } + private long getLatestSnapshotId(String tableName) + { + return (long) computeActual(format("SELECT snapshot_id FROM \"%s$snapshots\" ORDER BY committed_at DESC LIMIT 1", tableName)) + .getOnlyValue(); + } + static class FileOperation { private final FileType fileType; diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergNodeLocalDynamicSplitPruning.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergNodeLocalDynamicSplitPruning.java index 784c09188da0..272bb56cc6ac 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergNodeLocalDynamicSplitPruning.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergNodeLocalDynamicSplitPruning.java @@ -173,7 +173,7 @@ private static ConnectorPageSource createTestingPageSource(HiveTransactionHandle TableType.DATA, Optional.empty(), SchemaParser.toJson(TABLE_SCHEMA), - PartitionSpecParser.toJson(PartitionSpec.unpartitioned()), + Optional.of(PartitionSpecParser.toJson(PartitionSpec.unpartitioned())), 2, TupleDomain.withColumnDomains(ImmutableMap.of(KEY_ICEBERG_COLUMN_HANDLE, Domain.singleValue(INTEGER, (long) KEY_COLUMN_VALUE))), TupleDomain.all(), diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergSplitSource.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergSplitSource.java index dbbfef8ed2a9..2b8f12343990 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergSplitSource.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergSplitSource.java @@ -128,7 +128,7 @@ public void testIncompleteDynamicFilterTimeout() TableType.DATA, Optional.empty(), SchemaParser.toJson(nationTable.schema()), - PartitionSpecParser.toJson(nationTable.spec()), + Optional.of(PartitionSpecParser.toJson(nationTable.spec())), 1, TupleDomain.all(), TupleDomain.all(), diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/optimizer/TestConnectorPushdownRulesWithIceberg.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/optimizer/TestConnectorPushdownRulesWithIceberg.java index b1c25554cfc2..065ede30d5c1 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/optimizer/TestConnectorPushdownRulesWithIceberg.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/optimizer/TestConnectorPushdownRulesWithIceberg.java @@ -171,7 +171,7 @@ public void testProjectionPushdown() DATA, Optional.of(1L), "", - "", + Optional.of(""), 1, TupleDomain.all(), TupleDomain.all(), @@ -252,7 +252,7 @@ public void testPredicatePushdown() DATA, Optional.of(1L), "", - "", + Optional.of(""), 1, TupleDomain.all(), TupleDomain.all(), @@ -301,7 +301,7 @@ public void testColumnPruningProjectionPushdown() DATA, Optional.empty(), "", - "", + Optional.of(""), 1, TupleDomain.all(), TupleDomain.all(), @@ -361,7 +361,7 @@ public void testPushdownWithDuplicateExpressions() DATA, Optional.of(1L), "", - "", + Optional.of(""), 1, TupleDomain.all(), TupleDomain.all(),