Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 39 additions & 13 deletions core/src/main/java/org/apache/iceberg/avro/Avro.java
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
import org.apache.iceberg.io.FileAppender;
import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.io.OutputFile;
import org.apache.iceberg.mapping.MappingUtil;
import org.apache.iceberg.mapping.NameMapping;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
Expand Down Expand Up @@ -610,11 +611,12 @@ public static class ReadBuilder {
private org.apache.iceberg.Schema schema = null;
private Function<Schema, DatumReader<?>> createReaderFunc = null;
private BiFunction<org.apache.iceberg.Schema, Schema, DatumReader<?>> createReaderBiFunc = null;
private Function<org.apache.iceberg.Schema, DatumReader<?>> createResolvingReaderFunc = null;

@SuppressWarnings("UnnecessaryLambda")
private final Function<Schema, DatumReader<?>> defaultCreateReaderFunc =
private final Function<org.apache.iceberg.Schema, DatumReader<?>> defaultCreateReaderFunc =
readSchema -> {
GenericAvroReader<?> reader = new GenericAvroReader<>(readSchema);
GenericAvroReader<?> reader = GenericAvroReader.create(readSchema);
reader.setClassLoader(loader);
return reader;
};
Expand All @@ -627,15 +629,28 @@ private ReadBuilder(InputFile file) {
this.file = file;
}

public ReadBuilder createResolvingReader(
Function<org.apache.iceberg.Schema, DatumReader<?>> readerFunction) {
Preconditions.checkState(
createReaderBiFunc == null && createReaderFunc == null,
"Cannot set multiple read builder functions");
this.createResolvingReaderFunc = readerFunction;
return this;
}

public ReadBuilder createReaderFunc(Function<Schema, DatumReader<?>> readerFunction) {
Preconditions.checkState(createReaderBiFunc == null, "Cannot set multiple createReaderFunc");
Preconditions.checkState(
createReaderBiFunc == null && createResolvingReaderFunc == null,
"Cannot set multiple read builder functions");
this.createReaderFunc = readerFunction;
return this;
}

public ReadBuilder createReaderFunc(
BiFunction<org.apache.iceberg.Schema, Schema, DatumReader<?>> readerFunction) {
Preconditions.checkState(createReaderFunc == null, "Cannot set multiple createReaderFunc");
Preconditions.checkState(
createReaderFunc == null && createResolvingReaderFunc == null,
"Cannot set multiple read builder functions");
this.createReaderBiFunc = readerFunction;
return this;
}
Expand Down Expand Up @@ -683,23 +698,34 @@ public ReadBuilder classLoader(ClassLoader classLoader) {
return this;
}

@SuppressWarnings("unchecked")
public <D> AvroIterable<D> build() {
Preconditions.checkNotNull(schema, "Schema is required");
Function<Schema, DatumReader<?>> readerFunc;

if (null == nameMapping) {
this.nameMapping = MappingUtil.create(schema);
}

DatumReader<D> reader;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I always like to make these final so you're sure that it doesn't skip through a branch.

Suggested change
DatumReader<D> reader;
final DatumReader<D> reader;

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think it is necessary in this case. The compiler will catch if it is unset because no default was provided.

if (createReaderBiFunc != null) {
readerFunc = avroSchema -> createReaderBiFunc.apply(schema, avroSchema);
reader =
new ProjectionDatumReader<>(
avroSchema -> createReaderBiFunc.apply(schema, avroSchema), schema, renames, null);
} else if (createReaderFunc != null) {
readerFunc = createReaderFunc;
reader = new ProjectionDatumReader<>(createReaderFunc, schema, renames, null);
} else if (createResolvingReaderFunc != null) {
reader = (DatumReader<D>) createResolvingReaderFunc.apply(schema);
} else {
readerFunc = defaultCreateReaderFunc;
reader = (DatumReader<D>) defaultCreateReaderFunc.apply(schema);
}

if (reader instanceof SupportsCustomRecords) {
((SupportsCustomRecords) reader).setClassLoader(loader);
((SupportsCustomRecords) reader).setRenames(renames);
}

return new AvroIterable<>(
file,
new ProjectionDatumReader<>(readerFunc, schema, renames, nameMapping),
start,
length,
reuseContainers);
file, new NameMappingDatumReader<>(nameMapping, reader), start, length, reuseContainers);
}
}

Expand Down
4 changes: 3 additions & 1 deletion core/src/main/java/org/apache/iceberg/avro/AvroIterable.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.CloseableIterator;
import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.relocated.com.google.common.base.Suppliers;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;

public class AvroIterable<D> extends CloseableGroup implements CloseableIterable<D> {
Expand Down Expand Up @@ -78,7 +79,8 @@ public CloseableIterator<D> iterator() {
if (start != null) {
if (reader instanceof SupportsRowPosition) {
((SupportsRowPosition) reader)
.setRowPositionSupplier(() -> AvroIO.findStartingRowPos(file::newStream, start));
.setRowPositionSupplier(
Suppliers.memoize(() -> AvroIO.findStartingRowPos(file::newStream, start)));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why the memoize? Are we reading the same file multiple times?

Copy link
Contributor Author

@rdblue rdblue Jan 1, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Previously, this was being done in the StructValueReader. If the struct reader inserted a PositionReader, it would also rewrite the position supplier.

That was a lot of complication for the value reader and didn't work in all cases (for example, if two structs had position columns) so I moved the memoization here. It's simpler that way and enabled us to add position readers that are constructed in the same place as the other readers, instead of needing to keep track of the position index in a struct and inject when setRowPositionSupplier is called.

}
fileReader = new AvroRangeIterator<>(fileReader, start, end);
} else if (reader instanceof SupportsRowPosition) {
Expand Down
211 changes: 211 additions & 0 deletions core/src/main/java/org/apache/iceberg/avro/AvroWithPartnerVisitor.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,211 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.avro;

import java.util.Deque;
import java.util.List;
import org.apache.avro.Schema;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types;

public class AvroWithPartnerVisitor<P, R> {
public interface PartnerAccessors<P> {
P fieldPartner(P partnerStruct, Integer fieldId, String name);

P mapKeyPartner(P partnerMap);

P mapValuePartner(P partnerMap);

P listElementPartner(P partnerList);
}

static class FieldIDAccessors implements AvroWithPartnerVisitor.PartnerAccessors<Type> {
private static final FieldIDAccessors INSTANCE = new FieldIDAccessors();

public static FieldIDAccessors get() {
return INSTANCE;
}

@Override
public Type fieldPartner(Type partner, Integer fieldId, String name) {
Types.NestedField field = partner.asStructType().field(fieldId);
return field != null ? field.type() : null;
}

@Override
public Type mapKeyPartner(Type partner) {
return partner.asMapType().keyType();
}

@Override
public Type mapValuePartner(Type partner) {
return partner.asMapType().valueType();
}

@Override
public Type listElementPartner(Type partner) {
return partner.asListType().elementType();
}
}

/** Used to fail on recursive types. */
private Deque<String> recordLevels = Lists.newLinkedList();

public R record(P partner, Schema record, List<R> fieldResults) {
return null;
}

public R union(P partner, Schema union, List<R> optionResults) {
return null;
}

public R array(P partner, Schema array, R elementResult) {
return null;
}

public R arrayMap(P partner, Schema map, R keyResult, R valueResult) {
return null;
}

public R map(P partner, Schema map, R valueResult) {
return null;
}

public R primitive(P partner, Schema primitive) {
return null;
}

public static <P, R> R visit(
P partner,
Schema schema,
AvroWithPartnerVisitor<P, R> visitor,
PartnerAccessors<P> accessors) {
switch (schema.getType()) {
case RECORD:
return visitRecord(partner, schema, visitor, accessors);

case UNION:
return visitUnion(partner, schema, visitor, accessors);

case ARRAY:
return visitArray(partner, schema, visitor, accessors);

case MAP:
return visitor.map(
partner,
schema,
visit(
partner != null ? accessors.mapValuePartner(partner) : null,
schema.getValueType(),
visitor,
accessors));

default:
return visitor.primitive(partner, schema);
}
}

private static <P, R> R visitRecord(
P partnerStruct,
Schema record,
AvroWithPartnerVisitor<P, R> visitor,
PartnerAccessors<P> accessors) {
// check to make sure this hasn't been visited before
String recordName = record.getFullName();
Preconditions.checkState(
!visitor.recordLevels.contains(recordName),
"Cannot process recursive Avro record %s",
recordName);
visitor.recordLevels.push(recordName);

List<Schema.Field> fields = record.getFields();
List<R> results = Lists.newArrayListWithExpectedSize(fields.size());
for (int pos = 0; pos < fields.size(); pos += 1) {
Schema.Field field = fields.get(pos);
Integer fieldId = AvroSchemaUtil.fieldId(field);

P fieldPartner =
partnerStruct != null && fieldId != null
? accessors.fieldPartner(partnerStruct, fieldId, field.name())
: null;
results.add(visit(fieldPartner, field.schema(), visitor, accessors));
}

visitor.recordLevels.pop();

return visitor.record(partnerStruct, record, results);
}

private static <P, R> R visitUnion(
P partner,
Schema union,
AvroWithPartnerVisitor<P, R> visitor,
PartnerAccessors<P> accessors) {
Preconditions.checkArgument(
AvroSchemaUtil.isOptionSchema(union), "Cannot visit non-option union: %s", union);

List<Schema> types = union.getTypes();
List<R> options = Lists.newArrayListWithExpectedSize(types.size());
for (Schema branch : types) {
options.add(visit(partner, branch, visitor, accessors));
}

return visitor.union(partner, union, options);
}

private static <P, R> R visitArray(
P partnerArray,
Schema array,
AvroWithPartnerVisitor<P, R> visitor,
PartnerAccessors<P> accessors) {
if (array.getLogicalType() instanceof LogicalMap) {
Preconditions.checkState(
AvroSchemaUtil.isKeyValueSchema(array.getElementType()),
"Cannot visit invalid logical map type: %s",
array);

List<Schema.Field> keyValueFields = array.getElementType().getFields();
return visitor.arrayMap(
partnerArray,
array,
visit(
partnerArray != null ? accessors.mapKeyPartner(partnerArray) : null,
keyValueFields.get(0).schema(),
visitor,
accessors),
visit(
partnerArray != null ? accessors.mapValuePartner(partnerArray) : null,
keyValueFields.get(1).schema(),
visitor,
accessors));

} else {
return visitor.array(
partnerArray,
array,
visit(
partnerArray != null ? accessors.listElementPartner(partnerArray) : null,
array.getElementType(),
visitor,
accessors));
}
}
}
Loading