From ad893cf63da79975324a798b3f58425b4e2218ac Mon Sep 17 00:00:00 2001 From: Ashish Singh Date: Sun, 12 Dec 2021 12:05:22 -0800 Subject: [PATCH 01/14] Demonstrate issue where Iceberg's parquet reader silently returns nulls for fields with 2-level lists. Also, update migrate tables documentation to let users know of the issue. --- site/docs/spark-procedures.md | 11 +++ spark/v3.2/build.gradle | 6 ++ .../iceberg/spark/data/TestParquetReader.java | 90 ++++++++++++++++++ .../spark/src/test/resources/twoLevelList.pq | Bin 0 -> 584 bytes 4 files changed, 107 insertions(+) create mode 100644 spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/data/TestParquetReader.java create mode 100644 spark/v3.2/spark/src/test/resources/twoLevelList.pq diff --git a/site/docs/spark-procedures.md b/site/docs/spark-procedures.md index a77e56a33493..c106c00cecd3 100644 --- a/site/docs/spark-procedures.md +++ b/site/docs/spark-procedures.md @@ -334,6 +334,17 @@ CALL catalog_name.system.rewrite_manifests('db.sample', false) The `snapshot` and `migrate` procedures help test and migrate existing Hive or Spark tables to Iceberg. +**Note** Parquet files written with old two level list structures when read from Spark using Iceberg returns NULL +for those fields irrespective of actual values in the files. This is a known issue till 0.12.1 Iceberg releases. +The issue is planned to be resolved in upcoming releases. + +`snapshot` are not the sole owners of their data files, they are prohibited from +actions like `expire_snapshots` which would physically delete data files. Iceberg deletes, which only effect metadata, +are still allowed. In addition, any operations which affect the original data files will disrupt the Snapshot's +integrity. DELETE statements executed against the original Hive table will remove original data files and the +`snapshot` table will no longer be able to access them. + + ### `snapshot` Create a light-weight temporary copy of a table for testing, without changing the source table. diff --git a/spark/v3.2/build.gradle b/spark/v3.2/build.gradle index 30beb0c10624..b83fd028b1e3 100644 --- a/spark/v3.2/build.gradle +++ b/spark/v3.2/build.gradle @@ -93,6 +93,12 @@ project(':iceberg-spark:iceberg-spark-3.2_2.12') { testImplementation project(path: ':iceberg-core', configuration: 'testArtifacts') testImplementation project(path: ':iceberg-data', configuration: 'testArtifacts') testImplementation "org.xerial:sqlite-jdbc" + testImplementation("org.apache.parquet:parquet-avro") { + exclude group: 'org.apache.avro', module: 'avro' + // already shaded by Parquet + exclude group: 'it.unimi.dsi' + exclude group: 'org.codehaus.jackson' + } } tasks.withType(Test) { diff --git a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/data/TestParquetReader.java b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/data/TestParquetReader.java new file mode 100644 index 000000000000..71267b849010 --- /dev/null +++ b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/data/TestParquetReader.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.spark.data; + +import java.io.IOException; +import java.util.List; +import org.apache.iceberg.Files; +import org.apache.iceberg.Schema; +import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.parquet.Parquet; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.types.Types; +import org.apache.spark.sql.catalyst.InternalRow; +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import static org.apache.iceberg.types.Types.NestedField.optional; + +public class TestParquetReader { + + @Rule + public TemporaryFolder temp = new TemporaryFolder(); + + @Test + public void testParquet2LevelList() throws IOException { + Schema icebergSchema = new Schema( + optional(1, "key", Types.StringType.get()), + optional(2, "val", Types.ListType.ofRequired(3, Types.StructType.of( + optional(4, "a1", Types.StringType.get()), + optional(5, "a2", Types.StringType.get()) + ) + ))); + List rows; + + /* Using a static file rather than generating test data in test, as parquet writers in Iceberg only supports + * three level lists. The twoLevelList.pq is a parquet file that contains following Parquet schema. + * message hive_schema { + * optional binary key (STRING); + * optional group val (LIST) { + * repeated group bag { + * optional group array_element { + * optional binary a1 (STRING); + * optional binary a2 (STRING); + * } + * } + * } + * } + * + * It contains only one row. Below is the json dump of the file. + * {"key":"k1","val":{"bag":[{"array_element":{"a1":"a","a2":"b"}}]}} + */ + try (CloseableIterable reader = + Parquet.read(Files.localInput( + this.getClass().getClassLoader().getResource("twoLevelList.pq").getPath())) + .project(icebergSchema) + .createReaderFunc(type -> SparkParquetReaders.buildReader(icebergSchema, type)) + .build()) { + rows = Lists.newArrayList(reader); + } + + Assert.assertEquals(1, rows.size()); + InternalRow row = rows.get(0); + Assert.assertEquals("k1", row.getString(0)); + + /* + * Iceberg's parquet reader would read 2-level list as null, which is due to Iceberg's parquet reader + * does not support reading 2-level lists. + */ + Assert.assertTrue("Parquet reader silently incorrectly reads null.", row.isNullAt(1)); + } +} diff --git a/spark/v3.2/spark/src/test/resources/twoLevelList.pq b/spark/v3.2/spark/src/test/resources/twoLevelList.pq new file mode 100644 index 0000000000000000000000000000000000000000..df5eace40eba96cc87682b06b1ebacd3e238630b GIT binary patch literal 584 zcmaiyK}*9h6vtoFQ3}puLIXYY;7S!&*Fn)kFT3p|!;`m>XmDlgoLM)R&bwd1uOax= zb`yN5+t^eTUfz2N{QfWRCB4p7LV(hu^}DbvjiG)RoqgqYYJ+HMQ33YKVj)v(!eJE4NFS+vU@Pd=c+%w8K^?g8qj)! zc=k{&8UQI~b1}l697t=PEyW!r=JZf#Ef#$>QiU3q8;Xbop-O;=Hl~blpJ$=#(>hPV zb$BIA>x?kT%FxYz84~xgCBQNO;nIK&aCp~T!<64Ij0G#P0ecLW*|)jv3azYvmD*dS zm4#A!Q{0haZ-fzuIuoHEW~yu!I+!ax&F{xS5+|)VX-@KSskEAw%~srs&(k21vwUa#l#CpO1Ow1AH00;8XVS4}AiwsA%K> literal 0 HcmV?d00001 From 5953516d49092231c6c607a377fce4fa98c3f96d Mon Sep 17 00:00:00 2001 From: Ashish Singh Date: Sun, 12 Dec 2021 16:29:40 -0800 Subject: [PATCH 02/14] Undo copy error --- site/docs/spark-procedures.md | 7 ------- 1 file changed, 7 deletions(-) diff --git a/site/docs/spark-procedures.md b/site/docs/spark-procedures.md index c106c00cecd3..75947c7961ec 100644 --- a/site/docs/spark-procedures.md +++ b/site/docs/spark-procedures.md @@ -338,13 +338,6 @@ The `snapshot` and `migrate` procedures help test and migrate existing Hive or S for those fields irrespective of actual values in the files. This is a known issue till 0.12.1 Iceberg releases. The issue is planned to be resolved in upcoming releases. -`snapshot` are not the sole owners of their data files, they are prohibited from -actions like `expire_snapshots` which would physically delete data files. Iceberg deletes, which only effect metadata, -are still allowed. In addition, any operations which affect the original data files will disrupt the Snapshot's -integrity. DELETE statements executed against the original Hive table will remove original data files and the -`snapshot` table will no longer be able to access them. - - ### `snapshot` Create a light-weight temporary copy of a table for testing, without changing the source table. From 9ea100ab88799bc13d45d7bb417d865124b76d4c Mon Sep 17 00:00:00 2001 From: Ashish Singh Date: Sun, 12 Dec 2021 16:35:20 -0800 Subject: [PATCH 03/14] Drop dependency that is no longer needed --- spark/v3.2/build.gradle | 6 ------ 1 file changed, 6 deletions(-) diff --git a/spark/v3.2/build.gradle b/spark/v3.2/build.gradle index b83fd028b1e3..30beb0c10624 100644 --- a/spark/v3.2/build.gradle +++ b/spark/v3.2/build.gradle @@ -93,12 +93,6 @@ project(':iceberg-spark:iceberg-spark-3.2_2.12') { testImplementation project(path: ':iceberg-core', configuration: 'testArtifacts') testImplementation project(path: ':iceberg-data', configuration: 'testArtifacts') testImplementation "org.xerial:sqlite-jdbc" - testImplementation("org.apache.parquet:parquet-avro") { - exclude group: 'org.apache.avro', module: 'avro' - // already shaded by Parquet - exclude group: 'it.unimi.dsi' - exclude group: 'org.codehaus.jackson' - } } tasks.withType(Test) { From c192ad82ca824d19a5bc44a3f7289c35f10de60c Mon Sep 17 00:00:00 2001 From: Ashish Singh Date: Sun, 12 Dec 2021 23:44:43 -0800 Subject: [PATCH 04/14] 2 --- .../iceberg/spark/data/TestParquetReader.java | 2 ++ .../spark/src/test/resources/twoLevelList_new.pq | Bin 0 -> 1099 bytes 2 files changed, 2 insertions(+) create mode 100644 spark/v3.2/spark/src/test/resources/twoLevelList_new.pq diff --git a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/data/TestParquetReader.java b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/data/TestParquetReader.java index 71267b849010..c514fc1225a6 100644 --- a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/data/TestParquetReader.java +++ b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/data/TestParquetReader.java @@ -24,6 +24,7 @@ import org.apache.iceberg.Files; import org.apache.iceberg.Schema; import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.mapping.MappingUtil; import org.apache.iceberg.parquet.Parquet; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.types.Types; @@ -72,6 +73,7 @@ public void testParquet2LevelList() throws IOException { Parquet.read(Files.localInput( this.getClass().getClassLoader().getResource("twoLevelList.pq").getPath())) .project(icebergSchema) + .withNameMapping(MappingUtil.create(icebergSchema)) .createReaderFunc(type -> SparkParquetReaders.buildReader(icebergSchema, type)) .build()) { rows = Lists.newArrayList(reader); diff --git a/spark/v3.2/spark/src/test/resources/twoLevelList_new.pq b/spark/v3.2/spark/src/test/resources/twoLevelList_new.pq new file mode 100644 index 0000000000000000000000000000000000000000..97f4e08e2203a30a70633cca6b59e80c3a225207 GIT binary patch literal 1099 zcmb7EL5tHs6n>fP)(8a=c7_Csl;DOg)Yv4u?QZGC+ag7lr3XP4nWWRTG);GtbY)q4 z_2Nn0gDm1t@US5C?8Tqp&6B^ts|dbHv$ZG|g-K@K%lE$bz4w^x-P?C4&?3D>-@UrH zyiuYAO^s>*l@E7+z(WUs0Kh<^Zqqt_efjBpb%rX?!WHV}45fmKSXBiDH=akldGYQ0 z^?!=+jhn>K$-v28lo$*kb1B@=6~5^3(c$Ng64PiKpGJA*U-?@H*(~K%A~ayM&QjXr1s7i!ZcW(q7S5!hVuc0$W7CCPFb3Q927GI^j+p zhdk1BbCVIHWV9@;M2p5cXhxQT78zkp*0!{ktW7=)D)|XDW19f;0DG>|HTITJ3#?AY zPyl0B%F3i%iNNF;<38(JofftQ7xHEMBHM*z`$(t>)>K(W=g#S-3iA25B%jAH|0@KjPS2JL|K3BI2Y!iu6^_cI_JI8-E~@H%?+c4w`R70 Date: Mon, 13 Dec 2021 02:03:16 -0800 Subject: [PATCH 05/14] update doc --- .../iceberg/parquet/ApplyNameMapping.java | 40 ++++++++++++++++-- site/docs/spark-procedures.md | 6 +-- .../iceberg/spark/data/TestParquetReader.java | 10 ++--- .../src/test/resources/twoLevelList_new.pq | Bin 1099 -> 0 bytes 4 files changed, 43 insertions(+), 13 deletions(-) delete mode 100644 spark/v3.2/spark/src/test/resources/twoLevelList_new.pq diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/ApplyNameMapping.java b/parquet/src/main/java/org/apache/iceberg/parquet/ApplyNameMapping.java index 66e88c9bd13f..1c89e0814dec 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/ApplyNameMapping.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/ApplyNameMapping.java @@ -26,7 +26,9 @@ import org.apache.iceberg.mapping.NameMapping; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.parquet.schema.GroupType; +import org.apache.parquet.schema.LogicalTypeAnnotation; import org.apache.parquet.schema.MessageType; +import org.apache.parquet.schema.OriginalType; import org.apache.parquet.schema.PrimitiveType; import org.apache.parquet.schema.Type; import org.apache.parquet.schema.Types; @@ -61,13 +63,20 @@ public Type list(GroupType list, Type elementType) { "List type must have element field"); MappedField field = nameMapping.find(currentPath()); - Type listType = Types.list(list.getRepetition()) - .element(elementType) - .named(list.getName()); + Type listType = makeListType(list, elementType); return field == null ? listType : listType.withId(field.id()); } + private Type makeListType(GroupType list, Type elementType) { + // List's repeated group in 3-level lists can be named differently across different parquet writers. + // For example, hive names it "bag", whereas new parquet writers names it as "list". + return Types.buildGroup(list.getRepetition()) + .as(LogicalTypeAnnotation.listType()) + .repeatedGroup().addFields(elementType).named(list.getFieldName(0)) + .named(list.getName()); + } + @Override public Type map(GroupType map, Type keyType, Type valueType) { Preconditions.checkArgument(keyType != null && valueType != null, @@ -88,6 +97,31 @@ public Type primitive(PrimitiveType primitive) { return field == null ? primitive : primitive.withId(field.id()); } + @Override + public void beforeElementField(Type element) { + super.beforeElementField(makeElement(element)); + } + + @Override + public void afterElementField(Type element) { + super.afterElementField(makeElement(element)); + } + + private Type makeElement(Type element) { + // List's element in 3-level lists can be named differently across different parquet writers. + // For example, hive names it "array_element", whereas new parquet writers names it as "element". + if (element.getName().equals("element") || element.isPrimitive()) { + return element; + } + + Types.GroupBuilder elementBuilder = Types.buildGroup(element.getRepetition()) + .addFields(element.asGroupType().getFields().toArray(new Type[0])); + if (element.getId() != null) { + elementBuilder.id(element.getId().intValue()); + } + return elementBuilder.named("element"); + } + @Override public void beforeRepeatedElement(Type element) { // do not add the repeated element's name diff --git a/site/docs/spark-procedures.md b/site/docs/spark-procedures.md index 75947c7961ec..d1a2e235a5b1 100644 --- a/site/docs/spark-procedures.md +++ b/site/docs/spark-procedures.md @@ -334,9 +334,9 @@ CALL catalog_name.system.rewrite_manifests('db.sample', false) The `snapshot` and `migrate` procedures help test and migrate existing Hive or Spark tables to Iceberg. -**Note** Parquet files written with old two level list structures when read from Spark using Iceberg returns NULL -for those fields irrespective of actual values in the files. This is a known issue till 0.12.1 Iceberg releases. -The issue is planned to be resolved in upcoming releases. +**Note** Parquet files written with Parquet writers that use names other than `list` and `element` for repeated group +and element of the list respectively are **read incorrectly** by Iceberg upto 0.12.1 Iceberg versions. Parquet files +generated by Hive fall in this category. ### `snapshot` diff --git a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/data/TestParquetReader.java b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/data/TestParquetReader.java index c514fc1225a6..83f73854588e 100644 --- a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/data/TestParquetReader.java +++ b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/data/TestParquetReader.java @@ -42,7 +42,7 @@ public class TestParquetReader { public TemporaryFolder temp = new TemporaryFolder(); @Test - public void testParquet2LevelList() throws IOException { + public void testHiveStlyeList() throws IOException { Schema icebergSchema = new Schema( optional(1, "key", Types.StringType.get()), optional(2, "val", Types.ListType.ofRequired(3, Types.StructType.of( @@ -82,11 +82,7 @@ public void testParquet2LevelList() throws IOException { Assert.assertEquals(1, rows.size()); InternalRow row = rows.get(0); Assert.assertEquals("k1", row.getString(0)); - - /* - * Iceberg's parquet reader would read 2-level list as null, which is due to Iceberg's parquet reader - * does not support reading 2-level lists. - */ - Assert.assertTrue("Parquet reader silently incorrectly reads null.", row.isNullAt(1)); + Assert.assertEquals("a", row.getArray(1).getStruct(0, 2).getUTF8String(0).toString()); + Assert.assertEquals("b", row.getArray(1).getStruct(0, 2).getUTF8String(1).toString()); } } diff --git a/spark/v3.2/spark/src/test/resources/twoLevelList_new.pq b/spark/v3.2/spark/src/test/resources/twoLevelList_new.pq deleted file mode 100644 index 97f4e08e2203a30a70633cca6b59e80c3a225207..0000000000000000000000000000000000000000 GIT binary patch literal 0 HcmV?d00001 literal 1099 zcmb7EL5tHs6n>fP)(8a=c7_Csl;DOg)Yv4u?QZGC+ag7lr3XP4nWWRTG);GtbY)q4 z_2Nn0gDm1t@US5C?8Tqp&6B^ts|dbHv$ZG|g-K@K%lE$bz4w^x-P?C4&?3D>-@UrH zyiuYAO^s>*l@E7+z(WUs0Kh<^Zqqt_efjBpb%rX?!WHV}45fmKSXBiDH=akldGYQ0 z^?!=+jhn>K$-v28lo$*kb1B@=6~5^3(c$Ng64PiKpGJA*U-?@H*(~K%A~ayM&QjXr1s7i!ZcW(q7S5!hVuc0$W7CCPFb3Q927GI^j+p zhdk1BbCVIHWV9@;M2p5cXhxQT78zkp*0!{ktW7=)D)|XDW19f;0DG>|HTITJ3#?AY zPyl0B%F3i%iNNF;<38(JofftQ7xHEMBHM*z`$(t>)>K(W=g#S-3iA25B%jAH|0@KjPS2JL|K3BI2Y!iu6^_cI_JI8-E~@H%?+c4w`R70 Date: Mon, 13 Dec 2021 02:17:46 -0800 Subject: [PATCH 06/14] Remove unused import --- .../main/java/org/apache/iceberg/parquet/ApplyNameMapping.java | 1 - 1 file changed, 1 deletion(-) diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/ApplyNameMapping.java b/parquet/src/main/java/org/apache/iceberg/parquet/ApplyNameMapping.java index 1c89e0814dec..5ae1aa35588c 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/ApplyNameMapping.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/ApplyNameMapping.java @@ -28,7 +28,6 @@ import org.apache.parquet.schema.GroupType; import org.apache.parquet.schema.LogicalTypeAnnotation; import org.apache.parquet.schema.MessageType; -import org.apache.parquet.schema.OriginalType; import org.apache.parquet.schema.PrimitiveType; import org.apache.parquet.schema.Type; import org.apache.parquet.schema.Types; From c05b8263cb29efd9fedeac13e14c8cb65a17b6cf Mon Sep 17 00:00:00 2001 From: Ashish Singh Date: Mon, 13 Dec 2021 02:19:22 -0800 Subject: [PATCH 07/14] Nit --- .../iceberg/spark/data/TestParquetReader.java | 4 ++-- .../resources/{twoLevelList.pq => hiveStyleList.pq} | Bin 2 files changed, 2 insertions(+), 2 deletions(-) rename spark/v3.2/spark/src/test/resources/{twoLevelList.pq => hiveStyleList.pq} (100%) diff --git a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/data/TestParquetReader.java b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/data/TestParquetReader.java index 83f73854588e..3eb8a769f055 100644 --- a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/data/TestParquetReader.java +++ b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/data/TestParquetReader.java @@ -53,7 +53,7 @@ public void testHiveStlyeList() throws IOException { List rows; /* Using a static file rather than generating test data in test, as parquet writers in Iceberg only supports - * three level lists. The twoLevelList.pq is a parquet file that contains following Parquet schema. + * three level lists. The hiveStyleList.pq is a parquet file that contains following Parquet schema. * message hive_schema { * optional binary key (STRING); * optional group val (LIST) { @@ -71,7 +71,7 @@ public void testHiveStlyeList() throws IOException { */ try (CloseableIterable reader = Parquet.read(Files.localInput( - this.getClass().getClassLoader().getResource("twoLevelList.pq").getPath())) + this.getClass().getClassLoader().getResource("hiveStyleList.pq").getPath())) .project(icebergSchema) .withNameMapping(MappingUtil.create(icebergSchema)) .createReaderFunc(type -> SparkParquetReaders.buildReader(icebergSchema, type)) diff --git a/spark/v3.2/spark/src/test/resources/twoLevelList.pq b/spark/v3.2/spark/src/test/resources/hiveStyleList.pq similarity index 100% rename from spark/v3.2/spark/src/test/resources/twoLevelList.pq rename to spark/v3.2/spark/src/test/resources/hiveStyleList.pq From e2653ded47d5ab71231ed0ab60e345460b3f7cc1 Mon Sep 17 00:00:00 2001 From: Ashish Singh Date: Mon, 13 Dec 2021 17:10:16 -0800 Subject: [PATCH 08/14] Update test to not use static binary file --- .../iceberg/spark/data/TestParquetReader.java | 121 +++++++++++++----- .../spark/src/test/resources/hiveStyleList.pq | Bin 584 -> 0 bytes 2 files changed, 86 insertions(+), 35 deletions(-) delete mode 100644 spark/v3.2/spark/src/test/resources/hiveStyleList.pq diff --git a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/data/TestParquetReader.java b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/data/TestParquetReader.java index 3eb8a769f055..5aefd0ed2b6a 100644 --- a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/data/TestParquetReader.java +++ b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/data/TestParquetReader.java @@ -19,70 +19,121 @@ package org.apache.iceberg.spark.data; +import java.io.File; +import java.io.FilenameFilter; import java.io.IOException; +import java.util.Arrays; +import java.util.Collections; import java.util.List; +import java.util.Objects; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; import org.apache.iceberg.Files; import org.apache.iceberg.Schema; import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.mapping.MappingUtil; import org.apache.iceberg.parquet.Parquet; import org.apache.iceberg.relocated.com.google.common.collect.Lists; -import org.apache.iceberg.types.Types; +import org.apache.iceberg.spark.SparkSchemaUtil; +import org.apache.parquet.hadoop.ParquetFileReader; +import org.apache.parquet.hadoop.util.HadoopInputFile; +import org.apache.parquet.schema.MessageType; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.sql.SaveMode; +import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.types.ArrayType; +import org.apache.spark.sql.types.DataTypes; +import org.apache.spark.sql.types.Metadata; +import org.apache.spark.sql.types.StructField; +import org.apache.spark.sql.types.StructType; +import org.junit.AfterClass; import org.junit.Assert; +import org.junit.BeforeClass; import org.junit.Rule; import org.junit.Test; import org.junit.rules.TemporaryFolder; -import static org.apache.iceberg.types.Types.NestedField.optional; - public class TestParquetReader { + private static SparkSession spark = null; @Rule public TemporaryFolder temp = new TemporaryFolder(); + @BeforeClass + public static void startSpark() { + TestParquetReader.spark = SparkSession.builder().master("local[2]") + .config("spark.sql.parquet.writeLegacyFormat", true) + .getOrCreate(); + } + + @AfterClass + public static void stopSpark() { + SparkSession currentSpark = TestParquetReader.spark; + TestParquetReader.spark = null; + currentSpark.stop(); + } + @Test - public void testHiveStlyeList() throws IOException { - Schema icebergSchema = new Schema( - optional(1, "key", Types.StringType.get()), - optional(2, "val", Types.ListType.ofRequired(3, Types.StructType.of( - optional(4, "a1", Types.StringType.get()), - optional(5, "a2", Types.StringType.get()) - ) - ))); - List rows; + public void testHiveStyleThreeLevelList() throws IOException { + File location = new File(temp.getRoot(), "parquetReaderTest"); + StructType sparkSchema = + new StructType( + new StructField[]{ + new StructField( + "col1", new ArrayType( + new StructType( + new StructField[]{ + new StructField( + "col2", + DataTypes.IntegerType, + false, + Metadata.empty()) + }), true), true, Metadata.empty())}); + + String expectedParquetSchema = + "message spark_schema {\n" + + " optional group col1 (LIST) {\n" + + " repeated group bag {\n" + + " optional group array {\n" + + " required int32 col2;\n" + + " }\n" + + " }\n" + + " }\n" + + "}\n"; - /* Using a static file rather than generating test data in test, as parquet writers in Iceberg only supports - * three level lists. The hiveStyleList.pq is a parquet file that contains following Parquet schema. - * message hive_schema { - * optional binary key (STRING); - * optional group val (LIST) { - * repeated group bag { - * optional group array_element { - * optional binary a1 (STRING); - * optional binary a2 (STRING); - * } - * } - * } - * } - * - * It contains only one row. Below is the json dump of the file. - * {"key":"k1","val":{"bag":[{"array_element":{"a1":"a","a2":"b"}}]}} - */ + + // generate parquet file with required schema + List testData = Collections.singletonList("{\"col1\": [{\"col2\": 1}]}"); + spark.read().schema(sparkSchema).json( + JavaSparkContext.fromSparkContext(spark.sparkContext()).parallelize(testData)) + .coalesce(1).write().format("parquet").mode(SaveMode.Append).save(location.getPath()); + + File parquetFile = Arrays.stream(Objects.requireNonNull(location.listFiles(new FilenameFilter() { + @Override + public boolean accept(File dir, String name) { + return name.endsWith("parquet"); + } + }))).findAny().get(); + + // verify generated parquet file has expected schema + ParquetFileReader pqReader = ParquetFileReader.open(HadoopInputFile.fromPath(new Path(parquetFile.getPath()), new Configuration())); + MessageType schema = pqReader.getFooter().getFileMetaData().getSchema(); + Assert.assertEquals(expectedParquetSchema, schema.toString()); + + // read from Iceberg's parquet reader and ensure data is read correctly into Spark's internal row + Schema icebergSchema = SparkSchemaUtil.convert(sparkSchema); + List rows; try (CloseableIterable reader = - Parquet.read(Files.localInput( - this.getClass().getClassLoader().getResource("hiveStyleList.pq").getPath())) + Parquet.read(Files.localInput(parquetFile.getPath())) .project(icebergSchema) .withNameMapping(MappingUtil.create(icebergSchema)) .createReaderFunc(type -> SparkParquetReaders.buildReader(icebergSchema, type)) .build()) { rows = Lists.newArrayList(reader); } - Assert.assertEquals(1, rows.size()); InternalRow row = rows.get(0); - Assert.assertEquals("k1", row.getString(0)); - Assert.assertEquals("a", row.getArray(1).getStruct(0, 2).getUTF8String(0).toString()); - Assert.assertEquals("b", row.getArray(1).getStruct(0, 2).getUTF8String(1).toString()); + Assert.assertEquals(1, row.getArray(0).getStruct(0, 1).getInt(0)); } } diff --git a/spark/v3.2/spark/src/test/resources/hiveStyleList.pq b/spark/v3.2/spark/src/test/resources/hiveStyleList.pq deleted file mode 100644 index df5eace40eba96cc87682b06b1ebacd3e238630b..0000000000000000000000000000000000000000 GIT binary patch literal 0 HcmV?d00001 literal 584 zcmaiyK}*9h6vtoFQ3}puLIXYY;7S!&*Fn)kFT3p|!;`m>XmDlgoLM)R&bwd1uOax= zb`yN5+t^eTUfz2N{QfWRCB4p7LV(hu^}DbvjiG)RoqgqYYJ+HMQ33YKVj)v(!eJE4NFS+vU@Pd=c+%w8K^?g8qj)! zc=k{&8UQI~b1}l697t=PEyW!r=JZf#Ef#$>QiU3q8;Xbop-O;=Hl~blpJ$=#(>hPV zb$BIA>x?kT%FxYz84~xgCBQNO;nIK&aCp~T!<64Ij0G#P0ecLW*|)jv3azYvmD*dS zm4#A!Q{0haZ-fzuIuoHEW~yu!I+!ax&F{xS5+|)VX-@KSskEAw%~srs&(k21vwUa#l#CpO1Ow1AH00;8XVS4}AiwsA%K> From 87fc0bc4888315c96ea72f867c1f92fe43e9b0a9 Mon Sep 17 00:00:00 2001 From: Ashish Singh Date: Tue, 14 Dec 2021 10:50:52 -0800 Subject: [PATCH 09/14] Fix checkstyle errors --- .../iceberg/spark/data/TestParquetReader.java | 59 ++++++++++--------- 1 file changed, 30 insertions(+), 29 deletions(-) diff --git a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/data/TestParquetReader.java b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/data/TestParquetReader.java index 5aefd0ed2b6a..b621737fc7d7 100644 --- a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/data/TestParquetReader.java +++ b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/data/TestParquetReader.java @@ -78,36 +78,36 @@ public static void stopSpark() { public void testHiveStyleThreeLevelList() throws IOException { File location = new File(temp.getRoot(), "parquetReaderTest"); StructType sparkSchema = - new StructType( - new StructField[]{ - new StructField( - "col1", new ArrayType( - new StructType( - new StructField[]{ - new StructField( - "col2", - DataTypes.IntegerType, - false, - Metadata.empty()) - }), true), true, Metadata.empty())}); + new StructType( + new StructField[]{ + new StructField( + "col1", new ArrayType( + new StructType( + new StructField[]{ + new StructField( + "col2", + DataTypes.IntegerType, + false, + Metadata.empty()) + }), true), true, Metadata.empty())}); String expectedParquetSchema = - "message spark_schema {\n" + - " optional group col1 (LIST) {\n" + - " repeated group bag {\n" + - " optional group array {\n" + - " required int32 col2;\n" + - " }\n" + - " }\n" + - " }\n" + - "}\n"; + "message spark_schema {\n" + + " optional group col1 (LIST) {\n" + + " repeated group bag {\n" + + " optional group array {\n" + + " required int32 col2;\n" + + " }\n" + + " }\n" + + " }\n" + + "}\n"; // generate parquet file with required schema List testData = Collections.singletonList("{\"col1\": [{\"col2\": 1}]}"); spark.read().schema(sparkSchema).json( - JavaSparkContext.fromSparkContext(spark.sparkContext()).parallelize(testData)) - .coalesce(1).write().format("parquet").mode(SaveMode.Append).save(location.getPath()); + JavaSparkContext.fromSparkContext(spark.sparkContext()).parallelize(testData)) + .coalesce(1).write().format("parquet").mode(SaveMode.Append).save(location.getPath()); File parquetFile = Arrays.stream(Objects.requireNonNull(location.listFiles(new FilenameFilter() { @Override @@ -117,7 +117,8 @@ public boolean accept(File dir, String name) { }))).findAny().get(); // verify generated parquet file has expected schema - ParquetFileReader pqReader = ParquetFileReader.open(HadoopInputFile.fromPath(new Path(parquetFile.getPath()), new Configuration())); + ParquetFileReader pqReader = ParquetFileReader.open( + HadoopInputFile.fromPath(new Path(parquetFile.getPath()), new Configuration())); MessageType schema = pqReader.getFooter().getFileMetaData().getSchema(); Assert.assertEquals(expectedParquetSchema, schema.toString()); @@ -125,11 +126,11 @@ public boolean accept(File dir, String name) { Schema icebergSchema = SparkSchemaUtil.convert(sparkSchema); List rows; try (CloseableIterable reader = - Parquet.read(Files.localInput(parquetFile.getPath())) - .project(icebergSchema) - .withNameMapping(MappingUtil.create(icebergSchema)) - .createReaderFunc(type -> SparkParquetReaders.buildReader(icebergSchema, type)) - .build()) { + Parquet.read(Files.localInput(parquetFile.getPath())) + .project(icebergSchema) + .withNameMapping(MappingUtil.create(icebergSchema)) + .createReaderFunc(type -> SparkParquetReaders.buildReader(icebergSchema, type)) + .build()) { rows = Lists.newArrayList(reader); } Assert.assertEquals(1, rows.size()); From c87b1b596b89b1e0246da66afc742da7e85512dc Mon Sep 17 00:00:00 2001 From: Ashish Singh Date: Thu, 16 Dec 2021 18:22:07 -0800 Subject: [PATCH 10/14] Update unit tests --- .../parquet/TestParquetSchemaUtil.java | 23 +++ .../iceberg/spark/data/TestParquetReader.java | 191 ++++++++++-------- 2 files changed, 127 insertions(+), 87 deletions(-) diff --git a/parquet/src/test/java/org/apache/iceberg/parquet/TestParquetSchemaUtil.java b/parquet/src/test/java/org/apache/iceberg/parquet/TestParquetSchemaUtil.java index 92ca00a76469..da943d901345 100644 --- a/parquet/src/test/java/org/apache/iceberg/parquet/TestParquetSchemaUtil.java +++ b/parquet/src/test/java/org/apache/iceberg/parquet/TestParquetSchemaUtil.java @@ -27,6 +27,7 @@ import org.apache.iceberg.types.Types; import org.apache.parquet.schema.GroupType; import org.apache.parquet.schema.MessageType; +import org.apache.parquet.schema.MessageTypeParser; import org.apache.parquet.schema.PrimitiveType; import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName; import org.apache.parquet.schema.Type; @@ -181,6 +182,28 @@ public void testSchemaConversionWithoutAssigningIds() { Assert.assertEquals("Schema must match", expectedSchema.asStruct(), actualSchema.asStruct()); } + @Test + public void testSchemaConversionForHiveStyleLists() { + String parquetSchemaString = + "message spark_schema {\n" + + " optional group col1 (LIST) {\n" + + " repeated group bag {\n" + + " optional group array {\n" + + " required int32 col2;\n" + + " }\n" + + " }\n" + + " }\n" + + "}\n"; + MessageType messageType = MessageTypeParser.parseMessageType(parquetSchemaString); + + Schema expectedSchema = new Schema(optional(1, "col1", Types.ListType.ofOptional( + 2, Types.StructType.of(required(3, "col2", Types.IntegerType.get()))))); + NameMapping nameMapping = MappingUtil.create(expectedSchema); + MessageType messageTypeWithIds = ParquetSchemaUtil.applyNameMapping(messageType, nameMapping); + Schema actualSchema = ParquetSchemaUtil.convertAndPrune(messageTypeWithIds); + Assert.assertEquals("Schema must match", expectedSchema.asStruct(), actualSchema.asStruct()); + } + private Type primitive(Integer id, String name, PrimitiveTypeName typeName, Repetition repetition) { PrimitiveBuilder builder = org.apache.parquet.schema.Types.primitive(typeName, repetition); if (id != null) { diff --git a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/data/TestParquetReader.java b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/data/TestParquetReader.java index b621737fc7d7..1b4ecf405f75 100644 --- a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/data/TestParquetReader.java +++ b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/data/TestParquetReader.java @@ -20,33 +20,13 @@ package org.apache.iceberg.spark.data; import java.io.File; -import java.io.FilenameFilter; -import java.io.IOException; -import java.util.Arrays; -import java.util.Collections; import java.util.List; -import java.util.Objects; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; -import org.apache.iceberg.Files; -import org.apache.iceberg.Schema; -import org.apache.iceberg.io.CloseableIterable; -import org.apache.iceberg.mapping.MappingUtil; -import org.apache.iceberg.parquet.Parquet; -import org.apache.iceberg.relocated.com.google.common.collect.Lists; -import org.apache.iceberg.spark.SparkSchemaUtil; -import org.apache.parquet.hadoop.ParquetFileReader; -import org.apache.parquet.hadoop.util.HadoopInputFile; -import org.apache.parquet.schema.MessageType; -import org.apache.spark.api.java.JavaSparkContext; -import org.apache.spark.sql.SaveMode; +import java.util.Map; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.spark.SparkSessionCatalog; +import org.apache.iceberg.spark.SparkTestBaseWithCatalog; +import org.apache.iceberg.spark.actions.SparkActions; import org.apache.spark.sql.SparkSession; -import org.apache.spark.sql.catalyst.InternalRow; -import org.apache.spark.sql.types.ArrayType; -import org.apache.spark.sql.types.DataTypes; -import org.apache.spark.sql.types.Metadata; -import org.apache.spark.sql.types.StructField; -import org.apache.spark.sql.types.StructType; import org.junit.AfterClass; import org.junit.Assert; import org.junit.BeforeClass; @@ -54,17 +34,31 @@ import org.junit.Test; import org.junit.rules.TemporaryFolder; -public class TestParquetReader { +public class TestParquetReader extends SparkTestBaseWithCatalog { private static SparkSession spark = null; + private static String catalogName = "spark_catalog"; + private static String implementation = SparkSessionCatalog.class.getName(); + private static Map config = ImmutableMap.of( + "type", "hive", + "default-namespace", "default", + "parquet-enabled", "true", + "cache-enabled", "false" + ); @Rule public TemporaryFolder temp = new TemporaryFolder(); + public TestParquetReader() { + super(catalogName, implementation, config); + spark.sessionState().catalogManager().catalog(catalogName); + } + @BeforeClass public static void startSpark() { TestParquetReader.spark = SparkSession.builder().master("local[2]") .config("spark.sql.parquet.writeLegacyFormat", true) .getOrCreate(); + spark.sessionState().catalogManager().catalog("spark_catalog"); } @AfterClass @@ -75,66 +69,89 @@ public static void stopSpark() { } @Test - public void testHiveStyleThreeLevelList() throws IOException { - File location = new File(temp.getRoot(), "parquetReaderTest"); - StructType sparkSchema = - new StructType( - new StructField[]{ - new StructField( - "col1", new ArrayType( - new StructType( - new StructField[]{ - new StructField( - "col2", - DataTypes.IntegerType, - false, - Metadata.empty()) - }), true), true, Metadata.empty())}); - - String expectedParquetSchema = - "message spark_schema {\n" + - " optional group col1 (LIST) {\n" + - " repeated group bag {\n" + - " optional group array {\n" + - " required int32 col2;\n" + - " }\n" + - " }\n" + - " }\n" + - "}\n"; - - - // generate parquet file with required schema - List testData = Collections.singletonList("{\"col1\": [{\"col2\": 1}]}"); - spark.read().schema(sparkSchema).json( - JavaSparkContext.fromSparkContext(spark.sparkContext()).parallelize(testData)) - .coalesce(1).write().format("parquet").mode(SaveMode.Append).save(location.getPath()); - - File parquetFile = Arrays.stream(Objects.requireNonNull(location.listFiles(new FilenameFilter() { - @Override - public boolean accept(File dir, String name) { - return name.endsWith("parquet"); - } - }))).findAny().get(); - - // verify generated parquet file has expected schema - ParquetFileReader pqReader = ParquetFileReader.open( - HadoopInputFile.fromPath(new Path(parquetFile.getPath()), new Configuration())); - MessageType schema = pqReader.getFooter().getFileMetaData().getSchema(); - Assert.assertEquals(expectedParquetSchema, schema.toString()); - - // read from Iceberg's parquet reader and ensure data is read correctly into Spark's internal row - Schema icebergSchema = SparkSchemaUtil.convert(sparkSchema); - List rows; - try (CloseableIterable reader = - Parquet.read(Files.localInput(parquetFile.getPath())) - .project(icebergSchema) - .withNameMapping(MappingUtil.create(icebergSchema)) - .createReaderFunc(type -> SparkParquetReaders.buildReader(icebergSchema, type)) - .build()) { - rows = Lists.newArrayList(reader); - } - Assert.assertEquals(1, rows.size()); - InternalRow row = rows.get(0); - Assert.assertEquals(1, row.getArray(0).getStruct(0, 1).getInt(0)); + public void testHiveStyleThreeLevelList1() throws Exception { + String tableName = "default.testHiveStyleThreeLevelList"; + File location = temp.newFolder(); + sql(String.format("CREATE TABLE %s (col1 ARRAY>)" + + " STORED AS parquet" + + " LOCATION '%s'", tableName, location)); + + int testValue = 12345; + sql(String.format("INSERT INTO %s VALUES (ARRAY(STRUCT(%s)))", tableName, testValue)); + List expected = sql(String.format("SELECT * FROM %s", tableName)); + + // migrate table + SparkActions.get().migrateTable(tableName).execute(); + + // check migrated table is returning expected result + List results = sql(String.format("SELECT * FROM %s", tableName)); + Assert.assertTrue(results.size() > 0); + assertEquals("Output must match", expected, results); + } + + @Test + public void testHiveStyleThreeLevelListWithNestedStruct() throws Exception { + String tableName = "default.testHiveStyleThreeLevelListWithNestedStruct"; + File location = temp.newFolder(); + sql(String.format("CREATE TABLE %s (col1 ARRAY>>)" + + " STORED AS parquet" + + " LOCATION '%s'", tableName, location)); + + int testValue = 12345; + sql(String.format("INSERT INTO %s VALUES (ARRAY(STRUCT(STRUCT(%s))))", tableName, testValue)); + List expected = sql(String.format("SELECT * FROM %s", tableName)); + + // migrate table + SparkActions.get().migrateTable(tableName).execute(); + + // check migrated table is returning expected result + List results = sql(String.format("SELECT * FROM %s", tableName)); + Assert.assertTrue(results.size() > 0); + assertEquals("Output must match", expected, results); + } + + @Test + public void testHiveStyleThreeLevelLists() throws Exception { + String tableName = "default.testHiveStyleThreeLevelLists"; + File location = temp.newFolder(); + sql(String.format("CREATE TABLE %s (col1 ARRAY>, col3 ARRAY>)" + + " STORED AS parquet" + + " LOCATION '%s'", tableName, location)); + + int testValue1 = 12345; + int testValue2 = 987654; + sql(String.format("INSERT INTO %s VALUES (ARRAY(STRUCT(%s)), ARRAY(STRUCT(%s)))", + tableName, testValue1, testValue2)); + List expected = sql(String.format("SELECT * FROM %s", tableName)); + + // migrate table + SparkActions.get().migrateTable(tableName).execute(); + + // check migrated table is returning expected result + List results = sql(String.format("SELECT * FROM %s", tableName)); + Assert.assertTrue(results.size() > 0); + assertEquals("Output must match", expected, results); + } + + @Test + public void testHiveStyleStructOfThreeLevelLists() throws Exception { + String tableName = "default.testHiveStyleStructOfThreeLevelLists"; + File location = temp.newFolder(); + sql(String.format("CREATE TABLE %s (col1 STRUCT>>)" + + " STORED AS parquet" + + " LOCATION '%s'", tableName, location)); + + int testValue1 = 12345; + sql(String.format("INSERT INTO %s VALUES (STRUCT(STRUCT(ARRAY(STRUCT(%s)))))", + tableName, testValue1)); + List expected = sql(String.format("SELECT * FROM %s", tableName)); + + // migrate table + SparkActions.get().migrateTable(tableName).execute(); + + // check migrated table is returning expected result + List results = sql(String.format("SELECT * FROM %s", tableName)); + Assert.assertTrue(results.size() > 0); + assertEquals("Output must match", expected, results); } } From a6272e9d861f42de53702e61fe7263a48e98b024 Mon Sep 17 00:00:00 2001 From: Ashish Singh Date: Fri, 17 Dec 2021 11:31:36 -0800 Subject: [PATCH 11/14] Move tests into TestCreateActions --- .../spark/actions/TestCreateActions.java | 96 +++++++++++ .../iceberg/spark/data/TestParquetReader.java | 157 ------------------ 2 files changed, 96 insertions(+), 157 deletions(-) delete mode 100644 spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/data/TestParquetReader.java diff --git a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/actions/TestCreateActions.java b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/actions/TestCreateActions.java index 0b60d1a2b613..8489aa3056c2 100644 --- a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/actions/TestCreateActions.java +++ b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/actions/TestCreateActions.java @@ -136,6 +136,7 @@ public void before() { spark.conf().set("hive.exec.dynamic.partition", "true"); spark.conf().set("hive.exec.dynamic.partition.mode", "nonstrict"); + spark.conf().set("spark.sql.parquet.writeLegacyFormat", false); spark.sql(String.format("DROP TABLE IF EXISTS %s", baseTableName)); List expected = Lists.newArrayList( @@ -570,6 +571,101 @@ public void schemaEvolutionTestWithSparkSQL() throws Exception { assertEquals("Output must match", expectedAfterAddColumn, afterMigarteAfterAddResults); } + @Test + public void testHiveStyleThreeLevelList() throws Exception { + spark.conf().set("spark.sql.parquet.writeLegacyFormat", true); + + String tableName = sourceName("testHiveStyleThreeLevelList"); + File location = temp.newFolder(); + sql("CREATE TABLE %s (col1 ARRAY>)" + + " STORED AS parquet" + + " LOCATION '%s'", tableName, location); + + int testValue = 12345; + sql("INSERT INTO %s VALUES (ARRAY(STRUCT(%s)))", tableName, testValue); + List expected = sql(String.format("SELECT * FROM %s", tableName)); + + // migrate table + SparkActions.get().migrateTable(tableName).execute(); + + // check migrated table is returning expected result + List results = sql("SELECT * FROM %s", tableName); + Assert.assertTrue(results.size() > 0); + assertEquals("Output must match", expected, results); + } + + @Test + public void testHiveStyleThreeLevelListWithNestedStruct() throws Exception { + spark.conf().set("spark.sql.parquet.writeLegacyFormat", true); + + String tableName = sourceName("testHiveStyleThreeLevelListWithNestedStruct"); + File location = temp.newFolder(); + sql("CREATE TABLE %s (col1 ARRAY>>)" + + " STORED AS parquet" + + " LOCATION '%s'", tableName, location); + + int testValue = 12345; + sql("INSERT INTO %s VALUES (ARRAY(STRUCT(STRUCT(%s))))", tableName, testValue); + List expected = sql(String.format("SELECT * FROM %s", tableName)); + + // migrate table + SparkActions.get().migrateTable(tableName).execute(); + + // check migrated table is returning expected result + List results = sql("SELECT * FROM %s", tableName); + Assert.assertTrue(results.size() > 0); + assertEquals("Output must match", expected, results); + } + + @Test + public void testHiveStyleThreeLevelLists() throws Exception { + spark.conf().set("spark.sql.parquet.writeLegacyFormat", true); + + String tableName = sourceName("testHiveStyleThreeLevelLists"); + File location = temp.newFolder(); + sql("CREATE TABLE %s (col1 ARRAY>, col3 ARRAY>)" + + " STORED AS parquet" + + " LOCATION '%s'", tableName, location); + + int testValue1 = 12345; + int testValue2 = 987654; + sql("INSERT INTO %s VALUES (ARRAY(STRUCT(%s)), ARRAY(STRUCT(%s)))", + tableName, testValue1, testValue2); + List expected = sql(String.format("SELECT * FROM %s", tableName)); + + // migrate table + SparkActions.get().migrateTable(tableName).execute(); + + // check migrated table is returning expected result + List results = sql("SELECT * FROM %s", tableName); + Assert.assertTrue(results.size() > 0); + assertEquals("Output must match", expected, results); + } + + @Test + public void testHiveStyleStructOfThreeLevelLists() throws Exception { + spark.conf().set("spark.sql.parquet.writeLegacyFormat", true); + + String tableName = sourceName("testHiveStyleStructOfThreeLevelLists"); + File location = temp.newFolder(); + sql("CREATE TABLE %s (col1 STRUCT>>)" + + " STORED AS parquet" + + " LOCATION '%s'", tableName, location); + + int testValue1 = 12345; + sql("INSERT INTO %s VALUES (STRUCT(STRUCT(ARRAY(STRUCT(%s)))))", + tableName, testValue1); + List expected = sql(String.format("SELECT * FROM %s", tableName)); + + // migrate table + SparkActions.get().migrateTable(tableName).execute(); + + // check migrated table is returning expected result + List results = sql("SELECT * FROM %s", tableName); + Assert.assertTrue(results.size() > 0); + assertEquals("Output must match", expected, results); + } + private SparkTable loadTable(String name) throws NoSuchTableException, ParseException { return (SparkTable) catalog.loadTable(Spark3Util.catalogAndIdentifier(spark, name).identifier()); diff --git a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/data/TestParquetReader.java b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/data/TestParquetReader.java deleted file mode 100644 index 1b4ecf405f75..000000000000 --- a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/data/TestParquetReader.java +++ /dev/null @@ -1,157 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.iceberg.spark.data; - -import java.io.File; -import java.util.List; -import java.util.Map; -import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; -import org.apache.iceberg.spark.SparkSessionCatalog; -import org.apache.iceberg.spark.SparkTestBaseWithCatalog; -import org.apache.iceberg.spark.actions.SparkActions; -import org.apache.spark.sql.SparkSession; -import org.junit.AfterClass; -import org.junit.Assert; -import org.junit.BeforeClass; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.TemporaryFolder; - -public class TestParquetReader extends SparkTestBaseWithCatalog { - private static SparkSession spark = null; - private static String catalogName = "spark_catalog"; - private static String implementation = SparkSessionCatalog.class.getName(); - private static Map config = ImmutableMap.of( - "type", "hive", - "default-namespace", "default", - "parquet-enabled", "true", - "cache-enabled", "false" - ); - - @Rule - public TemporaryFolder temp = new TemporaryFolder(); - - public TestParquetReader() { - super(catalogName, implementation, config); - spark.sessionState().catalogManager().catalog(catalogName); - } - - @BeforeClass - public static void startSpark() { - TestParquetReader.spark = SparkSession.builder().master("local[2]") - .config("spark.sql.parquet.writeLegacyFormat", true) - .getOrCreate(); - spark.sessionState().catalogManager().catalog("spark_catalog"); - } - - @AfterClass - public static void stopSpark() { - SparkSession currentSpark = TestParquetReader.spark; - TestParquetReader.spark = null; - currentSpark.stop(); - } - - @Test - public void testHiveStyleThreeLevelList1() throws Exception { - String tableName = "default.testHiveStyleThreeLevelList"; - File location = temp.newFolder(); - sql(String.format("CREATE TABLE %s (col1 ARRAY>)" + - " STORED AS parquet" + - " LOCATION '%s'", tableName, location)); - - int testValue = 12345; - sql(String.format("INSERT INTO %s VALUES (ARRAY(STRUCT(%s)))", tableName, testValue)); - List expected = sql(String.format("SELECT * FROM %s", tableName)); - - // migrate table - SparkActions.get().migrateTable(tableName).execute(); - - // check migrated table is returning expected result - List results = sql(String.format("SELECT * FROM %s", tableName)); - Assert.assertTrue(results.size() > 0); - assertEquals("Output must match", expected, results); - } - - @Test - public void testHiveStyleThreeLevelListWithNestedStruct() throws Exception { - String tableName = "default.testHiveStyleThreeLevelListWithNestedStruct"; - File location = temp.newFolder(); - sql(String.format("CREATE TABLE %s (col1 ARRAY>>)" + - " STORED AS parquet" + - " LOCATION '%s'", tableName, location)); - - int testValue = 12345; - sql(String.format("INSERT INTO %s VALUES (ARRAY(STRUCT(STRUCT(%s))))", tableName, testValue)); - List expected = sql(String.format("SELECT * FROM %s", tableName)); - - // migrate table - SparkActions.get().migrateTable(tableName).execute(); - - // check migrated table is returning expected result - List results = sql(String.format("SELECT * FROM %s", tableName)); - Assert.assertTrue(results.size() > 0); - assertEquals("Output must match", expected, results); - } - - @Test - public void testHiveStyleThreeLevelLists() throws Exception { - String tableName = "default.testHiveStyleThreeLevelLists"; - File location = temp.newFolder(); - sql(String.format("CREATE TABLE %s (col1 ARRAY>, col3 ARRAY>)" + - " STORED AS parquet" + - " LOCATION '%s'", tableName, location)); - - int testValue1 = 12345; - int testValue2 = 987654; - sql(String.format("INSERT INTO %s VALUES (ARRAY(STRUCT(%s)), ARRAY(STRUCT(%s)))", - tableName, testValue1, testValue2)); - List expected = sql(String.format("SELECT * FROM %s", tableName)); - - // migrate table - SparkActions.get().migrateTable(tableName).execute(); - - // check migrated table is returning expected result - List results = sql(String.format("SELECT * FROM %s", tableName)); - Assert.assertTrue(results.size() > 0); - assertEquals("Output must match", expected, results); - } - - @Test - public void testHiveStyleStructOfThreeLevelLists() throws Exception { - String tableName = "default.testHiveStyleStructOfThreeLevelLists"; - File location = temp.newFolder(); - sql(String.format("CREATE TABLE %s (col1 STRUCT>>)" + - " STORED AS parquet" + - " LOCATION '%s'", tableName, location)); - - int testValue1 = 12345; - sql(String.format("INSERT INTO %s VALUES (STRUCT(STRUCT(ARRAY(STRUCT(%s)))))", - tableName, testValue1)); - List expected = sql(String.format("SELECT * FROM %s", tableName)); - - // migrate table - SparkActions.get().migrateTable(tableName).execute(); - - // check migrated table is returning expected result - List results = sql(String.format("SELECT * FROM %s", tableName)); - Assert.assertTrue(results.size() > 0); - assertEquals("Output must match", expected, results); - } -} From d551cee758ed21117a65b8485948f2a9ed0faf92 Mon Sep 17 00:00:00 2001 From: Ashish Singh Date: Fri, 17 Dec 2021 11:48:42 -0800 Subject: [PATCH 12/14] Update doc --- .../iceberg/parquet/ApplyNameMapping.java | 5 +-- site/docs/spark-procedures.md | 35 ++++++++++++------- 2 files changed, 25 insertions(+), 15 deletions(-) diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/ApplyNameMapping.java b/parquet/src/main/java/org/apache/iceberg/parquet/ApplyNameMapping.java index 5ae1aa35588c..bc3e7348da97 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/ApplyNameMapping.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/ApplyNameMapping.java @@ -33,6 +33,7 @@ import org.apache.parquet.schema.Types; class ApplyNameMapping extends ParquetTypeVisitor { + private static final String LIST_ELEMENT_NAME = "element"; private final NameMapping nameMapping; ApplyNameMapping(NameMapping nameMapping) { @@ -109,7 +110,7 @@ public void afterElementField(Type element) { private Type makeElement(Type element) { // List's element in 3-level lists can be named differently across different parquet writers. // For example, hive names it "array_element", whereas new parquet writers names it as "element". - if (element.getName().equals("element") || element.isPrimitive()) { + if (element.getName().equals(LIST_ELEMENT_NAME) || element.isPrimitive()) { return element; } @@ -118,7 +119,7 @@ private Type makeElement(Type element) { if (element.getId() != null) { elementBuilder.id(element.getId().intValue()); } - return elementBuilder.named("element"); + return elementBuilder.named(LIST_ELEMENT_NAME); } @Override diff --git a/site/docs/spark-procedures.md b/site/docs/spark-procedures.md index d1a2e235a5b1..2d233fc599c1 100644 --- a/site/docs/spark-procedures.md +++ b/site/docs/spark-procedures.md @@ -49,7 +49,8 @@ Roll back a table to a specific snapshot ID. To roll back to a specific time, use [`rollback_to_timestamp`](#rollback_to_timestamp). -**Note** this procedure invalidates all cached Spark plans that reference the affected table. +!!! Note + This procedure invalidates all cached Spark plans that reference the affected table. #### Usage @@ -77,7 +78,8 @@ CALL catalog_name.system.rollback_to_snapshot('db.sample', 1) Roll back a table to the snapshot that was current at some time. -**Note** this procedure invalidates all cached Spark plans that reference the affected table. +!!! Note + This procedure invalidates all cached Spark plans that reference the affected table. #### Usage @@ -106,7 +108,8 @@ Sets the current snapshot ID for a table. Unlike rollback, the snapshot is not required to be an ancestor of the current table state. -**Note** this procedure invalidates all cached Spark plans that reference the affected table. +!!! Note + This procedure invalidates all cached Spark plans that reference the affected table. #### Usage @@ -137,7 +140,8 @@ Cherry-picking creates a new snapshot from an existing snapshot without altering Only append and dynamic overwrite snapshots can be cherry-picked. -**Note** this procedure invalidates all cached Spark plans that reference the affected table. +!!! Note + This procedure invalidates all cached Spark plans that reference the affected table. #### Usage @@ -302,7 +306,8 @@ Data files in manifests are sorted by fields in the partition spec. This procedu See the [`RewriteManifestsAction` Javadoc](./javadoc/{{ versions.iceberg }}/org/apache/iceberg/actions/RewriteManifestsAction.html) to see more configuration options. -**Note** this procedure invalidates all cached Spark plans that reference the affected table. +!!! Note + This procedure invalidates all cached Spark plans that reference the affected table. #### Usage @@ -334,9 +339,12 @@ CALL catalog_name.system.rewrite_manifests('db.sample', false) The `snapshot` and `migrate` procedures help test and migrate existing Hive or Spark tables to Iceberg. -**Note** Parquet files written with Parquet writers that use names other than `list` and `element` for repeated group -and element of the list respectively are **read incorrectly** by Iceberg upto 0.12.1 Iceberg versions. Parquet files -generated by Hive fall in this category. +!!! Note + Parquet files written with Parquet writers that use names other than `list` and `element` for repeated group + and element of the list respectively are **read incorrectly as nulls** by Iceberg upto 0.12.1 Iceberg versions. + Most commonly such files are written by follow writers. +1. *Hive*: when written to tables with `org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe` as it's SerDe. +2. *Spark*: when written with `spark.sql.parquet.writeLegacyFormat` set to `true`. ### `snapshot` @@ -348,11 +356,12 @@ When inserts or overwrites run on the snapshot, new files are placed in the snap When finished testing a snapshot table, clean it up by running `DROP TABLE`. -**Note** Because tables created by `snapshot` are not the sole owners of their data files, they are prohibited from -actions like `expire_snapshots` which would physically delete data files. Iceberg deletes, which only effect metadata, -are still allowed. In addition, any operations which affect the original data files will disrupt the Snapshot's -integrity. DELETE statements executed against the original Hive table will remove original data files and the -`snapshot` table will no longer be able to access them. +!!! Note + Because tables created by `snapshot` are not the sole owners of their data files, they are prohibited from + actions like `expire_snapshots` which would physically delete data files. Iceberg deletes, which only effect metadata, + are still allowed. In addition, any operations which affect the original data files will disrupt the Snapshot's + integrity. DELETE statements executed against the original Hive table will remove original data files and the + `snapshot` table will no longer be able to access them. See [`migrate`](#migrate-table-procedure) to replace an existing table with an Iceberg table. From bfece893a58d3c135ccb76eb9f8122512be792aa Mon Sep 17 00:00:00 2001 From: Ashish Singh Date: Fri, 17 Dec 2021 12:25:38 -0800 Subject: [PATCH 13/14] Add tests to also test files written in no-legacy mdoe --- site/docs/spark-procedures.md | 33 ++++------ .../spark/actions/TestCreateActions.java | 64 +++++++++++++++---- 2 files changed, 63 insertions(+), 34 deletions(-) diff --git a/site/docs/spark-procedures.md b/site/docs/spark-procedures.md index 2d233fc599c1..5dcc511414f7 100644 --- a/site/docs/spark-procedures.md +++ b/site/docs/spark-procedures.md @@ -49,8 +49,7 @@ Roll back a table to a specific snapshot ID. To roll back to a specific time, use [`rollback_to_timestamp`](#rollback_to_timestamp). -!!! Note - This procedure invalidates all cached Spark plans that reference the affected table. +**Note** this procedure invalidates all cached Spark plans that reference the affected table. #### Usage @@ -78,8 +77,7 @@ CALL catalog_name.system.rollback_to_snapshot('db.sample', 1) Roll back a table to the snapshot that was current at some time. -!!! Note - This procedure invalidates all cached Spark plans that reference the affected table. +**Note** this procedure invalidates all cached Spark plans that reference the affected table. #### Usage @@ -108,8 +106,7 @@ Sets the current snapshot ID for a table. Unlike rollback, the snapshot is not required to be an ancestor of the current table state. -!!! Note - This procedure invalidates all cached Spark plans that reference the affected table. +**Note** this procedure invalidates all cached Spark plans that reference the affected table. #### Usage @@ -140,8 +137,7 @@ Cherry-picking creates a new snapshot from an existing snapshot without altering Only append and dynamic overwrite snapshots can be cherry-picked. -!!! Note - This procedure invalidates all cached Spark plans that reference the affected table. +**Note** this procedure invalidates all cached Spark plans that reference the affected table. #### Usage @@ -306,8 +302,7 @@ Data files in manifests are sorted by fields in the partition spec. This procedu See the [`RewriteManifestsAction` Javadoc](./javadoc/{{ versions.iceberg }}/org/apache/iceberg/actions/RewriteManifestsAction.html) to see more configuration options. -!!! Note - This procedure invalidates all cached Spark plans that reference the affected table. +**Note** this procedure invalidates all cached Spark plans that reference the affected table. #### Usage @@ -339,10 +334,9 @@ CALL catalog_name.system.rewrite_manifests('db.sample', false) The `snapshot` and `migrate` procedures help test and migrate existing Hive or Spark tables to Iceberg. -!!! Note - Parquet files written with Parquet writers that use names other than `list` and `element` for repeated group - and element of the list respectively are **read incorrectly as nulls** by Iceberg upto 0.12.1 Iceberg versions. - Most commonly such files are written by follow writers. +**Note** Parquet files written with Parquet writers that use names other than `list` and `element` for repeated group +and element of the list respectively are **read incorrectly as nulls** by Iceberg upto 0.12.1 Iceberg versions. Most +commonly such files are written by follow writers. 1. *Hive*: when written to tables with `org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe` as it's SerDe. 2. *Spark*: when written with `spark.sql.parquet.writeLegacyFormat` set to `true`. @@ -356,12 +350,11 @@ When inserts or overwrites run on the snapshot, new files are placed in the snap When finished testing a snapshot table, clean it up by running `DROP TABLE`. -!!! Note - Because tables created by `snapshot` are not the sole owners of their data files, they are prohibited from - actions like `expire_snapshots` which would physically delete data files. Iceberg deletes, which only effect metadata, - are still allowed. In addition, any operations which affect the original data files will disrupt the Snapshot's - integrity. DELETE statements executed against the original Hive table will remove original data files and the - `snapshot` table will no longer be able to access them. +**Note** Because tables created by `snapshot` are not the sole owners of their data files, they are prohibited from +actions like `expire_snapshots` which would physically delete data files. Iceberg deletes, which only effect metadata, +are still allowed. In addition, any operations which affect the original data files will disrupt the Snapshot's +integrity. DELETE statements executed against the original Hive table will remove original data files and the +`snapshot` table will no longer be able to access them. See [`migrate`](#migrate-table-procedure) to replace an existing table with an Iceberg table. diff --git a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/actions/TestCreateActions.java b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/actions/TestCreateActions.java index 8489aa3056c2..e9413606f41b 100644 --- a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/actions/TestCreateActions.java +++ b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/actions/TestCreateActions.java @@ -573,9 +573,48 @@ public void schemaEvolutionTestWithSparkSQL() throws Exception { @Test public void testHiveStyleThreeLevelList() throws Exception { - spark.conf().set("spark.sql.parquet.writeLegacyFormat", true); + threeLevelList(true); + } + + @Test + public void testThreeLevelList() throws Exception { + threeLevelList(false); + } + + @Test + public void testHiveStyleThreeLevelListWithNestedStruct() throws Exception { + threeLevelListWithNestedStruct(true); + } + + @Test + public void testThreeLevelListWithNestedStruct() throws Exception { + threeLevelListWithNestedStruct(false); + } + + @Test + public void testHiveStyleThreeLevelLists() throws Exception { + threeLevelLists(true); + } + + @Test + public void testThreeLevelLists() throws Exception { + threeLevelLists(false); + } + + @Test + public void testHiveStyleStructOfThreeLevelLists() throws Exception { + structOfThreeLevelLists(true); + } + + @Test + public void testStructOfThreeLevelLists() throws Exception { + structOfThreeLevelLists(false); + } + + public void threeLevelList(boolean useLegacyMode) throws Exception { + spark.conf().set("spark.sql.parquet.writeLegacyFormat", useLegacyMode); - String tableName = sourceName("testHiveStyleThreeLevelList"); + String tableName = sourceName(String.format("threeLevelList_%s", useLegacyMode)); File location = temp.newFolder(); sql("CREATE TABLE %s (col1 ARRAY>)" + " STORED AS parquet" + @@ -594,11 +633,10 @@ public void testHiveStyleThreeLevelList() throws Exception { assertEquals("Output must match", expected, results); } - @Test - public void testHiveStyleThreeLevelListWithNestedStruct() throws Exception { - spark.conf().set("spark.sql.parquet.writeLegacyFormat", true); + private void threeLevelListWithNestedStruct(boolean useLegacyMode) throws Exception { + spark.conf().set("spark.sql.parquet.writeLegacyFormat", useLegacyMode); - String tableName = sourceName("testHiveStyleThreeLevelListWithNestedStruct"); + String tableName = sourceName(String.format("threeLevelListWithNestedStruct_%s", useLegacyMode)); File location = temp.newFolder(); sql("CREATE TABLE %s (col1 ARRAY>>)" + " STORED AS parquet" + @@ -617,11 +655,10 @@ public void testHiveStyleThreeLevelListWithNestedStruct() throws Exception { assertEquals("Output must match", expected, results); } - @Test - public void testHiveStyleThreeLevelLists() throws Exception { - spark.conf().set("spark.sql.parquet.writeLegacyFormat", true); + private void threeLevelLists(boolean useLegacyMode) throws Exception { + spark.conf().set("spark.sql.parquet.writeLegacyFormat", useLegacyMode); - String tableName = sourceName("testHiveStyleThreeLevelLists"); + String tableName = sourceName(String.format("threeLevelLists_%s", useLegacyMode)); File location = temp.newFolder(); sql("CREATE TABLE %s (col1 ARRAY>, col3 ARRAY>)" + " STORED AS parquet" + @@ -642,11 +679,10 @@ public void testHiveStyleThreeLevelLists() throws Exception { assertEquals("Output must match", expected, results); } - @Test - public void testHiveStyleStructOfThreeLevelLists() throws Exception { - spark.conf().set("spark.sql.parquet.writeLegacyFormat", true); + private void structOfThreeLevelLists(boolean useLegacyMode) throws Exception { + spark.conf().set("spark.sql.parquet.writeLegacyFormat", useLegacyMode); - String tableName = sourceName("testHiveStyleStructOfThreeLevelLists"); + String tableName = sourceName(String.format("structOfThreeLevelLists_%s", useLegacyMode)); File location = temp.newFolder(); sql("CREATE TABLE %s (col1 STRUCT>>)" + " STORED AS parquet" + From 9bf3acf7b968e0deda7fe076f08221ceefcfa319 Mon Sep 17 00:00:00 2001 From: Ashish Singh Date: Fri, 17 Dec 2021 12:53:10 -0800 Subject: [PATCH 14/14] Rebase --- site/docs/spark-procedures.md | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/site/docs/spark-procedures.md b/site/docs/spark-procedures.md index 5dcc511414f7..a7188d434381 100644 --- a/site/docs/spark-procedures.md +++ b/site/docs/spark-procedures.md @@ -336,7 +336,8 @@ The `snapshot` and `migrate` procedures help test and migrate existing Hive or S **Note** Parquet files written with Parquet writers that use names other than `list` and `element` for repeated group and element of the list respectively are **read incorrectly as nulls** by Iceberg upto 0.12.1 Iceberg versions. Most -commonly such files are written by follow writers. +commonly such files are written by the following writers. + 1. *Hive*: when written to tables with `org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe` as it's SerDe. 2. *Spark*: when written with `spark.sql.parquet.writeLegacyFormat` set to `true`.