diff --git a/core/src/main/java/org/apache/iceberg/avro/Avro.java b/core/src/main/java/org/apache/iceberg/avro/Avro.java index 85cc8d902026..afc502e20d96 100644 --- a/core/src/main/java/org/apache/iceberg/avro/Avro.java +++ b/core/src/main/java/org/apache/iceberg/avro/Avro.java @@ -61,6 +61,7 @@ import org.apache.iceberg.io.FileAppender; import org.apache.iceberg.io.InputFile; import org.apache.iceberg.io.OutputFile; +import org.apache.iceberg.mapping.MappingUtil; import org.apache.iceberg.mapping.NameMapping; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.Maps; @@ -610,11 +611,12 @@ public static class ReadBuilder { private org.apache.iceberg.Schema schema = null; private Function> createReaderFunc = null; private BiFunction> createReaderBiFunc = null; + private Function> createResolvingReaderFunc = null; @SuppressWarnings("UnnecessaryLambda") - private final Function> defaultCreateReaderFunc = + private final Function> defaultCreateReaderFunc = readSchema -> { - GenericAvroReader reader = new GenericAvroReader<>(readSchema); + GenericAvroReader reader = GenericAvroReader.create(readSchema); reader.setClassLoader(loader); return reader; }; @@ -627,15 +629,28 @@ private ReadBuilder(InputFile file) { this.file = file; } + public ReadBuilder createResolvingReader( + Function> readerFunction) { + Preconditions.checkState( + createReaderBiFunc == null && createReaderFunc == null, + "Cannot set multiple read builder functions"); + this.createResolvingReaderFunc = readerFunction; + return this; + } + public ReadBuilder createReaderFunc(Function> readerFunction) { - Preconditions.checkState(createReaderBiFunc == null, "Cannot set multiple createReaderFunc"); + Preconditions.checkState( + createReaderBiFunc == null && createResolvingReaderFunc == null, + "Cannot set multiple read builder functions"); this.createReaderFunc = readerFunction; return this; } public ReadBuilder createReaderFunc( BiFunction> readerFunction) { - Preconditions.checkState(createReaderFunc == null, "Cannot set multiple createReaderFunc"); + Preconditions.checkState( + createReaderFunc == null && createResolvingReaderFunc == null, + "Cannot set multiple read builder functions"); this.createReaderBiFunc = readerFunction; return this; } @@ -683,23 +698,34 @@ public ReadBuilder classLoader(ClassLoader classLoader) { return this; } + @SuppressWarnings("unchecked") public AvroIterable build() { Preconditions.checkNotNull(schema, "Schema is required"); - Function> readerFunc; + + if (null == nameMapping) { + this.nameMapping = MappingUtil.create(schema); + } + + DatumReader reader; if (createReaderBiFunc != null) { - readerFunc = avroSchema -> createReaderBiFunc.apply(schema, avroSchema); + reader = + new ProjectionDatumReader<>( + avroSchema -> createReaderBiFunc.apply(schema, avroSchema), schema, renames, null); } else if (createReaderFunc != null) { - readerFunc = createReaderFunc; + reader = new ProjectionDatumReader<>(createReaderFunc, schema, renames, null); + } else if (createResolvingReaderFunc != null) { + reader = (DatumReader) createResolvingReaderFunc.apply(schema); } else { - readerFunc = defaultCreateReaderFunc; + reader = (DatumReader) defaultCreateReaderFunc.apply(schema); + } + + if (reader instanceof SupportsCustomRecords) { + ((SupportsCustomRecords) reader).setClassLoader(loader); + ((SupportsCustomRecords) reader).setRenames(renames); } return new AvroIterable<>( - file, - new ProjectionDatumReader<>(readerFunc, schema, renames, nameMapping), - start, - length, - reuseContainers); + file, new NameMappingDatumReader<>(nameMapping, reader), start, length, reuseContainers); } } diff --git a/core/src/main/java/org/apache/iceberg/avro/AvroIterable.java b/core/src/main/java/org/apache/iceberg/avro/AvroIterable.java index 49acb8010b61..3bcc6a4799d2 100644 --- a/core/src/main/java/org/apache/iceberg/avro/AvroIterable.java +++ b/core/src/main/java/org/apache/iceberg/avro/AvroIterable.java @@ -31,6 +31,7 @@ import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.io.CloseableIterator; import org.apache.iceberg.io.InputFile; +import org.apache.iceberg.relocated.com.google.common.base.Suppliers; import org.apache.iceberg.relocated.com.google.common.collect.Maps; public class AvroIterable extends CloseableGroup implements CloseableIterable { @@ -78,7 +79,8 @@ public CloseableIterator iterator() { if (start != null) { if (reader instanceof SupportsRowPosition) { ((SupportsRowPosition) reader) - .setRowPositionSupplier(() -> AvroIO.findStartingRowPos(file::newStream, start)); + .setRowPositionSupplier( + Suppliers.memoize(() -> AvroIO.findStartingRowPos(file::newStream, start))); } fileReader = new AvroRangeIterator<>(fileReader, start, end); } else if (reader instanceof SupportsRowPosition) { diff --git a/core/src/main/java/org/apache/iceberg/avro/AvroWithPartnerVisitor.java b/core/src/main/java/org/apache/iceberg/avro/AvroWithPartnerVisitor.java new file mode 100644 index 000000000000..0147dbf37d31 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/avro/AvroWithPartnerVisitor.java @@ -0,0 +1,211 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.avro; + +import java.util.Deque; +import java.util.List; +import org.apache.avro.Schema; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.types.Type; +import org.apache.iceberg.types.Types; + +public class AvroWithPartnerVisitor { + public interface PartnerAccessors

{ + P fieldPartner(P partnerStruct, Integer fieldId, String name); + + P mapKeyPartner(P partnerMap); + + P mapValuePartner(P partnerMap); + + P listElementPartner(P partnerList); + } + + static class FieldIDAccessors implements AvroWithPartnerVisitor.PartnerAccessors { + private static final FieldIDAccessors INSTANCE = new FieldIDAccessors(); + + public static FieldIDAccessors get() { + return INSTANCE; + } + + @Override + public Type fieldPartner(Type partner, Integer fieldId, String name) { + Types.NestedField field = partner.asStructType().field(fieldId); + return field != null ? field.type() : null; + } + + @Override + public Type mapKeyPartner(Type partner) { + return partner.asMapType().keyType(); + } + + @Override + public Type mapValuePartner(Type partner) { + return partner.asMapType().valueType(); + } + + @Override + public Type listElementPartner(Type partner) { + return partner.asListType().elementType(); + } + } + + /** Used to fail on recursive types. */ + private Deque recordLevels = Lists.newLinkedList(); + + public R record(P partner, Schema record, List fieldResults) { + return null; + } + + public R union(P partner, Schema union, List optionResults) { + return null; + } + + public R array(P partner, Schema array, R elementResult) { + return null; + } + + public R arrayMap(P partner, Schema map, R keyResult, R valueResult) { + return null; + } + + public R map(P partner, Schema map, R valueResult) { + return null; + } + + public R primitive(P partner, Schema primitive) { + return null; + } + + public static R visit( + P partner, + Schema schema, + AvroWithPartnerVisitor visitor, + PartnerAccessors

accessors) { + switch (schema.getType()) { + case RECORD: + return visitRecord(partner, schema, visitor, accessors); + + case UNION: + return visitUnion(partner, schema, visitor, accessors); + + case ARRAY: + return visitArray(partner, schema, visitor, accessors); + + case MAP: + return visitor.map( + partner, + schema, + visit( + partner != null ? accessors.mapValuePartner(partner) : null, + schema.getValueType(), + visitor, + accessors)); + + default: + return visitor.primitive(partner, schema); + } + } + + private static R visitRecord( + P partnerStruct, + Schema record, + AvroWithPartnerVisitor visitor, + PartnerAccessors

accessors) { + // check to make sure this hasn't been visited before + String recordName = record.getFullName(); + Preconditions.checkState( + !visitor.recordLevels.contains(recordName), + "Cannot process recursive Avro record %s", + recordName); + visitor.recordLevels.push(recordName); + + List fields = record.getFields(); + List results = Lists.newArrayListWithExpectedSize(fields.size()); + for (int pos = 0; pos < fields.size(); pos += 1) { + Schema.Field field = fields.get(pos); + Integer fieldId = AvroSchemaUtil.fieldId(field); + + P fieldPartner = + partnerStruct != null && fieldId != null + ? accessors.fieldPartner(partnerStruct, fieldId, field.name()) + : null; + results.add(visit(fieldPartner, field.schema(), visitor, accessors)); + } + + visitor.recordLevels.pop(); + + return visitor.record(partnerStruct, record, results); + } + + private static R visitUnion( + P partner, + Schema union, + AvroWithPartnerVisitor visitor, + PartnerAccessors

accessors) { + Preconditions.checkArgument( + AvroSchemaUtil.isOptionSchema(union), "Cannot visit non-option union: %s", union); + + List types = union.getTypes(); + List options = Lists.newArrayListWithExpectedSize(types.size()); + for (Schema branch : types) { + options.add(visit(partner, branch, visitor, accessors)); + } + + return visitor.union(partner, union, options); + } + + private static R visitArray( + P partnerArray, + Schema array, + AvroWithPartnerVisitor visitor, + PartnerAccessors

accessors) { + if (array.getLogicalType() instanceof LogicalMap) { + Preconditions.checkState( + AvroSchemaUtil.isKeyValueSchema(array.getElementType()), + "Cannot visit invalid logical map type: %s", + array); + + List keyValueFields = array.getElementType().getFields(); + return visitor.arrayMap( + partnerArray, + array, + visit( + partnerArray != null ? accessors.mapKeyPartner(partnerArray) : null, + keyValueFields.get(0).schema(), + visitor, + accessors), + visit( + partnerArray != null ? accessors.mapValuePartner(partnerArray) : null, + keyValueFields.get(1).schema(), + visitor, + accessors)); + + } else { + return visitor.array( + partnerArray, + array, + visit( + partnerArray != null ? accessors.listElementPartner(partnerArray) : null, + array.getElementType(), + visitor, + accessors)); + } + } +} diff --git a/core/src/main/java/org/apache/iceberg/avro/GenericAvroReader.java b/core/src/main/java/org/apache/iceberg/avro/GenericAvroReader.java index 0fa2e795811b..d89489d92a28 100644 --- a/core/src/main/java/org/apache/iceberg/avro/GenericAvroReader.java +++ b/core/src/main/java/org/apache/iceberg/avro/GenericAvroReader.java @@ -20,6 +20,7 @@ import java.io.IOException; import java.util.List; +import java.util.Map; import java.util.function.Supplier; import org.apache.avro.LogicalType; import org.apache.avro.LogicalTypes; @@ -27,39 +28,68 @@ import org.apache.avro.generic.IndexedRecord; import org.apache.avro.io.DatumReader; import org.apache.avro.io.Decoder; +import org.apache.iceberg.MetadataColumns; import org.apache.iceberg.common.DynClasses; -import org.apache.iceberg.data.avro.DecoderResolver; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.types.Type; +import org.apache.iceberg.types.Types; +import org.apache.iceberg.util.Pair; -public class GenericAvroReader implements DatumReader, SupportsRowPosition { +public class GenericAvroReader + implements DatumReader, SupportsRowPosition, SupportsCustomRecords { - private final Schema readSchema; + private final Types.StructType expectedType; private ClassLoader loader = Thread.currentThread().getContextClassLoader(); + private Map renames = ImmutableMap.of(); + private Map idToConstant = ImmutableMap.of(); private Schema fileSchema = null; private ValueReader reader = null; + public static GenericAvroReader create(org.apache.iceberg.Schema schema) { + return new GenericAvroReader<>(schema); + } + public static GenericAvroReader create(Schema schema) { return new GenericAvroReader<>(schema); } + GenericAvroReader(org.apache.iceberg.Schema readSchema) { + this.expectedType = readSchema.asStruct(); + } + GenericAvroReader(Schema readSchema) { - this.readSchema = readSchema; + this.expectedType = AvroSchemaUtil.convert(readSchema).asStructType(); } @SuppressWarnings("unchecked") private void initReader() { - this.reader = (ValueReader) AvroSchemaVisitor.visit(readSchema, new ReadBuilder(loader)); + this.reader = + (ValueReader) + AvroWithPartnerVisitor.visit( + expectedType, + fileSchema, + new ResolvingReadBuilder(expectedType, fileSchema.getFullName()), + AvroWithPartnerVisitor.FieldIDAccessors.get()); } @Override public void setSchema(Schema schema) { - this.fileSchema = Schema.applyAliases(schema, readSchema); + this.fileSchema = schema; initReader(); } + @Override public void setClassLoader(ClassLoader newClassLoader) { this.loader = newClassLoader; } + @Override + public void setRenames(Map renames) { + this.renames = renames; + } + @Override public void setRowPositionSupplier(Supplier posSupplier) { if (reader instanceof SupportsRowPosition) { @@ -69,62 +99,108 @@ public void setRowPositionSupplier(Supplier posSupplier) { @Override public T read(T reuse, Decoder decoder) throws IOException { - return DecoderResolver.resolveAndRead(decoder, readSchema, fileSchema, reader, reuse); + return reader.read(decoder, reuse); } - private static class ReadBuilder extends AvroSchemaVisitor> { - private final ClassLoader loader; + private class ResolvingReadBuilder extends AvroWithPartnerVisitor> { + private final Map avroSchemas; - private ReadBuilder(ClassLoader loader) { - this.loader = loader; + private ResolvingReadBuilder(Types.StructType expectedType, String rootName) { + this.avroSchemas = AvroSchemaUtil.convertTypes(expectedType, rootName); } @Override - @SuppressWarnings("unchecked") - public ValueReader record(Schema record, List names, List> fields) { - try { - Class recordClass = - DynClasses.builder().loader(loader).impl(record.getFullName()).buildChecked(); - if (IndexedRecord.class.isAssignableFrom(recordClass)) { - return ValueReaders.record(fields, (Class) recordClass, record); + public ValueReader record(Type partner, Schema record, List> fieldResults) { + Types.StructType expected = partner != null ? partner.asStructType() : null; + Map idToPos = idToPos(expected); + + List>> readPlan = Lists.newArrayList(); + List fileFields = record.getFields(); + for (int pos = 0; pos < fileFields.size(); pos += 1) { + Schema.Field field = fileFields.get(pos); + ValueReader fieldReader = fieldResults.get(pos); + Integer fieldId = AvroSchemaUtil.fieldId(field); + Integer projectionPos = idToPos.remove(fieldId); + + Object constant = idToConstant.get(fieldId); + if (projectionPos != null && constant != null) { + readPlan.add( + Pair.of(projectionPos, ValueReaders.replaceWithConstant(fieldReader, constant))); + } else { + readPlan.add(Pair.of(projectionPos, fieldReader)); + } + } + + // handle any expected columns that are not in the data file + for (Map.Entry idAndPos : idToPos.entrySet()) { + int fieldId = idAndPos.getKey(); + int pos = idAndPos.getValue(); + + Object constant = idToConstant.get(fieldId); + Types.NestedField field = expected.field(fieldId); + if (constant != null) { + readPlan.add(Pair.of(pos, ValueReaders.constant(constant))); + } else if (fieldId == MetadataColumns.IS_DELETED.fieldId()) { + readPlan.add(Pair.of(pos, ValueReaders.constant(false))); + } else if (fieldId == MetadataColumns.ROW_POSITION.fieldId()) { + readPlan.add(Pair.of(pos, ValueReaders.positions())); + } else if (field.isOptional()) { + readPlan.add(Pair.of(pos, ValueReaders.constant(null))); + } else { + throw new IllegalArgumentException( + String.format("Missing required field: %s", field.name())); } + } - return ValueReaders.record(fields, record); + return recordReader(readPlan, avroSchemas.get(partner), record.getFullName()); + } - } catch (ClassNotFoundException e) { - return ValueReaders.record(fields, record); + @SuppressWarnings("unchecked") + private ValueReader recordReader( + List>> readPlan, Schema avroSchema, String recordName) { + String className = renames.getOrDefault(recordName, recordName); + if (className != null) { + try { + Class recordClass = DynClasses.builder().loader(loader).impl(className).buildChecked(); + if (IndexedRecord.class.isAssignableFrom(recordClass)) { + return ValueReaders.record( + avroSchema, (Class) recordClass, readPlan); + } + } catch (ClassNotFoundException e) { + // use a generic record reader below + } } + + return ValueReaders.record(avroSchema, readPlan); } @Override - public ValueReader union(Schema union, List> options) { + public ValueReader union(Type partner, Schema union, List> options) { return ValueReaders.union(options); } @Override - public ValueReader array(Schema array, ValueReader elementReader) { - if (array.getLogicalType() instanceof LogicalMap) { - ValueReaders.StructReader keyValueReader = (ValueReaders.StructReader) elementReader; - ValueReader keyReader = keyValueReader.reader(0); - ValueReader valueReader = keyValueReader.reader(1); - - if (keyReader == ValueReaders.utf8s()) { - return ValueReaders.arrayMap(ValueReaders.strings(), valueReader); - } - - return ValueReaders.arrayMap(keyReader, valueReader); + public ValueReader arrayMap( + Type partner, Schema map, ValueReader keyReader, ValueReader valueReader) { + if (keyReader == ValueReaders.utf8s()) { + return ValueReaders.arrayMap(ValueReaders.strings(), valueReader); } + return ValueReaders.arrayMap(keyReader, valueReader); + } + + @Override + public ValueReader array(Type partner, Schema array, ValueReader elementReader) { return ValueReaders.array(elementReader); } @Override - public ValueReader map(Schema map, ValueReader valueReader) { + public ValueReader map(Type partner, Schema map, ValueReader valueReader) { return ValueReaders.map(ValueReaders.strings(), valueReader); } @Override - public ValueReader primitive(Schema primitive) { + public ValueReader primitive(Type partner, Schema primitive) { LogicalType logicalType = primitive.getLogicalType(); if (logicalType != null) { switch (logicalType.getName()) { @@ -163,10 +239,16 @@ public ValueReader primitive(Schema primitive) { case BOOLEAN: return ValueReaders.booleans(); case INT: + if (partner != null && partner.typeId() == Type.TypeID.LONG) { + return ValueReaders.intsAsLongs(); + } return ValueReaders.ints(); case LONG: return ValueReaders.longs(); case FLOAT: + if (partner != null && partner.typeId() == Type.TypeID.DOUBLE) { + return ValueReaders.floatsAsDoubles(); + } return ValueReaders.floats(); case DOUBLE: return ValueReaders.doubles(); @@ -182,5 +264,19 @@ public ValueReader primitive(Schema primitive) { throw new IllegalArgumentException("Unsupported type: " + primitive); } } + + private Map idToPos(Types.StructType struct) { + Map idToPos = Maps.newHashMap(); + + if (struct != null) { + List fields = struct.fields(); + for (int pos = 0; pos < fields.size(); pos += 1) { + Types.NestedField field = fields.get(pos); + idToPos.put(field.fieldId(), pos); + } + } + + return idToPos; + } } } diff --git a/core/src/main/java/org/apache/iceberg/avro/NameMappingDatumReader.java b/core/src/main/java/org/apache/iceberg/avro/NameMappingDatumReader.java new file mode 100644 index 000000000000..83f2beabeedc --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/avro/NameMappingDatumReader.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.avro; + +import java.io.IOException; +import java.util.function.Supplier; +import org.apache.avro.Schema; +import org.apache.avro.io.DatumReader; +import org.apache.avro.io.Decoder; +import org.apache.iceberg.mapping.NameMapping; + +/** + * A delegating DatumReader that applies a name mapping to a file schema to enable reading Avro + * files that were written without field IDs. + * + * @param Java class of datums produced by this reader + */ +public class NameMappingDatumReader implements DatumReader, SupportsRowPosition { + private final NameMapping nameMapping; + private final DatumReader wrapped; + + public NameMappingDatumReader(NameMapping nameMapping, DatumReader wrapped) { + this.nameMapping = nameMapping; + this.wrapped = wrapped; + } + + @Override + public void setSchema(Schema newFileSchema) { + Schema fileSchema; + if (AvroSchemaUtil.hasIds(newFileSchema)) { + fileSchema = newFileSchema; + } else { + fileSchema = AvroSchemaUtil.applyNameMapping(newFileSchema, nameMapping); + } + + wrapped.setSchema(fileSchema); + } + + @Override + public D read(D reuse, Decoder in) throws IOException { + return wrapped.read(reuse, in); + } + + @Override + public void setRowPositionSupplier(Supplier posSupplier) { + if (wrapped instanceof SupportsRowPosition) { + ((SupportsRowPosition) wrapped).setRowPositionSupplier(posSupplier); + } + } +} diff --git a/core/src/main/java/org/apache/iceberg/avro/SupportsCustomRecords.java b/core/src/main/java/org/apache/iceberg/avro/SupportsCustomRecords.java new file mode 100644 index 000000000000..c05e206e1307 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/avro/SupportsCustomRecords.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.avro; + +import java.util.Map; + +/** An interface for Avro DatumReaders to support custom record classes. */ +interface SupportsCustomRecords { + void setClassLoader(ClassLoader loader); + + void setRenames(Map renames); +} diff --git a/core/src/main/java/org/apache/iceberg/avro/ValueReader.java b/core/src/main/java/org/apache/iceberg/avro/ValueReader.java index 5470b8168f1b..d01bd74f536a 100644 --- a/core/src/main/java/org/apache/iceberg/avro/ValueReader.java +++ b/core/src/main/java/org/apache/iceberg/avro/ValueReader.java @@ -23,4 +23,8 @@ public interface ValueReader { T read(Decoder decoder, Object reuse) throws IOException; + + default void skip(Decoder decoder) throws IOException { + read(decoder, null); + } } diff --git a/core/src/main/java/org/apache/iceberg/avro/ValueReaders.java b/core/src/main/java/org/apache/iceberg/avro/ValueReaders.java index 19789cce82fc..d530bc1854e1 100644 --- a/core/src/main/java/org/apache/iceberg/avro/ValueReaders.java +++ b/core/src/main/java/org/apache/iceberg/avro/ValueReaders.java @@ -31,6 +31,7 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.UUID; import java.util.function.Supplier; import org.apache.avro.Schema; @@ -44,6 +45,7 @@ import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.types.Types; +import org.apache.iceberg.util.Pair; import org.apache.iceberg.util.UUIDUtil; public class ValueReaders { @@ -53,6 +55,14 @@ public static ValueReader nulls() { return NullReader.INSTANCE; } + public static ValueReader constant(T value) { + return new ConstantReader<>(value); + } + + public static ValueReader replaceWithConstant(ValueReader reader, T value) { + return new ReplaceWithConstantReader<>(reader, value); + } + public static ValueReader booleans() { return BooleanReader.INSTANCE; } @@ -61,6 +71,10 @@ public static ValueReader ints() { return IntegerReader.INSTANCE; } + public static ValueReader intsAsLongs() { + return IntegerAsLongReader.INSTANCE; + } + public static ValueReader longs() { return LongReader.INSTANCE; } @@ -69,6 +83,10 @@ public static ValueReader floats() { return FloatReader.INSTANCE; } + public static ValueReader floatsAsDoubles() { + return FloatAsDoubleReader.INSTANCE; + } + public static ValueReader doubles() { return DoubleReader.INSTANCE; } @@ -125,6 +143,10 @@ public static ValueReader union(List> readers) { return new UnionReader(readers); } + public static ValueReader positions() { + return new PositionReader(); + } + public static ValueReader> array(ValueReader elementReader) { return new ArrayReader<>(elementReader); } @@ -149,6 +171,16 @@ public static ValueReader record( return new IndexedRecordReader<>(readers, recordClass, recordSchema); } + public static ValueReader record( + Schema recordSchema, List>> readPlan) { + return new PlannedRecordReader(recordSchema, readPlan); + } + + public static ValueReader record( + Schema recordSchema, Class recordClass, List>> readPlan) { + return new PlannedIndexedReader<>(recordSchema, recordClass, readPlan); + } + private static class NullReader implements ValueReader { private static final NullReader INSTANCE = new NullReader(); @@ -159,6 +191,47 @@ public Object read(Decoder decoder, Object ignored) throws IOException { decoder.readNull(); return null; } + + @Override + public void skip(Decoder decoder) throws IOException { + decoder.readNull(); + } + } + + private static class ConstantReader implements ValueReader { + private final T constant; + + private ConstantReader(T constant) { + this.constant = constant; + } + + @Override + public T read(Decoder decoder, Object reuse) throws IOException { + return constant; + } + + @Override + public void skip(Decoder decoder) throws IOException {} + } + + private static class ReplaceWithConstantReader extends ConstantReader { + private final ValueReader replaced; + + private ReplaceWithConstantReader(ValueReader replaced, T constant) { + super(constant); + this.replaced = replaced; + } + + @Override + public T read(Decoder decoder, Object reuse) throws IOException { + replaced.read(decoder, reuse); + return super.read(decoder, reuse); + } + + @Override + public void skip(Decoder decoder) throws IOException { + replaced.skip(decoder); + } } private static class BooleanReader implements ValueReader { @@ -170,6 +243,11 @@ private BooleanReader() {} public Boolean read(Decoder decoder, Object ignored) throws IOException { return decoder.readBoolean(); } + + @Override + public void skip(Decoder decoder) throws IOException { + decoder.readBoolean(); + } } private static class IntegerReader implements ValueReader { @@ -181,6 +259,27 @@ private IntegerReader() {} public Integer read(Decoder decoder, Object ignored) throws IOException { return decoder.readInt(); } + + @Override + public void skip(Decoder decoder) throws IOException { + decoder.readInt(); + } + } + + private static class IntegerAsLongReader implements ValueReader { + private static final IntegerAsLongReader INSTANCE = new IntegerAsLongReader(); + + private IntegerAsLongReader() {} + + @Override + public Long read(Decoder decoder, Object ignored) throws IOException { + return (long) decoder.readInt(); + } + + @Override + public void skip(Decoder decoder) throws IOException { + decoder.readInt(); + } } private static class LongReader implements ValueReader { @@ -192,6 +291,11 @@ private LongReader() {} public Long read(Decoder decoder, Object ignored) throws IOException { return decoder.readLong(); } + + @Override + public void skip(Decoder decoder) throws IOException { + decoder.readLong(); + } } private static class FloatReader implements ValueReader { @@ -203,6 +307,27 @@ private FloatReader() {} public Float read(Decoder decoder, Object ignored) throws IOException { return decoder.readFloat(); } + + @Override + public void skip(Decoder decoder) throws IOException { + decoder.skipFixed(4); + } + } + + private static class FloatAsDoubleReader implements ValueReader { + private static final FloatAsDoubleReader INSTANCE = new FloatAsDoubleReader(); + + private FloatAsDoubleReader() {} + + @Override + public Double read(Decoder decoder, Object ignored) throws IOException { + return (double) decoder.readFloat(); + } + + @Override + public void skip(Decoder decoder) throws IOException { + decoder.skipFixed(4); + } } private static class DoubleReader implements ValueReader { @@ -214,6 +339,11 @@ private DoubleReader() {} public Double read(Decoder decoder, Object ignored) throws IOException { return decoder.readDouble(); } + + @Override + public void skip(Decoder decoder) throws IOException { + decoder.skipFixed(8); + } } private static class StringReader implements ValueReader { @@ -231,6 +361,11 @@ public String read(Decoder decoder, Object ignored) throws IOException { // byte[] bytes = new byte[length]; // decoder.readFixed(bytes, 0, length); } + + @Override + public void skip(Decoder decoder) throws IOException { + decoder.skipString(); + } } private static class Utf8Reader implements ValueReader { @@ -250,6 +385,11 @@ public Utf8 read(Decoder decoder, Object reuse) throws IOException { // byte[] bytes = new byte[length]; // decoder.readFixed(bytes, 0, length); } + + @Override + public void skip(Decoder decoder) throws IOException { + decoder.skipString(); + } } private static class UUIDReader implements ValueReader { @@ -275,6 +415,11 @@ public UUID read(Decoder decoder, Object ignored) throws IOException { return UUIDUtil.convert(buffer); } + + @Override + public void skip(Decoder decoder) throws IOException { + decoder.skipFixed(16); + } } private static class FixedReader implements ValueReader { @@ -298,6 +443,11 @@ public byte[] read(Decoder decoder, Object reuse) throws IOException { decoder.readFixed(bytes, 0, length); return bytes; } + + @Override + public void skip(Decoder decoder) throws IOException { + decoder.skipFixed(length); + } } private static class GenericFixedReader implements ValueReader { @@ -323,6 +473,11 @@ public GenericData.Fixed read(Decoder decoder, Object reuse) throws IOException decoder.readFixed(bytes, 0, length); return new GenericData.Fixed(schema, bytes); } + + @Override + public void skip(Decoder decoder) throws IOException { + decoder.skipFixed(length); + } } private static class BytesReader implements ValueReader { @@ -344,6 +499,11 @@ public byte[] read(Decoder decoder, Object reuse) throws IOException { // decoder.readFixed(bytes, 0, length); // return bytes; } + + @Override + public void skip(Decoder decoder) throws IOException { + decoder.skipBytes(); + } } private static class ByteBufferReader implements ValueReader { @@ -364,6 +524,11 @@ public ByteBuffer read(Decoder decoder, Object reuse) throws IOException { // decoder.readFixed(bytes, 0, length); // return bytes; } + + @Override + public void skip(Decoder decoder) throws IOException { + decoder.skipBytes(); + } } private static class DecimalReader implements ValueReader { @@ -381,6 +546,11 @@ public BigDecimal read(Decoder decoder, Object ignored) throws IOException { byte[] bytes = bytesReader.read(decoder, null); return new BigDecimal(new BigInteger(bytes), scale); } + + @Override + public void skip(Decoder decoder) throws IOException { + bytesReader.skip(decoder); + } } private static class UnionReader implements ValueReader { @@ -398,6 +568,12 @@ public Object read(Decoder decoder, Object reuse) throws IOException { int index = decoder.readIndex(); return readers[index].read(decoder, reuse); } + + @Override + public void skip(Decoder decoder) throws IOException { + int index = decoder.readIndex(); + readers[index].skip(decoder); + } } private static class EnumReader implements ValueReader { @@ -415,6 +591,11 @@ public String read(Decoder decoder, Object ignored) throws IOException { int index = decoder.readEnum(); return symbols[index]; } + + @Override + public void skip(Decoder decoder) throws IOException { + decoder.readEnum(); + } } private static class ArrayReader implements ValueReader> { @@ -456,6 +637,16 @@ public Collection read(Decoder decoder, Object reused) throws IOException { return resultList; } + + @Override + public void skip(Decoder decoder) throws IOException { + long itemsToSkip; + while ((itemsToSkip = decoder.skipArray()) != 0) { + for (int i = 0; i < itemsToSkip; i += 1) { + elementReader.skip(decoder); + } + } + } } private static class ArrayMapReader implements ValueReader> { @@ -509,6 +700,17 @@ public Map read(Decoder decoder, Object reuse) throws IOException { return resultMap; } + + @Override + public void skip(Decoder decoder) throws IOException { + long itemsToSkip; + while ((itemsToSkip = decoder.skipArray()) != 0) { + for (int i = 0; i < itemsToSkip; i += 1) { + keyReader.skip(decoder); + valueReader.skip(decoder); + } + } + } } private static class MapReader implements ValueReader> { @@ -562,6 +764,133 @@ public Map read(Decoder decoder, Object reuse) throws IOException { return resultMap; } + + @Override + public void skip(Decoder decoder) throws IOException { + long itemsToSkip; + while ((itemsToSkip = decoder.skipMap()) != 0) { + for (int i = 0; i < itemsToSkip; i += 1) { + keyReader.skip(decoder); + valueReader.skip(decoder); + } + } + } + } + + public abstract static class PlannedStructReader + implements ValueReader, SupportsRowPosition { + private final ValueReader[] readers; + private final Integer[] positions; + + protected PlannedStructReader(List>> readPlan) { + this.readers = readPlan.stream().map(Pair::second).toArray(ValueReader[]::new); + this.positions = readPlan.stream().map(Pair::first).toArray(Integer[]::new); + } + + @Override + public void setRowPositionSupplier(Supplier posSupplier) { + for (ValueReader reader : readers) { + if (reader instanceof SupportsRowPosition) { + ((SupportsRowPosition) reader).setRowPositionSupplier(posSupplier); + } + } + } + + protected abstract S reuseOrCreate(Object reuse); + + protected abstract Object get(S struct, int pos); + + protected abstract void set(S struct, int pos, Object value); + + @Override + public S read(Decoder decoder, Object reuse) throws IOException { + S struct = reuseOrCreate(reuse); + + for (int i = 0; i < readers.length; i += 1) { + if (positions[i] != null) { + Object reusedValue = get(struct, positions[i]); + set(struct, positions[i], readers[i].read(decoder, reusedValue)); + } else { + // if pos is null, the value is not projected + readers[i].skip(decoder); + } + } + + return struct; + } + + @Override + public void skip(Decoder decoder) throws IOException { + for (int i = 0; i < readers.length; i += 1) { + readers[i].skip(decoder); + } + } + } + + private static class PlannedRecordReader extends PlannedStructReader { + private final Schema recordSchema; + + private PlannedRecordReader(Schema recordSchema, List>> readPlan) { + super(readPlan); + this.recordSchema = recordSchema; + } + + @Override + protected GenericData.Record reuseOrCreate(Object reuse) { + if (reuse instanceof GenericData.Record) { + return (GenericData.Record) reuse; + } else { + return new GenericData.Record(recordSchema); + } + } + + @Override + protected Object get(GenericData.Record struct, int pos) { + return struct.get(pos); + } + + @Override + protected void set(GenericData.Record struct, int pos, Object value) { + struct.put(pos, value); + } + } + + private static class PlannedIndexedReader + extends PlannedStructReader { + private final Class recordClass; + private final DynConstructors.Ctor ctor; + private final Schema schema; + + PlannedIndexedReader( + Schema recordSchema, Class recordClass, List>> readPlan) { + super(readPlan); + this.recordClass = recordClass; + this.ctor = + DynConstructors.builder(IndexedRecord.class) + .hiddenImpl(recordClass, Schema.class) + .hiddenImpl(recordClass) + .build(); + this.schema = recordSchema; + } + + @Override + protected R reuseOrCreate(Object reuse) { + if (recordClass.isInstance(reuse)) { + return recordClass.cast(reuse); + } else { + return ctor.newInstance(schema); + } + } + + @Override + protected Object get(R struct, int pos) { + return struct.get(pos); + } + + @Override + protected void set(R struct, int pos, Object value) { + struct.put(pos, value); + } } public abstract static class StructReader implements ValueReader, SupportsRowPosition { @@ -577,10 +906,11 @@ protected StructReader(List> readers, Schema schema) { List fields = schema.getFields(); for (int pos = 0; pos < fields.size(); pos += 1) { Schema.Field field = fields.get(pos); - if (AvroSchemaUtil.getFieldId(field) == MetadataColumns.ROW_POSITION.fieldId()) { + if (Objects.equals(AvroSchemaUtil.fieldId(field), MetadataColumns.ROW_POSITION.fieldId())) { // track where the _pos field is located for setRowPositionSupplier this.posField = pos; - } else if (AvroSchemaUtil.getFieldId(field) == MetadataColumns.IS_DELETED.fieldId()) { + } else if (Objects.equals( + AvroSchemaUtil.fieldId(field), MetadataColumns.IS_DELETED.fieldId())) { isDeletedColumnPos = pos; } } @@ -622,19 +952,12 @@ protected StructReader( @Override public void setRowPositionSupplier(Supplier posSupplier) { if (posField >= 0) { - long startingPos = posSupplier.get(); - this.readers[posField] = new PositionReader(startingPos); - for (ValueReader reader : readers) { - if (reader instanceof SupportsRowPosition) { - ((SupportsRowPosition) reader).setRowPositionSupplier(() -> startingPos); - } - } + this.readers[posField] = new PositionReader(); + } - } else { - for (ValueReader reader : readers) { - if (reader instanceof SupportsRowPosition) { - ((SupportsRowPosition) reader).setRowPositionSupplier(posSupplier); - } + for (ValueReader reader : readers) { + if (reader instanceof SupportsRowPosition) { + ((SupportsRowPosition) reader).setRowPositionSupplier(posSupplier); } } } @@ -739,17 +1062,21 @@ protected void set(R struct, int pos, Object value) { } } - static class PositionReader implements ValueReader { - private long currentPosition; - - PositionReader(long rowPosition) { - this.currentPosition = rowPosition - 1; - } + static class PositionReader implements ValueReader, SupportsRowPosition { + private long currentPosition = 0; @Override public Long read(Decoder ignored, Object reuse) throws IOException { - currentPosition += 1; + this.currentPosition += 1; return currentPosition; } + + @Override + public void skip(Decoder ignored) throws IOException {} + + @Override + public void setRowPositionSupplier(Supplier posSupplier) { + this.currentPosition = posSupplier.get() - 1; + } } } diff --git a/core/src/test/java/org/apache/iceberg/avro/TestAvroNameMapping.java b/core/src/test/java/org/apache/iceberg/avro/TestAvroNameMapping.java index f4c7ee883e49..2af098445c6f 100644 --- a/core/src/test/java/org/apache/iceberg/avro/TestAvroNameMapping.java +++ b/core/src/test/java/org/apache/iceberg/avro/TestAvroNameMapping.java @@ -80,10 +80,7 @@ public void testMapProjections() throws IOException { Record projected = writeAndRead(writeSchema, readSchema, record, nameMapping); // field id 5 comes from read schema - Assertions.assertThat(projected.getSchema().getField("location_r5")) - .as("Field missing from table mapping is renamed") - .isNotNull(); - Assertions.assertThat(projected.get("location_r5")) + Assertions.assertThat(projected.get("location")) .as("location field should not be read") .isNull(); Assertions.assertThat(projected.get("id")).isEqualTo(34L); @@ -105,10 +102,7 @@ public void testMapProjections() throws IOException { projected = writeAndRead(writeSchema, readSchema, record, nameMapping); Record projectedL1 = ((Map) projected.get("location")).get("l1"); - Assertions.assertThat(projectedL1.getSchema().getField("long_r2")) - .as("Field missing from table mapping is renamed") - .isNotNull(); - Assertions.assertThat(projectedL1.get("long_r2")) + Assertions.assertThat(projectedL1.get("long")) .as("location.value.long, should not be read") .isNull(); } @@ -187,8 +181,7 @@ public void testComplexMapKeys() throws IOException { Comparators.charSequences().compare("k2", (CharSequence) projectedKey.get("k2"))) .isEqualTo(0); Assertions.assertThat(projectedValue.get("lat")).isEqualTo(52.995143f); - Assertions.assertThat(projectedValue.getSchema().getField("long_r2")).isNotNull(); - Assertions.assertThat(projectedValue.get("long_r2")).isNull(); + Assertions.assertThat(projectedValue.get("long")).isNull(); } @Test @@ -249,10 +242,7 @@ public void testArrayProjections() throws Exception { Schema readSchema = writeSchema; Record projected = writeAndRead(writeSchema, readSchema, record, nameMapping); - Assertions.assertThat(projected.getSchema().getField("point_r22")) - .as("Field missing from table mapping is renamed") - .isNotNull(); - Assertions.assertThat(projected.get("point_r22")).as("point field is not projected").isNull(); + Assertions.assertThat(projected.get("point")).as("point is not projected").isNull(); Assertions.assertThat(projected.get("id")).isEqualTo(34L); // point array is partially projected nameMapping = @@ -269,11 +259,8 @@ public void testArrayProjections() throws Exception { projected = writeAndRead(writeSchema, readSchema, record, nameMapping); Record point = ((List) projected.get("point")).get(0); - Assertions.assertThat(point.getSchema().getField("y_r18")) - .as("Field missing from table mapping is renamed") - .isNotNull(); Assertions.assertThat(point.get("x")).as("point.x is projected").isEqualTo(1); - Assertions.assertThat(point.get("y_r18")).as("point.y is not projected").isNull(); + Assertions.assertThat(point.get("y")).as("point.y is not projected").isNull(); Assertions.assertThat(projected.get("id")).isEqualTo(34L); }