diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java index 1ae1a18d5339..d2af5d1d04c4 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java @@ -557,10 +557,8 @@ public List listTables(ConnectorSession session, Optional getColumnHandles(ConnectorSession session, ConnectorTableHandle tableHandle) { IcebergTableHandle table = (IcebergTableHandle) tableHandle; - Table icebergTable = catalog.loadTable(session, table.getSchemaTableName()); - ImmutableMap.Builder columnHandles = ImmutableMap.builder(); - for (IcebergColumnHandle columnHandle : getColumns(icebergTable.schema(), typeManager)) { + for (IcebergColumnHandle columnHandle : getColumns(SchemaParser.fromJson(table.getTableSchemaJson()), typeManager)) { columnHandles.put(columnHandle.getName(), columnHandle); } columnHandles.put(FILE_PATH.getColumnName(), pathColumnHandle()); diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSourceProvider.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSourceProvider.java index 5dcf5d6cbfa6..8d1270500ef3 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSourceProvider.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSourceProvider.java @@ -312,7 +312,7 @@ else if (identity.getId() == TRINO_MERGE_PARTITION_DATA) { partitionSpec.specId(), split.getPartitionDataJson(), split.getFileFormat(), - split.getSchemaAsJson().map(SchemaParser::fromJson), + SchemaParser.fromJson(table.getTableSchemaJson()), requiredColumns, effectivePredicate, table.getNameMappingJson().map(NameMappingParser::fromJson), @@ -495,7 +495,7 @@ private ConnectorPageSource openDeletes( 0, "", IcebergFileFormat.fromIceberg(delete.format()), - Optional.of(schemaFromHandles(columns)), + schemaFromHandles(columns), columns, tupleDomain, Optional.empty(), @@ -513,7 +513,7 @@ public ReaderPageSourceWithRowPositions createDataPageSource( int partitionSpecId, String partitionData, IcebergFileFormat fileFormat, - Optional fileSchema, + Schema fileSchema, List dataColumns, TupleDomain predicate, Optional nameMapping, @@ -564,7 +564,7 @@ public ReaderPageSourceWithRowPositions createDataPageSource( length, partitionSpecId, partitionData, - fileSchema.orElseThrow(), + fileSchema, nameMapping, dataColumns); default: diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergSplit.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergSplit.java index a34cf77a7b70..ac9dc0612440 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergSplit.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergSplit.java @@ -25,7 +25,6 @@ import org.openjdk.jol.info.ClassLayout; import java.util.List; -import java.util.Optional; import static com.google.common.base.MoreObjects.toStringHelper; import static io.airlift.slice.SizeOf.estimatedSizeOf; @@ -44,7 +43,6 @@ public class IcebergSplit private final List addresses; private final String partitionSpecJson; private final String partitionDataJson; - private final Optional schemaAsJson; private final List deletes; private final SplitWeight splitWeight; @@ -58,7 +56,6 @@ public IcebergSplit( @JsonProperty("addresses") List addresses, @JsonProperty("partitionSpecJson") String partitionSpecJson, @JsonProperty("partitionDataJson") String partitionDataJson, - @JsonProperty("schemaAsJson") Optional schemaAsJson, @JsonProperty("deletes") List deletes, @JsonProperty("splitWeight") SplitWeight splitWeight) { @@ -70,7 +67,6 @@ public IcebergSplit( this.addresses = ImmutableList.copyOf(requireNonNull(addresses, "addresses is null")); this.partitionSpecJson = requireNonNull(partitionSpecJson, "partitionSpecJson is null"); this.partitionDataJson = requireNonNull(partitionDataJson, "partitionDataJson is null"); - this.schemaAsJson = requireNonNull(schemaAsJson, "schemaAsJson is null"); this.deletes = ImmutableList.copyOf(requireNonNull(deletes, "deletes is null")); this.splitWeight = requireNonNull(splitWeight, "splitWeight is null"); } @@ -124,12 +120,6 @@ public String getPartitionSpecJson() return partitionSpecJson; } - @JsonProperty - public Optional getSchemaAsJson() - { - return schemaAsJson; - } - @JsonProperty public String getPartitionDataJson() { diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergSplitSource.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergSplitSource.java index 6265e877c6bf..7b9ab10efaca 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergSplitSource.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergSplitSource.java @@ -43,7 +43,6 @@ import org.apache.iceberg.FileScanTask; import org.apache.iceberg.PartitionSpecParser; import org.apache.iceberg.Schema; -import org.apache.iceberg.SchemaParser; import org.apache.iceberg.TableScan; import org.apache.iceberg.expressions.Expression; import org.apache.iceberg.io.CloseableIterable; @@ -76,7 +75,6 @@ import static io.trino.plugin.iceberg.IcebergColumnHandle.fileModifiedTimeColumnHandle; import static io.trino.plugin.iceberg.IcebergColumnHandle.pathColumnHandle; import static io.trino.plugin.iceberg.IcebergErrorCode.ICEBERG_FILESYSTEM_ERROR; -import static io.trino.plugin.iceberg.IcebergFileFormat.AVRO; import static io.trino.plugin.iceberg.IcebergMetadataColumn.isMetadataColumnId; import static io.trino.plugin.iceberg.IcebergSplitManager.ICEBERG_DOMAIN_COMPACTION_THRESHOLD; import static io.trino.plugin.iceberg.IcebergTypes.convertIcebergValueToTrino; @@ -424,7 +422,6 @@ static boolean partitionMatchesPredicate( private IcebergSplit toIcebergSplit(FileScanTask task) { - IcebergFileFormat fileFormat = IcebergFileFormat.fromIceberg(task.file().format()); return new IcebergSplit( task.file().path().toString(), task.start(), @@ -434,7 +431,6 @@ private IcebergSplit toIcebergSplit(FileScanTask task) ImmutableList.of(), PartitionSpecParser.toJson(task.spec()), PartitionData.toJson(task.file().partition()), - fileFormat != AVRO ? Optional.empty() : Optional.of(SchemaParser.toJson(task.spec().schema())), task.deletes().stream() .map(DeleteFile::fromIceberg) .collect(toImmutableList()), diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorTest.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorTest.java index cd687c220560..62ce755c4f90 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorTest.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorTest.java @@ -100,6 +100,7 @@ import static io.trino.spi.predicate.Domain.multipleValues; import static io.trino.spi.predicate.Domain.singleValue; import static io.trino.spi.type.BigintType.BIGINT; +import static io.trino.spi.type.BooleanType.BOOLEAN; import static io.trino.spi.type.DoubleType.DOUBLE; import static io.trino.spi.type.IntegerType.INTEGER; import static io.trino.spi.type.TimeZoneKey.UTC_KEY; @@ -5427,6 +5428,55 @@ public void testReadFromVersionedTableWithSchemaEvolution() .matches("VALUES (VARCHAR 'a', 11, NULL), (VARCHAR 'b', 22, BIGINT '32')"); } + @Test + public void testReadFromVersionedTableWithSchemaEvolutionDropColumn() + { + String tableName = "test_versioned_table_schema_evolution_drop_column_" + randomTableSuffix(); + + assertQuerySucceeds("CREATE TABLE " + tableName + "(col1 varchar, col2 integer, col3 boolean)"); + long v1SnapshotId = getCurrentSnapshotId(tableName); + assertThat(query("SELECT * FROM " + tableName + " FOR VERSION AS OF " + v1SnapshotId)) + .hasOutputTypes(ImmutableList.of(VARCHAR, INTEGER, BOOLEAN)) + .returnsEmptyResult(); + + assertUpdate("INSERT INTO " + tableName + " VALUES ('a', 1 , true)", 1); + long v2SnapshotId = getCurrentSnapshotId(tableName); + assertThat(query("SELECT * FROM " + tableName + " FOR VERSION AS OF " + v2SnapshotId)) + .hasOutputTypes(ImmutableList.of(VARCHAR, INTEGER, BOOLEAN)) + .matches("VALUES (VARCHAR 'a', 1, true)"); + + assertUpdate("ALTER TABLE " + tableName + " DROP COLUMN col3"); + assertUpdate("INSERT INTO " + tableName + " VALUES ('b', 2)", 1); + long v3SnapshotId = getCurrentSnapshotId(tableName); + assertThat(query("SELECT * FROM " + tableName + " FOR VERSION AS OF " + v3SnapshotId)) + .hasOutputTypes(ImmutableList.of(VARCHAR, INTEGER)) + .matches("VALUES (VARCHAR 'a', 1), (VARCHAR 'b', 2)"); + assertThat(query("SELECT * FROM " + tableName)) + .hasOutputTypes(ImmutableList.of(VARCHAR, INTEGER)) + .matches("VALUES (VARCHAR 'a', 1), (VARCHAR 'b', 2)"); + assertThat(query("SELECT * FROM " + tableName + " FOR VERSION AS OF " + v2SnapshotId)) + .hasOutputTypes(ImmutableList.of(VARCHAR, INTEGER, BOOLEAN)) + .matches("VALUES (VARCHAR 'a', 1, true)"); + + assertUpdate("ALTER TABLE " + tableName + " DROP COLUMN col2"); + assertUpdate("INSERT INTO " + tableName + " VALUES ('c')", 1); + long v4SnapshotId = getCurrentSnapshotId(tableName); + assertThat(query("SELECT * FROM " + tableName + " FOR VERSION AS OF " + v4SnapshotId)) + .hasOutputTypes(ImmutableList.of(VARCHAR)) + .matches("VALUES (VARCHAR 'a'), (VARCHAR 'b'), (VARCHAR 'c')"); + assertThat(query("SELECT * FROM " + tableName)) + .hasOutputTypes(ImmutableList.of(VARCHAR)) + .matches("VALUES (VARCHAR 'a'), (VARCHAR 'b'), (VARCHAR 'c')"); + assertThat(query("SELECT * FROM " + tableName + " FOR VERSION AS OF " + v3SnapshotId)) + .hasOutputTypes(ImmutableList.of(VARCHAR, INTEGER)) + .matches("VALUES (VARCHAR 'a', 1), (VARCHAR 'b', 2)"); + assertThat(query("SELECT * FROM " + tableName + " FOR VERSION AS OF " + v2SnapshotId)) + .hasOutputTypes(ImmutableList.of(VARCHAR, INTEGER, BOOLEAN)) + .matches("VALUES (VARCHAR 'a', 1, true)"); + + assertUpdate("DROP TABLE " + tableName); + } + @Test public void testReadFromVersionedTableWithPartitionSpecEvolution() throws Exception diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergNodeLocalDynamicSplitPruning.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergNodeLocalDynamicSplitPruning.java index 6cc8bb17ff2d..d16648c262e5 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergNodeLocalDynamicSplitPruning.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergNodeLocalDynamicSplitPruning.java @@ -161,7 +161,6 @@ private static ConnectorPageSource createTestingPageSource(HiveTransactionHandle ImmutableList.of(), PartitionSpecParser.toJson(PartitionSpec.unpartitioned()), PartitionData.toJson(new PartitionData(new Object[] {})), - Optional.empty(), ImmutableList.of(), SplitWeight.standard());