diff --git a/hive-metastore/src/main/java/org/apache/iceberg/hive/legacy/HiveTypeToIcebergType.java b/hive-metastore/src/main/java/org/apache/iceberg/hive/legacy/HiveTypeToIcebergType.java index 7d022cfc3a..5871b3ed83 100644 --- a/hive-metastore/src/main/java/org/apache/iceberg/hive/legacy/HiveTypeToIcebergType.java +++ b/hive-metastore/src/main/java/org/apache/iceberg/hive/legacy/HiveTypeToIcebergType.java @@ -32,7 +32,7 @@ public class HiveTypeToIcebergType extends HiveTypeUtil.HiveSchemaVisitor { - private static final String UNION_TO_STRUCT_CONVERSION_PREFIX = "tag_"; + private static final String UNION_TO_STRUCT_CONVERSION_PREFIX = "field"; private int nextId = 1; @Override @@ -57,7 +57,8 @@ public Type list(ListTypeInfo list, Type elementResult) { // Mimic the struct call behavior to construct a union converted struct type @Override public Type union(UnionTypeInfo union, List unionResults) { - List fields = Lists.newArrayListWithExpectedSize(unionResults.size()); + List fields = Lists.newArrayListWithExpectedSize(unionResults.size() + 1); + fields.add(Types.NestedField.required(allocateId(), "tag", Types.IntegerType.get())); for (int i = 0; i < unionResults.size(); i++) { fields.add(Types.NestedField.optional(allocateId(), UNION_TO_STRUCT_CONVERSION_PREFIX + i, unionResults.get(i))); } diff --git a/hive-metastore/src/test/java/org/apache/iceberg/hive/legacy/TestHiveSchemaConversions.java b/hive-metastore/src/test/java/org/apache/iceberg/hive/legacy/TestHiveSchemaConversions.java index 7b12233f25..8c770437b3 100644 --- a/hive-metastore/src/test/java/org/apache/iceberg/hive/legacy/TestHiveSchemaConversions.java +++ b/hive-metastore/src/test/java/org/apache/iceberg/hive/legacy/TestHiveSchemaConversions.java @@ -80,7 +80,7 @@ public void testConversions() { "struct<" + "length:int,count:int,list:array>," + "wordcounts:map>"); - check("struct<1: tag_0: optional int, 2: tag_1: optional string>", "uniontype"); + check("struct<1: tag: required int, 2: field0: optional int, 3: field1: optional string>", "uniontype"); } private static void check(String icebergTypeStr, String hiveTypeStr) { diff --git a/orc/src/main/java/org/apache/iceberg/orc/ORCSchemaUtil.java b/orc/src/main/java/org/apache/iceberg/orc/ORCSchemaUtil.java index 12d1389d66..ad4c80fe43 100644 --- a/orc/src/main/java/org/apache/iceberg/orc/ORCSchemaUtil.java +++ b/orc/src/main/java/org/apache/iceberg/orc/ORCSchemaUtil.java @@ -311,9 +311,11 @@ private static TypeDescription buildOrcProjectForStructType(Integer fieldId, Typ Map mapping) { TypeDescription orcType; OrcField orcField = mapping.getOrDefault(fieldId, null); + // this branch means the iceberg struct schema actually correspond to an underlying union if (orcField != null && orcField.type.getCategory().equals(TypeDescription.Category.UNION)) { orcType = TypeDescription.createUnion(); - for (Types.NestedField nestedField : type.asStructType().fields()) { + List nestedFields = type.asStructType().fields(); + for (Types.NestedField nestedField : nestedFields.subList(1, nestedFields.size())) { TypeDescription childType = buildOrcProjection(nestedField.fieldId(), nestedField.type(), isRequired && nestedField.isRequired(), mapping); orcType.addUnionChild(childType); diff --git a/orc/src/main/java/org/apache/iceberg/orc/OrcSchemaVisitor.java b/orc/src/main/java/org/apache/iceberg/orc/OrcSchemaVisitor.java index 2ff967c9be..87679d140b 100644 --- a/orc/src/main/java/org/apache/iceberg/orc/OrcSchemaVisitor.java +++ b/orc/src/main/java/org/apache/iceberg/orc/OrcSchemaVisitor.java @@ -123,7 +123,7 @@ private static T visitRecord(TypeDescription record, OrcSchemaVisitor vis } public String optionName(int ordinal) { - return "tag_" + ordinal; + return "field" + ordinal; } public String elementName() { diff --git a/orc/src/main/java/org/apache/iceberg/orc/OrcSchemaWithTypeVisitor.java b/orc/src/main/java/org/apache/iceberg/orc/OrcSchemaWithTypeVisitor.java index 640feccff3..c30eea2483 100644 --- a/orc/src/main/java/org/apache/iceberg/orc/OrcSchemaWithTypeVisitor.java +++ b/orc/src/main/java/org/apache/iceberg/orc/OrcSchemaWithTypeVisitor.java @@ -38,7 +38,7 @@ public static T visit(Type iType, TypeDescription schema, OrcSchemaWithTypeV return visitor.visitRecord(iType != null ? iType.asStructType() : null, schema, visitor); case UNION: - return visitUnion(iType, schema, visitor); + return visitor.visitUnion(iType, schema, visitor); case LIST: Types.ListType list = iType != null ? iType.asListType() : null; @@ -71,12 +71,12 @@ protected T visitRecord( return visitor.record(struct, record, names, results); } - private static T visitUnion(Type type, TypeDescription union, OrcSchemaWithTypeVisitor visitor) { + protected T visitUnion(Type type, TypeDescription union, OrcSchemaWithTypeVisitor visitor) { List types = union.getChildren(); List options = Lists.newArrayListWithCapacity(types.size()); for (int i = 0; i < types.size(); i += 1) { - options.add(visit(type.asStructType().fields().get(i).type(), types.get(i), visitor)); + options.add(visit(type.asStructType().fields().get(i + 1).type(), types.get(i), visitor)); } return visitor.union(type, union, options); diff --git a/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcValueReaders.java b/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcValueReaders.java index 3091c3f2e6..f3f1c8df05 100644 --- a/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcValueReaders.java +++ b/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcValueReaders.java @@ -176,16 +176,17 @@ private UnionReader(List> readers) { @Override public InternalRow nonNullRead(ColumnVector vector, int row) { - InternalRow struct = new GenericInternalRow(readers.length); + InternalRow struct = new GenericInternalRow(readers.length + 1); UnionColumnVector unionColumnVector = (UnionColumnVector) vector; int fieldIndex = unionColumnVector.tags[row]; Object value = this.readers[fieldIndex].read(unionColumnVector.fields[fieldIndex], row); for (int i = 0; i < readers.length; i += 1) { - struct.setNullAt(i); + struct.setNullAt(i + 1); } - struct.update(fieldIndex, value); + struct.update(0, fieldIndex); + struct.update(fieldIndex + 1, value); return struct; } diff --git a/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/VectorizedSparkOrcReaders.java b/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/VectorizedSparkOrcReaders.java index 1620765980..d626e01191 100644 --- a/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/VectorizedSparkOrcReaders.java +++ b/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/VectorizedSparkOrcReaders.java @@ -19,6 +19,7 @@ package org.apache.iceberg.spark.data.vectorized; +import java.util.Arrays; import java.util.List; import java.util.Map; import java.util.stream.IntStream; @@ -36,6 +37,7 @@ import org.apache.iceberg.types.Types; import org.apache.orc.TypeDescription; import org.apache.orc.storage.ql.exec.vector.ListColumnVector; +import org.apache.orc.storage.ql.exec.vector.LongColumnVector; import org.apache.orc.storage.ql.exec.vector.MapColumnVector; import org.apache.orc.storage.ql.exec.vector.StructColumnVector; import org.apache.orc.storage.ql.exec.vector.UnionColumnVector; @@ -443,11 +445,14 @@ public ColumnVector convert(org.apache.orc.storage.ql.exec.vector.ColumnVector v long batchOffsetInFile) { UnionColumnVector unionColumnVector = (UnionColumnVector) vector; List fields = structType.fields(); - assert fields.size() == unionColumnVector.fields.length; - assert fields.size() == optionConverters.size(); - List fieldVectors = Lists.newArrayListWithExpectedSize(fields.size()); - for (int i = 0; i < fields.size(); i += 1) { + + LongColumnVector longColumnVector = new LongColumnVector(); + longColumnVector.vector = Arrays.stream(unionColumnVector.tags).asLongStream().toArray(); + + fieldVectors.add(new PrimitiveOrcColumnVector(Types.IntegerType.get(), batchSize, longColumnVector, + OrcValueReaders.ints(), batchOffsetInFile)); + for (int i = 0; i < fields.size() - 1; i += 1) { fieldVectors.add(optionConverters.get(i).convert(unionColumnVector.fields[i], batchSize, batchOffsetInFile)); } diff --git a/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkOrcUnions.java b/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkOrcUnions.java index 7fcdbad273..3cddbfe479 100644 --- a/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkOrcUnions.java +++ b/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkOrcUnions.java @@ -68,20 +68,23 @@ public void testComplexUnion() throws IOException { Schema expectedSchema = new Schema( Types.NestedField.optional(0, "unionCol", Types.StructType.of( - Types.NestedField.optional(1, "tag_0", Types.IntegerType.get()), - Types.NestedField.optional(2, "tag_1", Types.StringType.get()))) + Types.NestedField.optional(3, "tag", Types.IntegerType.get()), + Types.NestedField.optional(1, "field0", Types.IntegerType.get()), + Types.NestedField.optional(2, "field1", Types.StringType.get()))) ); final InternalRow expectedFirstRow = new GenericInternalRow(1); - final InternalRow field1 = new GenericInternalRow(2); + final InternalRow field1 = new GenericInternalRow(3); field1.update(0, 0); - field1.update(1, null); + field1.update(1, 0); + field1.update(2, null); expectedFirstRow.update(0, field1); final InternalRow expectedSecondRow = new GenericInternalRow(1); - final InternalRow field2 = new GenericInternalRow(2); - field2.update(0, null); - field2.update(1, UTF8String.fromString("stringtype1")); + final InternalRow field2 = new GenericInternalRow(3); + field2.update(0, 1); + field2.update(1, null); + field2.update(2, UTF8String.fromString("foo-1")); expectedSecondRow.update(0, field2); Configuration conf = new Configuration(); @@ -103,7 +106,7 @@ public void testComplexUnion() throws IOException { for (int i = 0; i < NUM_OF_ROWS; i += 1) { complexUnion.tags[i] = i % 2; longColumnVector.vector[i] = i; - String stringValue = "stringtype" + i; + String stringValue = "foo-" + i; bytesColumnVector.setVal(i, stringValue.getBytes(StandardCharsets.UTF_8)); } @@ -115,106 +118,28 @@ public void testComplexUnion() throws IOException { writer.close(); // Test non-vectorized reader - List internalRows = Lists.newArrayList(); + List actualRows = Lists.newArrayList(); try (CloseableIterable reader = ORC.read(Files.localInput(orcFile)) .project(expectedSchema) .createReaderFunc(readOrcSchema -> new SparkOrcReader(expectedSchema, readOrcSchema)) .build()) { - reader.forEach(internalRows::add); + reader.forEach(actualRows::add); - Assert.assertEquals(internalRows.size(), NUM_OF_ROWS); - assertEquals(expectedSchema, expectedFirstRow, internalRows.get(0)); - assertEquals(expectedSchema, expectedSecondRow, internalRows.get(1)); + Assert.assertEquals(actualRows.size(), NUM_OF_ROWS); + assertEquals(expectedSchema, expectedFirstRow, actualRows.get(0)); + assertEquals(expectedSchema, expectedSecondRow, actualRows.get(1)); } // Test vectorized reader - List columnarBatches = Lists.newArrayList(); try (CloseableIterable reader = ORC.read(Files.localInput(orcFile)) .project(expectedSchema) .createBatchedReaderFunc(readOrcSchema -> VectorizedSparkOrcReaders.buildReader(expectedSchema, readOrcSchema, ImmutableMap.of())) .build()) { - reader.forEach(columnarBatches::add); - Iterator rowIterator = columnarBatches.get(0).rowIterator(); + final Iterator actualRowsIt = batchesToRows(reader.iterator()); - Assert.assertEquals(columnarBatches.get(0).numRows(), NUM_OF_ROWS); - assertEquals(expectedSchema, expectedFirstRow, rowIterator.next()); - assertEquals(expectedSchema, expectedSecondRow, rowIterator.next()); - } - } - - @Test - public void testSingleComponentUnion() throws IOException { - TypeDescription orcSchema = - TypeDescription.fromString("struct>"); - - Schema expectedSchema = new Schema( - Types.NestedField.optional(0, "unionCol", Types.StructType.of( - Types.NestedField.optional(1, "tag_0", Types.IntegerType.get()))) - ); - - final InternalRow expectedFirstRow = new GenericInternalRow(1); - final InternalRow field1 = new GenericInternalRow(1); - field1.update(0, 0); - expectedFirstRow.update(0, field1); - - final InternalRow expectedSecondRow = new GenericInternalRow(1); - final InternalRow field2 = new GenericInternalRow(1); - field2.update(0, 3); - expectedSecondRow.update(0, field2); - - Configuration conf = new Configuration(); - - File orcFile = temp.newFile(); - Path orcFilePath = new Path(orcFile.getPath()); - - Writer writer = OrcFile.createWriter(orcFilePath, - OrcFile.writerOptions(conf) - .setSchema(orcSchema).overwrite(true)); - - VectorizedRowBatch batch = orcSchema.createRowBatch(); - LongColumnVector longColumnVector = new LongColumnVector(NUM_OF_ROWS); - UnionColumnVector complexUnion = new UnionColumnVector(NUM_OF_ROWS, longColumnVector); - complexUnion.init(); - - for (int i = 0; i < NUM_OF_ROWS; i += 1) { - complexUnion.tags[i] = 0; - longColumnVector.vector[i] = 3 * i; - } - - batch.size = NUM_OF_ROWS; - batch.cols[0] = complexUnion; - - writer.addRowBatch(batch); - batch.reset(); - writer.close(); - - // Test non-vectorized reader - List internalRows = Lists.newArrayList(); - try (CloseableIterable reader = ORC.read(Files.localInput(orcFile)) - .project(expectedSchema) - .createReaderFunc(readOrcSchema -> new SparkOrcReader(expectedSchema, readOrcSchema)) - .build()) { - reader.forEach(internalRows::add); - - Assert.assertEquals(internalRows.size(), NUM_OF_ROWS); - assertEquals(expectedSchema, expectedFirstRow, internalRows.get(0)); - assertEquals(expectedSchema, expectedSecondRow, internalRows.get(1)); - } - - // Test vectorized reader - List columnarBatches = Lists.newArrayList(); - try (CloseableIterable reader = ORC.read(Files.localInput(orcFile)) - .project(expectedSchema) - .createBatchedReaderFunc(readOrcSchema -> - VectorizedSparkOrcReaders.buildReader(expectedSchema, readOrcSchema, ImmutableMap.of())) - .build()) { - reader.forEach(columnarBatches::add); - Iterator rowIterator = columnarBatches.get(0).rowIterator(); - - Assert.assertEquals(columnarBatches.get(0).numRows(), NUM_OF_ROWS); - assertEquals(expectedSchema, expectedFirstRow, rowIterator.next()); - assertEquals(expectedSchema, expectedSecondRow, rowIterator.next()); + assertEquals(expectedSchema, expectedFirstRow, actualRowsIt.next()); + assertEquals(expectedSchema, expectedSecondRow, actualRowsIt.next()); } } @@ -225,23 +150,27 @@ public void testDeeplyNestedUnion() throws IOException { Schema expectedSchema = new Schema( Types.NestedField.optional(0, "c1", Types.StructType.of( - Types.NestedField.optional(1, "tag_0", Types.IntegerType.get()), - Types.NestedField.optional(2, "tag_1", + Types.NestedField.optional(100, "tag", Types.IntegerType.get()), + Types.NestedField.optional(1, "field0", Types.IntegerType.get()), + Types.NestedField.optional(2, "field1", Types.StructType.of(Types.NestedField.optional(3, "c2", Types.StringType.get()), Types.NestedField.optional(4, "c3", Types.StructType.of( - Types.NestedField.optional(5, "tag_0", Types.IntegerType.get()), - Types.NestedField.optional(6, "tag_1", Types.StringType.get())))))))); + Types.NestedField.optional(101, "tag", Types.IntegerType.get()), + Types.NestedField.optional(5, "field0", Types.IntegerType.get()), + Types.NestedField.optional(6, "field1", Types.StringType.get())))))))); final InternalRow expectedFirstRow = new GenericInternalRow(1); - final InternalRow inner1 = new GenericInternalRow(2); - inner1.update(0, null); + final InternalRow inner1 = new GenericInternalRow(3); + inner1.update(0, 1); + inner1.update(1, null); final InternalRow inner2 = new GenericInternalRow(2); inner2.update(0, UTF8String.fromString("foo0")); - final InternalRow inner3 = new GenericInternalRow(2); + final InternalRow inner3 = new GenericInternalRow(3); inner3.update(0, 0); - inner3.update(1, null); + inner3.update(1, 0); + inner3.update(2, null); inner2.update(1, inner3); - inner1.update(1, inner2); + inner1.update(2, inner2); expectedFirstRow.update(0, inner1); Configuration conf = new Configuration(); @@ -303,10 +232,9 @@ public void testDeeplyNestedUnion() throws IOException { .createBatchedReaderFunc(readOrcSchema -> VectorizedSparkOrcReaders.buildReader(expectedSchema, readOrcSchema, ImmutableMap.of())) .build()) { - final Iterator actualRows = batchesToRows(reader.iterator()); - final InternalRow actualFirstRow = actualRows.next(); + final Iterator actualRowsIt = batchesToRows(reader.iterator()); - assertEquals(expectedSchema, expectedFirstRow, actualFirstRow); + assertEquals(expectedSchema, expectedFirstRow, actualRowsIt.next()); } }