Skip to content
254 changes: 146 additions & 108 deletions orc/src/main/java/org/apache/iceberg/orc/ORCSchemaUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,11 @@

package org.apache.iceberg.orc;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import org.apache.iceberg.Schema;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
Expand Down Expand Up @@ -67,8 +65,8 @@ public TypeDescription type() {
}
}

private static final String ICEBERG_ID_ATTRIBUTE = "iceberg.id";
private static final String ICEBERG_REQUIRED_ATTRIBUTE = "iceberg.required";
static final String ICEBERG_ID_ATTRIBUTE = "iceberg.id";
static final String ICEBERG_REQUIRED_ATTRIBUTE = "iceberg.required";

/**
* The name of the ORC {@link TypeDescription} attribute indicating the Iceberg type corresponding to an
Expand Down Expand Up @@ -202,10 +200,11 @@ private static TypeDescription convert(Integer fieldId, Type type, boolean isReq

/**
* Convert an ORC schema to an Iceberg schema. This method handles the convertion from the original
* Iceberg column mapping IDs if present in the ORC column attributes, otherwise, ORC column IDs
* will be assigned following ORCs pre-order ID assignment.
* Iceberg column mapping IDs if present in the ORC column attributes, otherwise, ORC columns with no
* Iceberg IDs will be ignored and skipped in the conversion.
*
* @return the Iceberg schema
* @throws IllegalArgumentException if ORC schema has no columns with Iceberg ID attributes
*/
public static Schema convert(TypeDescription orcSchema) {
List<TypeDescription> children = orcSchema.getChildren();
Expand All @@ -214,10 +213,13 @@ public static Schema convert(TypeDescription orcSchema) {
"Error in ORC file, children fields and names do not match.");

List<Types.NestedField> icebergFields = Lists.newArrayListWithExpectedSize(children.size());
AtomicInteger lastColumnId = new AtomicInteger(getMaxIcebergId(orcSchema));
for (int i = 0; i < children.size(); i++) {
icebergFields.add(convertOrcToIceberg(children.get(i), childrenNames.get(i),
lastColumnId::incrementAndGet));
OrcToIcebergVisitor schemaConverter = new OrcToIcebergVisitor(icebergToOrcMapping("root", orcSchema));
for (TypeDescription child : orcSchema.getChildren()) {
OrcToIcebergVisitor.visit(child, schemaConverter).ifPresent(icebergFields::add);
}

if (icebergFields.size() == 0) {
throw new IllegalArgumentException("ORC schema has no Iceberg mappings");
}

return new Schema(icebergFields);
Expand Down Expand Up @@ -403,114 +405,150 @@ private static Types.NestedField getIcebergType(int icebergID, String name, Type
Types.NestedField.optional(icebergID, name, type);
}

private static Types.NestedField convertOrcToIceberg(TypeDescription orcType, String name,
TypeUtil.NextID nextID) {
private static class OrcToIcebergVisitor extends OrcSchemaVisitor<Optional<Types.NestedField>> {

final int icebergID = icebergID(orcType).orElseGet(nextID::get);
final boolean isRequired = isRequired(orcType);
private final Map<Integer, OrcField> icebergToOrcMapping;

switch (orcType.getCategory()) {
case BOOLEAN:
return getIcebergType(icebergID, name, Types.BooleanType.get(), isRequired);
case BYTE:
case SHORT:
case INT:
return getIcebergType(icebergID, name, Types.IntegerType.get(), isRequired);
case LONG:
String longAttributeValue = orcType.getAttributeValue(ICEBERG_LONG_TYPE_ATTRIBUTE);
LongType longType = longAttributeValue == null ? LongType.LONG : LongType.valueOf(longAttributeValue);
switch (longType) {
case TIME:
return getIcebergType(icebergID, name, Types.TimeType.get(), isRequired);
case LONG:
return getIcebergType(icebergID, name, Types.LongType.get(), isRequired);
default:
throw new IllegalStateException("Invalid Long type found in ORC type attribute");
}
case FLOAT:
return getIcebergType(icebergID, name, Types.FloatType.get(), isRequired);
case DOUBLE:
return getIcebergType(icebergID, name, Types.DoubleType.get(), isRequired);
case STRING:
case CHAR:
case VARCHAR:
return getIcebergType(icebergID, name, Types.StringType.get(), isRequired);
case BINARY:
String binaryAttributeValue = orcType.getAttributeValue(ICEBERG_BINARY_TYPE_ATTRIBUTE);
BinaryType binaryType = binaryAttributeValue == null ? BinaryType.BINARY :
BinaryType.valueOf(binaryAttributeValue);
switch (binaryType) {
case UUID:
return getIcebergType(icebergID, name, Types.UUIDType.get(), isRequired);
case FIXED:
int fixedLength = Integer.parseInt(orcType.getAttributeValue(ICEBERG_FIELD_LENGTH));
return getIcebergType(icebergID, name, Types.FixedType.ofLength(fixedLength), isRequired);
case BINARY:
return getIcebergType(icebergID, name, Types.BinaryType.get(), isRequired);
default:
throw new IllegalStateException("Invalid Binary type found in ORC type attribute");
}
case DATE:
return getIcebergType(icebergID, name, Types.DateType.get(), isRequired);
case TIMESTAMP:
return getIcebergType(icebergID, name, Types.TimestampType.withoutZone(), isRequired);
case TIMESTAMP_INSTANT:
return getIcebergType(icebergID, name, Types.TimestampType.withZone(), isRequired);
case DECIMAL:
return getIcebergType(icebergID, name,
Types.DecimalType.of(orcType.getPrecision(), orcType.getScale()),
isRequired);
case STRUCT: {
List<String> fieldNames = orcType.getFieldNames();
List<TypeDescription> fieldTypes = orcType.getChildren();
List<Types.NestedField> fields = new ArrayList<>(fieldNames.size());
for (int c = 0; c < fieldNames.size(); ++c) {
String childName = fieldNames.get(c);
TypeDescription type = fieldTypes.get(c);
Types.NestedField field = convertOrcToIceberg(type, childName, nextID);
fields.add(field);
}
OrcToIcebergVisitor(Map<Integer, OrcField> icebergToOrcMapping) {
this.icebergToOrcMapping = icebergToOrcMapping;
}

return getIcebergType(icebergID, name, Types.StructType.of(fields), isRequired);
}
case LIST: {
TypeDescription elementType = orcType.getChildren().get(0);
Types.NestedField element = convertOrcToIceberg(elementType, "element", nextID);

Types.ListType listTypeWithElem = isRequired(elementType) ?
Types.ListType.ofRequired(element.fieldId(), element.type()) :
Types.ListType.ofOptional(element.fieldId(), element.type());
return isRequired ?
Types.NestedField.required(icebergID, name, listTypeWithElem) :
Types.NestedField.optional(icebergID, name, listTypeWithElem);
@Override
public Optional<Types.NestedField> record(TypeDescription record, List<String> names,
List<Optional<Types.NestedField>> fields) {
boolean isRequired = isRequired(record);
Optional<Integer> icebergIdOpt = icebergID(record);
if (!icebergIdOpt.isPresent() || fields.size() == 0) {
return Optional.empty();
}
case MAP: {
TypeDescription keyType = orcType.getChildren().get(0);
Types.NestedField key = convertOrcToIceberg(keyType, "key", nextID);
TypeDescription valueType = orcType.getChildren().get(1);
Types.NestedField value = convertOrcToIceberg(valueType, "value", nextID);

Types.MapType mapTypeWithKV = isRequired(valueType) ?
Types.MapType.ofRequired(key.fieldId(), value.fieldId(), key.type(), value.type()) :
Types.MapType.ofOptional(key.fieldId(), value.fieldId(), key.type(), value.type());
Types.StructType structType = Types.StructType.of(
fields.stream().filter(Optional::isPresent).map(Optional::get).collect(Collectors.toList()));
return Optional.of(getIcebergType(icebergIdOpt.get(), icebergToOrcMapping.get(icebergIdOpt.get()).name(),
structType, isRequired));
}

@Override
public Optional<Types.NestedField> list(TypeDescription array,
Optional<Types.NestedField> element) {
boolean isRequired = isRequired(array);
Optional<Integer> icebergIdOpt = icebergID(array);

return getIcebergType(icebergID, name, mapTypeWithKV, isRequired);
if (!icebergIdOpt.isPresent() || !element.isPresent()) {
return Optional.empty();
}
default:
// We don't have an answer for union types.
throw new IllegalArgumentException("Can't handle " + orcType);

Types.NestedField foundElement = element.get();
Types.ListType listTypeWithElem = isRequired(array.getChildren().get(0)) ?
Types.ListType.ofRequired(foundElement.fieldId(), foundElement.type()) :
Types.ListType.ofOptional(foundElement.fieldId(), foundElement.type());
return Optional.of(getIcebergType(icebergIdOpt.get(),
icebergToOrcMapping.get(icebergIdOpt.get()).name(), listTypeWithElem, isRequired));
}
}

private static int getMaxIcebergId(TypeDescription originalOrcSchema) {
int maxId = icebergID(originalOrcSchema).orElse(0);
final List<TypeDescription> children = Optional.ofNullable(originalOrcSchema.getChildren())
.orElse(Collections.emptyList());
for (TypeDescription child : children) {
maxId = Math.max(maxId, getMaxIcebergId(child));
@Override
public Optional<Types.NestedField> map(TypeDescription map, Optional<Types.NestedField> key,
Optional<Types.NestedField> value) {
boolean isRequired = isRequired(map);
Optional<Integer> icebergIdOpt = icebergID(map);

if (!icebergIdOpt.isPresent() || !key.isPresent() || !value.isPresent()) {
return Optional.empty();
}

Types.NestedField foundKey = key.get();
Types.NestedField foundValue = value.get();
Types.MapType mapTypeWithKV = isRequired(map.getChildren().get(1)) ?
Types.MapType.ofRequired(foundKey.fieldId(), foundValue.fieldId(), foundKey.type(), foundValue.type()) :
Types.MapType.ofOptional(foundKey.fieldId(), foundValue.fieldId(), foundKey.type(), foundValue.type());

return Optional.of(getIcebergType(icebergIdOpt.get(), icebergToOrcMapping.get(icebergIdOpt.get()).name(),
mapTypeWithKV, isRequired));
}

return maxId;
@Override
public Optional<Types.NestedField> primitive(TypeDescription primitive) {
boolean isRequired = isRequired(primitive);
Optional<Integer> icebergIdOpt = icebergID(primitive);

if (!icebergIdOpt.isPresent()) {
return Optional.empty();
}

final Types.NestedField foundField;
int icebergID = icebergIdOpt.get();
String name = icebergToOrcMapping.get(icebergID).name();
switch (primitive.getCategory()) {
case BOOLEAN:
foundField = getIcebergType(icebergID, name, Types.BooleanType.get(), isRequired);
break;
case BYTE:
case SHORT:
case INT:
foundField = getIcebergType(icebergID, name, Types.IntegerType.get(), isRequired);
break;
case LONG:
String longAttributeValue = primitive.getAttributeValue(ICEBERG_LONG_TYPE_ATTRIBUTE);
LongType longType = longAttributeValue == null ? LongType.LONG : LongType.valueOf(longAttributeValue);
switch (longType) {
case TIME:
foundField = getIcebergType(icebergID, name, Types.TimeType.get(), isRequired);
break;
case LONG:
foundField = getIcebergType(icebergID, name, Types.LongType.get(), isRequired);
break;
default:
throw new IllegalStateException("Invalid Long type found in ORC type attribute");
}
break;
case FLOAT:
foundField = getIcebergType(icebergID, name, Types.FloatType.get(), isRequired);
break;
case DOUBLE:
foundField = getIcebergType(icebergID, name, Types.DoubleType.get(), isRequired);
break;
case STRING:
case CHAR:
case VARCHAR:
foundField = getIcebergType(icebergID, name, Types.StringType.get(), isRequired);
break;
case BINARY:
String binaryAttributeValue = primitive.getAttributeValue(ICEBERG_BINARY_TYPE_ATTRIBUTE);
BinaryType binaryType = binaryAttributeValue == null ? BinaryType.BINARY :
BinaryType.valueOf(binaryAttributeValue);
switch (binaryType) {
case UUID:
foundField = getIcebergType(icebergID, name, Types.UUIDType.get(), isRequired);
break;
case FIXED:
int fixedLength = Integer.parseInt(primitive.getAttributeValue(ICEBERG_FIELD_LENGTH));
foundField = getIcebergType(icebergID, name, Types.FixedType.ofLength(fixedLength), isRequired);
break;
case BINARY:
foundField = getIcebergType(icebergID, name, Types.BinaryType.get(), isRequired);
break;
default:
throw new IllegalStateException("Invalid Binary type found in ORC type attribute");
}
break;
case DATE:
foundField = getIcebergType(icebergID, name, Types.DateType.get(), isRequired);
break;
case TIMESTAMP:
foundField = getIcebergType(icebergID, name, Types.TimestampType.withoutZone(), isRequired);
break;
case TIMESTAMP_INSTANT:
foundField = getIcebergType(icebergID, name, Types.TimestampType.withZone(), isRequired);
break;
case DECIMAL:
foundField = getIcebergType(icebergID, name,
Types.DecimalType.of(primitive.getPrecision(), primitive.getScale()), isRequired);
break;
default:
throw new IllegalArgumentException("Can't handle " + primitive);
}
return Optional.of(foundField);
}
}

/**
Expand Down
83 changes: 83 additions & 0 deletions orc/src/main/java/org/apache/iceberg/orc/OrcSchemaVisitor.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iceberg.orc;

import java.util.List;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.orc.TypeDescription;

/**
* Generic visitor of an ORC Schema.
*/
public abstract class OrcSchemaVisitor<T> {

public static <T> T visit(TypeDescription schema, OrcSchemaVisitor<T> visitor) {
switch (schema.getCategory()) {
case STRUCT:
return visitRecord(schema, visitor);

case UNION:
throw new UnsupportedOperationException("Cannot handle " + schema);

case LIST:
return visitor.list(
schema, visit(schema.getChildren().get(0), visitor));

case MAP:
return visitor.map(
schema,
visit(schema.getChildren().get(0), visitor),
visit(schema.getChildren().get(1), visitor));

default:
return visitor.primitive(schema);
}
}

private static <T> T visitRecord(TypeDescription record, OrcSchemaVisitor<T> visitor) {
List<TypeDescription> fields = record.getChildren();
List<String> names = record.getFieldNames();
List<T> results = Lists.newArrayListWithExpectedSize(fields.size());
List<String> includedNames = Lists.newArrayListWithExpectedSize(names.size());
for (int i = 0; i < fields.size(); ++i) {
TypeDescription field = fields.get(i);
String name = names.get(i);
results.add(visit(field, visitor));
includedNames.add(name);
}
return visitor.record(record, includedNames, results);
}

public T record(TypeDescription record, List<String> names, List<T> fields) {
return null;
}

public T list(TypeDescription array, T element) {
return null;
}

public T map(TypeDescription map, T key, T value) {
return null;
}

public T primitive(TypeDescription primitive) {
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ public static <T> T visit(Type iType, TypeDescription schema, OrcSchemaWithTypeV
Types.ListType list = iType != null ? iType.asListType() : null;
return visitor.list(
list, schema,
visit(list.elementType(), schema.getChildren().get(0), visitor));
visit(list != null ? list.elementType() : null, schema.getChildren().get(0), visitor));

case MAP:
Types.MapType map = iType != null ? iType.asMapType() : null;
Expand Down
Loading