Skip to content
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,14 @@
import org.apache.iceberg.mapping.NameMapping;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.parquet.schema.GroupType;
import org.apache.parquet.schema.LogicalTypeAnnotation;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.PrimitiveType;
import org.apache.parquet.schema.Type;
import org.apache.parquet.schema.Types;

class ApplyNameMapping extends ParquetTypeVisitor<Type> {
private static final String LIST_ELEMENT_NAME = "element";
private final NameMapping nameMapping;

ApplyNameMapping(NameMapping nameMapping) {
Expand Down Expand Up @@ -61,13 +63,20 @@ public Type list(GroupType list, Type elementType) {
"List type must have element field");

MappedField field = nameMapping.find(currentPath());
Type listType = Types.list(list.getRepetition())
.element(elementType)
.named(list.getName());
Type listType = makeListType(list, elementType);

return field == null ? listType : listType.withId(field.id());
}

private Type makeListType(GroupType list, Type elementType) {
// List's repeated group in 3-level lists can be named differently across different parquet writers.
// For example, hive names it "bag", whereas new parquet writers names it as "list".
return Types.buildGroup(list.getRepetition())
.as(LogicalTypeAnnotation.listType())
.repeatedGroup().addFields(elementType).named(list.getFieldName(0))
.named(list.getName());
}

@Override
public Type map(GroupType map, Type keyType, Type valueType) {
Preconditions.checkArgument(keyType != null && valueType != null,
Expand All @@ -88,6 +97,31 @@ public Type primitive(PrimitiveType primitive) {
return field == null ? primitive : primitive.withId(field.id());
}

@Override
public void beforeElementField(Type element) {
super.beforeElementField(makeElement(element));
}

@Override
public void afterElementField(Type element) {
super.afterElementField(makeElement(element));
}

private Type makeElement(Type element) {
// List's element in 3-level lists can be named differently across different parquet writers.
// For example, hive names it "array_element", whereas new parquet writers names it as "element".
if (element.getName().equals(LIST_ELEMENT_NAME) || element.isPrimitive()) {
return element;
}

Types.GroupBuilder<GroupType> elementBuilder = Types.buildGroup(element.getRepetition())
.addFields(element.asGroupType().getFields().toArray(new Type[0]));
if (element.getId() != null) {
elementBuilder.id(element.getId().intValue());
}
return elementBuilder.named(LIST_ELEMENT_NAME);
}

@Override
public void beforeRepeatedElement(Type element) {
// do not add the repeated element's name
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.iceberg.types.Types;
import org.apache.parquet.schema.GroupType;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.MessageTypeParser;
import org.apache.parquet.schema.PrimitiveType;
import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName;
import org.apache.parquet.schema.Type;
Expand Down Expand Up @@ -181,6 +182,28 @@ public void testSchemaConversionWithoutAssigningIds() {
Assert.assertEquals("Schema must match", expectedSchema.asStruct(), actualSchema.asStruct());
}

@Test
public void testSchemaConversionForHiveStyleLists() {
String parquetSchemaString =
"message spark_schema {\n" +
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This schema cannot be constructed using the Parquet API directly? I was hoping this would look similar to the tests above that create the schema using Parquet directly.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unfortunately no, as the moment we call list, it will use 3-level list with list and element names

" optional group col1 (LIST) {\n" +
" repeated group bag {\n" +
" optional group array {\n" +
" required int32 col2;\n" +
" }\n" +
" }\n" +
" }\n" +
"}\n";
MessageType messageType = MessageTypeParser.parseMessageType(parquetSchemaString);

Schema expectedSchema = new Schema(optional(1, "col1", Types.ListType.ofOptional(
2, Types.StructType.of(required(3, "col2", Types.IntegerType.get())))));
NameMapping nameMapping = MappingUtil.create(expectedSchema);
MessageType messageTypeWithIds = ParquetSchemaUtil.applyNameMapping(messageType, nameMapping);
Schema actualSchema = ParquetSchemaUtil.convertAndPrune(messageTypeWithIds);
Assert.assertEquals("Schema must match", expectedSchema.asStruct(), actualSchema.asStruct());
}

private Type primitive(Integer id, String name, PrimitiveTypeName typeName, Repetition repetition) {
PrimitiveBuilder<PrimitiveType> builder = org.apache.parquet.schema.Types.primitive(typeName, repetition);
if (id != null) {
Expand Down
7 changes: 7 additions & 0 deletions site/docs/spark-procedures.md
Original file line number Diff line number Diff line change
Expand Up @@ -334,6 +334,13 @@ CALL catalog_name.system.rewrite_manifests('db.sample', false)

The `snapshot` and `migrate` procedures help test and migrate existing Hive or Spark tables to Iceberg.

**Note** Parquet files written with Parquet writers that use names other than `list` and `element` for repeated group
and element of the list respectively are **read incorrectly as nulls** by Iceberg upto 0.12.1 Iceberg versions. Most
commonly such files are written by the following writers.

1. *Hive*: when written to tables with `org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe` as it's SerDe.
2. *Spark*: when written with `spark.sql.parquet.writeLegacyFormat` set to `true`.

### `snapshot`

Create a light-weight temporary copy of a table for testing, without changing the source table.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ public void before() {

spark.conf().set("hive.exec.dynamic.partition", "true");
spark.conf().set("hive.exec.dynamic.partition.mode", "nonstrict");
spark.conf().set("spark.sql.parquet.writeLegacyFormat", false);
spark.sql(String.format("DROP TABLE IF EXISTS %s", baseTableName));

List<SimpleRecord> expected = Lists.newArrayList(
Expand Down Expand Up @@ -570,6 +571,137 @@ public void schemaEvolutionTestWithSparkSQL() throws Exception {
assertEquals("Output must match", expectedAfterAddColumn, afterMigarteAfterAddResults);
}

@Test
public void testHiveStyleThreeLevelList() throws Exception {
threeLevelList(true);
}

@Test
public void testThreeLevelList() throws Exception {
threeLevelList(false);
}

@Test
public void testHiveStyleThreeLevelListWithNestedStruct() throws Exception {
threeLevelListWithNestedStruct(true);
}

@Test
public void testThreeLevelListWithNestedStruct() throws Exception {
threeLevelListWithNestedStruct(false);
}

@Test
public void testHiveStyleThreeLevelLists() throws Exception {
threeLevelLists(true);
}

@Test
public void testThreeLevelLists() throws Exception {
threeLevelLists(false);
}

@Test
public void testHiveStyleStructOfThreeLevelLists() throws Exception {
structOfThreeLevelLists(true);
}

@Test
public void testStructOfThreeLevelLists() throws Exception {
structOfThreeLevelLists(false);
}

public void threeLevelList(boolean useLegacyMode) throws Exception {
spark.conf().set("spark.sql.parquet.writeLegacyFormat", useLegacyMode);

String tableName = sourceName(String.format("threeLevelList_%s", useLegacyMode));
File location = temp.newFolder();
sql("CREATE TABLE %s (col1 ARRAY<STRUCT<col2 INT>>)" +
" STORED AS parquet" +
" LOCATION '%s'", tableName, location);

int testValue = 12345;
sql("INSERT INTO %s VALUES (ARRAY(STRUCT(%s)))", tableName, testValue);
List<Object[]> expected = sql(String.format("SELECT * FROM %s", tableName));

// migrate table
SparkActions.get().migrateTable(tableName).execute();

// check migrated table is returning expected result
List<Object[]> results = sql("SELECT * FROM %s", tableName);
Assert.assertTrue(results.size() > 0);
assertEquals("Output must match", expected, results);
}

private void threeLevelListWithNestedStruct(boolean useLegacyMode) throws Exception {
spark.conf().set("spark.sql.parquet.writeLegacyFormat", useLegacyMode);

String tableName = sourceName(String.format("threeLevelListWithNestedStruct_%s", useLegacyMode));
File location = temp.newFolder();
sql("CREATE TABLE %s (col1 ARRAY<STRUCT<col2 STRUCT<col3 INT>>>)" +
" STORED AS parquet" +
" LOCATION '%s'", tableName, location);

int testValue = 12345;
sql("INSERT INTO %s VALUES (ARRAY(STRUCT(STRUCT(%s))))", tableName, testValue);
List<Object[]> expected = sql(String.format("SELECT * FROM %s", tableName));

// migrate table
SparkActions.get().migrateTable(tableName).execute();

// check migrated table is returning expected result
List<Object[]> results = sql("SELECT * FROM %s", tableName);
Assert.assertTrue(results.size() > 0);
assertEquals("Output must match", expected, results);
}

private void threeLevelLists(boolean useLegacyMode) throws Exception {
spark.conf().set("spark.sql.parquet.writeLegacyFormat", useLegacyMode);

String tableName = sourceName(String.format("threeLevelLists_%s", useLegacyMode));
File location = temp.newFolder();
sql("CREATE TABLE %s (col1 ARRAY<STRUCT<col2 INT>>, col3 ARRAY<STRUCT<col4 INT>>)" +
" STORED AS parquet" +
" LOCATION '%s'", tableName, location);

int testValue1 = 12345;
int testValue2 = 987654;
sql("INSERT INTO %s VALUES (ARRAY(STRUCT(%s)), ARRAY(STRUCT(%s)))",
tableName, testValue1, testValue2);
List<Object[]> expected = sql(String.format("SELECT * FROM %s", tableName));

// migrate table
SparkActions.get().migrateTable(tableName).execute();

// check migrated table is returning expected result
List<Object[]> results = sql("SELECT * FROM %s", tableName);
Assert.assertTrue(results.size() > 0);
assertEquals("Output must match", expected, results);
}

private void structOfThreeLevelLists(boolean useLegacyMode) throws Exception {
spark.conf().set("spark.sql.parquet.writeLegacyFormat", useLegacyMode);

String tableName = sourceName(String.format("structOfThreeLevelLists_%s", useLegacyMode));
File location = temp.newFolder();
sql("CREATE TABLE %s (col1 STRUCT<col2 ARRAY<STRUCT<col3 INT>>>)" +
" STORED AS parquet" +
" LOCATION '%s'", tableName, location);

int testValue1 = 12345;
sql("INSERT INTO %s VALUES (STRUCT(STRUCT(ARRAY(STRUCT(%s)))))",
tableName, testValue1);
List<Object[]> expected = sql(String.format("SELECT * FROM %s", tableName));

// migrate table
SparkActions.get().migrateTable(tableName).execute();

// check migrated table is returning expected result
List<Object[]> results = sql("SELECT * FROM %s", tableName);
Assert.assertTrue(results.size() > 0);
assertEquals("Output must match", expected, results);
}


private SparkTable loadTable(String name) throws NoSuchTableException, ParseException {
return (SparkTable) catalog.loadTable(Spark3Util.catalogAndIdentifier(spark, name).identifier());
Expand Down