Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -81,13 +81,14 @@ public Schema record(Schema record, List<String> names, Iterable<Schema.Field> s
List<Types.NestedField> expectedFields = struct.fields();
for (int i = 0; i < expectedFields.size(); i += 1) {
Types.NestedField field = expectedFields.get(i);
String sanitizedFieldName = AvroSchemaUtil.makeCompatibleName(field.name());

// detect reordering
if (i < fields.size() && !field.name().equals(fields.get(i).name())) {
if (i < fields.size() && !sanitizedFieldName.equals(fields.get(i).name())) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is sanitization necessary if this fix works by removing columns from the partition columns that get attached via JoinedRow?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is happening before getting to the materialized data. Matching the columns at read time did totally lack the sanitization logic that is used at write time.

hasChange = true;
}

Schema.Field avroField = updateMap.get(AvroSchemaUtil.makeCompatibleName(field.name()));
Schema.Field avroField = updateMap.get(sanitizedFieldName);

if (avroField != null) {
updatedFields.add(avroField);
Expand Down Expand Up @@ -123,7 +124,7 @@ public Schema.Field field(Schema.Field field, Supplier<Schema> fieldResult) {
return null;
}

String expectedName = expectedField.name();
String expectedName = AvroSchemaUtil.makeCompatibleName(expectedField.name());

this.current = expectedField.type();
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.math.BigInteger;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -66,23 +67,29 @@ public class SparkParquetReaders {
private SparkParquetReaders() {
}

@SuppressWarnings("unchecked")
public static ParquetValueReader<InternalRow> buildReader(Schema expectedSchema,
MessageType fileSchema) {
return SparkParquetReaders.buildReader(expectedSchema, fileSchema, Collections.emptyMap());
}

@SuppressWarnings("unchecked")
public static ParquetValueReader<InternalRow> buildReader(Schema expectedSchema,
MessageType fileSchema,
Map<Integer, Object> partitionValues) {
if (ParquetSchemaUtil.hasIds(fileSchema)) {
return (ParquetValueReader<InternalRow>)
TypeWithSchemaVisitor.visit(expectedSchema.asStruct(), fileSchema,
new ReadBuilder(fileSchema));
new ReadBuilder(fileSchema, partitionValues));
} else {
return (ParquetValueReader<InternalRow>)
TypeWithSchemaVisitor.visit(expectedSchema.asStruct(), fileSchema,
new FallbackReadBuilder(fileSchema));
new FallbackReadBuilder(fileSchema, partitionValues));
}
}

private static class FallbackReadBuilder extends ReadBuilder {
FallbackReadBuilder(MessageType type) {
super(type);
FallbackReadBuilder(MessageType type, Map<Integer, Object> partitionValues) {
super(type, partitionValues);
}

@Override
Expand Down Expand Up @@ -113,9 +120,11 @@ public ParquetValueReader<?> struct(Types.StructType ignored, GroupType struct,

private static class ReadBuilder extends TypeWithSchemaVisitor<ParquetValueReader<?>> {
private final MessageType type;
private final Map<Integer, Object> partitionValues;

ReadBuilder(MessageType type) {
ReadBuilder(MessageType type, Map<Integer, Object> partitionValues) {
this.type = type;
this.partitionValues = partitionValues;
}

@Override
Expand Down Expand Up @@ -146,13 +155,18 @@ public ParquetValueReader<?> struct(Types.StructType expected, GroupType struct,
List<Type> types = Lists.newArrayListWithExpectedSize(expectedFields.size());
for (Types.NestedField field : expectedFields) {
int id = field.fieldId();
ParquetValueReader<?> reader = readersById.get(id);
if (reader != null) {
reorderedFields.add(reader);
types.add(typesById.get(id));
} else {
reorderedFields.add(ParquetValueReaders.nulls());
if (partitionValues.containsKey(id)) {
reorderedFields.add(ParquetValueReaders.constant(partitionValues.get(id)));
types.add(null);
} else {
ParquetValueReader<?> reader = readersById.get(id);
if (reader != null) {
reorderedFields.add(reader);
types.add(typesById.get(id));
} else {
reorderedFields.add(ParquetValueReaders.nulls());
types.add(null);
}
}
}

Expand Down
165 changes: 42 additions & 123 deletions spark/src/main/java/org/apache/iceberg/spark/source/Reader.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,14 @@
import com.google.common.collect.Iterables;
import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import java.io.Closeable;
import java.io.IOException;
import java.io.Serializable;
import java.nio.ByteBuffer;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Function;
import org.apache.iceberg.CombinedScanTask;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DataTask;
Expand Down Expand Up @@ -61,14 +60,11 @@
import org.apache.iceberg.spark.data.SparkAvroReader;
import org.apache.iceberg.spark.data.SparkOrcReader;
import org.apache.iceberg.spark.data.SparkParquetReaders;
import org.apache.iceberg.types.TypeUtil;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.ByteBuffers;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.catalyst.expressions.Attribute;
import org.apache.spark.sql.catalyst.expressions.AttributeReference;
import org.apache.spark.sql.catalyst.expressions.GenericInternalRow;
import org.apache.spark.sql.catalyst.expressions.JoinedRow;
import org.apache.spark.sql.catalyst.expressions.UnsafeProjection;
import org.apache.spark.sql.sources.Filter;
import org.apache.spark.sql.sources.v2.DataSourceOptions;
Expand All @@ -84,7 +80,6 @@
import org.apache.spark.sql.types.Decimal;
import org.apache.spark.sql.types.DecimalType;
import org.apache.spark.sql.types.StringType;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.unsafe.types.UTF8String;
import org.slf4j.Logger;
Expand Down Expand Up @@ -396,48 +391,25 @@ private Iterator<InternalRow> open(FileScanTask task) {
// schema or rows returned by readers
Schema finalSchema = expectedSchema;
PartitionSpec spec = task.spec();
Set<Integer> idColumns = spec.identitySourceIds();

// schema needed for the projection and filtering
StructType sparkType = SparkSchemaUtil.convert(finalSchema);
Schema requiredSchema = SparkSchemaUtil.prune(tableSchema, sparkType, task.residual(), caseSensitive);
boolean hasJoinedPartitionColumns = !idColumns.isEmpty();
boolean hasExtraFilterColumns = requiredSchema.columns().size() != finalSchema.columns().size();

Schema iterSchema;
Iterator<InternalRow> iter;

if (hasJoinedPartitionColumns) {
// schema used to read data files
Schema readSchema = TypeUtil.selectNot(requiredSchema, idColumns);
Schema partitionSchema = TypeUtil.select(requiredSchema, idColumns);
PartitionRowConverter convertToRow = new PartitionRowConverter(partitionSchema, spec);
JoinedRow joined = new JoinedRow();

InternalRow partition = convertToRow.apply(file.partition());
joined.withRight(partition);

// create joined rows and project from the joined schema to the final schema
iterSchema = TypeUtil.join(readSchema, partitionSchema);
iter = Iterators.transform(open(task, readSchema), joined::withLeft);

} else if (hasExtraFilterColumns) {
// add projection to the final schema
iterSchema = requiredSchema;
iter = open(task, requiredSchema);
// build a map of partition values for reconstructing records
Map<Integer, Object> partitionValues = partitionMap(spec, file.partition());

if (hasExtraFilterColumns) {
return Iterators.transform(
open(task, requiredSchema, partitionValues),
APPLY_PROJECTION.bind(projection(finalSchema, requiredSchema))::invoke);
} else {
// return the base iterator
iterSchema = finalSchema;
iter = open(task, finalSchema);
return open(task, finalSchema, partitionValues);
}

// TODO: remove the projection by reporting the iterator's schema back to Spark
return Iterators.transform(iter,
APPLY_PROJECTION.bind(projection(finalSchema, iterSchema))::invoke);
}

private Iterator<InternalRow> open(FileScanTask task, Schema readSchema) {
private Iterator<InternalRow> open(FileScanTask task, Schema readSchema, Map<Integer, Object> partitionValues) {
CloseableIterable<InternalRow> iter;
if (task.isDataTask()) {
iter = newDataIterable(task.asDataTask(), readSchema);
Expand All @@ -448,7 +420,7 @@ private Iterator<InternalRow> open(FileScanTask task, Schema readSchema) {

switch (task.file().format()) {
case PARQUET:
iter = newParquetIterable(location, task, readSchema);
iter = newParquetIterable(location, task, readSchema, partitionValues);
break;

case AVRO:
Expand Down Expand Up @@ -504,12 +476,14 @@ private CloseableIterable<InternalRow> newAvroIterable(InputFile location,
}

private CloseableIterable<InternalRow> newParquetIterable(InputFile location,
FileScanTask task,
Schema readSchema) {
FileScanTask task,
Schema readSchema,
Map<Integer, Object> partitionValues) {

return Parquet.read(location)
.project(readSchema)
.split(task.start(), task.length())
.createReaderFunc(fileSchema -> SparkParquetReaders.buildReader(readSchema, fileSchema))
.createReaderFunc(fileSchema -> SparkParquetReaders.buildReader(readSchema, fileSchema, partitionValues))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does Avro need to be updated as well?

.filter(task.residual())
.caseSensitive(caseSensitive)
.build();
Expand All @@ -533,52 +507,6 @@ private CloseableIterable<InternalRow> newDataIterable(DataTask task, Schema rea
return CloseableIterable.transform(
asSparkRows, APPLY_PROJECTION.bind(projection(readSchema, tableSchema))::invoke);
}
}

private static class PartitionRowConverter implements Function<StructLike, InternalRow> {
private final DataType[] types;
private final int[] positions;
private final Class<?>[] javaTypes;
private final GenericInternalRow reusedRow;

PartitionRowConverter(Schema partitionSchema, PartitionSpec spec) {
StructType partitionType = SparkSchemaUtil.convert(partitionSchema);
StructField[] fields = partitionType.fields();

this.types = new DataType[fields.length];
this.positions = new int[types.length];
this.javaTypes = new Class<?>[types.length];
this.reusedRow = new GenericInternalRow(types.length);

List<PartitionField> partitionFields = spec.fields();
for (int rowIndex = 0; rowIndex < fields.length; rowIndex += 1) {
this.types[rowIndex] = fields[rowIndex].dataType();

int sourceId = partitionSchema.columns().get(rowIndex).fieldId();
for (int specIndex = 0; specIndex < partitionFields.size(); specIndex += 1) {
PartitionField field = spec.fields().get(specIndex);
if (field.sourceId() == sourceId && "identity".equals(field.transform().toString())) {
positions[rowIndex] = specIndex;
javaTypes[rowIndex] = spec.javaClasses()[specIndex];
break;
}
}
}
}

@Override
public InternalRow apply(StructLike tuple) {
for (int i = 0; i < types.length; i += 1) {
Object value = tuple.get(positions[i], javaTypes[i]);
if (value != null) {
reusedRow.update(i, convert(value, types[i]));
} else {
reusedRow.setNullAt(i);
}
}

return reusedRow;
}

/**
* Converts the objects into instances used by Spark's InternalRow.
Expand All @@ -588,48 +516,39 @@ public InternalRow apply(StructLike tuple) {
* @return the value converted to the representation expected by Spark's InternalRow.
*/
private static Object convert(Object value, DataType type) {
if (type instanceof StringType) {
return UTF8String.fromString(value.toString());
} else if (type instanceof BinaryType) {
return ByteBuffers.toByteArray((ByteBuffer) value);
} else if (type instanceof DecimalType) {
return Decimal.fromDecimal(value);
if (value != null) {
if (type instanceof StringType) {
return UTF8String.fromString(value.toString());
} else if (type instanceof BinaryType) {
return ByteBuffers.toByteArray((ByteBuffer) value);
} else if (type instanceof DecimalType) {
return Decimal.fromDecimal(value);
}
}
return value;
}
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: non-functional whitespace change.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

private static class StructLikeInternalRow implements StructLike {
private final DataType[] types;
private InternalRow row = null;

StructLikeInternalRow(StructType struct) {
this.types = new DataType[struct.size()];
StructField[] fields = struct.fields();
for (int i = 0; i < fields.length; i += 1) {
types[i] = fields[i].dataType();
/**
* Creates a map from field ID to Spark value for a partition tuple.
*
* @param spec a partition spec
* @param partition a partition tuple
* @return a map from field ID to Spark value
*/
private static Map<Integer, Object> partitionMap(PartitionSpec spec, StructLike partition) {
Map<Integer, Object> partitionValues = Maps.newHashMap();

List<PartitionField> fields = spec.fields();
for (int i = 0; i < fields.size(); i += 1) {
PartitionField field = fields.get(i);
if ("identity".equals(field.transform().toString())) {
partitionValues.put(field.sourceId(), convert(
partition.get(i, spec.javaClasses()[i]),
SparkSchemaUtil.convert(spec.partitionType().field(field.name()).type())));
}
}
}

public StructLikeInternalRow setRow(InternalRow row) {
this.row = row;
return this;
}

@Override
public int size() {
return types.length;
}

@Override
@SuppressWarnings("unchecked")
public <T> T get(int pos, Class<T> javaClass) {
return javaClass.cast(row.get(pos, types[pos]));
}

@Override
public <T> void set(int pos, T value) {
throw new UnsupportedOperationException("Not implemented: set");
return partitionValues;
}
}
}
Loading