Skip to content
17 changes: 8 additions & 9 deletions api/src/main/java/org/apache/iceberg/types/Types.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
package org.apache.iceberg.types;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Locale;
Expand Down Expand Up @@ -457,7 +456,7 @@ private static void validateDefaultValue(Object defaultValue, Type type) {
}
switch (type.typeId()) {
case STRUCT:
Preconditions.checkArgument(Map.class.isInstance(defaultValue),
Preconditions.checkArgument(defaultValue instanceof Map,
"defaultValue should be a Map from fields names to values, for StructType");
Map<String, Object> defaultStruct = (Map<String, Object>) defaultValue;
if (defaultStruct.isEmpty()) {
Expand All @@ -470,17 +469,17 @@ private static void validateDefaultValue(Object defaultValue, Type type) {
break;

case LIST:
Preconditions.checkArgument(defaultValue instanceof ArrayList,
"defaultValue should be an ArrayList of Objects, for ListType");
List<Object> defaultArrayList = (ArrayList<Object>) defaultValue;
if (defaultArrayList.size() == 0) {
Preconditions.checkArgument(defaultValue instanceof List,
"defaultValue should be an List of Objects, for ListType");
List<Object> defaultList = (List<Object>) defaultValue;
if (defaultList.size() == 0) {
return;
}
defaultArrayList.forEach(dv -> NestedField.validateDefaultValue(dv, type.asListType().elementField.type));
defaultList.forEach(dv -> NestedField.validateDefaultValue(dv, type.asListType().elementField.type));
break;

case MAP:
Preconditions.checkArgument(Map.class.isInstance(defaultValue),
Preconditions.checkArgument(defaultValue instanceof Map,
"defaultValue should be an instance of Map for MapType");
Map<Object, Object> defaultMap = (Map<Object, Object>) defaultValue;
if (defaultMap.isEmpty()) {
Expand All @@ -494,7 +493,7 @@ private static void validateDefaultValue(Object defaultValue, Type type) {

case FIXED:
case BINARY:
Preconditions.checkArgument(byte[].class.isInstance(defaultValue),
Preconditions.checkArgument(defaultValue instanceof byte[],
"defaultValue should be an instance of byte[] for TypeId.%s, but defaultValue.class = %s",
type.typeId().name(), defaultValue.getClass().getCanonicalName());
break;
Expand Down
5 changes: 4 additions & 1 deletion orc/src/main/java/org/apache/iceberg/orc/ORCSchemaUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,9 @@ private static TypeDescription buildOrcProjection(Integer fieldId, Type type, bo
// Using suffix _r to avoid potential underlying issues in ORC reader
// with reused column names between ORC and Iceberg;
// e.g. renaming column c -> d and adding new column d
if (mapping.get(nestedField.fieldId()) == null && nestedField.hasDefaultValue()) {
continue;
}
String name = Optional.ofNullable(mapping.get(nestedField.fieldId()))
.map(OrcField::name)
.orElseGet(() -> nestedField.name() + "_r" + nestedField.fieldId());
Expand Down Expand Up @@ -387,7 +390,7 @@ static Optional<Integer> icebergID(TypeDescription orcType) {
.map(Integer::parseInt);
}

static int fieldId(TypeDescription orcType) {
public static int fieldId(TypeDescription orcType) {
String idStr = orcType.getAttributeValue(ICEBERG_ID_ATTRIBUTE);
Preconditions.checkNotNull(idStr, "Missing expected '%s' property", ICEBERG_ID_ATTRIBUTE);
return Integer.parseInt(idStr);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ public static <T> T visit(
public static <T> T visit(Type iType, TypeDescription schema, OrcSchemaWithTypeVisitor<T> visitor) {
switch (schema.getCategory()) {
case STRUCT:
return visitRecord(iType != null ? iType.asStructType() : null, schema, visitor);
return visitor.visitRecord(iType != null ? iType.asStructType() : null, schema, visitor);

case UNION:
throw new UnsupportedOperationException("Cannot handle " + schema);
Expand All @@ -58,7 +58,7 @@ public static <T> T visit(Type iType, TypeDescription schema, OrcSchemaWithTypeV
}
}

private static <T> T visitRecord(
protected T visitRecord(
Types.StructType struct, TypeDescription record, OrcSchemaWithTypeVisitor<T> visitor) {
List<TypeDescription> fields = record.getChildren();
List<String> names = record.getFieldNames();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iceberg.spark;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.iceberg.MetadataColumns;
import org.apache.iceberg.orc.ORCSchemaUtil;
import org.apache.iceberg.orc.OrcSchemaWithTypeVisitor;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.spark.source.BaseDataReader;
import org.apache.iceberg.types.Types;
import org.apache.orc.TypeDescription;

public abstract class OrcSchemaWithTypeVisitorSpark<T> extends OrcSchemaWithTypeVisitor<T> {

private final Map<Integer, Object> idToConstant;

public Map<Integer, Object> getIdToConstant() {
return idToConstant;
}

protected OrcSchemaWithTypeVisitorSpark(Map<Integer, ?> idToConstant) {
this.idToConstant = new HashMap<>();
this.idToConstant.putAll(idToConstant);
}

@Override
protected T visitRecord(
Types.StructType struct, TypeDescription record, OrcSchemaWithTypeVisitor<T> visitor) {
Preconditions.checkState(
icebergFiledIdsContainOrcFieldIdsInOrder(struct, record),
"Iceberg schema and ORC schema doesn't align, please call ORCSchemaUtil.buildOrcProjection" +
"to get an aligned ORC schema first!"
);
List<Types.NestedField> iFields = struct.fields();
List<TypeDescription> fields = record.getChildren();
List<String> names = record.getFieldNames();
List<T> results = Lists.newArrayListWithExpectedSize(fields.size());

for (int i = 0, j = 0; i < iFields.size(); i++) {
Types.NestedField iField = iFields.get(i);
TypeDescription field = j < fields.size() ? fields.get(j) : null;
if (field == null || (iField.fieldId() != ORCSchemaUtil.fieldId(field))) {
// there are 3 cases where we need to use idToConstant for an iField
// 1. The field is MetadataColumns.ROW_POSITION, we build a RowPositionReader
// 2. The field is a partition column, we build a ConstantReader
// 3. The field should be read using the default value, where we build a ConstantReader
// Here we should only need to update idToConstant when it's the 3rd case,
// because the first 2 cases have been handled by logic in PartitionUtil.constantsMap
if (!iField.equals(MetadataColumns.ROW_POSITION) &&
!idToConstant.containsKey(iField.fieldId())) {
idToConstant.put(iField.fieldId(), BaseDataReader.convertConstant(iField.type(), iField.getDefaultValue()));
}
} else {
results.add(visit(iField.type(), field, visitor));
j++;
}
}
return visitor.record(struct, record, names, results);
}

private static boolean icebergFiledIdsContainOrcFieldIdsInOrder(Types.StructType struct, TypeDescription record) {
List<Integer> icebergIDList = struct.fields().stream().map(Types.NestedField::fieldId).collect(Collectors.toList());
List<Integer> orcIDList = record.getChildren().stream().map(ORCSchemaUtil::fieldId).collect(Collectors.toList());

return containsInOrder(icebergIDList, orcIDList);
}

/**
* Checks whether the first list contains all the integers
* in the same order as regarding to the second list, the first
* list can contain extra integers that the second list doesn't,
* but the ones that exist in the second list should occur in the
* same relative order in the first list.
*
* @param list1 the first list
* @param list2 the second list
* @return the condition check result
*/
private static boolean containsInOrder(List<Integer> list1, List<Integer> list2) {
if (list1.size() < list2.size()) {
return false;
}

for (int i = 0, j = 0; j < list2.size(); j++) {
if (i >= list1.size()) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hoist this check to the for loop condition?

return false;
}
while (!list1.get(i).equals(list2.get(j))) {
i++;
if (i >= list1.size()) {
return false;
}
}
i++;
}
return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.iceberg.orc.OrcValueReader;
import org.apache.iceberg.orc.OrcValueReaders;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.spark.OrcSchemaWithTypeVisitorSpark;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types;
import org.apache.orc.TypeDescription;
Expand Down Expand Up @@ -62,17 +63,16 @@ public void setBatchContext(long batchOffsetInFile) {
reader.setBatchContext(batchOffsetInFile);
}

private static class ReadBuilder extends OrcSchemaWithTypeVisitor<OrcValueReader<?>> {
private final Map<Integer, ?> idToConstant;
public static class ReadBuilder extends OrcSchemaWithTypeVisitorSpark<OrcValueReader<?>> {

private ReadBuilder(Map<Integer, ?> idToConstant) {
this.idToConstant = idToConstant;
super(idToConstant);
}

@Override
public OrcValueReader<?> record(
Types.StructType expected, TypeDescription record, List<String> names, List<OrcValueReader<?>> fields) {
return SparkOrcValueReaders.struct(fields, expected, idToConstant);
return SparkOrcValueReaders.struct(fields, expected, getIdToConstant());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iceberg.spark.data.vectorized;

import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.Decimal;
import org.apache.spark.sql.vectorized.ColumnVector;
import org.apache.spark.sql.vectorized.ColumnarArray;
import org.apache.spark.sql.vectorized.ColumnarMap;
import org.apache.spark.unsafe.types.UTF8String;

public class ConstantArrayColumnVector extends ConstantColumnVector {

private final Object[] constantArray;

public ConstantArrayColumnVector(DataType type, int batchSize, Object[] constantArray) {
super(type, batchSize, constantArray);
this.constantArray = constantArray;
}

@Override
public boolean getBoolean(int rowId) {
return (boolean) constantArray[rowId];
}

@Override
public byte getByte(int rowId) {
return (byte) constantArray[rowId];
}

@Override
public short getShort(int rowId) {
return (short) constantArray[rowId];
}

@Override
public int getInt(int rowId) {
return (int) constantArray[rowId];
}

@Override
public long getLong(int rowId) {
return (long) constantArray[rowId];
}

@Override
public float getFloat(int rowId) {
return (float) constantArray[rowId];
}

@Override
public double getDouble(int rowId) {
return (double) constantArray[rowId];
}

@Override
public ColumnarArray getArray(int rowId) {
return null;
}

@Override
public ColumnarMap getMap(int ordinal) {
return null;
}

@Override
public Decimal getDecimal(int rowId, int precision, int scale) {
return (Decimal) constantArray[rowId];
}

@Override
public UTF8String getUTF8String(int rowId) {
return (UTF8String) constantArray[rowId];
}

@Override
public byte[] getBinary(int rowId) {
return (byte[]) constantArray[rowId];
}

@Override
protected ColumnVector getChild(int ordinal) {
return null;
}
}
Loading