diff --git a/api/src/main/java/org/apache/iceberg/io/OutputFile.java b/api/src/main/java/org/apache/iceberg/io/OutputFile.java index 34b4e54abf62..67195c46c448 100644 --- a/api/src/main/java/org/apache/iceberg/io/OutputFile.java +++ b/api/src/main/java/org/apache/iceberg/io/OutputFile.java @@ -48,6 +48,7 @@ public interface OutputFile { * * @return an output stream that can report its position * @throws RuntimeIOException If the implementation throws an {@link IOException} + * @throws SecurityException If staging directory creation fails due to missing JVM level permission */ PositionOutputStream createOrOverwrite(); diff --git a/aws/src/integration/java/org/apache/iceberg/aws/dynamodb/DynamoDbCatalogTest.java b/aws/src/integration/java/org/apache/iceberg/aws/dynamodb/DynamoDbCatalogTest.java index 8d4273c1625f..9074d289b4c7 100644 --- a/aws/src/integration/java/org/apache/iceberg/aws/dynamodb/DynamoDbCatalogTest.java +++ b/aws/src/integration/java/org/apache/iceberg/aws/dynamodb/DynamoDbCatalogTest.java @@ -283,6 +283,18 @@ public void testConcurrentCommits() throws Exception { Assert.assertEquals(2, table.schema().columns().size()); } + @Test + public void testDropNamespace() { + Namespace namespace = Namespace.of(genRandomName()); + catalog.createNamespace(namespace); + catalog.dropNamespace(namespace); + GetItemResponse response = dynamo.getItem(GetItemRequest.builder() + .tableName(catalogTableName) + .key(DynamoDbCatalog.namespacePrimaryKey(namespace)) + .build()); + Assert.assertFalse("namespace must not exist", response.hasItem()); + } + private static String genRandomName() { return UUID.randomUUID().toString().replace("-", ""); } diff --git a/aws/src/main/java/org/apache/iceberg/aws/dynamodb/DynamoDbCatalog.java b/aws/src/main/java/org/apache/iceberg/aws/dynamodb/DynamoDbCatalog.java index f9bef1514604..ebf4964aea85 100644 --- a/aws/src/main/java/org/apache/iceberg/aws/dynamodb/DynamoDbCatalog.java +++ b/aws/src/main/java/org/apache/iceberg/aws/dynamodb/DynamoDbCatalog.java @@ -248,7 +248,7 @@ public boolean dropNamespace(Namespace namespace) throws NamespaceNotEmptyExcept dynamo.deleteItem(DeleteItemRequest.builder() .tableName(awsProperties.dynamoDbTableName()) .key(namespacePrimaryKey(namespace)) - .conditionExpression("attribute_exists(" + namespace + ")") + .conditionExpression("attribute_exists(" + COL_NAMESPACE + ")") .build()); return true; } catch (ConditionalCheckFailedException e) { diff --git a/aws/src/main/java/org/apache/iceberg/aws/s3/S3OutputStream.java b/aws/src/main/java/org/apache/iceberg/aws/s3/S3OutputStream.java index 690dc9aea255..f66a2967d05f 100644 --- a/aws/src/main/java/org/apache/iceberg/aws/s3/S3OutputStream.java +++ b/aws/src/main/java/org/apache/iceberg/aws/s3/S3OutputStream.java @@ -175,6 +175,7 @@ private void newStream() throws IOException { stream.close(); } + createStagingDirectoryIfNotExists(); currentStagingFile = File.createTempFile("s3fileio-", ".tmp", stagingDirectory); currentStagingFile.deleteOnExit(); stagingFiles.add(currentStagingFile); @@ -328,6 +329,26 @@ private static InputStream uncheckedInputStream(File file) { } } + private void createStagingDirectoryIfNotExists() throws IOException, SecurityException { + if (!stagingDirectory.exists()) { + LOG.info("Staging directory does not exist, trying to create one: {}", + stagingDirectory.getAbsolutePath()); + boolean createdStagingDirectory = stagingDirectory.mkdirs(); + if (createdStagingDirectory) { + LOG.info("Successfully created staging directory: {}", stagingDirectory.getAbsolutePath()); + } else { + if (stagingDirectory.exists()) { + LOG.info("Successfully created staging directory by another process: {}", + stagingDirectory.getAbsolutePath()); + } else { + throw new IOException( + "Failed to create staging directory due to some unknown reason: " + stagingDirectory + .getAbsolutePath()); + } + } + } + } + @SuppressWarnings("checkstyle:NoFinalizer") @Override protected void finalize() throws Throwable { diff --git a/aws/src/test/java/org/apache/iceberg/aws/s3/S3OutputStreamTest.java b/aws/src/test/java/org/apache/iceberg/aws/s3/S3OutputStreamTest.java index b4dc1ecf0b5e..b0f8b7384513 100644 --- a/aws/src/test/java/org/apache/iceberg/aws/s3/S3OutputStreamTest.java +++ b/aws/src/test/java/org/apache/iceberg/aws/s3/S3OutputStreamTest.java @@ -20,6 +20,7 @@ package org.apache.iceberg.aws.s3; import com.adobe.testing.s3mock.junit4.S3MockRule; +import java.io.File; import java.io.IOException; import java.io.UncheckedIOException; import java.nio.file.Files; @@ -29,6 +30,7 @@ import java.util.stream.Stream; import org.apache.iceberg.aws.AwsProperties; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.junit.After; import org.junit.Before; import org.junit.ClassRule; import org.junit.Test; @@ -72,6 +74,7 @@ public class S3OutputStreamTest { private final S3Client s3mock = mock(S3Client.class, delegatesTo(s3)); private final Random random = new Random(1); private final Path tmpDir = Files.createTempDirectory("s3fileio-test-"); + private final String newTmpDirectory = "/tmp/newStagingDirectory"; private final AwsProperties properties = new AwsProperties(ImmutableMap.of( AwsProperties.S3FILEIO_MULTIPART_SIZE, Integer.toString(5 * 1024 * 1024), @@ -85,6 +88,14 @@ public void before() { s3.createBucket(CreateBucketRequest.builder().bucket(BUCKET).build()); } + @After + public void after() { + File newStagingDirectory = new File(newTmpDirectory); + if (newStagingDirectory.exists()) { + newStagingDirectory.delete(); + } + } + @Test public void testWrite() { // Run tests for both byte and array write paths @@ -140,6 +151,14 @@ public void testMultipleClose() throws IOException { stream.close(); } + @Test + public void testStagingDirectoryCreation() throws IOException { + AwsProperties newStagingDirectoryAwsProperties = new AwsProperties(ImmutableMap.of( + AwsProperties.S3FILEIO_STAGING_DIRECTORY, newTmpDirectory)); + S3OutputStream stream = new S3OutputStream(s3, randomURI(), newStagingDirectoryAwsProperties); + stream.close(); + } + private void writeAndVerify(S3Client client, S3URI uri, byte [] data, boolean arrayWrite) { try (S3OutputStream stream = new S3OutputStream(client, uri, properties)) { if (arrayWrite) { diff --git a/build.gradle b/build.gradle index dd99804b3aef..8a8812ae3ec8 100644 --- a/build.gradle +++ b/build.gradle @@ -736,6 +736,8 @@ project(':iceberg-hive-runtime') { relocate 'org.apache.orc', 'org.apache.iceberg.shaded.org.apache.orc' relocate 'io.airlift', 'org.apache.iceberg.shaded.io.airlift' relocate 'org.threeten.extra', 'org.apache.iceberg.shaded.org.threeten.extra' + // relocate OrcSplit in order to avoid the conflict from Hive's OrcSplit + relocate 'org.apache.hadoop.hive.ql.io.orc.OrcSplit', 'org.apache.iceberg.shaded.org.apache.hadoop.hive.ql.io.orc.OrcSplit' classifier null } diff --git a/core/src/main/java/org/apache/iceberg/PropertiesUpdate.java b/core/src/main/java/org/apache/iceberg/PropertiesUpdate.java index 56c1cda7cad9..fe20302278a1 100644 --- a/core/src/main/java/org/apache/iceberg/PropertiesUpdate.java +++ b/core/src/main/java/org/apache/iceberg/PropertiesUpdate.java @@ -50,7 +50,7 @@ class PropertiesUpdate implements UpdateProperties { @Override public UpdateProperties set(String key, String value) { Preconditions.checkNotNull(key, "Key cannot be null"); - Preconditions.checkNotNull(key, "Value cannot be null"); + Preconditions.checkNotNull(value, "Value cannot be null"); Preconditions.checkArgument(!removals.contains(key), "Cannot remove and update the same key: %s", key); diff --git a/core/src/main/java/org/apache/iceberg/jdbc/JdbcClientPool.java b/core/src/main/java/org/apache/iceberg/jdbc/JdbcClientPool.java index 5b73fe8d30ed..8219585a9aef 100644 --- a/core/src/main/java/org/apache/iceberg/jdbc/JdbcClientPool.java +++ b/core/src/main/java/org/apache/iceberg/jdbc/JdbcClientPool.java @@ -47,8 +47,7 @@ class JdbcClientPool extends ClientPoolImpl { @Override protected Connection newClient() { try { - Properties dbProps = new Properties(); - properties.forEach((key, value) -> dbProps.put(key.replace(JdbcCatalog.PROPERTY_PREFIX, ""), value)); + Properties dbProps = JdbcUtil.filterAndRemovePrefix(properties, JdbcCatalog.PROPERTY_PREFIX); return DriverManager.getConnection(dbUrl, dbProps); } catch (SQLException e) { throw new UncheckedSQLException(e, "Failed to connect: %s", dbUrl); diff --git a/core/src/main/java/org/apache/iceberg/jdbc/JdbcUtil.java b/core/src/main/java/org/apache/iceberg/jdbc/JdbcUtil.java index b1eacbf59652..0b558395a105 100644 --- a/core/src/main/java/org/apache/iceberg/jdbc/JdbcUtil.java +++ b/core/src/main/java/org/apache/iceberg/jdbc/JdbcUtil.java @@ -19,6 +19,8 @@ package org.apache.iceberg.jdbc; +import java.util.Map; +import java.util.Properties; import org.apache.iceberg.catalog.Namespace; import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.relocated.com.google.common.base.Joiner; @@ -86,4 +88,15 @@ public static TableIdentifier stringToTableIdentifier(String tableNamespace, Str return TableIdentifier.of(JdbcUtil.stringToNamespace(tableNamespace), tableName); } + public static Properties filterAndRemovePrefix(Map properties, + String prefix) { + Properties result = new Properties(); + properties.forEach((key, value) -> { + if (key.startsWith(prefix)) { + result.put(key.substring(prefix.length()), value); + } + }); + + return result; + } } diff --git a/core/src/test/java/org/apache/iceberg/jdbc/TestJdbcUtil.java b/core/src/test/java/org/apache/iceberg/jdbc/TestJdbcUtil.java new file mode 100644 index 000000000000..92fa90eb02d8 --- /dev/null +++ b/core/src/test/java/org/apache/iceberg/jdbc/TestJdbcUtil.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.jdbc; + +import java.util.HashMap; +import java.util.Map; +import java.util.Properties; +import org.assertj.core.api.Assertions; +import org.junit.Test; + +public class TestJdbcUtil { + + @Test + public void testFilterAndRemovePrefix() { + Map input = new HashMap<>(); + input.put("warehouse", "/tmp/warehouse"); + input.put("user", "foo"); + input.put("jdbc.user", "bar"); + input.put("jdbc.pass", "secret"); + input.put("jdbc.jdbc.abcxyz", "abcxyz"); + + Properties expected = new Properties(); + expected.put("user", "bar"); + expected.put("pass", "secret"); + expected.put("jdbc.abcxyz", "abcxyz"); + + Properties actual = JdbcUtil.filterAndRemovePrefix(input, "jdbc."); + + Assertions.assertThat(expected).isEqualTo(actual); + } +} diff --git a/data/src/main/java/org/apache/iceberg/data/DeleteFilter.java b/data/src/main/java/org/apache/iceberg/data/DeleteFilter.java index a8eb13cdfa68..148f9d90c035 100644 --- a/data/src/main/java/org/apache/iceberg/data/DeleteFilter.java +++ b/data/src/main/java/org/apache/iceberg/data/DeleteFilter.java @@ -134,9 +134,14 @@ private List> applyEqDeletes() { Iterable> deleteRecords = Iterables.transform(deletes, delete -> openDeletes(delete, deleteSchema)); + + // copy the delete records because they will be held in a set + CloseableIterable records = CloseableIterable.transform( + CloseableIterable.concat(deleteRecords), Record::copy); + StructLikeSet deleteSet = Deletes.toEqualitySet( - // copy the delete records because they will be held in a set - CloseableIterable.transform(CloseableIterable.concat(deleteRecords), Record::copy), + CloseableIterable.transform( + records, record -> new InternalRecordWrapper(deleteSchema.asStruct()).wrap(record)), deleteSchema.asStruct()); Predicate isInDeleteSet = record -> deleteSet.contains(projectRow.wrap(asStructLike(record))); diff --git a/data/src/test/java/org/apache/iceberg/data/DeleteReadTests.java b/data/src/test/java/org/apache/iceberg/data/DeleteReadTests.java index 70ac77473c5d..69b0a572ad73 100644 --- a/data/src/test/java/org/apache/iceberg/data/DeleteReadTests.java +++ b/data/src/test/java/org/apache/iceberg/data/DeleteReadTests.java @@ -20,6 +20,7 @@ package org.apache.iceberg.data; import java.io.IOException; +import java.time.LocalDate; import java.util.List; import java.util.Set; import org.apache.iceberg.DataFile; @@ -34,6 +35,7 @@ import org.apache.iceberg.types.Types; import org.apache.iceberg.util.ArrayUtil; import org.apache.iceberg.util.CharSequenceSet; +import org.apache.iceberg.util.DateTimeUtil; import org.apache.iceberg.util.Pair; import org.apache.iceberg.util.StructLikeSet; import org.apache.iceberg.util.StructProjection; @@ -51,17 +53,30 @@ public abstract class DeleteReadTests { Types.NestedField.required(2, "data", Types.StringType.get()) ); + public static final Schema DATE_SCHEMA = new Schema( + Types.NestedField.required(1, "dt", Types.DateType.get()), + Types.NestedField.required(2, "data", Types.StringType.get()), + Types.NestedField.required(3, "id", Types.IntegerType.get()) + ); + // Partition spec used to create tables public static final PartitionSpec SPEC = PartitionSpec.builderFor(SCHEMA) .bucket("data", 16) .build(); + public static final PartitionSpec DATE_SPEC = PartitionSpec.builderFor(DATE_SCHEMA) + .day("dt") + .build(); + @Rule public TemporaryFolder temp = new TemporaryFolder(); - private String tableName = null; + protected String tableName = null; + protected String dateTableName = null; protected Table table = null; + protected Table dateTable = null; private List records = null; + private List dateRecords = null; private DataFile dataFile = null; @Before @@ -90,6 +105,46 @@ public void writeTestDataFile() throws IOException { @After public void cleanup() throws IOException { dropTable("test"); + dropTable("test2"); + } + + private void initDateTable() throws IOException { + dropTable("test2"); + this.dateTableName = "test2"; + this.dateTable = createTable(dateTableName, DATE_SCHEMA, DATE_SPEC); + + GenericRecord record = GenericRecord.create(dateTable.schema()); + + this.dateRecords = Lists.newArrayList( + record.copy("dt", LocalDate.parse("2021-09-01"), "data", "a", "id", 1), + record.copy("dt", LocalDate.parse("2021-09-02"), "data", "b", "id", 2), + record.copy("dt", LocalDate.parse("2021-09-03"), "data", "c", "id", 3), + record.copy("dt", LocalDate.parse("2021-09-04"), "data", "d", "id", 4), + record.copy("dt", LocalDate.parse("2021-09-05"), "data", "e", "id", 5)); + + DataFile dataFile1 = FileHelpers.writeDataFile( + dateTable, Files.localOutput(temp.newFile()), + Row.of(DateTimeUtil.daysFromDate(LocalDate.parse("2021-09-01"))), dateRecords.subList(0, 1)); + DataFile dataFile2 = FileHelpers.writeDataFile( + dateTable, Files.localOutput(temp.newFile()), + Row.of(DateTimeUtil.daysFromDate(LocalDate.parse("2021-09-02"))), dateRecords.subList(1, 2)); + DataFile dataFile3 = FileHelpers.writeDataFile( + dateTable, Files.localOutput(temp.newFile()), + Row.of(DateTimeUtil.daysFromDate(LocalDate.parse("2021-09-03"))), dateRecords.subList(2, 3)); + DataFile dataFile4 = FileHelpers.writeDataFile( + dateTable, Files.localOutput(temp.newFile()), + Row.of(DateTimeUtil.daysFromDate(LocalDate.parse("2021-09-04"))), dateRecords.subList(3, 4)); + DataFile dataFile5 = FileHelpers.writeDataFile( + dateTable, Files.localOutput(temp.newFile()), + Row.of(DateTimeUtil.daysFromDate(LocalDate.parse("2021-09-05"))), dateRecords.subList(4, 5)); + + dateTable.newAppend() + .appendFile(dataFile1) + .appendFile(dataFile2) + .appendFile(dataFile3) + .appendFile(dataFile4) + .appendFile(dataFile5) + .commit(); } protected abstract Table createTable(String name, Schema schema, PartitionSpec spec) throws IOException; @@ -119,12 +174,47 @@ public void testEqualityDeletes() throws IOException { .addDeletes(eqDeletes) .commit(); - StructLikeSet expected = rowSetWithoutIds(29, 89, 122); + StructLikeSet expected = rowSetWithoutIds(table, records, 29, 89, 122); StructLikeSet actual = rowSet(tableName, table, "*"); Assert.assertEquals("Table should contain expected rows", expected, actual); } + @Test + public void testEqualityDateDeletes() throws IOException { + initDateTable(); + + Schema deleteRowSchema = dateTable.schema().select("*"); + Record dataDelete = GenericRecord.create(deleteRowSchema); + List dataDeletes = Lists.newArrayList( + dataDelete.copy("dt", LocalDate.parse("2021-09-01"), "data", "a", "id", 1), + dataDelete.copy("dt", LocalDate.parse("2021-09-02"), "data", "b", "id", 2), + dataDelete.copy("dt", LocalDate.parse("2021-09-03"), "data", "c", "id", 3) + ); + + DeleteFile eqDeletes1 = FileHelpers.writeDeleteFile( + dateTable, Files.localOutput(temp.newFile()), + Row.of(DateTimeUtil.daysFromDate(LocalDate.parse("2021-09-01"))), dataDeletes.subList(0, 1), deleteRowSchema); + DeleteFile eqDeletes2 = FileHelpers.writeDeleteFile( + dateTable, Files.localOutput(temp.newFile()), + Row.of(DateTimeUtil.daysFromDate(LocalDate.parse("2021-09-02"))), dataDeletes.subList(1, 2), deleteRowSchema); + DeleteFile eqDeletes3 = FileHelpers.writeDeleteFile( + dateTable, Files.localOutput(temp.newFile()), + Row.of(DateTimeUtil.daysFromDate(LocalDate.parse("2021-09-03"))), dataDeletes.subList(2, 3), deleteRowSchema); + + dateTable.newRowDelta() + .addDeletes(eqDeletes1) + .addDeletes(eqDeletes2) + .addDeletes(eqDeletes3) + .commit(); + + StructLikeSet expected = rowSetWithoutIds(dateTable, dateRecords, 1, 2, 3); + + StructLikeSet actual = rowSet(dateTableName, dateTable, "*"); + + Assert.assertEquals("Table should contain expected rows", expected, actual); + } + @Test public void testEqualityDeletesWithRequiredEqColumn() throws IOException { Schema deleteRowSchema = table.schema().select("data"); @@ -142,7 +232,7 @@ public void testEqualityDeletesWithRequiredEqColumn() throws IOException { .addDeletes(eqDeletes) .commit(); - StructLikeSet expected = selectColumns(rowSetWithoutIds(29, 89, 122), "id"); + StructLikeSet expected = selectColumns(rowSetWithoutIds(table, records, 29, 89, 122), "id"); StructLikeSet actual = rowSet(tableName, table, "id"); if (expectPruned()) { @@ -180,7 +270,7 @@ public void testEqualityDeletesSpanningMultipleDataFiles() throws IOException { .addDeletes(eqDeletes) .commit(); - StructLikeSet expected = rowSetWithoutIds(29, 89, 122, 144); + StructLikeSet expected = rowSetWithoutIds(table, records, 29, 89, 122, 144); StructLikeSet actual = rowSet(tableName, table, "*"); Assert.assertEquals("Table should contain expected rows", expected, actual); @@ -202,7 +292,7 @@ public void testPositionDeletes() throws IOException { .validateDataFilesExist(posDeletes.second()) .commit(); - StructLikeSet expected = rowSetWithoutIds(29, 89, 122); + StructLikeSet expected = rowSetWithoutIds(table, records, 29, 89, 122); StructLikeSet actual = rowSet(tableName, table, "*"); Assert.assertEquals("Table should contain expected rows", expected, actual); @@ -235,7 +325,7 @@ public void testMixedPositionAndEqualityDeletes() throws IOException { .validateDataFilesExist(posDeletes.second()) .commit(); - StructLikeSet expected = rowSetWithoutIds(29, 89, 121, 122); + StructLikeSet expected = rowSetWithoutIds(table, records, 29, 89, 121, 122); StructLikeSet actual = rowSet(tableName, table, "*"); Assert.assertEquals("Table should contain expected rows", expected, actual); @@ -269,7 +359,7 @@ public void testMultipleEqualityDeleteSchemas() throws IOException { .addDeletes(idEqDeletes) .commit(); - StructLikeSet expected = rowSetWithoutIds(29, 89, 121, 122); + StructLikeSet expected = rowSetWithoutIds(table, records, 29, 89, 121, 122); StructLikeSet actual = rowSet(tableName, table, "*"); Assert.assertEquals("Table should contain expected rows", expected, actual); @@ -306,7 +396,7 @@ public void testEqualityDeleteByNull() throws IOException { .addDeletes(eqDeletes) .commit(); - StructLikeSet expected = rowSetWithoutIds(131); + StructLikeSet expected = rowSetWithoutIds(table, records, 131); StructLikeSet actual = rowSet(tableName, table, "*"); Assert.assertEquals("Table should contain expected rows", expected, actual); @@ -321,11 +411,12 @@ private StructLikeSet selectColumns(StructLikeSet rows, String... columns) { return set; } - private StructLikeSet rowSetWithoutIds(int... idsToRemove) { + private static StructLikeSet rowSetWithoutIds(Table table, List recordList, int... idsToRemove) { Set deletedIds = Sets.newHashSet(ArrayUtil.toIntList(idsToRemove)); StructLikeSet set = StructLikeSet.create(table.schema().asStruct()); - records.stream() + recordList.stream() .filter(row -> !deletedIds.contains(row.getField("id"))) + .map(record -> new InternalRecordWrapper(table.schema().asStruct()).wrap(record)) .forEach(set::add); return set; } diff --git a/data/src/test/java/org/apache/iceberg/data/TestGenericReaderDeletes.java b/data/src/test/java/org/apache/iceberg/data/TestGenericReaderDeletes.java index 72d48831de06..e92c0daec385 100644 --- a/data/src/test/java/org/apache/iceberg/data/TestGenericReaderDeletes.java +++ b/data/src/test/java/org/apache/iceberg/data/TestGenericReaderDeletes.java @@ -26,6 +26,7 @@ import org.apache.iceberg.Table; import org.apache.iceberg.TestTables; import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.relocated.com.google.common.collect.Iterables; import org.apache.iceberg.util.StructLikeSet; import org.junit.Assert; @@ -48,7 +49,8 @@ protected void dropTable(String name) { public StructLikeSet rowSet(String name, Table table, String... columns) throws IOException { StructLikeSet set = StructLikeSet.create(table.schema().asStruct()); try (CloseableIterable reader = IcebergGenerics.read(table).select(columns).build()) { - reader.forEach(set::add); + Iterables.addAll(set, CloseableIterable.transform( + reader, record -> new InternalRecordWrapper(table.schema().asStruct()).wrap(record))); } return set; } diff --git a/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java b/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java index ff9174a84399..010df8cf5da2 100644 --- a/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java +++ b/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java @@ -50,7 +50,6 @@ import org.apache.iceberg.relocated.com.google.common.base.MoreObjects; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.base.Strings; -import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.types.Comparators; @@ -282,9 +281,14 @@ private void commitDeltaTxn(NavigableMap pendingResults, Stri // txn2, the equality-delete files of txn2 are required to be applied to data files from txn1. Committing the // merged one will lead to the incorrect delete semantic. WriteResult result = e.getValue(); - RowDelta rowDelta = table.newRowDelta() - .validateDataFilesExist(ImmutableList.copyOf(result.referencedDataFiles())) - .validateDeletedFiles(); + + // Row delta validations are not needed for streaming changes that write equality deletes. Equality deletes + // are applied to data in all previous sequence numbers, so retries may push deletes further in the future, + // but do not affect correctness. Position deletes committed to the table in this path are used only to delete + // rows from data files that are being added in this commit. There is no way for data files added along with + // the delete files to be concurrently removed, so there is no need to validate the files referenced by the + // position delete files that are being committed. + RowDelta rowDelta = table.newRowDelta(); int numDataFiles = result.dataFiles().length; Arrays.stream(result.dataFiles()).forEach(rowDelta::addRows); diff --git a/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergFilesCommitter.java b/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergFilesCommitter.java index 34785cfb6a34..e16940fc4875 100644 --- a/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergFilesCommitter.java +++ b/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergFilesCommitter.java @@ -40,7 +40,6 @@ import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; import org.apache.flink.table.data.RowData; import org.apache.hadoop.conf.Configuration; -import org.apache.iceberg.AssertHelpers; import org.apache.iceberg.DataFile; import org.apache.iceberg.DeleteFile; import org.apache.iceberg.FileFormat; @@ -49,7 +48,6 @@ import org.apache.iceberg.ManifestFile; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.TableTestBase; -import org.apache.iceberg.exceptions.ValidationException; import org.apache.iceberg.flink.FlinkSchemaUtil; import org.apache.iceberg.flink.SimpleDataUtil; import org.apache.iceberg.flink.TestHelpers; @@ -679,66 +677,6 @@ public void testDeleteFiles() throws Exception { } } - @Test - public void testValidateDataFileExist() throws Exception { - Assume.assumeFalse("Only support equality-delete in format v2.", formatVersion < 2); - long timestamp = 0; - long checkpoint = 10; - JobID jobId = new JobID(); - FileAppenderFactory appenderFactory = createDeletableAppenderFactory(); - - RowData insert1 = SimpleDataUtil.createInsert(1, "aaa"); - DataFile dataFile1 = writeDataFile("data-file-1", ImmutableList.of(insert1)); - - try (OneInputStreamOperatorTestHarness harness = createStreamSink(jobId)) { - harness.setup(); - harness.open(); - - // Txn#1: insert the row <1, 'aaa'> - harness.processElement(WriteResult.builder() - .addDataFiles(dataFile1) - .build(), - ++timestamp); - harness.snapshot(checkpoint, ++timestamp); - harness.notifyOfCompletedCheckpoint(checkpoint); - - // Txn#2: Overwrite the committed data-file-1 - RowData insert2 = SimpleDataUtil.createInsert(2, "bbb"); - DataFile dataFile2 = writeDataFile("data-file-2", ImmutableList.of(insert2)); - new TestTableLoader(tablePath) - .loadTable() - .newOverwrite() - .addFile(dataFile2) - .deleteFile(dataFile1) - .commit(); - } - - try (OneInputStreamOperatorTestHarness harness = createStreamSink(jobId)) { - harness.setup(); - harness.open(); - - // Txn#3: position-delete the <1, 'aaa'> (NOT committed). - DeleteFile deleteFile1 = writePosDeleteFile(appenderFactory, - "pos-delete-file-1", - ImmutableList.of(Pair.of(dataFile1.path(), 0L))); - harness.processElement(WriteResult.builder() - .addDeleteFiles(deleteFile1) - .addReferencedDataFiles(dataFile1.path()) - .build(), - ++timestamp); - harness.snapshot(++checkpoint, ++timestamp); - - // Txn#3: validate will be failure when committing. - final long currentCheckpointId = checkpoint; - AssertHelpers.assertThrows("Validation should be failure because of non-exist data files.", - ValidationException.class, "Cannot commit, missing data files", - () -> { - harness.notifyOfCompletedCheckpoint(currentCheckpointId); - return null; - }); - } - } - @Test public void testCommitTwoCheckpointsInSingleTxn() throws Exception { Assume.assumeFalse("Only support equality-delete in format v2.", formatVersion < 2); diff --git a/hive3/src/main/java/org/apache/hadoop/hive/ql/io/orc/OrcSplit.java b/hive3/src/main/java/org/apache/hadoop/hive/ql/io/orc/OrcSplit.java new file mode 100644 index 000000000000..00f389eee792 --- /dev/null +++ b/hive3/src/main/java/org/apache/hadoop/hive/ql/io/orc/OrcSplit.java @@ -0,0 +1,284 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.io.orc; + +import java.io.ByteArrayOutputStream; +import java.io.DataInput; +import java.io.DataOutput; +import java.io.DataOutputStream; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.conf.HiveConf.ConfVars; +import org.apache.hadoop.hive.ql.io.AcidInputFormat; +import org.apache.hadoop.hive.ql.io.AcidUtils; +import org.apache.hadoop.hive.ql.io.ColumnarSplit; +import org.apache.hadoop.hive.ql.io.LlapAwareSplit; +import org.apache.hadoop.hive.ql.io.SyntheticFileId; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableUtils; +import org.apache.hadoop.mapred.FileSplit; +import org.apache.orc.OrcProto; +import org.apache.orc.impl.OrcTail; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * In order to fix some compatibility issues with ORC support with Hive 3.x and the shaded ORC libraries, + * this class has been copied from Hive 3.x source code. However, this class should be removed once + * Hive 4 is out. + */ +public class OrcSplit extends FileSplit implements ColumnarSplit, LlapAwareSplit { + private static final Logger LOG = LoggerFactory.getLogger(OrcSplit.class); + private OrcTail orcTail; + private boolean hasFooter; + /** + * This means {@link AcidUtils.AcidBaseFileType#ORIGINAL_BASE} + */ + private boolean isOriginal; + private boolean hasBase; + // partition root + private Path rootDir; + private final List deltas = new ArrayList<>(); + private long projColsUncompressedSize; + private transient Object fileKey; + private long fileLen; + + static final int HAS_SYNTHETIC_FILEID_FLAG = 16; + static final int HAS_LONG_FILEID_FLAG = 8; + static final int BASE_FLAG = 4; + static final int ORIGINAL_FLAG = 2; + static final int FOOTER_FLAG = 1; + + protected OrcSplit() { + // The FileSplit() constructor in hadoop 0.20 and 1.x is package private so can't use it. + // This constructor is used to create the object and then call readFields() + // so just pass nulls to this super constructor. + super(null, 0, 0, (String[]) null); + } + + public OrcSplit(Path path, Object fileId, long offset, long length, String[] hosts, + OrcTail orcTail, boolean isOriginal, boolean hasBase, + List deltas, long projectedDataSize, long fileLen, Path rootDir) { + super(path, offset, length, hosts); + // For HDFS, we could avoid serializing file ID and just replace the path with inode-based + // path. However, that breaks bunch of stuff because Hive later looks up things by split path. + this.fileKey = fileId; + this.orcTail = orcTail; + hasFooter = this.orcTail != null; + this.isOriginal = isOriginal; + this.hasBase = hasBase; + this.rootDir = rootDir; + this.deltas.addAll(deltas); + this.projColsUncompressedSize = projectedDataSize <= 0 ? length : projectedDataSize; + // setting file length to Long.MAX_VALUE will let orc reader read file length from file system + this.fileLen = fileLen <= 0 ? Long.MAX_VALUE : fileLen; + } + + @Override + public void write(DataOutput out) throws IOException { + ByteArrayOutputStream bos = new ByteArrayOutputStream(); + DataOutputStream dos = new DataOutputStream(bos); + // serialize path, offset, length using FileSplit + super.write(dos); + int required = bos.size(); + + // write addition payload required for orc + writeAdditionalPayload(dos); + int additional = bos.size() - required; + + out.write(bos.toByteArray()); + if (LOG.isTraceEnabled()) { + LOG.trace("Writing additional {} bytes to OrcSplit as payload. Required {} bytes.", + additional, required); + } + } + + private void writeAdditionalPayload(final DataOutputStream out) throws IOException { + boolean isFileIdLong = fileKey instanceof Long; + boolean isFileIdWritable = fileKey instanceof Writable; + int flags = (hasBase ? BASE_FLAG : 0) | + (isOriginal ? ORIGINAL_FLAG : 0) | + (hasFooter ? FOOTER_FLAG : 0) | + (isFileIdLong ? HAS_LONG_FILEID_FLAG : 0) | + (isFileIdWritable ? HAS_SYNTHETIC_FILEID_FLAG : 0); + out.writeByte(flags); + out.writeInt(deltas.size()); + for (AcidInputFormat.DeltaMetaData delta : deltas) { + delta.write(out); + } + if (hasFooter) { + OrcProto.FileTail fileTail = orcTail.getMinimalFileTail(); + byte[] tailBuffer = fileTail.toByteArray(); + int tailLen = tailBuffer.length; + WritableUtils.writeVInt(out, tailLen); + out.write(tailBuffer); + } + if (isFileIdLong) { + out.writeLong(((Long) fileKey).longValue()); + } else if (isFileIdWritable) { + ((Writable) fileKey).write(out); + } + out.writeLong(fileLen); + out.writeUTF(rootDir.toString()); + } + + @Override + public void readFields(DataInput in) throws IOException { + // deserialize path, offset, length using FileSplit + super.readFields(in); + + byte flags = in.readByte(); + hasFooter = (FOOTER_FLAG & flags) != 0; + isOriginal = (ORIGINAL_FLAG & flags) != 0; + hasBase = (BASE_FLAG & flags) != 0; + boolean hasLongFileId = (HAS_LONG_FILEID_FLAG & flags) != 0; + boolean hasWritableFileId = (HAS_SYNTHETIC_FILEID_FLAG & flags) != 0; + if (hasLongFileId && hasWritableFileId) { + throw new IOException("Invalid split - both file ID types present"); + } + + deltas.clear(); + int numDeltas = in.readInt(); + for (int i = 0; i < numDeltas; i++) { + AcidInputFormat.DeltaMetaData dmd = new AcidInputFormat.DeltaMetaData(); + dmd.readFields(in); + deltas.add(dmd); + } + if (hasFooter) { + int tailLen = WritableUtils.readVInt(in); + byte[] tailBuffer = new byte[tailLen]; + in.readFully(tailBuffer); + OrcProto.FileTail fileTail = OrcProto.FileTail.parseFrom(tailBuffer); + orcTail = new OrcTail(fileTail, null); + } + if (hasLongFileId) { + fileKey = in.readLong(); + } else if (hasWritableFileId) { + SyntheticFileId fileId = new SyntheticFileId(); + fileId.readFields(in); + this.fileKey = fileId; + } + fileLen = in.readLong(); + rootDir = new Path(in.readUTF()); + } + + public OrcTail getOrcTail() { + return orcTail; + } + + public boolean hasFooter() { + return hasFooter; + } + + /** + * @return {@code true} if file schema doesn't have Acid metadata columns + * Such file may be in a delta_x_y/ or base_x due to being added via + * "load data" command. It could be at partition|table root due to table having + * been converted from non-acid to acid table. It could even be something like + * "warehouse/t/HIVE_UNION_SUBDIR_15/000000_0" if it was written by an + * "insert into t select ... from A union all select ... from B" + */ + public boolean isOriginal() { + return isOriginal; + } + + public boolean hasBase() { + return hasBase; + } + + public Path getRootDir() { + return rootDir; + } + + public List getDeltas() { + return deltas; + } + + public long getFileLength() { + return fileLen; + } + + /** + * If this method returns true, then for sure it is ACID. + * However, if it returns false.. it could be ACID or non-ACID. + * + * @return true if is ACID + */ + public boolean isAcid() { + return hasBase || deltas.size() > 0; + } + + public long getProjectedColumnsUncompressedSize() { + return projColsUncompressedSize; + } + + public Object getFileKey() { + return fileKey; + } + + @Override + public long getColumnarProjectionSize() { + return projColsUncompressedSize; + } + + @Override + public boolean canUseLlapIo(Configuration conf) { + final boolean hasDelta = deltas != null && !deltas.isEmpty(); + final boolean isAcidRead = AcidUtils.isFullAcidScan(conf); + final boolean isVectorized = HiveConf.getBoolVar(conf, ConfVars.HIVE_VECTORIZATION_ENABLED); + Boolean isSplitUpdate = null; + if (isAcidRead) { + final AcidUtils.AcidOperationalProperties acidOperationalProperties + = AcidUtils.getAcidOperationalProperties(conf); + isSplitUpdate = acidOperationalProperties.isSplitUpdate(); + } + + if (isOriginal) { + if (!isAcidRead && !hasDelta) { + // Original scan only + return true; + } + } else { + boolean isAcidEnabled = HiveConf.getBoolVar(conf, ConfVars.LLAP_IO_ACID_ENABLED); + if (isAcidEnabled && isAcidRead && hasBase && isVectorized) { + if (hasDelta) { + if (isSplitUpdate) { // Base with delete deltas + return true; + } + } else { + // Base scan only + return true; + } + } + } + return false; + } + + @Override + public String toString() { + return "OrcSplit [" + getPath() + ", start=" + getStart() + ", length=" + getLength() + + ", isOriginal=" + isOriginal + ", fileLength=" + fileLen + ", hasFooter=" + hasFooter + + ", hasBase=" + hasBase + ", deltas=" + (deltas == null ? 0 : deltas.size()) + "]"; + } +} diff --git a/mr/src/test/java/org/apache/iceberg/mr/TestInputFormatReaderDeletes.java b/mr/src/test/java/org/apache/iceberg/mr/TestInputFormatReaderDeletes.java index ef964f8fb92e..2ba4e50e8aa1 100644 --- a/mr/src/test/java/org/apache/iceberg/mr/TestInputFormatReaderDeletes.java +++ b/mr/src/test/java/org/apache/iceberg/mr/TestInputFormatReaderDeletes.java @@ -32,6 +32,7 @@ import org.apache.iceberg.TableMetadata; import org.apache.iceberg.TableOperations; import org.apache.iceberg.data.DeleteReadTests; +import org.apache.iceberg.data.InternalRecordWrapper; import org.apache.iceberg.hadoop.HadoopTables; import org.apache.iceberg.util.StructLikeSet; import org.junit.Assert; @@ -104,6 +105,7 @@ public StructLikeSet rowSet(String name, Table table, String... columns) { .filter(recordFactory -> recordFactory.name().equals(inputFormat)) .map(recordFactory -> recordFactory.create(builder.project(projected).conf()).getRecords()) .flatMap(List::stream) + .map(record -> new InternalRecordWrapper(projected.asStruct()).wrap(record)) .collect(Collectors.toList()) ); diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/PruneColumns.java b/parquet/src/main/java/org/apache/iceberg/parquet/PruneColumns.java index acdda78c680b..f181875ad3ac 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/PruneColumns.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/PruneColumns.java @@ -104,12 +104,7 @@ public Type list(GroupType list, Type element) { return list; } else if (element != null) { if (!Objects.equal(element, originalElement)) { - Integer listId = getId(list); - // the element type was projected - Type listType = Types.list(list.getRepetition()) - .element(element) - .named(list.getName()); - return listId == null ? listType : listType.withId(listId); + return list.withNewFields(repeated.withNewFields(element)); } return list; } @@ -129,14 +124,8 @@ public Type map(GroupType map, Type key, Type value) { if ((keyId != null && selectedIds.contains(keyId)) || (valueId != null && selectedIds.contains(valueId))) { return map; } else if (value != null) { - Integer mapId = getId(map); if (!Objects.equal(value, originalValue)) { - Type mapType = Types.map(map.getRepetition()) - .key(originalKey) - .value(value) - .named(map.getName()); - - return mapId == null ? mapType : mapType.withId(mapId); + return map.withNewFields(repeated.withNewFields(originalKey, value)); } return map; } diff --git a/parquet/src/test/java/org/apache/iceberg/parquet/TestPruneColumns.java b/parquet/src/test/java/org/apache/iceberg/parquet/TestPruneColumns.java new file mode 100644 index 000000000000..dfa7e64a4758 --- /dev/null +++ b/parquet/src/test/java/org/apache/iceberg/parquet/TestPruneColumns.java @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.parquet; + +import org.apache.iceberg.Schema; +import org.apache.iceberg.types.Types.DoubleType; +import org.apache.iceberg.types.Types.ListType; +import org.apache.iceberg.types.Types.MapType; +import org.apache.iceberg.types.Types.NestedField; +import org.apache.iceberg.types.Types.StringType; +import org.apache.iceberg.types.Types.StructType; +import org.apache.parquet.schema.LogicalTypeAnnotation; +import org.apache.parquet.schema.MessageType; +import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName; +import org.apache.parquet.schema.Type; +import org.apache.parquet.schema.Types; +import org.junit.Assert; +import org.junit.Test; + +public class TestPruneColumns { + @Test + public void testMapKeyValueName() { + MessageType fileSchema = Types.buildMessage() + .addField(Types.buildGroup(Type.Repetition.OPTIONAL) + .addField(Types.buildGroup(Type.Repetition.REPEATED) + .addField(Types.primitive(PrimitiveTypeName.BINARY, Type.Repetition.REQUIRED) + .as(LogicalTypeAnnotation.stringType()) + .id(2) + .named("key")) + .addField(Types.buildGroup(Type.Repetition.OPTIONAL) + .addField(Types.primitive(PrimitiveTypeName.DOUBLE, Type.Repetition.REQUIRED).id(4).named("x")) + .addField(Types.primitive(PrimitiveTypeName.DOUBLE, Type.Repetition.REQUIRED).id(5).named("y")) + .addField(Types.primitive(PrimitiveTypeName.DOUBLE, Type.Repetition.REQUIRED).id(6).named("z")) + .id(3) + .named("value")) + .named("custom_key_value_name")) + .as(LogicalTypeAnnotation.mapType()) + .id(1) + .named("m")) + .named("table"); + + // project map.value.x and map.value.y + Schema projection = new Schema( + NestedField.optional(1, "m", MapType.ofOptional(2, 3, + StringType.get(), + StructType.of( + NestedField.required(4, "x", DoubleType.get()), + NestedField.required(5, "y", DoubleType.get()) + ) + )) + ); + + MessageType expected = Types.buildMessage() + .addField(Types.buildGroup(Type.Repetition.OPTIONAL) + .addField(Types.buildGroup(Type.Repetition.REPEATED) + .addField(Types.primitive(PrimitiveTypeName.BINARY, Type.Repetition.REQUIRED) + .as(LogicalTypeAnnotation.stringType()) + .id(2) + .named("key")) + .addField(Types.buildGroup(Type.Repetition.OPTIONAL) + .addField(Types.primitive(PrimitiveTypeName.DOUBLE, Type.Repetition.REQUIRED).id(4).named("x")) + .addField(Types.primitive(PrimitiveTypeName.DOUBLE, Type.Repetition.REQUIRED).id(5).named("y")) + .id(3) + .named("value")) + .named("custom_key_value_name")) + .as(LogicalTypeAnnotation.mapType()) + .id(1) + .named("m")) + .named("table"); + + MessageType actual = ParquetSchemaUtil.pruneColumns(fileSchema, projection); + Assert.assertEquals("Pruned schema should not rename repeated struct", expected, actual); + } + + @Test + public void testListElementName() { + MessageType fileSchema = Types.buildMessage() + .addField(Types.buildGroup(Type.Repetition.OPTIONAL) + .addField(Types.buildGroup(Type.Repetition.REPEATED) + .addField(Types.buildGroup(Type.Repetition.OPTIONAL) + .addField(Types.primitive(PrimitiveTypeName.DOUBLE, Type.Repetition.REQUIRED).id(4).named("x")) + .addField(Types.primitive(PrimitiveTypeName.DOUBLE, Type.Repetition.REQUIRED).id(5).named("y")) + .addField(Types.primitive(PrimitiveTypeName.DOUBLE, Type.Repetition.REQUIRED).id(6).named("z")) + .id(3) + .named("custom_element_name")) + .named("custom_repeated_name")) + .as(LogicalTypeAnnotation.listType()) + .id(1) + .named("m")) + .named("table"); + + // project map.value.x and map.value.y + Schema projection = new Schema( + NestedField.optional(1, "m", ListType.ofOptional(3, + StructType.of( + NestedField.required(4, "x", DoubleType.get()), + NestedField.required(5, "y", DoubleType.get()) + ) + )) + ); + + MessageType expected = Types.buildMessage() + .addField(Types.buildGroup(Type.Repetition.OPTIONAL) + .addField(Types.buildGroup(Type.Repetition.REPEATED) + .addField(Types.buildGroup(Type.Repetition.OPTIONAL) + .addField(Types.primitive(PrimitiveTypeName.DOUBLE, Type.Repetition.REQUIRED).id(4).named("x")) + .addField(Types.primitive(PrimitiveTypeName.DOUBLE, Type.Repetition.REQUIRED).id(5).named("y")) + .id(3) + .named("custom_element_name")) + .named("custom_repeated_name")) + .as(LogicalTypeAnnotation.listType()) + .id(1) + .named("m")) + .named("table"); + + MessageType actual = ParquetSchemaUtil.pruneColumns(fileSchema, projection); + Assert.assertEquals("Pruned schema should not rename repeated struct", expected, actual); + } +}