Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions orc/src/main/java/org/apache/iceberg/orc/ApplyNameMapping.java
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,21 @@ public TypeDescription record(TypeDescription record, List<String> names, List<T
return setId(structType, field);
}

@Override
public TypeDescription union(TypeDescription union, List<TypeDescription> options) {
Preconditions.checkArgument(options.size() >= 1, "Union type must have options");
MappedField field = nameMapping.find(currentPath());
TypeDescription unionType = TypeDescription.createUnion();

for (TypeDescription option : options) {
if (option != null) {
unionType.addUnionChild(option);
}
}

return setId(unionType, field);
}

@Override
public TypeDescription list(TypeDescription array, TypeDescription element) {
Preconditions.checkArgument(element != null, "List type must have element type");
Expand Down
5 changes: 5 additions & 0 deletions orc/src/main/java/org/apache/iceberg/orc/HasIds.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,11 @@ public Boolean record(TypeDescription record, List<String> names, List<Boolean>
return ORCSchemaUtil.icebergID(record).isPresent() || fields.stream().anyMatch(Predicate.isEqual(true));
}

@Override
public Boolean union(TypeDescription union, List<Boolean> options) {
return ORCSchemaUtil.icebergID(union).isPresent() || options.stream().anyMatch(Predicate.isEqual(true));
}

@Override
public Boolean list(TypeDescription array, Boolean element) {
return ORCSchemaUtil.icebergID(array).isPresent() || element;
Expand Down
50 changes: 38 additions & 12 deletions orc/src/main/java/org/apache/iceberg/orc/ORCSchemaUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -265,18 +265,7 @@ private static TypeDescription buildOrcProjection(Integer fieldId, Type type, bo

switch (type.typeId()) {
case STRUCT:
orcType = TypeDescription.createStruct();
for (Types.NestedField nestedField : type.asStructType().fields()) {
// Using suffix _r to avoid potential underlying issues in ORC reader
// with reused column names between ORC and Iceberg;
// e.g. renaming column c -> d and adding new column d
String name = Optional.ofNullable(mapping.get(nestedField.fieldId()))
.map(OrcField::name)
.orElseGet(() -> nestedField.name() + "_r" + nestedField.fieldId());
TypeDescription childType = buildOrcProjection(nestedField.fieldId(), nestedField.type(),
isRequired && nestedField.isRequired(), mapping);
orcType.addField(name, childType);
}
orcType = buildOrcProjectForStructType(fieldId, type, isRequired, mapping);
break;
case LIST:
Types.ListType list = (Types.ListType) type;
Expand Down Expand Up @@ -317,6 +306,36 @@ private static TypeDescription buildOrcProjection(Integer fieldId, Type type, bo
return orcType;
}

private static TypeDescription buildOrcProjectForStructType(Integer fieldId, Type type, boolean isRequired,
Map<Integer, OrcField> mapping) {
TypeDescription orcType;
OrcField orcField = mapping.getOrDefault(fieldId, null);
// this branch means the iceberg struct schema actually correspond to an underlying union
if (orcField != null && orcField.type.getCategory().equals(TypeDescription.Category.UNION)) {
orcType = TypeDescription.createUnion();
List<Types.NestedField> nestedFields = type.asStructType().fields();
for (Types.NestedField nestedField : nestedFields.subList(1, nestedFields.size())) {
TypeDescription childType = buildOrcProjection(nestedField.fieldId(), nestedField.type(),
isRequired && nestedField.isRequired(), mapping);
orcType.addUnionChild(childType);
}
} else {
orcType = TypeDescription.createStruct();
for (Types.NestedField nestedField : type.asStructType().fields()) {
// Using suffix _r to avoid potential underlying issues in ORC reader
// with reused column names between ORC and Iceberg;
// e.g. renaming column c -> d and adding new column d
String name = Optional.ofNullable(mapping.get(nestedField.fieldId()))
.map(OrcField::name)
.orElseGet(() -> nestedField.name() + "_r" + nestedField.fieldId());
TypeDescription childType = buildOrcProjection(nestedField.fieldId(), nestedField.type(),
isRequired && nestedField.isRequired(), mapping);
orcType.addField(name, childType);
}
}
return orcType;
}

private static Map<Integer, OrcField> icebergToOrcMapping(String name, TypeDescription orcType) {
Map<Integer, OrcField> icebergToOrc = Maps.newHashMap();
switch (orcType.getCategory()) {
Expand All @@ -327,6 +346,13 @@ private static Map<Integer, OrcField> icebergToOrcMapping(String name, TypeDescr
icebergToOrc.putAll(icebergToOrcMapping(childrenNames.get(i), children.get(i)));
}
break;
case UNION:
// This is part of building orc read schema in file level. orcType has union type inside it.
List<TypeDescription> options = orcType.getChildren();
for (int i = 0; i < options.size(); i++) {
icebergToOrc.putAll(icebergToOrcMapping("option" + i, options.get(i)));
}
break;
case LIST:
icebergToOrc.putAll(icebergToOrcMapping("element", orcType.getChildren().get(0)));
break;
Expand Down
28 changes: 27 additions & 1 deletion orc/src/main/java/org/apache/iceberg/orc/OrcSchemaVisitor.java
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,17 @@ public static <T> T visit(TypeDescription schema, OrcSchemaVisitor<T> visitor) {
return visitRecord(schema, visitor);

case UNION:
throw new UnsupportedOperationException("Cannot handle " + schema);
List<TypeDescription> types = schema.getChildren();
List<T> options = Lists.newArrayListWithExpectedSize(types.size());
for (int i = 0; i < types.size(); i++) {
visitor.beforeUnionOption(types.get(i), i);
try {
options.add(visit(types.get(i), visitor));
} finally {
visitor.afterUnionOption(types.get(i), i);
}
}
return visitor.union(schema, options);

case LIST:
final T elementResult;
Expand Down Expand Up @@ -112,6 +122,10 @@ private static <T> T visitRecord(TypeDescription record, OrcSchemaVisitor<T> vis
return visitor.record(record, names, visitFields(fields, names, visitor));
}

public String optionName(int ordinal) {
return "field" + ordinal;
}

public String elementName() {
return "_elem";
}
Expand All @@ -136,6 +150,14 @@ public void afterField(String name, TypeDescription type) {
fieldNames.pop();
}

public void beforeUnionOption(TypeDescription option, int ordinal) {
beforeField(optionName(ordinal), option);
}

public void afterUnionOption(TypeDescription option, int ordinal) {
afterField(optionName(ordinal), option);
}

public void beforeElementField(TypeDescription element) {
beforeField(elementName(), element);
}
Expand Down Expand Up @@ -164,6 +186,10 @@ public T record(TypeDescription record, List<String> names, List<T> fields) {
return null;
}

public T union(TypeDescription union, List<T> options) {
return null;
}

public T list(TypeDescription array, T element) {
return null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public static <T> T visit(Type iType, TypeDescription schema, OrcSchemaWithTypeV
return visitRecord(iType != null ? iType.asStructType() : null, schema, visitor);

case UNION:
throw new UnsupportedOperationException("Cannot handle " + schema);
return visitor.visitUnion(iType, schema, visitor);

case LIST:
Types.ListType list = iType != null ? iType.asListType() : null;
Expand Down Expand Up @@ -71,10 +71,25 @@ private static <T> T visitRecord(
return visitor.record(struct, record, names, results);
}

protected T visitUnion(Type type, TypeDescription union, OrcSchemaWithTypeVisitor<T> visitor) {
List<TypeDescription> types = union.getChildren();
List<T> options = Lists.newArrayListWithCapacity(types.size());

for (int i = 0; i < types.size(); i += 1) {
options.add(visit(type.asStructType().fields().get(i + 1).type(), types.get(i), visitor));
}

return visitor.union(type, union, options);
}

public T record(Types.StructType iStruct, TypeDescription record, List<String> names, List<T> fields) {
return null;
}

public T union(Type iUnion, TypeDescription union, List<T> options) {
return null;
}

public T list(Types.ListType iList, TypeDescription array, T element) {
return null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,11 @@ public OrcValueReader<?> record(
return SparkOrcValueReaders.struct(fields, expected, idToConstant);
}

@Override
public OrcValueReader<?> union(Type expected, TypeDescription union, List<OrcValueReader<?>> options) {
return SparkOrcValueReaders.union(options);
}

@Override
public OrcValueReader<?> list(Types.ListType iList, TypeDescription array, OrcValueReader<?> elementReader) {
return SparkOrcValueReaders.array(elementReader);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.orc.storage.ql.exec.vector.ListColumnVector;
import org.apache.orc.storage.ql.exec.vector.MapColumnVector;
import org.apache.orc.storage.ql.exec.vector.TimestampColumnVector;
import org.apache.orc.storage.ql.exec.vector.UnionColumnVector;
import org.apache.orc.storage.serde2.io.HiveDecimalWritable;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.catalyst.expressions.GenericInternalRow;
Expand Down Expand Up @@ -70,6 +71,10 @@ static OrcValueReader<?> struct(
return new StructReader(readers, struct, idToConstant);
}

static OrcValueReader<?> union(List<OrcValueReader<?>> readers) {
return new UnionReader(readers);
}

static OrcValueReader<?> array(OrcValueReader<?> elementReader) {
return new ArrayReader(elementReader);
}
Expand Down Expand Up @@ -159,6 +164,34 @@ protected void set(InternalRow struct, int pos, Object value) {
}
}

static class UnionReader implements OrcValueReader<InternalRow> {
private final OrcValueReader[] readers;

private UnionReader(List<OrcValueReader<?>> readers) {
this.readers = new OrcValueReader[readers.size()];
for (int i = 0; i < this.readers.length; i += 1) {
this.readers[i] = readers.get(i);
}
}

@Override
public InternalRow nonNullRead(ColumnVector vector, int row) {
InternalRow struct = new GenericInternalRow(readers.length + 1);
UnionColumnVector unionColumnVector = (UnionColumnVector) vector;

int fieldIndex = unionColumnVector.tags[row];
Object value = this.readers[fieldIndex].read(unionColumnVector.fields[fieldIndex], row);

for (int i = 0; i < readers.length; i += 1) {
struct.setNullAt(i + 1);
}
struct.update(0, fieldIndex);
struct.update(fieldIndex + 1, value);

return struct;
}
}

private static class StringReader implements OrcValueReader<UTF8String> {
private static final StringReader INSTANCE = new StringReader();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.apache.iceberg.spark.data.vectorized;

import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.stream.IntStream;
Expand All @@ -35,8 +36,10 @@
import org.apache.iceberg.types.Types;
import org.apache.orc.TypeDescription;
import org.apache.orc.storage.ql.exec.vector.ListColumnVector;
import org.apache.orc.storage.ql.exec.vector.LongColumnVector;
import org.apache.orc.storage.ql.exec.vector.MapColumnVector;
import org.apache.orc.storage.ql.exec.vector.StructColumnVector;
import org.apache.orc.storage.ql.exec.vector.UnionColumnVector;
import org.apache.orc.storage.ql.exec.vector.VectorizedRowBatch;
import org.apache.spark.sql.types.Decimal;
import org.apache.spark.sql.vectorized.ColumnVector;
Expand Down Expand Up @@ -93,6 +96,11 @@ public Converter record(Types.StructType iStruct, TypeDescription record, List<S
return new StructConverter(iStruct, fields, idToConstant);
}

@Override
public Converter union(Type iType, TypeDescription union, List<Converter> options) {
return new UnionConverter(iType, options);
}

@Override
public Converter list(Types.ListType iList, TypeDescription array, Converter element) {
return new ArrayConverter(iList, element);
Expand Down Expand Up @@ -424,4 +432,38 @@ public ColumnVector getChild(int ordinal) {
};
}
}

private static class UnionConverter implements Converter {
private final Types.StructType structType;
private final List<Converter> optionConverters;

private UnionConverter(Type type, List<Converter> optionConverters) {
this.structType = type.asStructType();
this.optionConverters = optionConverters;
}

@Override
public ColumnVector convert(org.apache.orc.storage.ql.exec.vector.ColumnVector vector, int batchSize,
long batchOffsetInFile) {
UnionColumnVector unionColumnVector = (UnionColumnVector) vector;
List<Types.NestedField> fields = structType.fields();
List<ColumnVector> fieldVectors = Lists.newArrayListWithExpectedSize(fields.size());

LongColumnVector longColumnVector = new LongColumnVector();
longColumnVector.vector = Arrays.stream(unionColumnVector.tags).asLongStream().toArray();

fieldVectors.add(new PrimitiveOrcColumnVector(Types.IntegerType.get(), batchSize, longColumnVector,
OrcValueReaders.ints(), batchOffsetInFile));
for (int i = 0; i < fields.size() - 1; i += 1) {
fieldVectors.add(optionConverters.get(i).convert(unionColumnVector.fields[i], batchSize, batchOffsetInFile));
}

return new BaseOrcColumnVector(structType, batchSize, vector) {
@Override
public ColumnVector getChild(int ordinal) {
return fieldVectors.get(ordinal);
}
};
}
}
}
Loading