Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
121 changes: 96 additions & 25 deletions orc/src/main/java/org/apache/iceberg/orc/ORCSchemaUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -263,36 +263,36 @@ public static TypeDescription buildOrcProjection(Schema schema,
private static TypeDescription buildOrcProjection(Integer fieldId, Type type, boolean isRequired,
Map<Integer, OrcField> mapping) {
final TypeDescription orcType;
final OrcField orcField = mapping.getOrDefault(fieldId, null);

switch (type.typeId()) {
case STRUCT:
orcType = buildOrcProjectForStructType(fieldId, type, isRequired, mapping);
break;
case LIST:
Types.ListType list = (Types.ListType) type;
TypeDescription elementType = buildOrcProjection(list.elementId(), list.elementType(),
isRequired && list.isElementRequired(), mapping);
orcType = TypeDescription.createList(elementType);
orcType = buildOrcProjectionForListType((Types.ListType) type, isRequired, mapping, orcField);
break;
case MAP:
Types.MapType map = (Types.MapType) type;
TypeDescription keyType = buildOrcProjection(map.keyId(), map.keyType(), isRequired, mapping);
TypeDescription valueType = buildOrcProjection(map.valueId(), map.valueType(),
isRequired && map.isValueRequired(), mapping);
orcType = TypeDescription.createMap(keyType, valueType);
orcType = buildOrcProjectionForMapType((Types.MapType) type, isRequired, mapping, orcField);
break;
default:
if (mapping.containsKey(fieldId)) {
TypeDescription originalType = mapping.get(fieldId).type();
Optional<TypeDescription> promotedType = getPromotedType(type, originalType);

if (promotedType.isPresent()) {
orcType = promotedType.get();
} else {
Preconditions.checkArgument(isSameType(originalType, type),
"Can not promote %s type to %s",
originalType.getCategory(), type.typeId().name());
if (originalType != null && originalType.getCategory().equals(TypeDescription.Category.UNION)) {
Preconditions.checkState(originalType.getChildren().size() == 1,
"Expect single type union for orc schema.");
orcType = originalType.clone();
} else {
Optional<TypeDescription> promotedType = getPromotedType(type, originalType);

if (promotedType.isPresent()) {
orcType = promotedType.get();
} else {
Preconditions.checkArgument(isSameType(originalType, type),
"Can not promote %s type to %s",
originalType.getCategory(), type.typeId().name());
orcType = originalType.clone();
}
}
} else {
if (isRequired) {
Expand All @@ -307,19 +307,58 @@ private static TypeDescription buildOrcProjection(Integer fieldId, Type type, bo
return orcType;
}

private static TypeDescription buildOrcProjectionForMapType(Types.MapType type, boolean isRequired,
Map<Integer, OrcField> mapping, OrcField orcField) {
final TypeDescription orcType;
if (orcField != null && orcField.type.getCategory().equals(TypeDescription.Category.UNION)) {
Preconditions.checkState(orcField.type.getChildren().size() == 1,
"Expect single type union for orc schema.");

orcType = TypeDescription.createUnion();
Types.MapType map = type;
TypeDescription keyType = buildOrcProjection(map.keyId(), map.keyType(), isRequired, mapping);
TypeDescription valueType = buildOrcProjection(map.valueId(), map.valueType(),
isRequired && map.isValueRequired(), mapping);
orcType.addUnionChild(TypeDescription.createMap(keyType, valueType));
} else {
Types.MapType map = type;
TypeDescription keyType = buildOrcProjection(map.keyId(), map.keyType(), isRequired, mapping);
TypeDescription valueType = buildOrcProjection(map.valueId(), map.valueType(),
isRequired && map.isValueRequired(), mapping);
orcType = TypeDescription.createMap(keyType, valueType);
}
return orcType;
}

private static TypeDescription buildOrcProjectionForListType(Types.ListType type, boolean isRequired,
Map<Integer, OrcField> mapping, OrcField orcField) {
final TypeDescription orcType;
if (orcField != null && orcField.type.getCategory().equals(TypeDescription.Category.UNION)) {
Preconditions.checkState(orcField.type.getChildren().size() == 1,
"Expect single type union for orc schema.");

orcType = TypeDescription.createUnion();
Types.ListType list = type;
TypeDescription elementType = buildOrcProjection(list.elementId(), list.elementType(),
isRequired && list.isElementRequired(), mapping);
orcType.addUnionChild(TypeDescription.createList(elementType));
} else {
Types.ListType list = type;
TypeDescription elementType = buildOrcProjection(list.elementId(), list.elementType(),
isRequired && list.isElementRequired(), mapping);
orcType = TypeDescription.createList(elementType);
}
return orcType;
}

private static TypeDescription buildOrcProjectForStructType(Integer fieldId, Type type, boolean isRequired,
Map<Integer, OrcField> mapping) {
TypeDescription orcType;
OrcField orcField = mapping.getOrDefault(fieldId, null);
// this branch means the iceberg struct schema actually correspond to an underlying union

if (orcField != null && orcField.type.getCategory().equals(TypeDescription.Category.UNION)) {
orcType = TypeDescription.createUnion();
List<Types.NestedField> nestedFields = type.asStructType().fields();
for (Types.NestedField nestedField : nestedFields.subList(1, nestedFields.size())) {
TypeDescription childType = buildOrcProjection(nestedField.fieldId(), nestedField.type(),
isRequired && nestedField.isRequired(), mapping);
orcType.addUnionChild(childType);
}
// this branch means the iceberg struct schema actually correspond to an underlying union
orcType = getOrcSchemaForUnionType(type, isRequired, mapping, orcField);
} else {
orcType = TypeDescription.createStruct();
for (Types.NestedField nestedField : type.asStructType().fields()) {
Expand All @@ -340,6 +379,38 @@ private static TypeDescription buildOrcProjectForStructType(Integer fieldId, Typ
return orcType;
}

private static TypeDescription getOrcSchemaForUnionType(Type type, boolean isRequired, Map<Integer, OrcField> mapping,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Maybe change this getOrcSchemaForUnionType method name to getOrcUnionTypeFromIcebergStructType ?

OrcField orcField) {
TypeDescription orcType;
if (orcField.type.getChildren().size() == 1) { // single type union
orcType = TypeDescription.createUnion();

TypeDescription childOrcStructType = TypeDescription.createStruct();
for (Types.NestedField nestedField : type.asStructType().fields()) {
if (mapping.get(nestedField.fieldId()) == null && nestedField.hasDefaultValue()) {
continue;
}
String name = Optional.ofNullable(mapping.get(nestedField.fieldId()))
.map(OrcField::name)
.orElseGet(() -> nestedField.name());
TypeDescription childType = buildOrcProjection(nestedField.fieldId(), nestedField.type(),
isRequired && nestedField.isRequired(), mapping);
childOrcStructType.addField(name, childType);
}

orcType.addUnionChild(childOrcStructType);
} else { // complex union
orcType = TypeDescription.createUnion();
List<Types.NestedField> nestedFields = type.asStructType().fields();
for (Types.NestedField nestedField : nestedFields.subList(1, nestedFields.size())) {
TypeDescription childType = buildOrcProjection(nestedField.fieldId(), nestedField.type(),
isRequired && nestedField.isRequired(), mapping);
orcType.addUnionChild(childType);
}
}
return orcType;
}

private static Map<Integer, OrcField> icebergToOrcMapping(String name, TypeDescription orcType) {
Map<Integer, OrcField> icebergToOrc = Maps.newHashMap();
switch (orcType.getCategory()) {
Expand Down
18 changes: 12 additions & 6 deletions orc/src/main/java/org/apache/iceberg/orc/OrcSchemaVisitor.java
Original file line number Diff line number Diff line change
Expand Up @@ -49,14 +49,20 @@ public static <T> T visit(TypeDescription schema, OrcSchemaVisitor<T> visitor) {
case UNION:
List<TypeDescription> types = schema.getChildren();
List<T> options = Lists.newArrayListWithExpectedSize(types.size());
for (int i = 0; i < types.size(); i++) {
visitor.beforeUnionOption(types.get(i), i);
try {
options.add(visit(types.get(i), visitor));
} finally {
visitor.afterUnionOption(types.get(i), i);

if (types.size() == 1) {
options.add(visit(types.get(0), visitor));
} else {
for (int i = 0; i < types.size(); i++) {
visitor.beforeUnionOption(types.get(i), i);
try {
options.add(visit(types.get(i), visitor));
} finally {
visitor.afterUnionOption(types.get(i), i);
}
}
}

return visitor.union(schema, options);

case LIST:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,12 @@ protected T visitUnion(Type type, TypeDescription union, OrcSchemaWithTypeVisito
List<TypeDescription> types = union.getChildren();
List<T> options = Lists.newArrayListWithCapacity(types.size());

for (int i = 0; i < types.size(); i += 1) {
options.add(visit(type.asStructType().fields().get(i + 1).type(), types.get(i), visitor));
if (types.size() == 1) { // single type union
options.add(visit(type, types.get(0), visitor));
} else { // complex union
for (int i = 0; i < types.size(); i += 1) {
options.add(visit(type.asStructType().fields().get(i + 1).type(), types.get(i), visitor));
}
}

return visitor.union(type, union, options);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ protected void set(InternalRow struct, int pos, Object value) {
}
}

static class UnionReader implements OrcValueReader<InternalRow> {
static class UnionReader implements OrcValueReader<Object> {
private final OrcValueReader[] readers;

private UnionReader(List<OrcValueReader<?>> readers) {
Expand All @@ -175,20 +175,23 @@ private UnionReader(List<OrcValueReader<?>> readers) {
}

@Override
public InternalRow nonNullRead(ColumnVector vector, int row) {
InternalRow struct = new GenericInternalRow(readers.length + 1);
public Object nonNullRead(ColumnVector vector, int row) {
UnionColumnVector unionColumnVector = (UnionColumnVector) vector;

int fieldIndex = unionColumnVector.tags[row];
Object value = this.readers[fieldIndex].read(unionColumnVector.fields[fieldIndex], row);

for (int i = 0; i < readers.length; i += 1) {
struct.setNullAt(i + 1);
if (readers.length == 1) {
return value;
} else {
InternalRow struct = new GenericInternalRow(readers.length + 1);
for (int i = 0; i < readers.length; i += 1) {
struct.setNullAt(i + 1);
}
struct.update(0, fieldIndex);
struct.update(fieldIndex + 1, value);

return struct;
}
struct.update(0, fieldIndex);
struct.update(fieldIndex + 1, value);

return struct;
}
}

Expand Down
Loading