diff --git a/orc/src/main/java/org/apache/iceberg/orc/ApplyNameMapping.java b/orc/src/main/java/org/apache/iceberg/orc/ApplyNameMapping.java index 619a8c33f3..4dc669dc31 100644 --- a/orc/src/main/java/org/apache/iceberg/orc/ApplyNameMapping.java +++ b/orc/src/main/java/org/apache/iceberg/orc/ApplyNameMapping.java @@ -70,6 +70,21 @@ public TypeDescription record(TypeDescription record, List names, List options) { + Preconditions.checkArgument(options.size() >= 1, "Union type must have options"); + MappedField field = nameMapping.find(currentPath()); + TypeDescription unionType = TypeDescription.createUnion(); + + for (TypeDescription option : options) { + if (option != null) { + unionType.addUnionChild(option); + } + } + + return setId(unionType, field); + } + @Override public TypeDescription list(TypeDescription array, TypeDescription element) { Preconditions.checkArgument(element != null, "List type must have element type"); diff --git a/orc/src/main/java/org/apache/iceberg/orc/HasIds.java b/orc/src/main/java/org/apache/iceberg/orc/HasIds.java index 833e1d977d..6043d96db8 100644 --- a/orc/src/main/java/org/apache/iceberg/orc/HasIds.java +++ b/orc/src/main/java/org/apache/iceberg/orc/HasIds.java @@ -30,6 +30,11 @@ public Boolean record(TypeDescription record, List names, List return ORCSchemaUtil.icebergID(record).isPresent() || fields.stream().anyMatch(Predicate.isEqual(true)); } + @Override + public Boolean union(TypeDescription union, List options) { + return ORCSchemaUtil.icebergID(union).isPresent() || options.stream().anyMatch(Predicate.isEqual(true)); + } + @Override public Boolean list(TypeDescription array, Boolean element) { return ORCSchemaUtil.icebergID(array).isPresent() || element; diff --git a/orc/src/main/java/org/apache/iceberg/orc/ORCSchemaUtil.java b/orc/src/main/java/org/apache/iceberg/orc/ORCSchemaUtil.java index 472a282bcc..bd6875e183 100644 --- a/orc/src/main/java/org/apache/iceberg/orc/ORCSchemaUtil.java +++ b/orc/src/main/java/org/apache/iceberg/orc/ORCSchemaUtil.java @@ -34,6 +34,7 @@ import org.apache.iceberg.types.Types; import org.apache.orc.TypeDescription; + /** * Utilities for mapping Iceberg to ORC schemas. */ @@ -265,21 +266,7 @@ private static TypeDescription buildOrcProjection(Integer fieldId, Type type, bo switch (type.typeId()) { case STRUCT: - orcType = TypeDescription.createStruct(); - for (Types.NestedField nestedField : type.asStructType().fields()) { - // Using suffix _r to avoid potential underlying issues in ORC reader - // with reused column names between ORC and Iceberg; - // e.g. renaming column c -> d and adding new column d - if (mapping.get(nestedField.fieldId()) == null && nestedField.hasDefaultValue()) { - continue; - } - String name = Optional.ofNullable(mapping.get(nestedField.fieldId())) - .map(OrcField::name) - .orElseGet(() -> nestedField.name() + "_r" + nestedField.fieldId()); - TypeDescription childType = buildOrcProjection(nestedField.fieldId(), nestedField.type(), - isRequired && nestedField.isRequired(), mapping); - orcType.addField(name, childType); - } + orcType = buildOrcProjectForStructType(fieldId, type, isRequired, mapping); break; case LIST: Types.ListType list = (Types.ListType) type; @@ -320,6 +307,32 @@ private static TypeDescription buildOrcProjection(Integer fieldId, Type type, bo return orcType; } + private static TypeDescription buildOrcProjectForStructType(Integer fieldId, Type type, boolean isRequired, + Map mapping) { + TypeDescription orcType; + OrcField orcField = mapping.getOrDefault(fieldId, null); + if (orcField != null && orcField.type.getCategory().equals(TypeDescription.Category.UNION)) { + orcType = orcField.type; + } else { + orcType = TypeDescription.createStruct(); + for (Types.NestedField nestedField : type.asStructType().fields()) { + // Using suffix _r to avoid potential underlying issues in ORC reader + // with reused column names between ORC and Iceberg; + // e.g. renaming column c -> d and adding new column d + if (mapping.get(nestedField.fieldId()) == null && nestedField.hasDefaultValue()) { + continue; + } + String name = Optional.ofNullable(mapping.get(nestedField.fieldId())) + .map(OrcField::name) + .orElseGet(() -> nestedField.name() + "_r" + nestedField.fieldId()); + TypeDescription childType = buildOrcProjection(nestedField.fieldId(), nestedField.type(), + isRequired && nestedField.isRequired(), mapping); + orcType.addField(name, childType); + } + } + return orcType; + } + private static Map icebergToOrcMapping(String name, TypeDescription orcType) { Map icebergToOrc = Maps.newHashMap(); switch (orcType.getCategory()) { @@ -330,6 +343,13 @@ private static Map icebergToOrcMapping(String name, TypeDescr icebergToOrc.putAll(icebergToOrcMapping(childrenNames.get(i), children.get(i))); } break; + case UNION: + // This is part of building orc read schema in file level. orcType has union type inside it. + List options = orcType.getChildren(); + for (int i = 0; i < options.size(); i++) { + icebergToOrc.putAll(icebergToOrcMapping("option" + i, options.get(i))); + } + break; case LIST: icebergToOrc.putAll(icebergToOrcMapping("element", orcType.getChildren().get(0))); break; diff --git a/orc/src/main/java/org/apache/iceberg/orc/OrcSchemaVisitor.java b/orc/src/main/java/org/apache/iceberg/orc/OrcSchemaVisitor.java index 778037b8ce..4be48f9fa5 100644 --- a/orc/src/main/java/org/apache/iceberg/orc/OrcSchemaVisitor.java +++ b/orc/src/main/java/org/apache/iceberg/orc/OrcSchemaVisitor.java @@ -47,7 +47,18 @@ public static T visit(TypeDescription schema, OrcSchemaVisitor visitor) { return visitRecord(schema, visitor); case UNION: - throw new UnsupportedOperationException("Cannot handle " + schema); + List types = schema.getChildren(); + List options = Lists.newArrayListWithExpectedSize(types.size()); + for (TypeDescription type : types) { + visitor.beforeUnionOption(type); + try { + options.add(visit(type, visitor)); + } finally { + visitor.afterUnionOption(type); + } + } + + return visitor.union(schema, options); case LIST: final T elementResult; @@ -112,6 +123,10 @@ private static T visitRecord(TypeDescription record, OrcSchemaVisitor vis return visitor.record(record, names, visitFields(fields, names, visitor)); } + public String optionName() { + return "_option"; + } + public String elementName() { return "_elem"; } @@ -136,6 +151,14 @@ public void afterField(String name, TypeDescription type) { fieldNames.pop(); } + public void beforeUnionOption(TypeDescription option) { + beforeField(optionName(), option); + } + + public void afterUnionOption(TypeDescription option) { + afterField(optionName(), option); + } + public void beforeElementField(TypeDescription element) { beforeField(elementName(), element); } @@ -164,6 +187,10 @@ public T record(TypeDescription record, List names, List fields) { return null; } + public T union(TypeDescription union, List options) { + return null; + } + public T list(TypeDescription array, T element) { return null; } diff --git a/orc/src/main/java/org/apache/iceberg/orc/OrcSchemaWithTypeVisitor.java b/orc/src/main/java/org/apache/iceberg/orc/OrcSchemaWithTypeVisitor.java index 7ed573da33..640feccff3 100644 --- a/orc/src/main/java/org/apache/iceberg/orc/OrcSchemaWithTypeVisitor.java +++ b/orc/src/main/java/org/apache/iceberg/orc/OrcSchemaWithTypeVisitor.java @@ -38,7 +38,7 @@ public static T visit(Type iType, TypeDescription schema, OrcSchemaWithTypeV return visitor.visitRecord(iType != null ? iType.asStructType() : null, schema, visitor); case UNION: - throw new UnsupportedOperationException("Cannot handle " + schema); + return visitUnion(iType, schema, visitor); case LIST: Types.ListType list = iType != null ? iType.asListType() : null; @@ -71,10 +71,25 @@ protected T visitRecord( return visitor.record(struct, record, names, results); } + private static T visitUnion(Type type, TypeDescription union, OrcSchemaWithTypeVisitor visitor) { + List types = union.getChildren(); + List options = Lists.newArrayListWithCapacity(types.size()); + + for (int i = 0; i < types.size(); i += 1) { + options.add(visit(type.asStructType().fields().get(i).type(), types.get(i), visitor)); + } + + return visitor.union(type, union, options); + } + public T record(Types.StructType iStruct, TypeDescription record, List names, List fields) { return null; } + public T union(Type iUnion, TypeDescription union, List options) { + return null; + } + public T list(Types.ListType iList, TypeDescription array, T element) { return null; } diff --git a/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcReader.java b/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcReader.java index b5ba50f271..6a428777a4 100644 --- a/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcReader.java +++ b/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcReader.java @@ -75,6 +75,11 @@ public OrcValueReader record( return SparkOrcValueReaders.struct(fields, expected, getIdToConstant()); } + @Override + public OrcValueReader union(Type expected, TypeDescription union, List> options) { + return SparkOrcValueReaders.union(options); + } + @Override public OrcValueReader list(Types.ListType iList, TypeDescription array, OrcValueReader elementReader) { return SparkOrcValueReaders.array(elementReader); diff --git a/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcValueReaders.java b/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcValueReaders.java index f35ab7a17c..3091c3f2e6 100644 --- a/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcValueReaders.java +++ b/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcValueReaders.java @@ -33,6 +33,7 @@ import org.apache.orc.storage.ql.exec.vector.ListColumnVector; import org.apache.orc.storage.ql.exec.vector.MapColumnVector; import org.apache.orc.storage.ql.exec.vector.TimestampColumnVector; +import org.apache.orc.storage.ql.exec.vector.UnionColumnVector; import org.apache.orc.storage.serde2.io.HiveDecimalWritable; import org.apache.spark.sql.catalyst.InternalRow; import org.apache.spark.sql.catalyst.expressions.GenericInternalRow; @@ -70,6 +71,10 @@ static OrcValueReader struct( return new StructReader(readers, struct, idToConstant); } + static OrcValueReader union(List> readers) { + return new UnionReader(readers); + } + static OrcValueReader array(OrcValueReader elementReader) { return new ArrayReader(elementReader); } @@ -159,6 +164,33 @@ protected void set(InternalRow struct, int pos, Object value) { } } + static class UnionReader implements OrcValueReader { + private final OrcValueReader[] readers; + + private UnionReader(List> readers) { + this.readers = new OrcValueReader[readers.size()]; + for (int i = 0; i < this.readers.length; i += 1) { + this.readers[i] = readers.get(i); + } + } + + @Override + public InternalRow nonNullRead(ColumnVector vector, int row) { + InternalRow struct = new GenericInternalRow(readers.length); + UnionColumnVector unionColumnVector = (UnionColumnVector) vector; + + int fieldIndex = unionColumnVector.tags[row]; + Object value = this.readers[fieldIndex].read(unionColumnVector.fields[fieldIndex], row); + + for (int i = 0; i < readers.length; i += 1) { + struct.setNullAt(i); + } + struct.update(fieldIndex, value); + + return struct; + } + } + private static class StringReader implements OrcValueReader { private static final StringReader INSTANCE = new StringReader(); diff --git a/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/VectorizedSparkOrcReaders.java b/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/VectorizedSparkOrcReaders.java index c6d56b96fe..1620765980 100644 --- a/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/VectorizedSparkOrcReaders.java +++ b/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/VectorizedSparkOrcReaders.java @@ -38,6 +38,7 @@ import org.apache.orc.storage.ql.exec.vector.ListColumnVector; import org.apache.orc.storage.ql.exec.vector.MapColumnVector; import org.apache.orc.storage.ql.exec.vector.StructColumnVector; +import org.apache.orc.storage.ql.exec.vector.UnionColumnVector; import org.apache.orc.storage.ql.exec.vector.VectorizedRowBatch; import org.apache.spark.sql.types.Decimal; import org.apache.spark.sql.vectorized.ColumnVector; @@ -93,6 +94,11 @@ public Converter record(Types.StructType iStruct, TypeDescription record, List options) { + return new UnionConverter(iType, options); + } + @Override public Converter list(Types.ListType iList, TypeDescription array, Converter element) { return new ArrayConverter(iList, element); @@ -422,4 +428,35 @@ public ColumnVector getChild(int ordinal) { }; } } + + private static class UnionConverter implements Converter { + private final Types.StructType structType; + private final List optionConverters; + + private UnionConverter(Type type, List optionConverters) { + this.structType = type.asStructType(); + this.optionConverters = optionConverters; + } + + @Override + public ColumnVector convert(org.apache.orc.storage.ql.exec.vector.ColumnVector vector, int batchSize, + long batchOffsetInFile) { + UnionColumnVector unionColumnVector = (UnionColumnVector) vector; + List fields = structType.fields(); + assert fields.size() == unionColumnVector.fields.length; + assert fields.size() == optionConverters.size(); + + List fieldVectors = Lists.newArrayListWithExpectedSize(fields.size()); + for (int i = 0; i < fields.size(); i += 1) { + fieldVectors.add(optionConverters.get(i).convert(unionColumnVector.fields[i], batchSize, batchOffsetInFile)); + } + + return new BaseOrcColumnVector(structType, batchSize, vector) { + @Override + public ColumnVector getChild(int ordinal) { + return fieldVectors.get(ordinal); + } + }; + } + } } diff --git a/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkOrcUnions.java b/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkOrcUnions.java new file mode 100644 index 0000000000..3be41c41e8 --- /dev/null +++ b/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkOrcUnions.java @@ -0,0 +1,218 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.spark.data; + +import java.io.File; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.Iterator; +import java.util.List; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.iceberg.Files; +import org.apache.iceberg.Schema; +import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.orc.ORC; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.spark.data.vectorized.VectorizedSparkOrcReaders; +import org.apache.iceberg.types.Types; +import org.apache.orc.OrcFile; +import org.apache.orc.TypeDescription; +import org.apache.orc.Writer; +import org.apache.orc.storage.ql.exec.vector.BytesColumnVector; +import org.apache.orc.storage.ql.exec.vector.LongColumnVector; +import org.apache.orc.storage.ql.exec.vector.UnionColumnVector; +import org.apache.orc.storage.ql.exec.vector.VectorizedRowBatch; +import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.catalyst.expressions.GenericInternalRow; +import org.apache.spark.sql.vectorized.ColumnarBatch; +import org.apache.spark.unsafe.types.UTF8String; +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import static org.apache.iceberg.spark.data.TestHelpers.assertEquals; + + +public class TestSparkOrcUnions { + private static final int NUM_OF_ROWS = 50; + + @Rule + public TemporaryFolder temp = new TemporaryFolder(); + + @Test + public void testComplexUnion() throws IOException { + TypeDescription orcSchema = + TypeDescription.fromString("struct>"); + + Schema expectedSchema = new Schema( + Types.NestedField.optional(0, "unionCol", Types.StructType.of( + Types.NestedField.optional(1, "tag_0", Types.IntegerType.get()), + Types.NestedField.optional(2, "tag_1", Types.StringType.get()))) + ); + + final InternalRow expectedFirstRow = new GenericInternalRow(1); + final InternalRow field1 = new GenericInternalRow(2); + field1.update(0, 0); + field1.update(1, null); + expectedFirstRow.update(0, field1); + + final InternalRow expectedSecondRow = new GenericInternalRow(1); + final InternalRow field2 = new GenericInternalRow(2); + field2.update(0, null); + field2.update(1, UTF8String.fromString("stringtype1")); + expectedSecondRow.update(0, field2); + + Configuration conf = new Configuration(); + + File orcFile = temp.newFile(); + Path orcFilePath = new Path(orcFile.getPath()); + + Writer writer = OrcFile.createWriter(orcFilePath, + OrcFile.writerOptions(conf) + .setSchema(orcSchema).overwrite(true)); + + VectorizedRowBatch batch = orcSchema.createRowBatch(); + LongColumnVector longColumnVector = new LongColumnVector(NUM_OF_ROWS); + BytesColumnVector bytesColumnVector = new BytesColumnVector(NUM_OF_ROWS); + UnionColumnVector complexUnion = new UnionColumnVector(NUM_OF_ROWS, longColumnVector, bytesColumnVector); + + complexUnion.init(); + + for (int i = 0; i < NUM_OF_ROWS; i += 1) { + complexUnion.tags[i] = i % 2; + longColumnVector.vector[i] = i; + String stringValue = "stringtype" + i; + bytesColumnVector.setVal(i, stringValue.getBytes(StandardCharsets.UTF_8)); + } + + batch.size = NUM_OF_ROWS; + batch.cols[0] = complexUnion; + + writer.addRowBatch(batch); + batch.reset(); + writer.close(); + + // Test non-vectorized reader + List internalRows = Lists.newArrayList(); + try (CloseableIterable reader = ORC.read(Files.localInput(orcFile)) + .project(expectedSchema) + .createReaderFunc(readOrcSchema -> new SparkOrcReader(expectedSchema, readOrcSchema)) + .build()) { + reader.forEach(internalRows::add); + + Assert.assertEquals(internalRows.size(), NUM_OF_ROWS); + assertEquals(expectedSchema, expectedFirstRow, internalRows.get(0)); + assertEquals(expectedSchema, expectedSecondRow, internalRows.get(1)); + } + + // Test vectorized reader + List columnarBatches = Lists.newArrayList(); + try (CloseableIterable reader = ORC.read(Files.localInput(orcFile)) + .project(expectedSchema) + .createBatchedReaderFunc(readOrcSchema -> + VectorizedSparkOrcReaders.buildReader(expectedSchema, readOrcSchema, ImmutableMap.of())) + .build()) { + reader.forEach(columnarBatches::add); + Iterator rowIterator = columnarBatches.get(0).rowIterator(); + + Assert.assertEquals(columnarBatches.get(0).numRows(), NUM_OF_ROWS); + assertEquals(expectedSchema, expectedFirstRow, rowIterator.next()); + assertEquals(expectedSchema, expectedSecondRow, rowIterator.next()); + } + } + + @Test + public void testSingleComponentUnion() throws IOException { + TypeDescription orcSchema = + TypeDescription.fromString("struct>"); + + Schema expectedSchema = new Schema( + Types.NestedField.optional(0, "unionCol", Types.StructType.of( + Types.NestedField.optional(1, "tag_0", Types.IntegerType.get()))) + ); + + final InternalRow expectedFirstRow = new GenericInternalRow(1); + final InternalRow field1 = new GenericInternalRow(1); + field1.update(0, 0); + expectedFirstRow.update(0, field1); + + final InternalRow expectedSecondRow = new GenericInternalRow(1); + final InternalRow field2 = new GenericInternalRow(1); + field2.update(0, 3); + expectedSecondRow.update(0, field2); + + Configuration conf = new Configuration(); + + File orcFile = temp.newFile(); + Path orcFilePath = new Path(orcFile.getPath()); + + Writer writer = OrcFile.createWriter(orcFilePath, + OrcFile.writerOptions(conf) + .setSchema(orcSchema).overwrite(true)); + + VectorizedRowBatch batch = orcSchema.createRowBatch(); + LongColumnVector longColumnVector = new LongColumnVector(NUM_OF_ROWS); + UnionColumnVector complexUnion = new UnionColumnVector(NUM_OF_ROWS, longColumnVector); + complexUnion.init(); + + for (int i = 0; i < NUM_OF_ROWS; i += 1) { + complexUnion.tags[i] = 0; + longColumnVector.vector[i] = 3 * i; + } + + batch.size = NUM_OF_ROWS; + batch.cols[0] = complexUnion; + + writer.addRowBatch(batch); + batch.reset(); + writer.close(); + + // Test non-vectorized reader + List internalRows = Lists.newArrayList(); + try (CloseableIterable reader = ORC.read(Files.localInput(orcFile)) + .project(expectedSchema) + .createReaderFunc(readOrcSchema -> new SparkOrcReader(expectedSchema, readOrcSchema)) + .build()) { + reader.forEach(internalRows::add); + + Assert.assertEquals(internalRows.size(), NUM_OF_ROWS); + assertEquals(expectedSchema, expectedFirstRow, internalRows.get(0)); + assertEquals(expectedSchema, expectedSecondRow, internalRows.get(1)); + } + + // Test vectorized reader + List columnarBatches = Lists.newArrayList(); + try (CloseableIterable reader = ORC.read(Files.localInput(orcFile)) + .project(expectedSchema) + .createBatchedReaderFunc(readOrcSchema -> + VectorizedSparkOrcReaders.buildReader(expectedSchema, readOrcSchema, ImmutableMap.of())) + .build()) { + reader.forEach(columnarBatches::add); + Iterator rowIterator = columnarBatches.get(0).rowIterator(); + + Assert.assertEquals(columnarBatches.get(0).numRows(), NUM_OF_ROWS); + assertEquals(expectedSchema, expectedFirstRow, rowIterator.next()); + assertEquals(expectedSchema, expectedSecondRow, rowIterator.next()); + } + } +}