diff --git a/orc/src/main/java/org/apache/iceberg/orc/ApplyNameMapping.java b/orc/src/main/java/org/apache/iceberg/orc/ApplyNameMapping.java new file mode 100644 index 000000000000..619a8c33f3ce --- /dev/null +++ b/orc/src/main/java/org/apache/iceberg/orc/ApplyNameMapping.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.orc; + +import java.util.List; +import org.apache.iceberg.mapping.MappedField; +import org.apache.iceberg.mapping.NameMapping; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.orc.TypeDescription; + +class ApplyNameMapping extends OrcSchemaVisitor { + private final NameMapping nameMapping; + + ApplyNameMapping(NameMapping nameMapping) { + this.nameMapping = nameMapping; + } + + @Override + public String elementName() { + return "element"; + } + + @Override + public String keyName() { + return "key"; + } + + @Override + public String valueName() { + return "value"; + } + + TypeDescription setId(TypeDescription type, MappedField mappedField) { + if (mappedField != null) { + type.setAttribute(ORCSchemaUtil.ICEBERG_ID_ATTRIBUTE, mappedField.id().toString()); + } + return type; + } + + @Override + public TypeDescription record(TypeDescription record, List names, List fields) { + Preconditions.checkArgument(names.size() == fields.size(), "All fields must have names"); + MappedField field = nameMapping.find(currentPath()); + TypeDescription structType = TypeDescription.createStruct(); + + for (int i = 0; i < fields.size(); i++) { + String fieldName = names.get(i); + TypeDescription fieldType = fields.get(i); + if (fieldType != null) { + structType.addField(fieldName, fieldType); + } + } + return setId(structType, field); + } + + @Override + public TypeDescription list(TypeDescription array, TypeDescription element) { + Preconditions.checkArgument(element != null, "List type must have element type"); + + MappedField field = nameMapping.find(currentPath()); + TypeDescription listType = TypeDescription.createList(element); + return setId(listType, field); + } + + @Override + public TypeDescription map(TypeDescription map, TypeDescription key, TypeDescription value) { + Preconditions.checkArgument(key != null && value != null, "Map type must have both key and value types"); + + MappedField field = nameMapping.find(currentPath()); + TypeDescription mapType = TypeDescription.createMap(key, value); + return setId(mapType, field); + } + + @Override + public TypeDescription primitive(TypeDescription primitive) { + MappedField field = nameMapping.find(currentPath()); + return setId(primitive.clone(), field); + } +} diff --git a/orc/src/main/java/org/apache/iceberg/orc/HasIds.java b/orc/src/main/java/org/apache/iceberg/orc/HasIds.java new file mode 100644 index 000000000000..833e1d977d44 --- /dev/null +++ b/orc/src/main/java/org/apache/iceberg/orc/HasIds.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.orc; + +import java.util.List; +import java.util.function.Predicate; +import org.apache.orc.TypeDescription; + +class HasIds extends OrcSchemaVisitor { + + @Override + public Boolean record(TypeDescription record, List names, List fields) { + return ORCSchemaUtil.icebergID(record).isPresent() || fields.stream().anyMatch(Predicate.isEqual(true)); + } + + @Override + public Boolean list(TypeDescription array, Boolean element) { + return ORCSchemaUtil.icebergID(array).isPresent() || element; + } + + @Override + public Boolean map(TypeDescription map, Boolean key, Boolean value) { + return ORCSchemaUtil.icebergID(map).isPresent() || key || value; + } + + @Override + public Boolean primitive(TypeDescription primitive) { + return ORCSchemaUtil.icebergID(primitive).isPresent(); + } +} diff --git a/orc/src/main/java/org/apache/iceberg/orc/ORC.java b/orc/src/main/java/org/apache/iceberg/orc/ORC.java index 38dc522fa5cf..7b57270776bb 100644 --- a/orc/src/main/java/org/apache/iceberg/orc/ORC.java +++ b/orc/src/main/java/org/apache/iceberg/orc/ORC.java @@ -35,6 +35,7 @@ import org.apache.iceberg.io.FileAppender; import org.apache.iceberg.io.InputFile; import org.apache.iceberg.io.OutputFile; +import org.apache.iceberg.mapping.NameMapping; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.orc.OrcConf; import org.apache.orc.OrcFile; @@ -120,11 +121,12 @@ public static ReadBuilder read(InputFile file) { public static class ReadBuilder { private final InputFile file; private final Configuration conf; - private org.apache.iceberg.Schema schema = null; + private Schema schema = null; private Long start = null; private Long length = null; private Expression filter = null; private boolean caseSensitive = true; + private NameMapping nameMapping = null; private Function> readerFunc; private Function> batchedReaderFunc; @@ -193,10 +195,15 @@ public ReadBuilder recordsPerBatch(int numRecordsPerBatch) { return this; } + public ReadBuilder withNameMapping(NameMapping newNameMapping) { + this.nameMapping = newNameMapping; + return this; + } + public CloseableIterable build() { Preconditions.checkNotNull(schema, "Schema is required"); - return new OrcIterable<>(file, conf, schema, start, length, readerFunc, caseSensitive, filter, batchedReaderFunc, - recordsPerBatch); + return new OrcIterable<>(file, conf, schema, nameMapping, start, length, readerFunc, caseSensitive, filter, + batchedReaderFunc, recordsPerBatch); } } diff --git a/orc/src/main/java/org/apache/iceberg/orc/ORCSchemaUtil.java b/orc/src/main/java/org/apache/iceberg/orc/ORCSchemaUtil.java index 0a09aea9b0df..94f0ca30dd20 100644 --- a/orc/src/main/java/org/apache/iceberg/orc/ORCSchemaUtil.java +++ b/orc/src/main/java/org/apache/iceberg/orc/ORCSchemaUtil.java @@ -25,6 +25,7 @@ import java.util.Optional; import java.util.stream.Collectors; import org.apache.iceberg.Schema; +import org.apache.iceberg.mapping.NameMapping; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.Maps; @@ -395,6 +396,18 @@ static boolean isOptional(TypeDescription orcType) { return true; } + static TypeDescription removeIds(TypeDescription type) { + return OrcSchemaVisitor.visit(type, new RemoveIds()); + } + + static boolean hasIds(TypeDescription orcSchema) { + return OrcSchemaVisitor.visit(orcSchema, new HasIds()); + } + + static TypeDescription applyNameMapping(TypeDescription orcSchema, NameMapping nameMapping) { + return OrcSchemaVisitor.visit(orcSchema, new ApplyNameMapping(nameMapping)); + } + /** * Generates mapping from field IDs to ORC qualified names. See {@link IdToOrcName} for details. */ diff --git a/orc/src/main/java/org/apache/iceberg/orc/OrcIterable.java b/orc/src/main/java/org/apache/iceberg/orc/OrcIterable.java index a965efcb2d3f..2560f053d26d 100644 --- a/orc/src/main/java/org/apache/iceberg/orc/OrcIterable.java +++ b/orc/src/main/java/org/apache/iceberg/orc/OrcIterable.java @@ -31,6 +31,8 @@ import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.io.CloseableIterator; import org.apache.iceberg.io.InputFile; +import org.apache.iceberg.mapping.MappingUtil; +import org.apache.iceberg.mapping.NameMapping; import org.apache.iceberg.util.Pair; import org.apache.orc.Reader; import org.apache.orc.TypeDescription; @@ -51,14 +53,16 @@ class OrcIterable extends CloseableGroup implements CloseableIterable { private final boolean caseSensitive; private final Function> batchReaderFunction; private final int recordsPerBatch; + private NameMapping nameMapping; OrcIterable(InputFile file, Configuration config, Schema schema, - Long start, Long length, + NameMapping nameMapping, Long start, Long length, Function> readerFunction, boolean caseSensitive, Expression filter, Function> batchReaderFunction, int recordsPerBatch) { this.schema = schema; this.readerFunction = readerFunction; this.file = file; + this.nameMapping = nameMapping; this.start = start; this.length = length; this.config = config; @@ -73,7 +77,18 @@ class OrcIterable extends CloseableGroup implements CloseableIterable { public CloseableIterator iterator() { Reader orcFileReader = ORC.newFileReader(file, config); addCloseable(orcFileReader); - TypeDescription readOrcSchema = ORCSchemaUtil.buildOrcProjection(schema, orcFileReader.getSchema()); + + TypeDescription fileSchema = orcFileReader.getSchema(); + final TypeDescription readOrcSchema; + if (ORCSchemaUtil.hasIds(fileSchema)) { + readOrcSchema = ORCSchemaUtil.buildOrcProjection(schema, fileSchema); + } else { + if (nameMapping == null) { + nameMapping = MappingUtil.create(schema); + } + TypeDescription typeWithIds = ORCSchemaUtil.applyNameMapping(fileSchema, nameMapping); + readOrcSchema = ORCSchemaUtil.buildOrcProjection(schema, typeWithIds); + } SearchArgument sarg = null; if (filter != null) { diff --git a/orc/src/main/java/org/apache/iceberg/orc/OrcSchemaVisitor.java b/orc/src/main/java/org/apache/iceberg/orc/OrcSchemaVisitor.java index 6d1b127981a7..778037b8ce51 100644 --- a/orc/src/main/java/org/apache/iceberg/orc/OrcSchemaVisitor.java +++ b/orc/src/main/java/org/apache/iceberg/orc/OrcSchemaVisitor.java @@ -19,6 +19,7 @@ package org.apache.iceberg.orc; +import java.util.Deque; import java.util.List; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.Lists; @@ -29,6 +30,8 @@ */ public abstract class OrcSchemaVisitor { + private final Deque fieldNames = Lists.newLinkedList(); + public static List visitSchema(TypeDescription schema, OrcSchemaVisitor visitor) { Preconditions.checkArgument(schema.getId() == 0, "TypeDescription must be root schema."); @@ -47,11 +50,37 @@ public static T visit(TypeDescription schema, OrcSchemaVisitor visitor) { throw new UnsupportedOperationException("Cannot handle " + schema); case LIST: - return visitor.list(schema, visit(schema.getChildren().get(0), visitor)); + final T elementResult; + + TypeDescription element = schema.getChildren().get(0); + visitor.beforeElementField(element); + try { + elementResult = visit(element, visitor); + } finally { + visitor.afterElementField(element); + } + return visitor.list(schema, elementResult); case MAP: - return visitor.map(schema, visit(schema.getChildren().get(0), visitor), - visit(schema.getChildren().get(1), visitor)); + final T keyResult; + final T valueResult; + + TypeDescription key = schema.getChildren().get(0); + visitor.beforeKeyField(key); + try { + keyResult = visit(key, visitor); + } finally { + visitor.afterKeyField(key); + } + + TypeDescription value = schema.getChildren().get(1); + visitor.beforeValueField(value); + try { + valueResult = visit(value, visitor); + } finally { + visitor.afterValueField(value); + } + return visitor.map(schema, keyResult, valueResult); default: return visitor.primitive(schema); @@ -83,9 +112,53 @@ private static T visitRecord(TypeDescription record, OrcSchemaVisitor vis return visitor.record(record, names, visitFields(fields, names, visitor)); } - public void beforeField(String name, TypeDescription type) {} + public String elementName() { + return "_elem"; + } + + public String keyName() { + return "_key"; + } + + public String valueName() { + return "_value"; + } + + public String currentFieldName() { + return fieldNames.peek(); + } + + public void beforeField(String name, TypeDescription type) { + fieldNames.push(name); + } + + public void afterField(String name, TypeDescription type) { + fieldNames.pop(); + } + + public void beforeElementField(TypeDescription element) { + beforeField(elementName(), element); + } + + public void afterElementField(TypeDescription element) { + afterField(elementName(), element); + } + + public void beforeKeyField(TypeDescription key) { + beforeField(keyName(), key); + } + + public void afterKeyField(TypeDescription key) { + afterField(keyName(), key); + } - public void afterField(String name, TypeDescription type) {} + public void beforeValueField(TypeDescription value) { + beforeField(valueName(), value); + } + + public void afterValueField(TypeDescription value) { + afterField(valueName(), value); + } public T record(TypeDescription record, List names, List fields) { return null; @@ -102,4 +175,14 @@ public T map(TypeDescription map, T key, T value) { public T primitive(TypeDescription primitive) { return null; } + + protected String[] currentPath() { + return Lists.newArrayList(fieldNames.descendingIterator()).toArray(new String[0]); + } + + protected String[] path(String name) { + List list = Lists.newArrayList(fieldNames.descendingIterator()); + list.add(name); + return list.toArray(new String[0]); + } } diff --git a/orc/src/main/java/org/apache/iceberg/orc/OrcToIcebergVisitor.java b/orc/src/main/java/org/apache/iceberg/orc/OrcToIcebergVisitor.java index 6a6b895b0d47..d00060eb34e6 100644 --- a/orc/src/main/java/org/apache/iceberg/orc/OrcToIcebergVisitor.java +++ b/orc/src/main/java/org/apache/iceberg/orc/OrcToIcebergVisitor.java @@ -19,11 +19,9 @@ package org.apache.iceberg.orc; -import java.util.Deque; import java.util.List; import java.util.Optional; import java.util.stream.Collectors; -import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.types.Types; import org.apache.orc.TypeDescription; @@ -32,26 +30,6 @@ */ class OrcToIcebergVisitor extends OrcSchemaVisitor> { - private final Deque fieldNames; - - OrcToIcebergVisitor() { - this.fieldNames = Lists.newLinkedList(); - } - - @Override - public void beforeField(String name, TypeDescription type) { - fieldNames.push(name); - } - - @Override - public void afterField(String name, TypeDescription type) { - fieldNames.pop(); - } - - private String currentFieldName() { - return fieldNames.peek(); - } - @Override public Optional record(TypeDescription record, List names, List> fields) { diff --git a/orc/src/main/java/org/apache/iceberg/orc/RemoveIds.java b/orc/src/main/java/org/apache/iceberg/orc/RemoveIds.java new file mode 100644 index 000000000000..bfa56deb54dd --- /dev/null +++ b/orc/src/main/java/org/apache/iceberg/orc/RemoveIds.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.orc; + +import java.util.List; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.orc.TypeDescription; + +class RemoveIds extends OrcSchemaVisitor { + + @Override + public TypeDescription record(TypeDescription record, List names, List fields) { + Preconditions.checkArgument(names.size() == fields.size(), "All fields must have names."); + TypeDescription struct = TypeDescription.createStruct(); + + for (int i = 0; i < fields.size(); i++) { + struct.addField(names.get(i), fields.get(i)); + } + return struct; + } + + @Override + public TypeDescription list(TypeDescription array, TypeDescription element) { + return TypeDescription.createList(element); + } + + @Override + public TypeDescription map(TypeDescription map, TypeDescription key, TypeDescription value) { + return TypeDescription.createMap(key, value); + } + + @Override + public TypeDescription primitive(TypeDescription primitive) { + return removeIcebergAttributes(primitive.clone()); + } + + private static TypeDescription removeIcebergAttributes(TypeDescription orcType) { + orcType.removeAttribute(ORCSchemaUtil.ICEBERG_ID_ATTRIBUTE); + return orcType; + } +} diff --git a/orc/src/test/java/org/apache/iceberg/orc/TestExpressionToSearchArgument.java b/orc/src/test/java/org/apache/iceberg/orc/TestExpressionToSearchArgument.java index 9812668b76f6..b56b65aa268b 100644 --- a/orc/src/test/java/org/apache/iceberg/orc/TestExpressionToSearchArgument.java +++ b/orc/src/test/java/org/apache/iceberg/orc/TestExpressionToSearchArgument.java @@ -33,6 +33,8 @@ import org.apache.iceberg.Schema; import org.apache.iceberg.expressions.Binder; import org.apache.iceberg.expressions.Expression; +import org.apache.iceberg.mapping.MappingUtil; +import org.apache.iceberg.mapping.NameMapping; import org.apache.iceberg.types.Types; import org.apache.orc.TypeDescription; import org.apache.orc.storage.ql.io.sarg.PredicateLeaf.Type; @@ -262,4 +264,132 @@ public void testEvolvedSchema() { actual = ExpressionToSearchArgument.convert(boundFilter, readSchema); Assert.assertEquals(expected.toString(), actual.toString()); } + + @Test + public void testOriginalSchemaNameMapping() { + Schema originalSchema = new Schema( + required(1, "int", Types.IntegerType.get()), + optional(2, "long", Types.LongType.get()) + ); + + TypeDescription orcSchemaWithoutIds = ORCSchemaUtil.removeIds(ORCSchemaUtil.convert(originalSchema)); + NameMapping nameMapping = MappingUtil.create(originalSchema); + + TypeDescription readSchema = ORCSchemaUtil.buildOrcProjection(originalSchema, + ORCSchemaUtil.applyNameMapping(orcSchemaWithoutIds, nameMapping)); + + Expression expr = and(equal("int", 1), equal("long", 1)); + Expression boundFilter = Binder.bind(originalSchema.asStruct(), expr, true); + SearchArgument expected = SearchArgumentFactory.newBuilder() + .equals("`int`", Type.LONG, 1L) + .equals("`long`", Type.LONG, 1L) + .build(); + + SearchArgument actual = ExpressionToSearchArgument.convert(boundFilter, readSchema); + Assert.assertEquals(expected.toString(), actual.toString()); + } + + @Test + public void testModifiedSimpleSchemaNameMapping() { + Schema originalSchema = new Schema( + required(1, "int", Types.IntegerType.get()), + optional(2, "long_to_be_dropped", Types.LongType.get()) + ); + Schema mappingSchema = new Schema( + required(1, "int", Types.IntegerType.get()), + optional(3, "new_float_field", Types.FloatType.get()) + ); + TypeDescription orcSchemaWithoutIds = ORCSchemaUtil.removeIds(ORCSchemaUtil.convert(originalSchema)); + NameMapping nameMapping = MappingUtil.create(mappingSchema); + + TypeDescription readSchema = ORCSchemaUtil.buildOrcProjection(mappingSchema, + ORCSchemaUtil.applyNameMapping(orcSchemaWithoutIds, nameMapping)); + + Expression expr = equal("int", 1); + Expression boundFilter = Binder.bind(mappingSchema.asStruct(), expr, true); + SearchArgument expected = SearchArgumentFactory.newBuilder() + .equals("`int`", Type.LONG, 1L) + .build(); + + SearchArgument actual = ExpressionToSearchArgument.convert(boundFilter, readSchema); + Assert.assertEquals(expected.toString(), actual.toString()); + + // for columns not in the file, buildOrcProjection will append field names with _r + // this will be passed down to ORC, but ORC will handle such cases and return a TruthValue during evaluation + expr = equal("new_float_field", 1); + boundFilter = Binder.bind(mappingSchema.asStruct(), expr, true); + expected = SearchArgumentFactory.newBuilder() + .equals("`new_float_field_r3`", Type.FLOAT, 1.0) + .build(); + + actual = ExpressionToSearchArgument.convert(boundFilter, readSchema); + Assert.assertEquals(expected.toString(), actual.toString()); + } + + @Test + public void testModifiedComplexSchemaNameMapping() { + Schema originalSchema = new Schema( + optional(1, "struct", Types.StructType.of( + required(2, "long", Types.LongType.get()) + )), + optional(3, "list", Types.ListType.ofRequired(4, Types.LongType.get())), + optional(5, "map", Types.MapType.ofRequired(6, 7, Types.LongType.get(), Types.LongType.get())), + optional(8, "listOfStruct", Types.ListType.ofRequired(9, Types.StructType.of( + required(10, "long", Types.LongType.get())))), + optional(11, "listOfPeople", Types.ListType.ofRequired(12, Types.StructType.of( + required(13, "name", Types.StringType.get()), + required(14, "birth_date", Types.DateType.get())))) + ); + Schema mappingSchema = new Schema( + optional(1, "struct", Types.StructType.of( + required(2, "int", Types.LongType.get()) + )), + optional(3, "list", Types.ListType.ofRequired(4, Types.LongType.get())), + optional(5, "newMap", Types.MapType.ofRequired(6, 7, Types.StringType.get(), Types.LongType.get())), + optional(8, "listOfStruct", Types.ListType.ofRequired(9, Types.StructType.of( + required(10, "newLong", Types.LongType.get())))), + optional(11, "listOfPeople", Types.ListType.ofRequired(12, Types.StructType.of( + required(13, "name", Types.StringType.get()), + required(14, "age", Types.IntegerType.get())))) + ); + TypeDescription orcSchemaWithoutIds = ORCSchemaUtil.removeIds(ORCSchemaUtil.convert(originalSchema)); + NameMapping nameMapping = MappingUtil.create(mappingSchema); + + TypeDescription readSchema = ORCSchemaUtil.buildOrcProjection(mappingSchema, + ORCSchemaUtil.applyNameMapping(orcSchemaWithoutIds, nameMapping)); + + Expression expr = and( + and( + equal("struct.int", 1), and( + lessThanOrEqual("list.element", 5), + equal("newMap.key", "country") + ), + and( + equal("listOfStruct.newLong", 100L), + notEqual("listOfPeople.name", "Bob") + ) + + ), + lessThan("listOfPeople.age", 30) + ); + Expression boundFilter = Binder.bind(mappingSchema.asStruct(), expr, true); + SearchArgument expected = SearchArgumentFactory.newBuilder() + .startAnd() + // Drops struct.long + .equals("`struct`.`int_r2`", Type.LONG, 1L) + .lessThanEquals("`list`.`_elem`", Type.LONG, 5L) + // Drops map + .equals("`newMap_r5`.`_key`", Type.STRING, "country") + // Drops listOfStruct.long + .equals("`listOfStruct`.`_elem`.`newLong_r10`", Type.LONG, 100L) + .startNot() + .equals("`listOfPeople`.`_elem`.`name`", Type.STRING, "Bob") + .end() + .lessThan("`listOfPeople`.`_elem`.`age_r14`", Type.LONG, 30L) + .end() + .build(); + + SearchArgument actual = ExpressionToSearchArgument.convert(boundFilter, readSchema); + Assert.assertEquals(expected.toString(), actual.toString()); + } } diff --git a/orc/src/test/java/org/apache/iceberg/orc/TestORCSchemaUtil.java b/orc/src/test/java/org/apache/iceberg/orc/TestORCSchemaUtil.java index 269919cf86b6..d80671666f00 100644 --- a/orc/src/test/java/org/apache/iceberg/orc/TestORCSchemaUtil.java +++ b/orc/src/test/java/org/apache/iceberg/orc/TestORCSchemaUtil.java @@ -19,7 +19,14 @@ package org.apache.iceberg.orc; +import java.util.Collections; +import java.util.List; +import java.util.Optional; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.iceberg.Schema; +import org.apache.iceberg.mapping.MappingUtil; +import org.apache.iceberg.mapping.NameMapping; +import org.apache.iceberg.types.TypeUtil; import org.apache.iceberg.types.Types; import org.apache.orc.TypeDescription; import org.junit.Test; @@ -30,30 +37,35 @@ import static org.apache.iceberg.types.Types.NestedField.optional; import static org.apache.iceberg.types.Types.NestedField.required; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; public class TestORCSchemaUtil { + private static final Types.StructType SUPPORTED_PRIMITIVES = Types.StructType.of( + optional(1, "intCol", Types.IntegerType.get()), + optional(3, "longCol", Types.LongType.get()), + optional(6, "intCol2", Types.IntegerType.get()), + optional(20, "intCol3", Types.IntegerType.get()), + required(9, "doubleCol", Types.DoubleType.get()), + required(10, "uuidCol", Types.UUIDType.get()), + optional(2, "booleanCol", Types.BooleanType.get()), + optional(21, "fixedCol", Types.FixedType.ofLength(4096)), + required(22, "binaryCol", Types.BinaryType.get()), + required(23, "stringCol", Types.StringType.get()), + required(25, "floatCol", Types.FloatType.get()), + optional(30, "dateCol", Types.DateType.get()), + required(32, "timeCol", Types.TimeType.get()), + required(34, "timestampCol", Types.TimestampType.withZone()), + required(114, "dec_9_0", Types.DecimalType.of(9, 0)), + required(115, "dec_11_2", Types.DecimalType.of(11, 2)), + required(116, "dec_38_10", Types.DecimalType.of(38, 10)) // spark's maximum precision + ); + @Test public void testRoundtripConversionPrimitive() { - Schema expectedSchema = new Schema( - optional(1, "intCol", Types.IntegerType.get()), - optional(3, "longCol", Types.LongType.get()), - optional(6, "intCol2", Types.IntegerType.get()), - optional(20, "intCol3", Types.IntegerType.get()), - required(9, "doubleCol", Types.DoubleType.get()), - required(10, "uuidCol", Types.UUIDType.get()), - optional(2, "booleanCol", Types.BooleanType.get()), - optional(21, "fixedCol", Types.FixedType.ofLength(4096)), - required(22, "binaryCol", Types.BinaryType.get()), - required(23, "stringCol", Types.StringType.get()), - required(24, "decimalCol", Types.DecimalType.of(15, 3)), - required(25, "floatCol", Types.FloatType.get()), - optional(30, "dateCol", Types.DateType.get()), - required(32, "timeCol", Types.TimeType.get()), - required(34, "timestampCol", Types.TimestampType.withZone()) - ); - TypeDescription orcSchema = ORCSchemaUtil.convert(expectedSchema); - assertEquals(expectedSchema.asStruct(), ORCSchemaUtil.convert(orcSchema).asStruct()); + TypeDescription orcSchema = ORCSchemaUtil.convert(new Schema(SUPPORTED_PRIMITIVES.fields())); + assertEquals(SUPPORTED_PRIMITIVES, ORCSchemaUtil.convert(orcSchema).asStruct()); } @Test @@ -269,4 +281,202 @@ public void testSkipNonIcebergColumns() { ); assertEquals("Schemas must match.", expectedSchema2.asStruct(), icebergSchema2.asStruct()); } + + @Test + public void testHasIds() { + Schema schema = new Schema( + optional(1, "data", Types.StructType.of( + optional(10, "entries", Types.MapType.ofOptional(11, 12, Types.StringType.get(), Types.DateType.get())) + )), + optional(2, "intCol", Types.IntegerType.get()), + optional(3, "longCol", Types.LongType.get()), + optional(4, "listCol", Types.ListType.ofOptional(40, Types.DoubleType.get())) + ); + + TypeDescription orcSchema = ORCSchemaUtil.removeIds(ORCSchemaUtil.convert(schema)); + assertFalse("Should not have Ids", ORCSchemaUtil.hasIds(orcSchema)); + + TypeDescription map2Col = TypeDescription.createMap(TypeDescription.createString(), TypeDescription.createBinary()); + map2Col.setAttribute(ICEBERG_ID_ATTRIBUTE, "4"); + orcSchema.addField("map2Col", map2Col); + assertTrue("Should have Ids after adding one type with Id", ORCSchemaUtil.hasIds(orcSchema)); + } + + @Test + public void testAssignIdsByNameMapping() { + Types.StructType structType = Types.StructType.of( + required(0, "id", Types.LongType.get()), + optional(1, "list_of_maps", + Types.ListType.ofOptional(2, Types.MapType.ofOptional(3, 4, + Types.StringType.get(), + SUPPORTED_PRIMITIVES))), + optional(5, "map_of_lists", + Types.MapType.ofOptional(6, 7, + Types.StringType.get(), + Types.ListType.ofOptional(8, SUPPORTED_PRIMITIVES))), + required(9, "list_of_lists", + Types.ListType.ofOptional(10, Types.ListType.ofOptional(11, SUPPORTED_PRIMITIVES))), + required(12, "map_of_maps", + Types.MapType.ofOptional(13, 14, + Types.StringType.get(), + Types.MapType.ofOptional(15, 16, + Types.StringType.get(), + SUPPORTED_PRIMITIVES))), + required(17, "list_of_struct_of_nested_types", Types.ListType.ofOptional(19, Types.StructType.of( + Types.NestedField.required(20, "m1", Types.MapType.ofOptional(21, 22, + Types.StringType.get(), + SUPPORTED_PRIMITIVES)), + Types.NestedField.optional(23, "l1", Types.ListType.ofRequired(24, SUPPORTED_PRIMITIVES)), + Types.NestedField.required(25, "l2", Types.ListType.ofRequired(26, SUPPORTED_PRIMITIVES)), + Types.NestedField.optional(27, "m2", Types.MapType.ofOptional(28, 29, + Types.StringType.get(), + SUPPORTED_PRIMITIVES)) + ))) + ); + + Schema schema = new Schema(TypeUtil.assignFreshIds(structType, new AtomicInteger(0)::incrementAndGet) + .asStructType().fields()); + + NameMapping nameMapping = MappingUtil.create(schema); + TypeDescription typeDescriptionWithIds = ORCSchemaUtil.convert(schema); + TypeDescription typeDescriptionWithIdsFromNameMapping = ORCSchemaUtil + .applyNameMapping(ORCSchemaUtil.removeIds(typeDescriptionWithIds), nameMapping); + + assertTrue("TypeDescription schemas should be equal, including IDs", + equalsWithIds(typeDescriptionWithIds, typeDescriptionWithIdsFromNameMapping)); + } + + @Test + public void testAssignIdsByNameMappingAndProject() { + Types.StructType structType = Types.StructType.of( + required(1, "id", Types.LongType.get()), + optional(2, "list_of_structs", + Types.ListType.ofOptional(3, Types.StructType.of( + required(4, "entry", Types.LongType.get()), + required(5, "data", Types.BinaryType.get()) + )) + ), + optional(6, "map", + Types.MapType.ofOptional(7, 8, Types.StringType.get(), Types.DoubleType.get()) + ), + optional(12, "map_of_structs", + Types.MapType.ofOptional(13, 14, Types.StringType.get(), Types.StructType.of( + required(20, "field", Types.LongType.get()))) + ), + required(30, "struct", Types.StructType.of( + required(31, "lat", Types.DoubleType.get()), + required(32, "long", Types.DoubleType.get()) + ) + ) + ); + + TypeDescription fileSchema = ORCSchemaUtil.removeIds( + ORCSchemaUtil.convert(new Schema(structType.asStructType().fields()))); + + Schema mappingSchema = new Schema(Types.StructType.of( + optional(1, "new_id", Types.LongType.get()), + optional(2, "list_of_structs", + Types.ListType.ofOptional(3, Types.StructType.of( + required(5, "data", Types.BinaryType.get()) + )) + ), + optional(6, "map", + Types.MapType.ofOptional(7, 8, Types.StringType.get(), Types.DoubleType.get()) + ), + optional(30, "struct", + Types.StructType.of( + optional(31, "latitude", Types.DoubleType.get()), + optional(32, "longitude", Types.DoubleType.get()) + ) + ), + optional(40, "long", Types.LongType.get()) + ).asStructType().fields()); + + NameMapping nameMapping = MappingUtil.create(mappingSchema); + TypeDescription typeDescriptionWithIdsFromNameMapping = ORCSchemaUtil + .applyNameMapping(fileSchema, nameMapping); + + TypeDescription expected = TypeDescription.createStruct(); + // new field + TypeDescription newId = TypeDescription.createLong(); + newId.setAttribute(ICEBERG_ID_ATTRIBUTE, "1"); + expected.addField("new_id_r1", newId); + + // list_of_structs + TypeDescription structElem = TypeDescription.createStruct(); + structElem.setAttribute(ICEBERG_ID_ATTRIBUTE, "3"); + TypeDescription dataInStruct = TypeDescription.createBinary(); + dataInStruct.setAttribute(ICEBERG_ID_ATTRIBUTE, "5"); + structElem.addField("data", dataInStruct); + TypeDescription list = TypeDescription.createList(structElem); + list.setAttribute(ICEBERG_ID_ATTRIBUTE, "2"); + + // map + TypeDescription mapKey = TypeDescription.createString(); + mapKey.setAttribute(ICEBERG_ID_ATTRIBUTE, "7"); + TypeDescription mapValue = TypeDescription.createDouble(); + mapValue.setAttribute(ICEBERG_ID_ATTRIBUTE, "8"); + TypeDescription map = TypeDescription.createMap(mapKey, mapValue); + map.setAttribute(ICEBERG_ID_ATTRIBUTE, "6"); + + expected.addField("list_of_structs", list); + expected.addField("map", map); + + TypeDescription struct = TypeDescription.createStruct(); + struct.setAttribute(ICEBERG_ID_ATTRIBUTE, "30"); + TypeDescription latitude = TypeDescription.createDouble(); + latitude.setAttribute(ICEBERG_ID_ATTRIBUTE, "31"); + TypeDescription longitude = TypeDescription.createDouble(); + longitude.setAttribute(ICEBERG_ID_ATTRIBUTE, "32"); + struct.addField("latitude_r31", latitude); + struct.addField("longitude_r32", longitude); + expected.addField("struct", struct); + TypeDescription longField = TypeDescription.createLong(); + longField.setAttribute(ICEBERG_ID_ATTRIBUTE, "40"); + expected.addField("long_r40", longField); + + assertTrue("ORC Schema must have the same structure, but one has Iceberg IDs", + typeDescriptionWithIdsFromNameMapping.equals(fileSchema, false)); + + TypeDescription projectedOrcSchema = ORCSchemaUtil.buildOrcProjection(mappingSchema, + typeDescriptionWithIdsFromNameMapping); + assertTrue("Schema should be the prunned by projection", + equalsWithIds(expected, projectedOrcSchema)); + } + + private static boolean equalsWithIds(TypeDescription first, TypeDescription second) { + if (second == first) { + return true; + } + + if (!first.equals(second, false)) { + return false; + } + + // check the ID attribute on non-root TypeDescriptions + if (first.getId() > 0 && second.getId() > 0) { + if (first.getAttributeValue(ICEBERG_ID_ATTRIBUTE) == null || + second.getAttributeValue(ICEBERG_ID_ATTRIBUTE) == null) { + return false; + } + + if (!first.getAttributeValue(ICEBERG_ID_ATTRIBUTE).equals(second.getAttributeValue(ICEBERG_ID_ATTRIBUTE))) { + return false; + } + } + + // check the children + List firstChildren = Optional.ofNullable(first.getChildren()).orElse(Collections.emptyList()); + List secondChildren = Optional.ofNullable(second.getChildren()).orElse(Collections.emptyList()); + if (firstChildren.size() != secondChildren.size()) { + return false; + } + for (int i = 0; i < firstChildren.size(); ++i) { + if (!equalsWithIds(firstChildren.get(i), secondChildren.get(i))) { + return false; + } + } + + return true; + } } diff --git a/spark/src/main/java/org/apache/iceberg/spark/source/BatchDataReader.java b/spark/src/main/java/org/apache/iceberg/spark/source/BatchDataReader.java index eff18ca3100d..d092dc69fc02 100644 --- a/spark/src/main/java/org/apache/iceberg/spark/source/BatchDataReader.java +++ b/spark/src/main/java/org/apache/iceberg/spark/source/BatchDataReader.java @@ -105,15 +105,20 @@ CloseableIterator open(FileScanTask task) { iter = builder.build(); } else if (task.file().format() == FileFormat.ORC) { Schema schemaWithoutConstants = TypeUtil.selectNot(expectedSchema, idToConstant.keySet()); - iter = ORC.read(location) + ORC.ReadBuilder builder = ORC.read(location) .project(schemaWithoutConstants) .split(task.start(), task.length()) .createBatchedReaderFunc(fileSchema -> VectorizedSparkOrcReaders.buildReader(expectedSchema, fileSchema, idToConstant)) .recordsPerBatch(batchSize) .filter(task.residual()) - .caseSensitive(caseSensitive) - .build(); + .caseSensitive(caseSensitive); + + if (nameMapping != null) { + builder.withNameMapping(NameMappingParser.fromJson(nameMapping)); + } + + iter = builder.build(); } else { throw new UnsupportedOperationException( "Format: " + task.file().format() + " not supported for batched reads"); diff --git a/spark/src/main/java/org/apache/iceberg/spark/source/RowDataReader.java b/spark/src/main/java/org/apache/iceberg/spark/source/RowDataReader.java index a9b0306c723c..a7fbe90ee8b5 100644 --- a/spark/src/main/java/org/apache/iceberg/spark/source/RowDataReader.java +++ b/spark/src/main/java/org/apache/iceberg/spark/source/RowDataReader.java @@ -174,13 +174,19 @@ private CloseableIterable newOrcIterable( Map idToConstant) { Schema readSchemaWithoutConstantAndMetadataFields = TypeUtil.selectNot(readSchema, Sets.union(idToConstant.keySet(), MetadataColumns.metadataFieldIds())); - return ORC.read(location) + + ORC.ReadBuilder builder = ORC.read(location) .project(readSchemaWithoutConstantAndMetadataFields) .split(task.start(), task.length()) .createReaderFunc(readOrcSchema -> new SparkOrcReader(readSchema, readOrcSchema, idToConstant)) .filter(task.residual()) - .caseSensitive(caseSensitive) - .build(); + .caseSensitive(caseSensitive); + + if (nameMapping != null) { + builder.withNameMapping(NameMappingParser.fromJson(nameMapping)); + } + + return builder.build(); } private CloseableIterable newDataIterable(DataTask task, Schema readSchema) { diff --git a/spark2/src/test/java/org/apache/iceberg/spark/source/TestNameMappingProjection.java b/spark2/src/test/java/org/apache/iceberg/spark/source/TestNameMappingProjection.java index 06338cfce729..c425fec39546 100644 --- a/spark2/src/test/java/org/apache/iceberg/spark/source/TestNameMappingProjection.java +++ b/spark2/src/test/java/org/apache/iceberg/spark/source/TestNameMappingProjection.java @@ -21,6 +21,7 @@ import java.io.File; import java.io.IOException; +import java.nio.charset.StandardCharsets; import java.util.List; import org.apache.avro.SchemaBuilder; import org.apache.avro.file.DataFileWriter; @@ -29,6 +30,7 @@ import org.apache.avro.generic.GenericRecord; import org.apache.avro.io.DatumWriter; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.iceberg.DataFile; import org.apache.iceberg.DataFiles; @@ -41,6 +43,11 @@ import org.apache.iceberg.mapping.NameMapping; import org.apache.iceberg.mapping.NameMappingParser; import org.apache.iceberg.types.Types; +import org.apache.orc.OrcFile; +import org.apache.orc.TypeDescription; +import org.apache.orc.storage.ql.exec.vector.BytesColumnVector; +import org.apache.orc.storage.ql.exec.vector.LongColumnVector; +import org.apache.orc.storage.ql.exec.vector.VectorizedRowBatch; import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession; import org.junit.AfterClass; @@ -83,6 +90,47 @@ public static void stopSpark() { currentSpark.stop(); } + @Test + public void testOrcReaderWithNameMapping() throws IOException { + File orcFile = temp.newFolder(); + TypeDescription orcSchema = TypeDescription.createStruct(); + orcSchema.addField("id", TypeDescription.createInt()); + orcSchema.addField("name", TypeDescription.createString()); + + Path dataFilePath = new Path(orcFile.toString(), "name-mapping-data.orc"); + try (org.apache.orc.Writer writer = OrcFile.createWriter(dataFilePath, + OrcFile.writerOptions(new Configuration()).setSchema(orcSchema))) { + VectorizedRowBatch batch = orcSchema.createRowBatch(); + byte[] aliceVal = "Alice".getBytes(StandardCharsets.UTF_8); + byte[] bobVal = "Bob".getBytes(StandardCharsets.UTF_8); + + int rowId = batch.size++; + batch.cols[0].isNull[rowId] = false; + ((LongColumnVector) batch.cols[0]).vector[rowId] = 1; + batch.cols[1].isNull[rowId] = false; + ((BytesColumnVector) batch.cols[1]).setRef(rowId, bobVal, 0, bobVal.length); + + rowId = batch.size++; + batch.cols[0].isNull[rowId] = false; + ((LongColumnVector) batch.cols[0]).vector[rowId] = 2; + batch.cols[1].isNull[rowId] = false; + ((BytesColumnVector) batch.cols[1]).setRef(rowId, aliceVal, 0, aliceVal.length); + + writer.addRowBatch(batch); + batch.reset(); + } + + File fileWithData = new File(dataFilePath.toString()); + DataFile orcDataFile = DataFiles.builder(PartitionSpec.unpartitioned()) + .withFormat("orc") + .withFileSizeInBytes(fileWithData.length()) + .withPath(fileWithData.getAbsolutePath()) + .withRecordCount(2) + .build(); + + assertNameMappingProjection(orcDataFile, "orc_table"); + } + @Test public void testAvroReaderWithNameMapping() throws IOException { File avroFile = temp.newFile(); @@ -118,6 +166,10 @@ public void testAvroReaderWithNameMapping() throws IOException { .withRecordCount(2) .build(); + assertNameMappingProjection(avroDataFile, "avro_table"); + } + + private void assertNameMappingProjection(DataFile dataFile, String tableName) { Schema filteredSchema = new Schema( required(1, "name", Types.StringType.get()) ); @@ -129,7 +181,7 @@ public void testAvroReaderWithNameMapping() throws IOException { ); Table table = catalog.createTable( - org.apache.iceberg.catalog.TableIdentifier.of(DB_NAME, "avro_table"), + org.apache.iceberg.catalog.TableIdentifier.of(DB_NAME, tableName), tableSchema, PartitionSpec.unpartitioned()); @@ -137,10 +189,10 @@ public void testAvroReaderWithNameMapping() throws IOException { .set(DEFAULT_NAME_MAPPING, NameMappingParser.toJson(nameMapping)) .commit(); - table.newFastAppend().appendFile(avroDataFile).commit(); + table.newFastAppend().appendFile(dataFile).commit(); List actual = spark.read().format("iceberg") - .load(DB_NAME + ".avro_table") + .load(String.format("%s.%s", DB_NAME, tableName)) .filter("name='Alice'") .collectAsList();