diff --git a/core/src/main/java/org/apache/iceberg/AllDataFilesTable.java b/core/src/main/java/org/apache/iceberg/AllDataFilesTable.java index fc2dc699a1bd..d1b084f0206c 100644 --- a/core/src/main/java/org/apache/iceberg/AllDataFilesTable.java +++ b/core/src/main/java/org/apache/iceberg/AllDataFilesTable.java @@ -29,6 +29,7 @@ import org.apache.iceberg.relocated.com.google.common.collect.Iterables; import org.apache.iceberg.relocated.com.google.common.collect.Sets; import org.apache.iceberg.types.TypeUtil; +import org.apache.iceberg.types.Types.StructType; import org.apache.iceberg.util.ParallelIterable; import org.apache.iceberg.util.ThreadPools; @@ -56,8 +57,9 @@ public TableScan newScan() { @Override public Schema schema() { - Schema schema = new Schema(DataFile.getType(table().spec().partitionType()).fields()); - if (table().spec().fields().size() < 1) { + StructType partitionType = Partitioning.partitionType(table()); + Schema schema = new Schema(DataFile.getType(partitionType).fields()); + if (partitionType.fields().size() < 1) { // avoid returning an empty struct, which is not always supported. instead, drop the partition field (id 102) return TypeUtil.selectNot(schema, Sets.newHashSet(102)); } else { diff --git a/core/src/main/java/org/apache/iceberg/AllEntriesTable.java b/core/src/main/java/org/apache/iceberg/AllEntriesTable.java index c1b714534def..84c1609fd4e7 100644 --- a/core/src/main/java/org/apache/iceberg/AllEntriesTable.java +++ b/core/src/main/java/org/apache/iceberg/AllEntriesTable.java @@ -29,6 +29,7 @@ import org.apache.iceberg.relocated.com.google.common.collect.Iterables; import org.apache.iceberg.relocated.com.google.common.collect.Sets; import org.apache.iceberg.types.TypeUtil; +import org.apache.iceberg.types.Types.StructType; import org.apache.iceberg.util.ParallelIterable; import org.apache.iceberg.util.ThreadPools; @@ -55,8 +56,9 @@ public TableScan newScan() { @Override public Schema schema() { - Schema schema = ManifestEntry.getSchema(table().spec().partitionType()); - if (table().spec().fields().size() < 1) { + StructType partitionType = Partitioning.partitionType(table()); + Schema schema = ManifestEntry.getSchema(partitionType); + if (partitionType.fields().size() < 1) { // avoid returning an empty struct, which is not always supported. instead, drop the partition field (id 102) return TypeUtil.selectNot(schema, Sets.newHashSet(102)); } else { diff --git a/core/src/main/java/org/apache/iceberg/DataFilesTable.java b/core/src/main/java/org/apache/iceberg/DataFilesTable.java index 145663ce4d9b..b619e1ebc469 100644 --- a/core/src/main/java/org/apache/iceberg/DataFilesTable.java +++ b/core/src/main/java/org/apache/iceberg/DataFilesTable.java @@ -27,6 +27,7 @@ import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.relocated.com.google.common.collect.Sets; import org.apache.iceberg.types.TypeUtil; +import org.apache.iceberg.types.Types.StructType; /** * A {@link Table} implementation that exposes a table's data files as rows. @@ -48,8 +49,9 @@ public TableScan newScan() { @Override public Schema schema() { - Schema schema = new Schema(DataFile.getType(table().spec().partitionType()).fields()); - if (table().spec().fields().size() < 1) { + StructType partitionType = Partitioning.partitionType(table()); + Schema schema = new Schema(DataFile.getType(partitionType).fields()); + if (partitionType.fields().size() < 1) { // avoid returning an empty struct, which is not always supported. instead, drop the partition field return TypeUtil.selectNot(schema, Sets.newHashSet(DataFile.PARTITION_ID)); } else { diff --git a/core/src/main/java/org/apache/iceberg/ManifestEntriesTable.java b/core/src/main/java/org/apache/iceberg/ManifestEntriesTable.java index 7bae3491a787..a44fc6421428 100644 --- a/core/src/main/java/org/apache/iceberg/ManifestEntriesTable.java +++ b/core/src/main/java/org/apache/iceberg/ManifestEntriesTable.java @@ -29,6 +29,7 @@ import org.apache.iceberg.relocated.com.google.common.collect.Sets; import org.apache.iceberg.types.Type; import org.apache.iceberg.types.TypeUtil; +import org.apache.iceberg.types.Types.StructType; import org.apache.iceberg.util.StructProjection; /** @@ -54,8 +55,9 @@ public TableScan newScan() { @Override public Schema schema() { - Schema schema = ManifestEntry.getSchema(table().spec().partitionType()); - if (table().spec().fields().size() < 1) { + StructType partitionType = Partitioning.partitionType(table()); + Schema schema = ManifestEntry.getSchema(partitionType); + if (partitionType.fields().size() < 1) { // avoid returning an empty struct, which is not always supported. instead, drop the partition field (id 102) return TypeUtil.selectNot(schema, Sets.newHashSet(102)); } else { diff --git a/core/src/main/java/org/apache/iceberg/Partitioning.java b/core/src/main/java/org/apache/iceberg/Partitioning.java index d393fe180507..28598d424dcf 100644 --- a/core/src/main/java/org/apache/iceberg/Partitioning.java +++ b/core/src/main/java/org/apache/iceberg/Partitioning.java @@ -19,9 +19,20 @@ package org.apache.iceberg; +import java.util.Collections; +import java.util.Comparator; import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import org.apache.iceberg.exceptions.ValidationException; import org.apache.iceberg.expressions.Expressions; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.transforms.PartitionSpecVisitor; +import org.apache.iceberg.transforms.Transform; +import org.apache.iceberg.transforms.Transforms; +import org.apache.iceberg.types.Types.NestedField; +import org.apache.iceberg.types.Types.StructType; public class Partitioning { private Partitioning() { @@ -177,4 +188,62 @@ public Void alwaysNull(int fieldId, String sourceName, int sourceId) { return null; } } + + /** + * Builds a common partition type for all specs in a table. + *

+ * Whenever a table has multiple specs, the partition type is a struct containing + * all columns that have ever been a part of any spec in the table. + * + * @param table a table with one or many specs + * @return the constructed common partition type + */ + public static StructType partitionType(Table table) { + if (table.specs().size() == 1) { + return table.spec().partitionType(); + } + + Map fieldMap = Maps.newHashMap(); + List structFields = Lists.newArrayList(); + + // sort the spec IDs in descending order to pick up the most recent field names + List specIds = table.specs().keySet().stream() + .sorted(Collections.reverseOrder()) + .collect(Collectors.toList()); + + for (Integer specId : specIds) { + PartitionSpec spec = table.specs().get(specId); + + for (PartitionField field : spec.fields()) { + int fieldId = field.fieldId(); + PartitionField existingField = fieldMap.get(fieldId); + + if (existingField == null) { + fieldMap.put(fieldId, field); + NestedField structField = spec.partitionType().field(fieldId); + structFields.add(structField); + } else { + // verify the fields are compatible as they may conflict in v1 tables + ValidationException.check(equivalentIgnoringNames(field, existingField), + "Conflicting partition fields: ['%s', '%s']", + field, existingField); + } + } + } + + List sortedStructFields = structFields.stream() + .sorted(Comparator.comparingInt(NestedField::fieldId)) + .collect(Collectors.toList()); + return StructType.of(sortedStructFields); + } + + private static boolean equivalentIgnoringNames(PartitionField field, PartitionField anotherField) { + return field.fieldId() == anotherField.fieldId() && + field.sourceId() == anotherField.sourceId() && + compatibleTransforms(field.transform(), anotherField.transform()); + } + + private static boolean compatibleTransforms(Transform t1, Transform t2) { + return t1.equals(t2) || t1.equals(Transforms.alwaysNull()) || t2.equals(Transforms.alwaysNull()); + } } diff --git a/core/src/test/java/org/apache/iceberg/TestPartitioning.java b/core/src/test/java/org/apache/iceberg/TestPartitioning.java new file mode 100644 index 000000000000..2610ad5c01cd --- /dev/null +++ b/core/src/test/java/org/apache/iceberg/TestPartitioning.java @@ -0,0 +1,196 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg; + +import java.io.File; +import java.io.IOException; +import org.apache.iceberg.exceptions.ValidationException; +import org.apache.iceberg.expressions.Expressions; +import org.apache.iceberg.types.Types; +import org.apache.iceberg.types.Types.NestedField; +import org.apache.iceberg.types.Types.StructType; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import static org.apache.iceberg.types.Types.NestedField.required; + +public class TestPartitioning { + + private static final int V1_FORMAT_VERSION = 1; + private static final int V2_FORMAT_VERSION = 2; + private static final Schema SCHEMA = new Schema( + required(1, "id", Types.IntegerType.get()), + required(2, "data", Types.StringType.get()), + required(3, "category", Types.StringType.get()) + ); + + @Rule + public TemporaryFolder temp = new TemporaryFolder(); + private File tableDir = null; + + @Before + public void setupTableDir() throws IOException { + this.tableDir = temp.newFolder(); + } + + @After + public void cleanupTables() { + TestTables.clearTables(); + } + + @Test + public void testPartitionTypeWithSpecEvolutionInV1Tables() { + PartitionSpec initialSpec = PartitionSpec.builderFor(SCHEMA) + .identity("data") + .build(); + TestTables.TestTable table = TestTables.create(tableDir, "test", SCHEMA, initialSpec, V1_FORMAT_VERSION); + + table.updateSpec() + .addField(Expressions.bucket("category", 8)) + .commit(); + + Assert.assertEquals("Should have 2 specs", 2, table.specs().size()); + + StructType expectedType = StructType.of( + NestedField.optional(1000, "data", Types.StringType.get()), + NestedField.optional(1001, "category_bucket_8", Types.IntegerType.get()) + ); + StructType actualType = Partitioning.partitionType(table); + Assert.assertEquals("Types must match", expectedType, actualType); + } + + @Test + public void testPartitionTypeWithSpecEvolutionInV2Tables() { + PartitionSpec initialSpec = PartitionSpec.builderFor(SCHEMA) + .identity("data") + .build(); + TestTables.TestTable table = TestTables.create(tableDir, "test", SCHEMA, initialSpec, V2_FORMAT_VERSION); + + table.updateSpec() + .removeField("data") + .addField("category") + .commit(); + + Assert.assertEquals("Should have 2 specs", 2, table.specs().size()); + + StructType expectedType = StructType.of( + NestedField.optional(1000, "data", Types.StringType.get()), + NestedField.optional(1001, "category", Types.StringType.get()) + ); + StructType actualType = Partitioning.partitionType(table); + Assert.assertEquals("Types must match", expectedType, actualType); + } + + @Test + public void testPartitionTypeWithRenamesInV1Table() { + PartitionSpec initialSpec = PartitionSpec.builderFor(SCHEMA) + .identity("data", "p1") + .build(); + TestTables.TestTable table = TestTables.create(tableDir, "test", SCHEMA, initialSpec, V1_FORMAT_VERSION); + + table.updateSpec() + .addField("category") + .commit(); + + table.updateSpec() + .renameField("p1", "p2") + .commit(); + + StructType expectedType = StructType.of( + NestedField.optional(1000, "p2", Types.StringType.get()), + NestedField.optional(1001, "category", Types.StringType.get()) + ); + StructType actualType = Partitioning.partitionType(table); + Assert.assertEquals("Types must match", expectedType, actualType); + } + + @Test + public void testPartitionTypeWithAddingBackSamePartitionFieldInV1Table() { + PartitionSpec initialSpec = PartitionSpec.builderFor(SCHEMA) + .identity("data") + .build(); + TestTables.TestTable table = TestTables.create(tableDir, "test", SCHEMA, initialSpec, V1_FORMAT_VERSION); + + table.updateSpec() + .removeField("data") + .commit(); + + table.updateSpec() + .addField("data") + .commit(); + + // in v1, we use void transforms instead of dropping partition fields + StructType expectedType = StructType.of( + NestedField.optional(1000, "data_1000", Types.StringType.get()), + NestedField.optional(1001, "data", Types.StringType.get()) + ); + StructType actualType = Partitioning.partitionType(table); + Assert.assertEquals("Types must match", expectedType, actualType); + } + + @Test + public void testPartitionTypeWithAddingBackSamePartitionFieldInV2Table() { + PartitionSpec initialSpec = PartitionSpec.builderFor(SCHEMA) + .identity("data") + .build(); + TestTables.TestTable table = TestTables.create(tableDir, "test", SCHEMA, initialSpec, V2_FORMAT_VERSION); + + table.updateSpec() + .removeField("data") + .commit(); + + table.updateSpec() + .addField("data") + .commit(); + + // in v2, we should be able to reuse the original partition spec + StructType expectedType = StructType.of( + NestedField.optional(1000, "data", Types.StringType.get()) + ); + StructType actualType = Partitioning.partitionType(table); + Assert.assertEquals("Types must match", expectedType, actualType); + } + + @Test + public void testPartitionTypeWithIncompatibleSpecEvolution() { + PartitionSpec initialSpec = PartitionSpec.builderFor(SCHEMA) + .identity("data") + .build(); + TestTables.TestTable table = TestTables.create(tableDir, "test", SCHEMA, initialSpec, V1_FORMAT_VERSION); + + PartitionSpec newSpec = PartitionSpec.builderFor(table.schema()) + .identity("category") + .build(); + + TableOperations ops = ((HasTableOperations) table).operations(); + TableMetadata current = ops.current(); + ops.commit(current, current.updatePartitionSpec(newSpec)); + + Assert.assertEquals("Should have 2 specs", 2, table.specs().size()); + + AssertHelpers.assertThrows("Should complain about incompatible specs", + ValidationException.class, "Conflicting partition fields", + () -> Partitioning.partitionType(table)); + } +} diff --git a/spark3/src/test/java/org/apache/iceberg/spark/source/TestMetadataTablesWithPartitionEvolution.java b/spark3/src/test/java/org/apache/iceberg/spark/source/TestMetadataTablesWithPartitionEvolution.java new file mode 100644 index 000000000000..0581ebeb709c --- /dev/null +++ b/spark3/src/test/java/org/apache/iceberg/spark/source/TestMetadataTablesWithPartitionEvolution.java @@ -0,0 +1,310 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.spark.source; + +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.concurrent.ThreadLocalRandom; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.MetadataTableType; +import org.apache.iceberg.Table; +import org.apache.iceberg.expressions.Expressions; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.spark.SparkCatalog; +import org.apache.iceberg.spark.SparkCatalogTestBase; +import org.apache.iceberg.spark.SparkSessionCatalog; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.catalyst.parser.ParseException; +import org.apache.spark.sql.types.DataType; +import org.apache.spark.sql.types.StructType; +import org.junit.After; +import org.junit.Assert; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameters; + +import static org.apache.iceberg.FileFormat.AVRO; +import static org.apache.iceberg.FileFormat.ORC; +import static org.apache.iceberg.FileFormat.PARQUET; +import static org.apache.iceberg.MetadataTableType.ALL_DATA_FILES; +import static org.apache.iceberg.MetadataTableType.ALL_ENTRIES; +import static org.apache.iceberg.MetadataTableType.ENTRIES; +import static org.apache.iceberg.MetadataTableType.FILES; +import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT; +import static org.apache.iceberg.TableProperties.FORMAT_VERSION; + +@RunWith(Parameterized.class) +public class TestMetadataTablesWithPartitionEvolution extends SparkCatalogTestBase { + + @Parameters(name = "catalog = {0}, impl = {1}, conf = {2}, fileFormat = {3}, formatVersion = {4}") + public static Object[][] parameters() { + return new Object[][] { + { "testhive", SparkCatalog.class.getName(), + ImmutableMap.of( + "type", "hive", + "default-namespace", "default" + ), + ORC, + formatVersion() + }, + { "testhadoop", SparkCatalog.class.getName(), + ImmutableMap.of( + "type", "hadoop" + ), + PARQUET, + formatVersion() + }, + { "spark_catalog", SparkSessionCatalog.class.getName(), + ImmutableMap.of( + "type", "hive", + "default-namespace", "default", + "clients", "1", + "parquet-enabled", "false", + "cache-enabled", "false" // Spark will delete tables using v1, leaving the cache out of sync + ), + AVRO, + formatVersion() + } + }; + } + + private static int formatVersion() { + return RANDOM.nextInt(2) + 1; + } + + private static final Random RANDOM = ThreadLocalRandom.current(); + + private final FileFormat fileFormat; + private final int formatVersion; + + public TestMetadataTablesWithPartitionEvolution(String catalogName, String implementation, Map config, + FileFormat fileFormat, int formatVersion) { + super(catalogName, implementation, config); + this.fileFormat = fileFormat; + this.formatVersion = formatVersion; + } + + @After + public void removeTable() { + sql("DROP TABLE IF EXISTS %s", tableName); + } + + @Test + public void testFilesMetadataTable() throws ParseException { + sql("CREATE TABLE %s (id bigint NOT NULL, category string, data string) USING iceberg", tableName); + initTable(); + + sql("INSERT INTO TABLE %s VALUES (1, 'a1', 'b1')", tableName); + + // verify the metadata tables while the current spec is still unpartitioned + for (MetadataTableType tableType : Arrays.asList(FILES, ALL_DATA_FILES)) { + Dataset df = loadMetadataTable(tableType); + Assert.assertTrue("Partition must be skipped", df.schema().getFieldIndex("partition").isEmpty()); + } + + Table table = validationCatalog.loadTable(tableIdent); + + table.updateSpec() + .addField("data") + .commit(); + sql("REFRESH TABLE %s", tableName); + sql("INSERT INTO TABLE %s VALUES (1, 'a1', 'b1')", tableName); + + // verify the metadata tables after adding the first partition column + for (MetadataTableType tableType : Arrays.asList(FILES, ALL_DATA_FILES)) { + assertPartitions( + ImmutableList.of(row(new Object[]{null}), row("b1")), + "STRUCT", + tableType); + } + + table.updateSpec() + .addField(Expressions.bucket("category", 8)) + .commit(); + sql("REFRESH TABLE %s", tableName); + sql("INSERT INTO TABLE %s VALUES (1, 'a1', 'b1')", tableName); + + // verify the metadata tables after adding the second partition column + for (MetadataTableType tableType : Arrays.asList(FILES, ALL_DATA_FILES)) { + assertPartitions( + ImmutableList.of(row(null, null), row("b1", null), row("b1", 2)), + "STRUCT", + tableType); + } + + table.updateSpec() + .removeField("data") + .commit(); + sql("REFRESH TABLE %s", tableName); + sql("INSERT INTO TABLE %s VALUES (1, 'a1', 'b1')", tableName); + + // verify the metadata tables after dropping the first partition column + for (MetadataTableType tableType : Arrays.asList(FILES, ALL_DATA_FILES)) { + assertPartitions( + ImmutableList.of(row(null, null), row(null, 2), row("b1", null), row("b1", 2)), + "STRUCT", + tableType); + } + + table.updateSpec() + .renameField("category_bucket_8", "category_bucket_8_another_name") + .commit(); + sql("REFRESH TABLE %s", tableName); + + // verify the metadata tables after renaming the second partition column + for (MetadataTableType tableType : Arrays.asList(FILES, ALL_DATA_FILES)) { + assertPartitions( + ImmutableList.of(row(null, null), row(null, 2), row("b1", null), row("b1", 2)), + "STRUCT", + tableType); + } + } + + @Test + public void testEntriesMetadataTable() throws ParseException { + sql("CREATE TABLE %s (id bigint NOT NULL, category string, data string) USING iceberg", tableName); + initTable(); + + sql("INSERT INTO TABLE %s VALUES (1, 'a1', 'b1')", tableName); + + // verify the metadata tables while the current spec is still unpartitioned + for (MetadataTableType tableType : Arrays.asList(ENTRIES, ALL_ENTRIES)) { + Dataset df = loadMetadataTable(tableType); + StructType dataFileType = (StructType) df.schema().apply("data_file").dataType(); + Assert.assertTrue("Partition must be skipped", dataFileType.getFieldIndex("").isEmpty()); + } + + Table table = validationCatalog.loadTable(tableIdent); + + table.updateSpec() + .addField("data") + .commit(); + sql("REFRESH TABLE %s", tableName); + sql("INSERT INTO TABLE %s VALUES (1, 'a1', 'b1')", tableName); + + // verify the metadata tables after adding the first partition column + for (MetadataTableType tableType : Arrays.asList(ENTRIES, ALL_ENTRIES)) { + assertPartitions( + ImmutableList.of(row(new Object[]{null}), row("b1")), + "STRUCT", + tableType); + } + + table.updateSpec() + .addField(Expressions.bucket("category", 8)) + .commit(); + sql("REFRESH TABLE %s", tableName); + sql("INSERT INTO TABLE %s VALUES (1, 'a1', 'b1')", tableName); + + // verify the metadata tables after adding the second partition column + for (MetadataTableType tableType : Arrays.asList(ENTRIES, ALL_ENTRIES)) { + assertPartitions( + ImmutableList.of(row(null, null), row("b1", null), row("b1", 2)), + "STRUCT", + tableType); + } + + table.updateSpec() + .removeField("data") + .commit(); + sql("REFRESH TABLE %s", tableName); + sql("INSERT INTO TABLE %s VALUES (1, 'a1', 'b1')", tableName); + + // verify the metadata tables after dropping the first partition column + for (MetadataTableType tableType : Arrays.asList(ENTRIES, ALL_ENTRIES)) { + assertPartitions( + ImmutableList.of(row(null, null), row(null, 2), row("b1", null), row("b1", 2)), + "STRUCT", + tableType); + } + + table.updateSpec() + .renameField("category_bucket_8", "category_bucket_8_another_name") + .commit(); + sql("REFRESH TABLE %s", tableName); + + // verify the metadata tables after renaming the second partition column + for (MetadataTableType tableType : Arrays.asList(ENTRIES, ALL_ENTRIES)) { + assertPartitions( + ImmutableList.of(row(null, null), row(null, 2), row("b1", null), row("b1", 2)), + "STRUCT", + tableType); + } + } + + private void assertPartitions(List expectedPartitions, String expectedTypeAsString, + MetadataTableType tableType) throws ParseException { + Dataset df = loadMetadataTable(tableType); + + DataType expectedType = spark.sessionState().sqlParser().parseDataType(expectedTypeAsString); + switch (tableType) { + case FILES: + case ALL_DATA_FILES: + DataType actualFilesType = df.schema().apply("partition").dataType(); + Assert.assertEquals("Partition type must match", expectedType, actualFilesType); + break; + + case ENTRIES: + case ALL_ENTRIES: + StructType dataFileType = (StructType) df.schema().apply("data_file").dataType(); + DataType actualEntriesType = dataFileType.apply("partition").dataType(); + Assert.assertEquals("Partition type must match", expectedType, actualEntriesType); + break; + + default: + throw new UnsupportedOperationException("Unsupported metadata table type: " + tableType); + } + + switch (tableType) { + case FILES: + case ALL_DATA_FILES: + List actualFilesPartitions = df.orderBy("partition") + .select("partition.*") + .collectAsList(); + assertEquals("Partitions must match", expectedPartitions, rowsToJava(actualFilesPartitions)); + break; + + case ENTRIES: + case ALL_ENTRIES: + List actualEntriesPartitions = df.orderBy("data_file.partition") + .select("data_file.partition.*") + .collectAsList(); + assertEquals("Partitions must match", expectedPartitions, rowsToJava(actualEntriesPartitions)); + break; + + default: + throw new UnsupportedOperationException("Unsupported metadata table type: " + tableType); + } + } + + private Dataset loadMetadataTable(MetadataTableType tableType) { + return spark.read().format("iceberg").load(tableName + "." + tableType.name()); + } + + private void initTable() { + sql("ALTER TABLE %s SET TBLPROPERTIES('%s' '%s')", tableName, DEFAULT_FILE_FORMAT, fileFormat.name()); + sql("ALTER TABLE %s SET TBLPROPERTIES('%s' '%d')", tableName, FORMAT_VERSION, formatVersion); + } +}