From d5bb5e4e5c9889df46bb88f25f9ec30f4a92ab2c Mon Sep 17 00:00:00 2001 From: Wenye Zhang Date: Wed, 27 Apr 2022 10:46:25 -0700 Subject: [PATCH 1/2] support reading union types for orc --- .../apache/iceberg/orc/ApplyNameMapping.java | 15 ++ .../java/org/apache/iceberg/orc/HasIds.java | 5 + .../org/apache/iceberg/orc/ORCSchemaUtil.java | 43 +++- .../apache/iceberg/orc/OrcSchemaVisitor.java | 28 ++- .../iceberg/orc/OrcSchemaWithTypeVisitor.java | 17 +- .../iceberg/spark/data/SparkOrcReader.java | 5 + .../spark/data/SparkOrcValueReaders.java | 33 +++ .../vectorized/VectorizedSparkOrcReaders.java | 42 ++++ .../spark/data/TestSparkOrcUnions.java | 238 ++++++++++++++++++ 9 files changed, 412 insertions(+), 14 deletions(-) create mode 100644 spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkOrcUnions.java diff --git a/orc/src/main/java/org/apache/iceberg/orc/ApplyNameMapping.java b/orc/src/main/java/org/apache/iceberg/orc/ApplyNameMapping.java index 619a8c33f3ce..4dc669dc315f 100644 --- a/orc/src/main/java/org/apache/iceberg/orc/ApplyNameMapping.java +++ b/orc/src/main/java/org/apache/iceberg/orc/ApplyNameMapping.java @@ -70,6 +70,21 @@ public TypeDescription record(TypeDescription record, List names, List options) { + Preconditions.checkArgument(options.size() >= 1, "Union type must have options"); + MappedField field = nameMapping.find(currentPath()); + TypeDescription unionType = TypeDescription.createUnion(); + + for (TypeDescription option : options) { + if (option != null) { + unionType.addUnionChild(option); + } + } + + return setId(unionType, field); + } + @Override public TypeDescription list(TypeDescription array, TypeDescription element) { Preconditions.checkArgument(element != null, "List type must have element type"); diff --git a/orc/src/main/java/org/apache/iceberg/orc/HasIds.java b/orc/src/main/java/org/apache/iceberg/orc/HasIds.java index 833e1d977d44..6043d96db8b9 100644 --- a/orc/src/main/java/org/apache/iceberg/orc/HasIds.java +++ b/orc/src/main/java/org/apache/iceberg/orc/HasIds.java @@ -30,6 +30,11 @@ public Boolean record(TypeDescription record, List names, List return ORCSchemaUtil.icebergID(record).isPresent() || fields.stream().anyMatch(Predicate.isEqual(true)); } + @Override + public Boolean union(TypeDescription union, List options) { + return ORCSchemaUtil.icebergID(union).isPresent() || options.stream().anyMatch(Predicate.isEqual(true)); + } + @Override public Boolean list(TypeDescription array, Boolean element) { return ORCSchemaUtil.icebergID(array).isPresent() || element; diff --git a/orc/src/main/java/org/apache/iceberg/orc/ORCSchemaUtil.java b/orc/src/main/java/org/apache/iceberg/orc/ORCSchemaUtil.java index af1d2cf66a41..dcc3e657d9c9 100644 --- a/orc/src/main/java/org/apache/iceberg/orc/ORCSchemaUtil.java +++ b/orc/src/main/java/org/apache/iceberg/orc/ORCSchemaUtil.java @@ -265,18 +265,7 @@ private static TypeDescription buildOrcProjection(Integer fieldId, Type type, bo switch (type.typeId()) { case STRUCT: - orcType = TypeDescription.createStruct(); - for (Types.NestedField nestedField : type.asStructType().fields()) { - // Using suffix _r to avoid potential underlying issues in ORC reader - // with reused column names between ORC and Iceberg; - // e.g. renaming column c -> d and adding new column d - String name = Optional.ofNullable(mapping.get(nestedField.fieldId())) - .map(OrcField::name) - .orElseGet(() -> nestedField.name() + "_r" + nestedField.fieldId()); - TypeDescription childType = buildOrcProjection(nestedField.fieldId(), nestedField.type(), - isRequired && nestedField.isRequired(), mapping); - orcType.addField(name, childType); - } + orcType = buildOrcProjectForStructType(fieldId, type, isRequired, mapping); break; case LIST: Types.ListType list = (Types.ListType) type; @@ -317,6 +306,36 @@ private static TypeDescription buildOrcProjection(Integer fieldId, Type type, bo return orcType; } + private static TypeDescription buildOrcProjectForStructType(Integer fieldId, Type type, boolean isRequired, + Map mapping) { + TypeDescription orcType; + OrcField orcField = mapping.getOrDefault(fieldId, null); + // this branch means the iceberg struct schema actually correspond to an underlying union + if (orcField != null && orcField.type.getCategory().equals(TypeDescription.Category.UNION)) { + orcType = TypeDescription.createUnion(); + List nestedFields = type.asStructType().fields(); + for (Types.NestedField nestedField : nestedFields.subList(1, nestedFields.size())) { + TypeDescription childType = buildOrcProjection(nestedField.fieldId(), nestedField.type(), + isRequired && nestedField.isRequired(), mapping); + orcType.addUnionChild(childType); + } + } else { + orcType = TypeDescription.createStruct(); + for (Types.NestedField nestedField : type.asStructType().fields()) { + // Using suffix _r to avoid potential underlying issues in ORC reader + // with reused column names between ORC and Iceberg; + // e.g. renaming column c -> d and adding new column d + String name = Optional.ofNullable(mapping.get(nestedField.fieldId())) + .map(OrcField::name) + .orElseGet(() -> nestedField.name() + "_r" + nestedField.fieldId()); + TypeDescription childType = buildOrcProjection(nestedField.fieldId(), nestedField.type(), + isRequired && nestedField.isRequired(), mapping); + orcType.addField(name, childType); + } + } + return orcType; + } + private static Map icebergToOrcMapping(String name, TypeDescription orcType) { Map icebergToOrc = Maps.newHashMap(); switch (orcType.getCategory()) { diff --git a/orc/src/main/java/org/apache/iceberg/orc/OrcSchemaVisitor.java b/orc/src/main/java/org/apache/iceberg/orc/OrcSchemaVisitor.java index 778037b8ce51..87679d140b11 100644 --- a/orc/src/main/java/org/apache/iceberg/orc/OrcSchemaVisitor.java +++ b/orc/src/main/java/org/apache/iceberg/orc/OrcSchemaVisitor.java @@ -47,7 +47,17 @@ public static T visit(TypeDescription schema, OrcSchemaVisitor visitor) { return visitRecord(schema, visitor); case UNION: - throw new UnsupportedOperationException("Cannot handle " + schema); + List types = schema.getChildren(); + List options = Lists.newArrayListWithExpectedSize(types.size()); + for (int i = 0; i < types.size(); i++) { + visitor.beforeUnionOption(types.get(i), i); + try { + options.add(visit(types.get(i), visitor)); + } finally { + visitor.afterUnionOption(types.get(i), i); + } + } + return visitor.union(schema, options); case LIST: final T elementResult; @@ -112,6 +122,10 @@ private static T visitRecord(TypeDescription record, OrcSchemaVisitor vis return visitor.record(record, names, visitFields(fields, names, visitor)); } + public String optionName(int ordinal) { + return "field" + ordinal; + } + public String elementName() { return "_elem"; } @@ -136,6 +150,14 @@ public void afterField(String name, TypeDescription type) { fieldNames.pop(); } + public void beforeUnionOption(TypeDescription option, int ordinal) { + beforeField(optionName(ordinal), option); + } + + public void afterUnionOption(TypeDescription option, int ordinal) { + afterField(optionName(ordinal), option); + } + public void beforeElementField(TypeDescription element) { beforeField(elementName(), element); } @@ -164,6 +186,10 @@ public T record(TypeDescription record, List names, List fields) { return null; } + public T union(TypeDescription union, List options) { + return null; + } + public T list(TypeDescription array, T element) { return null; } diff --git a/orc/src/main/java/org/apache/iceberg/orc/OrcSchemaWithTypeVisitor.java b/orc/src/main/java/org/apache/iceberg/orc/OrcSchemaWithTypeVisitor.java index 53b0c9f2fdeb..7e5eb0660a8b 100644 --- a/orc/src/main/java/org/apache/iceberg/orc/OrcSchemaWithTypeVisitor.java +++ b/orc/src/main/java/org/apache/iceberg/orc/OrcSchemaWithTypeVisitor.java @@ -38,7 +38,7 @@ public static T visit(Type iType, TypeDescription schema, OrcSchemaWithTypeV return visitRecord(iType != null ? iType.asStructType() : null, schema, visitor); case UNION: - throw new UnsupportedOperationException("Cannot handle " + schema); + return visitor.visitUnion(iType, schema, visitor); case LIST: Types.ListType list = iType != null ? iType.asListType() : null; @@ -71,10 +71,25 @@ private static T visitRecord( return visitor.record(struct, record, names, results); } + protected T visitUnion(Type type, TypeDescription union, OrcSchemaWithTypeVisitor visitor) { + List types = union.getChildren(); + List options = Lists.newArrayListWithCapacity(types.size()); + + for (int i = 0; i < types.size(); i += 1) { + options.add(visit(type.asStructType().fields().get(i + 1).type(), types.get(i), visitor)); + } + + return visitor.union(type, union, options); + } + public T record(Types.StructType iStruct, TypeDescription record, List names, List fields) { return null; } + public T union(Type iUnion, TypeDescription union, List options) { + return null; + } + public T list(Types.ListType iList, TypeDescription array, T element) { return null; } diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcReader.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcReader.java index 4ed6420a9aa4..8edfe46af75a 100644 --- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcReader.java +++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcReader.java @@ -75,6 +75,11 @@ public OrcValueReader record( return SparkOrcValueReaders.struct(fields, expected, idToConstant); } + @Override + public OrcValueReader union(Type expected, TypeDescription union, List> options) { + return SparkOrcValueReaders.union(options); + } + @Override public OrcValueReader list(Types.ListType iList, TypeDescription array, OrcValueReader elementReader) { return SparkOrcValueReaders.array(elementReader); diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcValueReaders.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcValueReaders.java index f35ab7a17c63..f3f1c8df0518 100644 --- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcValueReaders.java +++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcValueReaders.java @@ -33,6 +33,7 @@ import org.apache.orc.storage.ql.exec.vector.ListColumnVector; import org.apache.orc.storage.ql.exec.vector.MapColumnVector; import org.apache.orc.storage.ql.exec.vector.TimestampColumnVector; +import org.apache.orc.storage.ql.exec.vector.UnionColumnVector; import org.apache.orc.storage.serde2.io.HiveDecimalWritable; import org.apache.spark.sql.catalyst.InternalRow; import org.apache.spark.sql.catalyst.expressions.GenericInternalRow; @@ -70,6 +71,10 @@ static OrcValueReader struct( return new StructReader(readers, struct, idToConstant); } + static OrcValueReader union(List> readers) { + return new UnionReader(readers); + } + static OrcValueReader array(OrcValueReader elementReader) { return new ArrayReader(elementReader); } @@ -159,6 +164,34 @@ protected void set(InternalRow struct, int pos, Object value) { } } + static class UnionReader implements OrcValueReader { + private final OrcValueReader[] readers; + + private UnionReader(List> readers) { + this.readers = new OrcValueReader[readers.size()]; + for (int i = 0; i < this.readers.length; i += 1) { + this.readers[i] = readers.get(i); + } + } + + @Override + public InternalRow nonNullRead(ColumnVector vector, int row) { + InternalRow struct = new GenericInternalRow(readers.length + 1); + UnionColumnVector unionColumnVector = (UnionColumnVector) vector; + + int fieldIndex = unionColumnVector.tags[row]; + Object value = this.readers[fieldIndex].read(unionColumnVector.fields[fieldIndex], row); + + for (int i = 0; i < readers.length; i += 1) { + struct.setNullAt(i + 1); + } + struct.update(0, fieldIndex); + struct.update(fieldIndex + 1, value); + + return struct; + } + } + private static class StringReader implements OrcValueReader { private static final StringReader INSTANCE = new StringReader(); diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/VectorizedSparkOrcReaders.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/VectorizedSparkOrcReaders.java index 418c25993a7e..387a089202a5 100644 --- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/VectorizedSparkOrcReaders.java +++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/VectorizedSparkOrcReaders.java @@ -19,6 +19,7 @@ package org.apache.iceberg.spark.data.vectorized; +import java.util.Arrays; import java.util.List; import java.util.Map; import java.util.stream.IntStream; @@ -35,8 +36,10 @@ import org.apache.iceberg.types.Types; import org.apache.orc.TypeDescription; import org.apache.orc.storage.ql.exec.vector.ListColumnVector; +import org.apache.orc.storage.ql.exec.vector.LongColumnVector; import org.apache.orc.storage.ql.exec.vector.MapColumnVector; import org.apache.orc.storage.ql.exec.vector.StructColumnVector; +import org.apache.orc.storage.ql.exec.vector.UnionColumnVector; import org.apache.orc.storage.ql.exec.vector.VectorizedRowBatch; import org.apache.spark.sql.types.Decimal; import org.apache.spark.sql.vectorized.ColumnVector; @@ -93,6 +96,11 @@ public Converter record(Types.StructType iStruct, TypeDescription record, List options) { + return new UnionConverter(iType, options); + } + @Override public Converter list(Types.ListType iList, TypeDescription array, Converter element) { return new ArrayConverter(iList, element); @@ -424,4 +432,38 @@ public ColumnVector getChild(int ordinal) { }; } } + + private static class UnionConverter implements Converter { + private final Types.StructType structType; + private final List optionConverters; + + private UnionConverter(Type type, List optionConverters) { + this.structType = type.asStructType(); + this.optionConverters = optionConverters; + } + + @Override + public ColumnVector convert(org.apache.orc.storage.ql.exec.vector.ColumnVector vector, int batchSize, + long batchOffsetInFile) { + UnionColumnVector unionColumnVector = (UnionColumnVector) vector; + List fields = structType.fields(); + List fieldVectors = Lists.newArrayListWithExpectedSize(fields.size()); + + LongColumnVector longColumnVector = new LongColumnVector(); + longColumnVector.vector = Arrays.stream(unionColumnVector.tags).asLongStream().toArray(); + + fieldVectors.add(new PrimitiveOrcColumnVector(Types.IntegerType.get(), batchSize, longColumnVector, + OrcValueReaders.ints(), batchOffsetInFile)); + for (int i = 0; i < fields.size() - 1; i += 1) { + fieldVectors.add(optionConverters.get(i).convert(unionColumnVector.fields[i], batchSize, batchOffsetInFile)); + } + + return new BaseOrcColumnVector(structType, batchSize, vector) { + @Override + public ColumnVector getChild(int ordinal) { + return fieldVectors.get(ordinal); + } + }; + } + } } diff --git a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkOrcUnions.java b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkOrcUnions.java new file mode 100644 index 000000000000..5181c205c381 --- /dev/null +++ b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkOrcUnions.java @@ -0,0 +1,238 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.iceberg.spark.data; + +import java.io.File; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.Iterator; +import java.util.List; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.iceberg.Files; +import org.apache.iceberg.Schema; +import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.orc.ORC; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.relocated.com.google.common.collect.Iterators; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.spark.data.vectorized.VectorizedSparkOrcReaders; +import org.apache.iceberg.types.Types; +import org.apache.orc.OrcFile; +import org.apache.orc.TypeDescription; +import org.apache.orc.Writer; +import org.apache.orc.storage.ql.exec.vector.BytesColumnVector; +import org.apache.orc.storage.ql.exec.vector.LongColumnVector; +import org.apache.orc.storage.ql.exec.vector.StructColumnVector; +import org.apache.orc.storage.ql.exec.vector.UnionColumnVector; +import org.apache.orc.storage.ql.exec.vector.VectorizedRowBatch; +import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.catalyst.expressions.GenericInternalRow; +import org.apache.spark.sql.vectorized.ColumnarBatch; +import org.apache.spark.unsafe.types.UTF8String; +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import static org.apache.iceberg.spark.data.TestHelpers.assertEquals; + +public class TestSparkOrcUnions { + private static final int NUM_OF_ROWS = 50; + + @Rule + public TemporaryFolder temp = new TemporaryFolder(); + + @Test + public void testComplexUnion() throws IOException { + TypeDescription orcSchema = + TypeDescription.fromString("struct>"); + + Schema expectedSchema = new Schema( + Types.NestedField.optional(0, "unionCol", Types.StructType.of( + Types.NestedField.optional(3, "tag", Types.IntegerType.get()), + Types.NestedField.optional(1, "field0", Types.IntegerType.get()), + Types.NestedField.optional(2, "field1", Types.StringType.get()))) + ); + + final InternalRow expectedFirstRow = new GenericInternalRow(1); + final InternalRow field1 = new GenericInternalRow(3); + field1.update(0, 0); + field1.update(1, 0); + field1.update(2, null); + expectedFirstRow.update(0, field1); + + final InternalRow expectedSecondRow = new GenericInternalRow(1); + final InternalRow field2 = new GenericInternalRow(3); + field2.update(0, 1); + field2.update(1, null); + field2.update(2, UTF8String.fromString("foo-1")); + expectedSecondRow.update(0, field2); + + Configuration conf = new Configuration(); + + File orcFile = temp.newFile(); + Path orcFilePath = new Path(orcFile.getPath()); + + Writer writer = OrcFile.createWriter(orcFilePath, + OrcFile.writerOptions(conf) + .setSchema(orcSchema).overwrite(true)); + + VectorizedRowBatch batch = orcSchema.createRowBatch(); + LongColumnVector longColumnVector = new LongColumnVector(NUM_OF_ROWS); + BytesColumnVector bytesColumnVector = new BytesColumnVector(NUM_OF_ROWS); + UnionColumnVector complexUnion = new UnionColumnVector(NUM_OF_ROWS, longColumnVector, bytesColumnVector); + + complexUnion.init(); + + for (int i = 0; i < NUM_OF_ROWS; i += 1) { + complexUnion.tags[i] = i % 2; + longColumnVector.vector[i] = i; + String stringValue = "foo-" + i; + bytesColumnVector.setVal(i, stringValue.getBytes(StandardCharsets.UTF_8)); + } + + batch.size = NUM_OF_ROWS; + batch.cols[0] = complexUnion; + + writer.addRowBatch(batch); + batch.reset(); + writer.close(); + + // Test non-vectorized reader + List actualRows = Lists.newArrayList(); + try (CloseableIterable reader = ORC.read(Files.localInput(orcFile)) + .project(expectedSchema) + .createReaderFunc(readOrcSchema -> new SparkOrcReader(expectedSchema, readOrcSchema)) + .build()) { + reader.forEach(actualRows::add); + + Assert.assertEquals(actualRows.size(), NUM_OF_ROWS); + assertEquals(expectedSchema, expectedFirstRow, actualRows.get(0)); + assertEquals(expectedSchema, expectedSecondRow, actualRows.get(1)); + } + + // Test vectorized reader + try (CloseableIterable reader = ORC.read(Files.localInput(orcFile)) + .project(expectedSchema) + .createBatchedReaderFunc(readOrcSchema -> + VectorizedSparkOrcReaders.buildReader(expectedSchema, readOrcSchema, ImmutableMap.of())) + .build()) { + final Iterator actualRowsIt = batchesToRows(reader.iterator()); + + assertEquals(expectedSchema, expectedFirstRow, actualRowsIt.next()); + assertEquals(expectedSchema, expectedSecondRow, actualRowsIt.next()); + } + } + + @Test + public void testDeeplyNestedUnion() throws IOException { + TypeDescription orcSchema = + TypeDescription.fromString("struct>>>"); + + Schema expectedSchema = new Schema( + Types.NestedField.optional(0, "c1", Types.StructType.of( + Types.NestedField.optional(100, "tag", Types.IntegerType.get()), + Types.NestedField.optional(1, "field0", Types.IntegerType.get()), + Types.NestedField.optional(2, "field1", + Types.StructType.of(Types.NestedField.optional(3, "c2", Types.StringType.get()), + Types.NestedField.optional(4, "c3", Types.StructType.of( + Types.NestedField.optional(101, "tag", Types.IntegerType.get()), + Types.NestedField.optional(5, "field0", Types.IntegerType.get()), + Types.NestedField.optional(6, "field1", Types.StringType.get())))))))); + + final InternalRow expectedFirstRow = new GenericInternalRow(1); + final InternalRow inner1 = new GenericInternalRow(3); + inner1.update(0, 1); + inner1.update(1, null); + final InternalRow inner2 = new GenericInternalRow(2); + inner2.update(0, UTF8String.fromString("foo0")); + final InternalRow inner3 = new GenericInternalRow(3); + inner3.update(0, 0); + inner3.update(1, 0); + inner3.update(2, null); + inner2.update(1, inner3); + inner1.update(2, inner2); + expectedFirstRow.update(0, inner1); + + Configuration conf = new Configuration(); + + File orcFile = temp.newFile(); + Path orcFilePath = new Path(orcFile.getPath()); + + Writer writer = OrcFile.createWriter(orcFilePath, + OrcFile.writerOptions(conf) + .setSchema(orcSchema).overwrite(true)); + + VectorizedRowBatch batch = orcSchema.createRowBatch(); + UnionColumnVector innerUnion1 = (UnionColumnVector) batch.cols[0]; + LongColumnVector innerInt1 = (LongColumnVector) innerUnion1.fields[0]; + innerInt1.fillWithNulls(); + StructColumnVector innerStruct2 = (StructColumnVector) innerUnion1.fields[1]; + BytesColumnVector innerString2 = (BytesColumnVector) innerStruct2.fields[0]; + UnionColumnVector innerUnion3 = (UnionColumnVector) innerStruct2.fields[1]; + LongColumnVector innerInt3 = (LongColumnVector) innerUnion3.fields[0]; + BytesColumnVector innerString3 = (BytesColumnVector) innerUnion3.fields[1]; + innerString3.fillWithNulls(); + + for (int r = 0; r < NUM_OF_ROWS; ++r) { + int row = batch.size++; + innerUnion1.tags[row] = 1; + innerString2.setVal(row, ("foo" + row).getBytes(StandardCharsets.UTF_8)); + innerUnion3.tags[row] = 0; + innerInt3.vector[row] = r; + // If the batch is full, write it out and start over. + if (batch.size == batch.getMaxSize()) { + writer.addRowBatch(batch); + batch.reset(); + innerInt1.fillWithNulls(); + innerString3.fillWithNulls(); + } + } + if (batch.size != 0) { + writer.addRowBatch(batch); + batch.reset(); + } + writer.close(); + + // test non-vectorized reader + List results = Lists.newArrayList(); + try (CloseableIterable reader = ORC.read(Files.localInput(orcFile)) + .project(expectedSchema) + .createReaderFunc(readOrcSchema -> new SparkOrcReader(expectedSchema, readOrcSchema)) + .build()) { + reader.forEach(results::add); + final InternalRow actualFirstRow = results.get(0); + + Assert.assertEquals(results.size(), NUM_OF_ROWS); + assertEquals(expectedSchema, expectedFirstRow, actualFirstRow); + } + + // test vectorized reader + try (CloseableIterable reader = ORC.read(Files.localInput(orcFile)) + .project(expectedSchema) + .createBatchedReaderFunc(readOrcSchema -> + VectorizedSparkOrcReaders.buildReader(expectedSchema, readOrcSchema, ImmutableMap.of())) + .build()) { + final Iterator actualRowsIt = batchesToRows(reader.iterator()); + + assertEquals(expectedSchema, expectedFirstRow, actualRowsIt.next()); + } + } + + private Iterator batchesToRows(Iterator batches) { + return Iterators.concat(Iterators.transform(batches, ColumnarBatch::rowIterator)); + } +} From b22173164ddda2706bdb5e37c62821dbd3bc9fbd Mon Sep 17 00:00:00 2001 From: Wenye Zhang Date: Wed, 27 Apr 2022 12:45:14 -0700 Subject: [PATCH 2/2] add case union in icebergToOrcMapping --- .../main/java/org/apache/iceberg/orc/ORCSchemaUtil.java | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/orc/src/main/java/org/apache/iceberg/orc/ORCSchemaUtil.java b/orc/src/main/java/org/apache/iceberg/orc/ORCSchemaUtil.java index dcc3e657d9c9..03a85f23fbd0 100644 --- a/orc/src/main/java/org/apache/iceberg/orc/ORCSchemaUtil.java +++ b/orc/src/main/java/org/apache/iceberg/orc/ORCSchemaUtil.java @@ -346,6 +346,13 @@ private static Map icebergToOrcMapping(String name, TypeDescr icebergToOrc.putAll(icebergToOrcMapping(childrenNames.get(i), children.get(i))); } break; + case UNION: + // This is part of building orc read schema in file level. orcType has union type inside it. + List options = orcType.getChildren(); + for (int i = 0; i < options.size(); i++) { + icebergToOrc.putAll(icebergToOrcMapping("option" + i, options.get(i))); + } + break; case LIST: icebergToOrc.putAll(icebergToOrcMapping("element", orcType.getChildren().get(0))); break;