Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions orc/src/main/java/org/apache/iceberg/orc/ApplyNameMapping.java
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,21 @@ public TypeDescription record(TypeDescription record, List<String> names, List<T
return setId(structType, field);
}

@Override
public TypeDescription union(TypeDescription union, List<TypeDescription> options) {
Preconditions.checkArgument(options.size() >= 1, "Union type must have options");
MappedField field = nameMapping.find(currentPath());
TypeDescription unionType = TypeDescription.createUnion();

for (TypeDescription option : options) {
if (option != null) {
unionType.addUnionChild(option);
}
}

return setId(unionType, field);
}

@Override
public TypeDescription list(TypeDescription array, TypeDescription element) {
Preconditions.checkArgument(element != null, "List type must have element type");
Expand Down
5 changes: 5 additions & 0 deletions orc/src/main/java/org/apache/iceberg/orc/HasIds.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,11 @@ public Boolean record(TypeDescription record, List<String> names, List<Boolean>
return ORCSchemaUtil.icebergID(record).isPresent() || fields.stream().anyMatch(Predicate.isEqual(true));
}

@Override
public Boolean union(TypeDescription union, List<Boolean> options) {
return ORCSchemaUtil.icebergID(union).isPresent() || options.stream().anyMatch(Predicate.isEqual(true));
}

@Override
public Boolean list(TypeDescription array, Boolean element) {
return ORCSchemaUtil.icebergID(array).isPresent() || element;
Expand Down
50 changes: 35 additions & 15 deletions orc/src/main/java/org/apache/iceberg/orc/ORCSchemaUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.iceberg.types.Types;
import org.apache.orc.TypeDescription;


/**
* Utilities for mapping Iceberg to ORC schemas.
*/
Expand Down Expand Up @@ -265,21 +266,7 @@ private static TypeDescription buildOrcProjection(Integer fieldId, Type type, bo

switch (type.typeId()) {
case STRUCT:
orcType = TypeDescription.createStruct();
for (Types.NestedField nestedField : type.asStructType().fields()) {
// Using suffix _r to avoid potential underlying issues in ORC reader
// with reused column names between ORC and Iceberg;
// e.g. renaming column c -> d and adding new column d
if (mapping.get(nestedField.fieldId()) == null && nestedField.hasDefaultValue()) {
continue;
}
String name = Optional.ofNullable(mapping.get(nestedField.fieldId()))
.map(OrcField::name)
.orElseGet(() -> nestedField.name() + "_r" + nestedField.fieldId());
TypeDescription childType = buildOrcProjection(nestedField.fieldId(), nestedField.type(),
isRequired && nestedField.isRequired(), mapping);
orcType.addField(name, childType);
}
orcType = buildOrcProjectForStructType(fieldId, type, isRequired, mapping);
break;
case LIST:
Types.ListType list = (Types.ListType) type;
Expand Down Expand Up @@ -320,6 +307,32 @@ private static TypeDescription buildOrcProjection(Integer fieldId, Type type, bo
return orcType;
}

private static TypeDescription buildOrcProjectForStructType(Integer fieldId, Type type, boolean isRequired,
Map<Integer, OrcField> mapping) {
TypeDescription orcType;
OrcField orcField = mapping.getOrDefault(fieldId, null);
if (orcField != null && orcField.type.getCategory().equals(TypeDescription.Category.UNION)) {
orcType = orcField.type;
} else {
orcType = TypeDescription.createStruct();
for (Types.NestedField nestedField : type.asStructType().fields()) {
// Using suffix _r to avoid potential underlying issues in ORC reader
// with reused column names between ORC and Iceberg;
// e.g. renaming column c -> d and adding new column d
if (mapping.get(nestedField.fieldId()) == null && nestedField.hasDefaultValue()) {
continue;
}
String name = Optional.ofNullable(mapping.get(nestedField.fieldId()))
.map(OrcField::name)
.orElseGet(() -> nestedField.name() + "_r" + nestedField.fieldId());
TypeDescription childType = buildOrcProjection(nestedField.fieldId(), nestedField.type(),
isRequired && nestedField.isRequired(), mapping);
orcType.addField(name, childType);
}
}
return orcType;
}

private static Map<Integer, OrcField> icebergToOrcMapping(String name, TypeDescription orcType) {
Map<Integer, OrcField> icebergToOrc = Maps.newHashMap();
switch (orcType.getCategory()) {
Expand All @@ -330,6 +343,13 @@ private static Map<Integer, OrcField> icebergToOrcMapping(String name, TypeDescr
icebergToOrc.putAll(icebergToOrcMapping(childrenNames.get(i), children.get(i)));
}
break;
case UNION:
// This is part of building orc read schema in file level. orcType has union type inside it.
List<TypeDescription> options = orcType.getChildren();
for (int i = 0; i < options.size(); i++) {
icebergToOrc.putAll(icebergToOrcMapping("option" + i, options.get(i)));
}
break;
case LIST:
icebergToOrc.putAll(icebergToOrcMapping("element", orcType.getChildren().get(0)));
break;
Expand Down
29 changes: 28 additions & 1 deletion orc/src/main/java/org/apache/iceberg/orc/OrcSchemaVisitor.java
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,18 @@ public static <T> T visit(TypeDescription schema, OrcSchemaVisitor<T> visitor) {
return visitRecord(schema, visitor);

case UNION:
throw new UnsupportedOperationException("Cannot handle " + schema);
List<TypeDescription> types = schema.getChildren();
List<T> options = Lists.newArrayListWithExpectedSize(types.size());
for (TypeDescription type : types) {
visitor.beforeUnionOption(type);
try {
options.add(visit(type, visitor));
} finally {
visitor.afterUnionOption(type);
}
}

return visitor.union(schema, options);

case LIST:
final T elementResult;
Expand Down Expand Up @@ -112,6 +123,10 @@ private static <T> T visitRecord(TypeDescription record, OrcSchemaVisitor<T> vis
return visitor.record(record, names, visitFields(fields, names, visitor));
}

public String optionName() {
return "_option";
}

public String elementName() {
return "_elem";
}
Expand All @@ -136,6 +151,14 @@ public void afterField(String name, TypeDescription type) {
fieldNames.pop();
}

public void beforeUnionOption(TypeDescription option) {
beforeField(optionName(), option);
}

public void afterUnionOption(TypeDescription option) {
afterField(optionName(), option);
}

public void beforeElementField(TypeDescription element) {
beforeField(elementName(), element);
}
Expand Down Expand Up @@ -164,6 +187,10 @@ public T record(TypeDescription record, List<String> names, List<T> fields) {
return null;
}

public T union(TypeDescription union, List<T> options) {
return null;
}

public T list(TypeDescription array, T element) {
return null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public static <T> T visit(Type iType, TypeDescription schema, OrcSchemaWithTypeV
return visitor.visitRecord(iType != null ? iType.asStructType() : null, schema, visitor);

case UNION:
throw new UnsupportedOperationException("Cannot handle " + schema);
return visitUnion(iType, schema, visitor);

case LIST:
Types.ListType list = iType != null ? iType.asListType() : null;
Expand Down Expand Up @@ -71,10 +71,25 @@ protected T visitRecord(
return visitor.record(struct, record, names, results);
}

private static <T> T visitUnion(Type type, TypeDescription union, OrcSchemaWithTypeVisitor<T> visitor) {
List<TypeDescription> types = union.getChildren();
List<T> options = Lists.newArrayListWithCapacity(types.size());

for (int i = 0; i < types.size(); i += 1) {
options.add(visit(type.asStructType().fields().get(i).type(), types.get(i), visitor));
}

return visitor.union(type, union, options);
}

public T record(Types.StructType iStruct, TypeDescription record, List<String> names, List<T> fields) {
return null;
}

public T union(Type iUnion, TypeDescription union, List<T> options) {
return null;
}

public T list(Types.ListType iList, TypeDescription array, T element) {
return null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,11 @@ public OrcValueReader<?> record(
return SparkOrcValueReaders.struct(fields, expected, getIdToConstant());
}

@Override
public OrcValueReader<?> union(Type expected, TypeDescription union, List<OrcValueReader<?>> options) {
return SparkOrcValueReaders.union(options);
}

@Override
public OrcValueReader<?> list(Types.ListType iList, TypeDescription array, OrcValueReader<?> elementReader) {
return SparkOrcValueReaders.array(elementReader);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.orc.storage.ql.exec.vector.ListColumnVector;
import org.apache.orc.storage.ql.exec.vector.MapColumnVector;
import org.apache.orc.storage.ql.exec.vector.TimestampColumnVector;
import org.apache.orc.storage.ql.exec.vector.UnionColumnVector;
import org.apache.orc.storage.serde2.io.HiveDecimalWritable;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.catalyst.expressions.GenericInternalRow;
Expand Down Expand Up @@ -70,6 +71,10 @@ static OrcValueReader<?> struct(
return new StructReader(readers, struct, idToConstant);
}

static OrcValueReader<?> union(List<OrcValueReader<?>> readers) {
return new UnionReader(readers);
}

static OrcValueReader<?> array(OrcValueReader<?> elementReader) {
return new ArrayReader(elementReader);
}
Expand Down Expand Up @@ -159,6 +164,33 @@ protected void set(InternalRow struct, int pos, Object value) {
}
}

static class UnionReader implements OrcValueReader<InternalRow> {
private final OrcValueReader[] readers;

private UnionReader(List<OrcValueReader<?>> readers) {
this.readers = new OrcValueReader[readers.size()];
for (int i = 0; i < this.readers.length; i += 1) {
this.readers[i] = readers.get(i);
}
}

@Override
public InternalRow nonNullRead(ColumnVector vector, int row) {
InternalRow struct = new GenericInternalRow(readers.length);
UnionColumnVector unionColumnVector = (UnionColumnVector) vector;

int fieldIndex = unionColumnVector.tags[row];
Object value = this.readers[fieldIndex].read(unionColumnVector.fields[fieldIndex], row);

for (int i = 0; i < readers.length; i += 1) {
struct.setNullAt(i);
}
struct.update(fieldIndex, value);

return struct;
}
}

private static class StringReader implements OrcValueReader<UTF8String> {
private static final StringReader INSTANCE = new StringReader();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.apache.orc.storage.ql.exec.vector.ListColumnVector;
import org.apache.orc.storage.ql.exec.vector.MapColumnVector;
import org.apache.orc.storage.ql.exec.vector.StructColumnVector;
import org.apache.orc.storage.ql.exec.vector.UnionColumnVector;
import org.apache.orc.storage.ql.exec.vector.VectorizedRowBatch;
import org.apache.spark.sql.types.Decimal;
import org.apache.spark.sql.vectorized.ColumnVector;
Expand Down Expand Up @@ -93,6 +94,11 @@ public Converter record(Types.StructType iStruct, TypeDescription record, List<S
return new StructConverter(iStruct, fields, getIdToConstant());
}

@Override
public Converter union(Type iType, TypeDescription union, List<Converter> options) {
return new UnionConverter(iType, options);
}

@Override
public Converter list(Types.ListType iList, TypeDescription array, Converter element) {
return new ArrayConverter(iList, element);
Expand Down Expand Up @@ -422,4 +428,35 @@ public ColumnVector getChild(int ordinal) {
};
}
}

private static class UnionConverter implements Converter {
private final Types.StructType structType;
private final List<Converter> optionConverters;

private UnionConverter(Type type, List<Converter> optionConverters) {
this.structType = type.asStructType();
this.optionConverters = optionConverters;
}

@Override
public ColumnVector convert(org.apache.orc.storage.ql.exec.vector.ColumnVector vector, int batchSize,
long batchOffsetInFile) {
UnionColumnVector unionColumnVector = (UnionColumnVector) vector;
List<Types.NestedField> fields = structType.fields();
assert fields.size() == unionColumnVector.fields.length;
assert fields.size() == optionConverters.size();

List<ColumnVector> fieldVectors = Lists.newArrayListWithExpectedSize(fields.size());
for (int i = 0; i < fields.size(); i += 1) {
fieldVectors.add(optionConverters.get(i).convert(unionColumnVector.fields[i], batchSize, batchOffsetInFile));
}

return new BaseOrcColumnVector(structType, batchSize, vector) {
@Override
public ColumnVector getChild(int ordinal) {
return fieldVectors.get(ordinal);
}
};
}
}
}
Loading