Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,11 @@ public Schema record(Schema record, List<String> names, Iterable<Schema.Field> s
updatedFields.add(avroField);

} else {
// TODO when we later add the API support for default values, the below condition:
// (field.isRequired() && field.initialDefaultValue() != null)
// can also pass the validation, and we simply don't need to add the field to the avro file
// read
// schema, since the default value read is handled at Iceberg level by constantMap
Preconditions.checkArgument(
field.isOptional() || MetadataColumns.metadataFieldIds().contains(field.fieldId()),
"Missing required field: %s",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.avro;

import java.nio.ByteBuffer;
import java.util.List;
import java.util.Map;
import org.apache.avro.generic.GenericData;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.ByteBuffers;

public class SingleValueToAvroValue {

private SingleValueToAvroValue() {}

public static Object convert(Type type, Object value) {
switch (type.typeId()) {
case STRUCT:
Types.StructType structType = type.asNestedType().asStructType();
StructLike structValue = (StructLike) value;
GenericData.Record rec = new GenericData.Record(AvroSchemaUtil.convert(type));

for (int i = 0; i < structValue.size(); i += 1) {
Type fieldType = structType.fields().get(i).type();
rec.put(i, convert(fieldType, structValue.get(i, fieldType.typeId().javaClass())));
}

return rec;

case LIST:
Types.ListType listType = type.asNestedType().asListType();
Type elementType = listType.elementType();
List<Object> listValue = (List<Object>) value;

List<Object> list = Lists.newArrayListWithExpectedSize(listValue.size());
for (Object o : listValue) {
list.add(convert(elementType, o));
}

return list;

case MAP:
Types.MapType mapType = type.asNestedType().asMapType();
Map<Object, Object> mapValue = (Map<Object, Object>) value;

Map<Object, Object> map = Maps.newLinkedHashMap();
for (Map.Entry<Object, Object> entry : mapValue.entrySet()) {
map.put(
convert(mapType.keyType(), entry.getKey()),
convert(mapType.valueType(), entry.getValue()));
}

return map;

default:
return convertPrimitive(type.asPrimitiveType(), value);
}
}

private static Object convertPrimitive(Type.PrimitiveType primitive, Object value) {
// For the primitives that Avro needs a different type than Spark, fix
// them here.
switch (primitive.typeId()) {
case FIXED:
return new GenericData.Fixed(
AvroSchemaUtil.convert(primitive), ByteBuffers.toByteArray((ByteBuffer) value));
default:
return value;
}
}
}
7 changes: 7 additions & 0 deletions data/src/main/java/org/apache/iceberg/data/GenericReader.java
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,13 @@ private CloseableIterable<Record> openFile(FileScanTask task, Schema fileProject
InputFile input = io.newInputFile(task.file().path().toString());
Map<Integer, ?> partition =
PartitionUtil.constantsMap(task, IdentityPartitionConverters::convertConstant);
//TODO: Construct default value map from table schema, and merge it with the partition map to consolidate one id
// to constant map
//
// pseudo code demo below
// Map<Integer, ?> defaultValueMap = <logic to construct default value map from tableSchema>
// partition = partition.putAll(defaultValueMap); # hopefully we rename the `partition` to something else like
// idToConstant

switch (task.file().format()) {
case AVRO:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import org.apache.iceberg.mapping.NameMapping;
import org.apache.iceberg.mapping.NameMappingParser;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.spark.SparkSchemaUtil;
import org.apache.iceberg.types.Type;
Expand All @@ -59,6 +60,8 @@
import org.apache.spark.rdd.InputFileBlockHolder;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.catalyst.expressions.GenericInternalRow;
import org.apache.spark.sql.catalyst.util.ArrayBasedMapData;
import org.apache.spark.sql.catalyst.util.GenericArrayData;
import org.apache.spark.sql.types.Decimal;
import org.apache.spark.unsafe.types.UTF8String;
import org.slf4j.Logger;
Expand All @@ -69,7 +72,7 @@
*
* @param <T> is the Java class returned by this reader whose objects contain one or more rows.
*/
abstract class BaseReader<T, TaskT extends ScanTask> implements Closeable {
public abstract class BaseReader<T, TaskT extends ScanTask> implements Closeable {
private static final Logger LOG = LoggerFactory.getLogger(BaseReader.class);

private final Table table;
Expand Down Expand Up @@ -193,15 +196,19 @@ private EncryptedInputFile toEncryptedInputFile(ContentFile<?> file) {
}

protected Map<Integer, ?> constantsMap(ContentScanTask<?> task, Schema readSchema) {
// TODO: Add default value map construction and merge it with partition column constant map
// Map<Integer, ?> defaultMap= <logic to construct default map from readSchema>
if (readSchema.findField(MetadataColumns.PARTITION_COLUMN_ID) != null) {
StructType partitionType = Partitioning.partitionType(table);
return PartitionUtil.constantsMap(task, partitionType, BaseReader::convertConstant);
// return PartitionUtil.constantsMap(task, partitionType, BaseReader::convertConstant).putAll(defaultMap);
} else {
return PartitionUtil.constantsMap(task, BaseReader::convertConstant);
// return PartitionUtil.constantsMap(task, BaseReader::convertConstant).putAll(defaultMap);
}
}

protected static Object convertConstant(Type type, Object value) {
public static Object convertConstant(Type type, Object value) {
if (value == null) {
return null;
}
Expand All @@ -224,6 +231,21 @@ protected static Object convertConstant(Type type, Object value) {
return ByteBuffers.toByteArray((ByteBuffer) value);
case BINARY:
return ByteBuffers.toByteArray((ByteBuffer) value);
case UUID:
return UTF8String.fromString(value.toString());
case LIST:
return new GenericArrayData(
((List<?>) value)
.stream().map(e -> convertConstant(type.asListType().elementType(), e)).toArray());
case MAP:
List<Object> keyList = Lists.newArrayList();
List<Object> valueList = Lists.newArrayList();
for (Map.Entry<?, ?> entry : ((Map<?, ?>) value).entrySet()) {
keyList.add(convertConstant(type.asMapType().keyType(), entry.getKey()));
valueList.add(convertConstant(type.asMapType().valueType(), entry.getValue()));
}
return new ArrayBasedMapData(
new GenericArrayData(keyList.toArray()), new GenericArrayData(valueList.toArray()));
case STRUCT:
StructType structType = (StructType) type;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -701,7 +701,6 @@ private static void assertEquals(String context, DataType type, Object expected,
.as("Actual should be an InternalRow: " + context)
.isInstanceOf(InternalRow.class);
assertEquals(context, (StructType) type, (InternalRow) expected, (InternalRow) actual);

} else if (type instanceof ArrayType) {
Assertions.assertThat(expected)
.as("Expected should be an ArrayData: " + context)
Expand All @@ -710,7 +709,6 @@ private static void assertEquals(String context, DataType type, Object expected,
.as("Actual should be an ArrayData: " + context)
.isInstanceOf(ArrayData.class);
assertEquals(context, (ArrayType) type, (ArrayData) expected, (ArrayData) actual);

} else if (type instanceof MapType) {
Assertions.assertThat(expected)
.as("Expected should be a MapData: " + context)
Expand All @@ -719,7 +717,6 @@ private static void assertEquals(String context, DataType type, Object expected,
.as("Actual should be a MapData: " + context)
.isInstanceOf(MapData.class);
assertEquals(context, (MapType) type, (MapData) expected, (MapData) actual);

} else if (type instanceof BinaryType) {
assertEqualBytes(context, (byte[]) expected, (byte[]) actual);
} else {
Expand Down Expand Up @@ -808,4 +805,20 @@ public static Set<DeleteFile> deleteFiles(Table table) {

return deleteFiles;
}

public static Object convertJavaPrimitiveToAvroPrimitive(
Map<Type, org.apache.avro.Schema> typeToSchema, Type.PrimitiveType primitive, Object value) {
// For the primitives that Avro needs a different type than Spark, fix
// them here.
switch (primitive.typeId()) {
case FIXED:
return new GenericData.Fixed(typeToSchema.get(primitive), (byte[]) value);
case BINARY:
return ByteBuffer.wrap((byte[]) value);
case UUID:
return UUID.nameUUIDFromBytes((byte[]) value);
default:
return value;
}
}
}
Loading