Skip to content
152 changes: 17 additions & 135 deletions orc/src/main/java/org/apache/iceberg/orc/ORCSchemaUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,14 @@

package org.apache.iceberg.orc;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import org.apache.iceberg.Schema;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.TypeUtil;
Expand Down Expand Up @@ -67,8 +64,8 @@ public TypeDescription type() {
}
}

private static final String ICEBERG_ID_ATTRIBUTE = "iceberg.id";
private static final String ICEBERG_REQUIRED_ATTRIBUTE = "iceberg.required";
static final String ICEBERG_ID_ATTRIBUTE = "iceberg.id";
static final String ICEBERG_REQUIRED_ATTRIBUTE = "iceberg.required";

/**
* The name of the ORC {@link TypeDescription} attribute indicating the Iceberg type corresponding to an
Expand All @@ -80,7 +77,7 @@ public TypeDescription type() {
* ORC long type. The values for this attribute are denoted in {@code LongType}.
*/
public static final String ICEBERG_LONG_TYPE_ATTRIBUTE = "iceberg.long-type";
private static final String ICEBERG_FIELD_LENGTH = "iceberg.length";
static final String ICEBERG_FIELD_LENGTH = "iceberg.length";

private static final ImmutableMap<Type.TypeID, TypeDescription.Category> TYPE_MAPPING =
ImmutableMap.<Type.TypeID, TypeDescription.Category>builder()
Expand Down Expand Up @@ -202,25 +199,27 @@ private static TypeDescription convert(Integer fieldId, Type type, boolean isReq

/**
* Convert an ORC schema to an Iceberg schema. This method handles the convertion from the original
* Iceberg column mapping IDs if present in the ORC column attributes, otherwise, ORC column IDs
* will be assigned following ORCs pre-order ID assignment.
* Iceberg column mapping IDs if present in the ORC column attributes, otherwise, ORC columns with no
* Iceberg IDs will be ignored and skipped in the conversion.
*
* @return the Iceberg schema
* @throws IllegalArgumentException if ORC schema has no columns with Iceberg ID attributes
*/
public static Schema convert(TypeDescription orcSchema) {
List<TypeDescription> children = orcSchema.getChildren();
List<String> childrenNames = orcSchema.getFieldNames();
Preconditions.checkState(children.size() == childrenNames.size(),
"Error in ORC file, children fields and names do not match.");

List<Types.NestedField> icebergFields = Lists.newArrayListWithExpectedSize(children.size());
AtomicInteger lastColumnId = new AtomicInteger(getMaxIcebergId(orcSchema));
for (int i = 0; i < children.size(); i++) {
icebergFields.add(convertOrcToIceberg(children.get(i), childrenNames.get(i),
lastColumnId::incrementAndGet));
OrcToIcebergVisitor schemaConverter = new OrcToIcebergVisitor();
List<Types.NestedField> fields = OrcToIcebergVisitor.visitSchema(orcSchema, schemaConverter).stream()
.filter(Optional::isPresent).map(Optional::get).collect(Collectors.toList());

if (fields.size() == 0) {
throw new IllegalArgumentException("ORC schema does not contain Iceberg IDs");
}

return new Schema(icebergFields);
return new Schema(fields);
}

/**
Expand Down Expand Up @@ -388,129 +387,12 @@ static int fieldId(TypeDescription orcType) {
return Integer.parseInt(idStr);
}

private static boolean isRequired(TypeDescription orcType) {
static boolean isOptional(TypeDescription orcType) {
String isRequiredStr = orcType.getAttributeValue(ICEBERG_REQUIRED_ATTRIBUTE);
if (isRequiredStr != null) {
return Boolean.parseBoolean(isRequiredStr);
}
return false;
}

private static Types.NestedField getIcebergType(int icebergID, String name, Type type,
boolean isRequired) {
return isRequired ?
Types.NestedField.required(icebergID, name, type) :
Types.NestedField.optional(icebergID, name, type);
}

private static Types.NestedField convertOrcToIceberg(TypeDescription orcType, String name,
TypeUtil.NextID nextID) {

final int icebergID = icebergID(orcType).orElseGet(nextID::get);
final boolean isRequired = isRequired(orcType);

switch (orcType.getCategory()) {
case BOOLEAN:
return getIcebergType(icebergID, name, Types.BooleanType.get(), isRequired);
case BYTE:
case SHORT:
case INT:
return getIcebergType(icebergID, name, Types.IntegerType.get(), isRequired);
case LONG:
String longAttributeValue = orcType.getAttributeValue(ICEBERG_LONG_TYPE_ATTRIBUTE);
LongType longType = longAttributeValue == null ? LongType.LONG : LongType.valueOf(longAttributeValue);
switch (longType) {
case TIME:
return getIcebergType(icebergID, name, Types.TimeType.get(), isRequired);
case LONG:
return getIcebergType(icebergID, name, Types.LongType.get(), isRequired);
default:
throw new IllegalStateException("Invalid Long type found in ORC type attribute");
}
case FLOAT:
return getIcebergType(icebergID, name, Types.FloatType.get(), isRequired);
case DOUBLE:
return getIcebergType(icebergID, name, Types.DoubleType.get(), isRequired);
case STRING:
case CHAR:
case VARCHAR:
return getIcebergType(icebergID, name, Types.StringType.get(), isRequired);
case BINARY:
String binaryAttributeValue = orcType.getAttributeValue(ICEBERG_BINARY_TYPE_ATTRIBUTE);
BinaryType binaryType = binaryAttributeValue == null ? BinaryType.BINARY :
BinaryType.valueOf(binaryAttributeValue);
switch (binaryType) {
case UUID:
return getIcebergType(icebergID, name, Types.UUIDType.get(), isRequired);
case FIXED:
int fixedLength = Integer.parseInt(orcType.getAttributeValue(ICEBERG_FIELD_LENGTH));
return getIcebergType(icebergID, name, Types.FixedType.ofLength(fixedLength), isRequired);
case BINARY:
return getIcebergType(icebergID, name, Types.BinaryType.get(), isRequired);
default:
throw new IllegalStateException("Invalid Binary type found in ORC type attribute");
}
case DATE:
return getIcebergType(icebergID, name, Types.DateType.get(), isRequired);
case TIMESTAMP:
return getIcebergType(icebergID, name, Types.TimestampType.withoutZone(), isRequired);
case TIMESTAMP_INSTANT:
return getIcebergType(icebergID, name, Types.TimestampType.withZone(), isRequired);
case DECIMAL:
return getIcebergType(icebergID, name,
Types.DecimalType.of(orcType.getPrecision(), orcType.getScale()),
isRequired);
case STRUCT: {
List<String> fieldNames = orcType.getFieldNames();
List<TypeDescription> fieldTypes = orcType.getChildren();
List<Types.NestedField> fields = new ArrayList<>(fieldNames.size());
for (int c = 0; c < fieldNames.size(); ++c) {
String childName = fieldNames.get(c);
TypeDescription type = fieldTypes.get(c);
Types.NestedField field = convertOrcToIceberg(type, childName, nextID);
fields.add(field);
}

return getIcebergType(icebergID, name, Types.StructType.of(fields), isRequired);
}
case LIST: {
TypeDescription elementType = orcType.getChildren().get(0);
Types.NestedField element = convertOrcToIceberg(elementType, "element", nextID);

Types.ListType listTypeWithElem = isRequired(elementType) ?
Types.ListType.ofRequired(element.fieldId(), element.type()) :
Types.ListType.ofOptional(element.fieldId(), element.type());
return isRequired ?
Types.NestedField.required(icebergID, name, listTypeWithElem) :
Types.NestedField.optional(icebergID, name, listTypeWithElem);
}
case MAP: {
TypeDescription keyType = orcType.getChildren().get(0);
Types.NestedField key = convertOrcToIceberg(keyType, "key", nextID);
TypeDescription valueType = orcType.getChildren().get(1);
Types.NestedField value = convertOrcToIceberg(valueType, "value", nextID);

Types.MapType mapTypeWithKV = isRequired(valueType) ?
Types.MapType.ofRequired(key.fieldId(), value.fieldId(), key.type(), value.type()) :
Types.MapType.ofOptional(key.fieldId(), value.fieldId(), key.type(), value.type());

return getIcebergType(icebergID, name, mapTypeWithKV, isRequired);
}
default:
// We don't have an answer for union types.
throw new IllegalArgumentException("Can't handle " + orcType);
return !Boolean.parseBoolean(isRequiredStr);
}
}

private static int getMaxIcebergId(TypeDescription originalOrcSchema) {
int maxId = icebergID(originalOrcSchema).orElse(0);
final List<TypeDescription> children = Optional.ofNullable(originalOrcSchema.getChildren())
.orElse(Collections.emptyList());
for (TypeDescription child : children) {
maxId = Math.max(maxId, getMaxIcebergId(child));
}

return maxId;
return true;
}

/**
Expand Down
105 changes: 105 additions & 0 deletions orc/src/main/java/org/apache/iceberg/orc/OrcSchemaVisitor.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iceberg.orc;

import java.util.List;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.orc.TypeDescription;

/**
* Generic visitor of an ORC Schema.
*/
public abstract class OrcSchemaVisitor<T> {

public static <T> List<T> visitSchema(TypeDescription schema, OrcSchemaVisitor<T> visitor) {
Preconditions.checkArgument(schema.getId() == 0, "TypeDescription must be root schema.");

List<TypeDescription> fields = schema.getChildren();
List<String> names = schema.getFieldNames();

return visitFields(fields, names, visitor);
}

public static <T> T visit(TypeDescription schema, OrcSchemaVisitor<T> visitor) {
switch (schema.getCategory()) {
case STRUCT:
return visitRecord(schema, visitor);

case UNION:
throw new UnsupportedOperationException("Cannot handle " + schema);

case LIST:
return visitor.list(schema, visit(schema.getChildren().get(0), visitor));

case MAP:
return visitor.map(schema, visit(schema.getChildren().get(0), visitor),
visit(schema.getChildren().get(1), visitor));

default:
return visitor.primitive(schema);
}
}

private static <T> List<T> visitFields(List<TypeDescription> fields, List<String> names,
OrcSchemaVisitor<T> visitor) {
Preconditions.checkArgument(fields.size() == names.size(), "Not all fields have names in ORC struct");

List<T> results = Lists.newArrayListWithExpectedSize(fields.size());
for (int i = 0; i < fields.size(); i++) {
TypeDescription field = fields.get(i);
String name = names.get(i);
visitor.beforeField(name, field);
try {
results.add(visit(field, visitor));
} finally {
visitor.afterField(name, field);
}
}
return results;
}

private static <T> T visitRecord(TypeDescription record, OrcSchemaVisitor<T> visitor) {
List<TypeDescription> fields = record.getChildren();
List<String> names = record.getFieldNames();

return visitor.record(record, names, visitFields(fields, names, visitor));
}

public void beforeField(String name, TypeDescription type) {}

public void afterField(String name, TypeDescription type) {}

public T record(TypeDescription record, List<String> names, List<T> fields) {
return null;
}

public T list(TypeDescription array, T element) {
return null;
}

public T map(TypeDescription map, T key, T value) {
return null;
}

public T primitive(TypeDescription primitive) {
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ public static <T> T visit(Type iType, TypeDescription schema, OrcSchemaWithTypeV
Types.ListType list = iType != null ? iType.asListType() : null;
return visitor.list(
list, schema,
visit(list.elementType(), schema.getChildren().get(0), visitor));
visit(list != null ? list.elementType() : null, schema.getChildren().get(0), visitor));

case MAP:
Types.MapType map = iType != null ? iType.asMapType() : null;
Expand Down
Loading