diff --git a/core/src/main/java/org/apache/iceberg/avro/AvroSchemaUtil.java b/core/src/main/java/org/apache/iceberg/avro/AvroSchemaUtil.java index 929fb5f141..1b1be01f41 100644 --- a/core/src/main/java/org/apache/iceberg/avro/AvroSchemaUtil.java +++ b/core/src/main/java/org/apache/iceberg/avro/AvroSchemaUtil.java @@ -135,7 +135,7 @@ public static boolean isOptionSchema(Schema schema) { return false; } - static Schema toOption(Schema schema) { + public static Schema toOption(Schema schema) { if (schema.getType() == UNION) { Preconditions.checkArgument(isOptionSchema(schema), "Union schemas are not supported: %s", schema); @@ -145,7 +145,7 @@ static Schema toOption(Schema schema) { } } - static Schema fromOption(Schema schema) { + public static Schema fromOption(Schema schema) { Preconditions.checkArgument(schema.getType() == UNION, "Expected union schema but was passed: %s", schema); Preconditions.checkArgument(schema.getTypes().size() == 2, @@ -322,7 +322,7 @@ private static int toInt(Object value) { throw new UnsupportedOperationException("Cannot coerce value to int: " + value); } - static Schema copyRecord(Schema record, List newFields, String newName) { + public static Schema copyRecord(Schema record, List newFields, String newName) { Schema copy; if (newName != null) { copy = Schema.createRecord(newName, record.getDoc(), null, record.isError(), newFields); @@ -342,7 +342,7 @@ static Schema copyRecord(Schema record, List newFields, String new return copy; } - static Schema.Field copyField(Schema.Field field, Schema newSchema, String newName) { + public static Schema.Field copyField(Schema.Field field, Schema newSchema, String newName) { Schema.Field copy = new Schema.Field(newName, newSchema, field.doc(), field.defaultVal(), field.order()); diff --git a/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveMetadataPreservingTableOperations.java b/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveMetadataPreservingTableOperations.java index ff0b50a1d2..f3cbe1a848 100644 --- a/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveMetadataPreservingTableOperations.java +++ b/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveMetadataPreservingTableOperations.java @@ -24,6 +24,7 @@ import java.util.HashMap; import java.util.Objects; import java.util.Optional; +import java.util.stream.Collectors; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.common.StatsSetupConst; import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; @@ -39,6 +40,8 @@ import org.apache.iceberg.io.FileIO; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.thrift.TException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** @@ -53,6 +56,8 @@ * updated. */ public class HiveMetadataPreservingTableOperations extends HiveTableOperations { + private static final Logger LOG = LoggerFactory.getLogger(HiveMetadataPreservingTableOperations.class); + private final HiveClientPool metaClients; private final String database; private final String tableName; @@ -71,6 +76,20 @@ protected HiveMetadataPreservingTableOperations(Configuration conf, HiveClientPo this.tableName = table; } + private static void logTable(Table table) { + String columns = ""; + try { + columns = table.getSd().getCols().stream().map(column -> column.getName() + " " + column.getType()) + .collect(Collectors.joining("\n")); + } catch (Throwable throwable) { + LOG.debug("Encountered {} while fetching columns for {}.{}", throwable.getMessage(), + table.getDbName(), table.getTableName(), throwable); + return; + } + LOG.debug("Table: {}.{}", table.getDbName(), table.getTableName()); + LOG.debug("Columns: \n{}", columns); + } + @Override protected void doRefresh() { String metadataLocation = null; @@ -132,6 +151,8 @@ protected void doCommit(TableMetadata base, TableMetadata metadata) { boolean tableExists = metaClients.run(client -> client.tableExists(database, tableName)); if (tableExists) { tbl = metaClients.run(client -> client.getTable(database, tableName)); + LOG.debug("Following table has been fetched from metastore:"); + logTable(tbl); } else { final long currentTimeMillis = System.currentTimeMillis(); tbl = new Table(tableName, @@ -166,6 +187,9 @@ protected void doCommit(TableMetadata base, TableMetadata metadata) { EnvironmentContext envContext = new EnvironmentContext( ImmutableMap.of(StatsSetupConst.DO_NOT_UPDATE_STATS, StatsSetupConst.TRUE) ); + LOG.debug("Updating the metadata location of the following table:"); + logTable(tbl); + LOG.debug("Metadata Location: {}", tbl.getParameters().get(METADATA_LOCATION_PROP)); ALTER_TABLE.invoke(client, database, tableName, tbl, envContext); return null; }); diff --git a/hive-metastore/src/main/java/org/apache/iceberg/hive/legacy/HiveSchemaWithPartnerVisitor.java b/hive-metastore/src/main/java/org/apache/iceberg/hive/legacy/HiveSchemaWithPartnerVisitor.java new file mode 100644 index 0000000000..dd2adcf271 --- /dev/null +++ b/hive-metastore/src/main/java/org/apache/iceberg/hive/legacy/HiveSchemaWithPartnerVisitor.java @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.hive.legacy; + +import java.util.List; +import org.apache.hadoop.hive.serde2.typeinfo.ListTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.MapTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; + +/** + * A Hive {@link TypeInfo} visitor with an accompanying partner schema + * + * This visitor traverses the Hive {@link TypeInfo} tree contiguously accessing the schema tree for the partner schema + * using {@link PartnerAccessor}. When visiting each type in the Hive tree, the implementation is also presented + * with the corresponding type from the partner schema, or else a {@code null} if no match was found. Matching + * behavior can be controlled by implementing the methods in {@link PartnerAccessor} + * + * @param

type of partner schema + * @param type of the field representation in the partner schema + * @param type of the resultant schema generated by the visitor + * @param type of the field representation in the resultant schema + */ +@SuppressWarnings("ClassTypeParameterName") +public abstract class HiveSchemaWithPartnerVisitor { + + /** + * Methods to access types in the partner schema corresponding to types in the Hive schema being traversed + * + * @param

type of partner schema + * @param type of the field representation in the partner schema + */ + public interface PartnerAccessor { + + FP fieldPartner(P partnerStruct, String fieldName); + + P fieldType(FP partnerField); + + P mapKeyPartner(P partnerMap); + + P mapValuePartner(P partnerMap); + + P listElementPartner(P partnerList); + } + + @SuppressWarnings("MethodTypeParameterName") + public static R visit(TypeInfo typeInfo, P partner, HiveSchemaWithPartnerVisitor visitor, + PartnerAccessor accessor) { + switch (typeInfo.getCategory()) { + case STRUCT: + StructTypeInfo structTypeInfo = (StructTypeInfo) typeInfo; + List names = structTypeInfo.getAllStructFieldNames(); + List results = Lists.newArrayListWithExpectedSize(names.size()); + for (String name : names) { + TypeInfo fieldTypeInfo = structTypeInfo.getStructFieldTypeInfo(name); + FP fieldPartner = partner != null ? accessor.fieldPartner(partner, name) : null; + P fieldPartnerType = fieldPartner != null ? accessor.fieldType(fieldPartner) : null; + R result = visit(fieldTypeInfo, fieldPartnerType, visitor, accessor); + results.add(visitor.field(name, fieldTypeInfo, fieldPartner, result)); + } + return visitor.struct(structTypeInfo, partner, results); + + case LIST: + ListTypeInfo listTypeInfo = (ListTypeInfo) typeInfo; + TypeInfo elementTypeInfo = listTypeInfo.getListElementTypeInfo(); + P elementPartner = partner != null ? accessor.listElementPartner(partner) : null; + R elementResult = visit(elementTypeInfo, elementPartner, visitor, accessor); + return visitor.list(listTypeInfo, partner, elementResult); + + case MAP: + MapTypeInfo mapTypeInfo = (MapTypeInfo) typeInfo; + P keyPartner = partner != null ? accessor.mapKeyPartner(partner) : null; + R keyResult = visit(mapTypeInfo.getMapKeyTypeInfo(), keyPartner, visitor, accessor); + P valuePartner = partner != null ? accessor.mapValuePartner(partner) : null; + R valueResult = visit(mapTypeInfo.getMapValueTypeInfo(), valuePartner, visitor, accessor); + return visitor.map(mapTypeInfo, partner, keyResult, valueResult); + + case PRIMITIVE: + return visitor.primitive((PrimitiveTypeInfo) typeInfo, partner); + + case UNION: + throw new UnsupportedOperationException("Union data type not supported: " + typeInfo); + + default: + throw new UnsupportedOperationException(typeInfo + " not supported"); + } + } + + public R struct(StructTypeInfo struct, P partner, List fieldResults) { + return null; + } + + public FR field(String name, TypeInfo field, FP partner, R fieldResult) { + return null; + } + + public R list(ListTypeInfo list, P partner, R elementResult) { + return null; + } + + public R map(MapTypeInfo map, P partner, R keyResult, R valueResult) { + return null; + } + + public R primitive(PrimitiveTypeInfo primitive, P partner) { + return null; + } +} diff --git a/hive-metastore/src/main/java/org/apache/iceberg/hive/legacy/LegacyHiveTableUtils.java b/hive-metastore/src/main/java/org/apache/iceberg/hive/legacy/LegacyHiveTableUtils.java index f31d91a290..31a16441d4 100644 --- a/hive-metastore/src/main/java/org/apache/iceberg/hive/legacy/LegacyHiveTableUtils.java +++ b/hive-metastore/src/main/java/org/apache/iceberg/hive/legacy/LegacyHiveTableUtils.java @@ -27,6 +27,7 @@ import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.metastore.api.Partition; import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; @@ -35,6 +36,7 @@ import org.apache.iceberg.Schema; import org.apache.iceberg.StructLike; import org.apache.iceberg.avro.AvroSchemaUtil; +import org.apache.iceberg.avro.AvroSchemaVisitor; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.types.Conversions; @@ -54,12 +56,25 @@ private LegacyHiveTableUtils() { static Schema getSchema(org.apache.hadoop.hive.metastore.api.Table table) { Map props = getTableProperties(table); String schemaStr = props.get("avro.schema.literal"); + org.apache.avro.Schema avroSchema = schemaStr != null ? new org.apache.avro.Schema.Parser().parse(schemaStr) : null; Schema schema; - if (schemaStr != null) { - schema = AvroSchemaUtil.toIceberg(new org.apache.avro.Schema.Parser().parse(schemaStr)); + if (avroSchema != null) { + String serde = table.getSd().getSerdeInfo().getSerializationLib(); + org.apache.avro.Schema finalAvroSchema; + if (serde.equals("org.apache.hadoop.hive.serde2.avro.AvroSerDe") || + HasDuplicateLowercaseColumnNames.visit(avroSchema)) { + // Case 1: If serde == AVRO, early escape; Hive column info is not reliable and can be empty for these tables + // Hive itself uses avro.schema.literal as source of truth for these tables, so this should be fine + // Case 2: If avro.schema.literal has duplicate column names when lowercased, that means we cannot do reliable + // matching with Hive schema as multiple Avro fields can map to the same Hive field + finalAvroSchema = avroSchema; + } else { + finalAvroSchema = MergeHiveSchemaWithAvro.visit(structTypeInfoFromCols(table.getSd().getCols()), avroSchema); + } + schema = AvroSchemaUtil.toIceberg(finalAvroSchema); } else { // TODO: Do we need to support column and column.types properties for ORC tables? - LOG.warn("Table {}.{} does not have an avro.schema.literal set; using Hive schema instead. " + + LOG.info("Table {}.{} does not have an avro.schema.literal set; using Hive schema instead. " + "The schema will not have case sensitivity and nullability information", table.getDbName(), table.getTableName()); Type icebergType = HiveTypeUtil.convert(structTypeInfoFromCols(table.getSd().getCols())); @@ -74,7 +89,7 @@ static Schema getSchema(org.apache.hadoop.hive.metastore.api.Table table) { return new Schema(fields); } - static TypeInfo structTypeInfoFromCols(List cols) { + static StructTypeInfo structTypeInfoFromCols(List cols) { Preconditions.checkArgument(cols != null && cols.size() > 0, "No Hive schema present"); List fieldNames = cols .stream() @@ -84,7 +99,7 @@ static TypeInfo structTypeInfoFromCols(List cols) { .stream() .map(f -> TypeInfoUtils.getTypeInfoFromTypeString(f.getType())) .collect(Collectors.toList()); - return TypeInfoFactory.getStructTypeInfo(fieldNames, fieldTypeInfos); + return (StructTypeInfo) TypeInfoFactory.getStructTypeInfo(fieldNames, fieldTypeInfos); } private static Schema partitionSchema(List partitionKeys, Schema dataSchema) { @@ -169,4 +184,39 @@ private static FileFormat serdeToFileFormat(String serde) { throw new IllegalArgumentException("Unrecognized serde: " + serde); } } + + private static class HasDuplicateLowercaseColumnNames extends AvroSchemaVisitor { + private static final HasDuplicateLowercaseColumnNames INSTANCE = new HasDuplicateLowercaseColumnNames(); + + private static boolean visit(org.apache.avro.Schema schema) { + return AvroSchemaVisitor.visit(schema, INSTANCE); + } + + @Override + public Boolean record(org.apache.avro.Schema record, List names, List fieldResults) { + return fieldResults.stream().anyMatch(x -> x) || + names.stream().collect(Collectors.groupingBy(String::toLowerCase)) + .values().stream().anyMatch(x -> x.size() > 1); + } + + @Override + public Boolean union(org.apache.avro.Schema union, List optionResults) { + return optionResults.stream().anyMatch(x -> x); + } + + @Override + public Boolean array(org.apache.avro.Schema array, Boolean elementResult) { + return elementResult; + } + + @Override + public Boolean map(org.apache.avro.Schema map, Boolean valueResult) { + return valueResult; + } + + @Override + public Boolean primitive(org.apache.avro.Schema primitive) { + return false; + } + } } diff --git a/hive-metastore/src/main/java/org/apache/iceberg/hive/legacy/MergeHiveSchemaWithAvro.java b/hive-metastore/src/main/java/org/apache/iceberg/hive/legacy/MergeHiveSchemaWithAvro.java new file mode 100644 index 0000000000..55018c7d77 --- /dev/null +++ b/hive-metastore/src/main/java/org/apache/iceberg/hive/legacy/MergeHiveSchemaWithAvro.java @@ -0,0 +1,251 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.hive.legacy; + +import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.avro.JsonProperties; +import org.apache.avro.LogicalTypes; +import org.apache.avro.Schema; +import org.apache.hadoop.hive.serde2.typeinfo.DecimalTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.ListTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.MapTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; +import org.apache.iceberg.avro.AvroSchemaUtil; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; + + +/** + * A {@link HiveSchemaWithPartnerVisitor} which augments a Hive schema with extra metadata from a partner Avro schema + * and generates a resultant "merged" Avro schema + * + * 1. Fields are matched between Hive and Avro schemas using a case insensitive search by field name + * 2. Copies field names, nullability, default value, field props from the Avro schema + * 3. Copies field type from the Hive schema. + * TODO: We should also handle some cases of type promotion where the types in Avro are potentially more correct + * e.g.BINARY in Hive -> FIXED in Avro, STRING in Hive -> ENUM in Avro, etc + * 4. Retains fields found only in the Hive schema; Ignores fields found only in the Avro schema + * 5. Fields found only in Hive schema are represented as optional fields in the resultant Avro schema + * 6. For fields found only in Hive schema, field names are sanitized to make them compatible with Avro identifier spec + */ +class MergeHiveSchemaWithAvro extends HiveSchemaWithPartnerVisitor { + + static Schema visit(StructTypeInfo typeInfo, Schema schema) { + return HiveSchemaWithPartnerVisitor.visit(typeInfo, schema, new MergeHiveSchemaWithAvro(), + AvroPartnerAccessor.INSTANCE); + } + + private final AtomicInteger recordCounter = new AtomicInteger(0); + + @Override + public Schema struct(StructTypeInfo struct, Schema partner, List fieldResults) { + boolean shouldResultBeOptional = partner == null || AvroSchemaUtil.isOptionSchema(partner); + Schema result; + if (partner == null || extractIfOption(partner).getType() != Schema.Type.RECORD) { + // if there was no matching Avro struct, return a struct with new record/namespace + int recordNum = recordCounter.incrementAndGet(); + result = Schema.createRecord("record" + recordNum, null, "namespace" + recordNum, false, fieldResults); + } else { + result = AvroSchemaUtil.copyRecord(extractIfOption(partner), fieldResults, null); + } + return shouldResultBeOptional ? AvroSchemaUtil.toOption(result) : result; + } + + @Override + public Schema.Field field(String name, TypeInfo field, Schema.Field partner, Schema fieldResult) { + // No need to infer `shouldResultBeOptional`. We expect other visitor methods to return optional schemas + // in their field results if required + if (partner == null) { + // if there was no matching Avro field, use name form the Hive schema and set a null default + return new Schema.Field( + AvroSchemaUtil.makeCompatibleName(name), fieldResult, null, Schema.Field.NULL_DEFAULT_VALUE); + } else { + // TODO: How to ensure that field default value is compatible with new field type generated from Hive? + // Copy field type from the visitor result, copy everything else from the partner + // Avro requires the default value to match the first type in the option, reorder option if required + Schema reordered = reorderOptionIfRequired(fieldResult, partner.defaultVal()); + return AvroSchemaUtil.copyField(partner, reordered, partner.name()); + } + } + + /** + * Reorders an option schema so that the type of the provided default value is the first type in the option schema + * + * e.g. If the schema is (NULL, INT) and the default value is 1, the returned schema is (INT, NULL) + * If the schema is not an option schema or if there is no default value, schema is returned as-is + */ + private Schema reorderOptionIfRequired(Schema schema, Object defaultValue) { + if (AvroSchemaUtil.isOptionSchema(schema) && defaultValue != null) { + boolean isNullFirstOption = schema.getTypes().get(0).getType() == Schema.Type.NULL; + if (isNullFirstOption && defaultValue.equals(JsonProperties.NULL_VALUE)) { + return schema; + } else { + return Schema.createUnion(schema.getTypes().get(1), schema.getTypes().get(0)); + } + } else { + return schema; + } + } + + @Override + public Schema list(ListTypeInfo list, Schema partner, Schema elementResult) { + // if there was no matching Avro list, or if matching Avro list was an option, return an optional list + boolean shouldResultBeOptional = partner == null || AvroSchemaUtil.isOptionSchema(partner); + Schema result = Schema.createArray(elementResult); + return shouldResultBeOptional ? AvroSchemaUtil.toOption(result) : result; + } + + @Override + public Schema map(MapTypeInfo map, Schema partner, Schema keyResult, Schema valueResult) { + Preconditions.checkArgument(extractIfOption(keyResult).getType() == Schema.Type.STRING, + "Map keys should always be non-nullable strings. Found: %s", keyResult); + // if there was no matching Avro map, or if matching Avro map was an option, return an optional map + boolean shouldResultBeOptional = partner == null || AvroSchemaUtil.isOptionSchema(partner); + Schema result = Schema.createMap(valueResult); + return shouldResultBeOptional ? AvroSchemaUtil.toOption(result) : result; + } + + @Override + public Schema primitive(PrimitiveTypeInfo primitive, Schema partner) { + boolean shouldResultBeOptional = partner == null || AvroSchemaUtil.isOptionSchema(partner); + Schema hivePrimitive = hivePrimitiveToAvro(primitive); + // if there was no matching Avro primitive, use the Hive primitive + Schema result = partner == null ? hivePrimitive : checkCompatibilityAndPromote(hivePrimitive, partner); + return shouldResultBeOptional ? AvroSchemaUtil.toOption(result) : result; + } + + private Schema checkCompatibilityAndPromote(Schema schema, Schema partner) { + // TODO: Check if schema is compatible with partner + // Also do type promotion if required, schema = string & partner = enum, schema = bytes & partner = fixed, etc + return schema; + } + + /** + * A {@link PartnerAccessor} which matches the requested field from a partner Avro struct by case insensitive + * field name match + */ + private static class AvroPartnerAccessor implements PartnerAccessor { + private static final AvroPartnerAccessor INSTANCE = new AvroPartnerAccessor(); + + private static final Schema MAP_KEY = Schema.create(Schema.Type.STRING); + + @Override + public Schema.Field fieldPartner(Schema partner, String fieldName) { + Schema schema = extractIfOption(partner); + return (schema.getType() == Schema.Type.RECORD) ? findCaseInsensitive(schema, fieldName) : null; + } + + @Override + public Schema fieldType(Schema.Field partnerField) { + return partnerField.schema(); + } + + @Override + public Schema mapKeyPartner(Schema partner) { + Schema schema = extractIfOption(partner); + return (schema.getType() == Schema.Type.MAP) ? MAP_KEY : null; + } + + @Override + public Schema mapValuePartner(Schema partner) { + Schema schema = extractIfOption(partner); + return (schema.getType() == Schema.Type.MAP) ? schema.getValueType() : null; + } + + @Override + public Schema listElementPartner(Schema partner) { + Schema schema = extractIfOption(partner); + return (schema.getType() == Schema.Type.ARRAY) ? schema.getElementType() : null; + } + + private Schema.Field findCaseInsensitive(Schema struct, String fieldName) { + Preconditions.checkArgument(struct.getType() == Schema.Type.RECORD); + // TODO: Optimize? This will be called for every struct field, we will run the for loop for every struct field + for (Schema.Field field : struct.getFields()) { + if (field.name().equalsIgnoreCase(fieldName)) { + return field; + } + } + return null; + } + } + + private static Schema extractIfOption(Schema schema) { + if (AvroSchemaUtil.isOptionSchema(schema)) { + return AvroSchemaUtil.fromOption(schema); + } else { + return schema; + } + } + + // Additional numeric type, similar to other logical type names in AvroSerde + private static final String SHORT_TYPE_NAME = "short"; + private static final String BYTE_TYPE_NAME = "byte"; + + // TODO: This should be refactored into a visitor if we ever require conversion of complex types + public Schema hivePrimitiveToAvro(PrimitiveTypeInfo primitive) { + switch (primitive.getPrimitiveCategory()) { + case INT: + case BYTE: + case SHORT: + return Schema.create(Schema.Type.INT); + + case LONG: + return Schema.create(Schema.Type.LONG); + + case FLOAT: + return Schema.create(Schema.Type.FLOAT); + + case DOUBLE: + return Schema.create(Schema.Type.DOUBLE); + + case BOOLEAN: + return Schema.create(Schema.Type.BOOLEAN); + + case CHAR: + case STRING: + case VARCHAR: + return Schema.create(Schema.Type.STRING); + + case BINARY: + return Schema.create(Schema.Type.BYTES); + + case VOID: + return Schema.create(Schema.Type.NULL); + + case DATE: + return LogicalTypes.date().addToSchema(Schema.create(Schema.Type.INT)); + + case TIMESTAMP: + Schema schema = Schema.create(Schema.Type.LONG); + schema.addProp(AvroSchemaUtil.ADJUST_TO_UTC_PROP, false); + return LogicalTypes.timestampMillis().addToSchema(schema); + + case DECIMAL: + DecimalTypeInfo dti = (DecimalTypeInfo) primitive; + return LogicalTypes.decimal(dti.getPrecision(), dti.getScale()).addToSchema(Schema.create(Schema.Type.BYTES)); + + default: + throw new UnsupportedOperationException(primitive + " is not supported."); + } + } +} diff --git a/hive-metastore/src/test/java/org/apache/iceberg/hive/legacy/TestMergeHiveSchemaWithAvro.java b/hive-metastore/src/test/java/org/apache/iceberg/hive/legacy/TestMergeHiveSchemaWithAvro.java new file mode 100644 index 0000000000..7ad76dae7a --- /dev/null +++ b/hive-metastore/src/test/java/org/apache/iceberg/hive/legacy/TestMergeHiveSchemaWithAvro.java @@ -0,0 +1,404 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.hive.legacy; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import java.util.Arrays; +import java.util.Map; +import org.apache.avro.LogicalTypes; +import org.apache.avro.Schema; +import org.apache.avro.util.internal.JacksonUtils; +import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; +import org.apache.iceberg.avro.AvroSchemaUtil; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; + + +public class TestMergeHiveSchemaWithAvro { + + @Test + public void shouldUseFieldNamesFromAvro() { + String hive = "struct>"; + Schema avro = struct("r1", + optional("fA", Schema.Type.INT), + optional("fB", struct("r2", + optional("gA", Schema.Type.INT) + )) + ); + + assertSchema(avro, merge(hive, avro)); + } + + @Test + public void shouldUseNullabilityFromAvro() { + String hive = "struct>"; + Schema avro = struct("r1", + required("fA", Schema.Type.INT), + required("fB", struct("r2", + required("gA", Schema.Type.INT) + )) + ); + + assertSchema(avro, merge(hive, avro)); + } + + @Test + public void shouldUseTypesFromHive() { + String hive = "struct,fb:array,fc:map,fd:string>"; + Schema avro = struct("r1", + required("fA", Schema.Type.INT), + required("fB", Schema.Type.INT), + required("fC", Schema.Type.INT), + required("fD", Schema.Type.INT) + ); + + Schema expected = struct("r1", + required("fA", struct("record1", null, "namespace1", + optional("ga", Schema.Type.INT) + )), + required("fB", array(nullable(Schema.Type.INT))), + required("fC", map(nullable(Schema.Type.INT))), + required("fD", Schema.Type.STRING) + ); + + assertSchema(expected, merge(hive, avro)); + } + + @Test + public void shouldIgnoreExtraFieldsFromAvro() { + String hive = "struct>"; + Schema avro = struct("r1", + required("fA", Schema.Type.INT), + required("fB", struct("r2", + required("gA", Schema.Type.INT), + required("gB", Schema.Type.INT) + )), + required("fC", Schema.Type.INT) + ); + + Schema expected = struct("r1", + required("fA", Schema.Type.INT), + required("fB", struct("r2", + required("gA", Schema.Type.INT) + )) + ); + + assertSchema(expected, merge(hive, avro)); + } + + @Test + public void shouldRetainExtraFieldsFromHive() { + String hive = "struct,fc:int,fd:struct>"; + Schema avro = struct("r1", + required("fA", Schema.Type.INT), + required("fB", struct("r2", + required("gA", Schema.Type.INT) + )) + ); + + Schema expected = struct("r1", + required("fA", Schema.Type.INT), + required("fB", struct("r2", + required("gA", Schema.Type.INT), + // Nested field missing in Avro + optional("gb", Schema.Type.INT) + )), + // Top level field missing in Avro + optional("fc", Schema.Type.INT), + // Top level struct missing in Avro + optional("fd", struct("record1", null, "namespace1", + optional("ha", Schema.Type.INT) + )) + ); + + assertSchema(expected, merge(hive, avro)); + } + + @Test + public void shouldRetainDocStringsFromAvro() { + String hive = "struct>"; + Schema avro = struct("r1", "doc-r1", "n1", + required("fA", Schema.Type.INT, "doc-fA", null, null), + required("fB", struct("r2", "doc-r2", "n2", + required("gA", Schema.Type.INT, "doc-gA", null, null) + ), "doc-fB", null, null) + ); + + assertSchema(avro, merge(hive, avro)); + } + + @Test + public void shouldRetainDefaultValuesFromAvro() { + String hive = "struct>"; + Schema avro = struct("r1", + required("fA", Schema.Type.INT, null, 1, null), + required("fB", struct("r2", + required("gA", Schema.Type.INT, null, 2, null) + ), null, fromJson("{\"gA\": 3}"), null) + ); + + assertSchema(avro, merge(hive, avro)); + } + + @Test + public void shouldRetainFieldPropsFromAvro() { + String hive = "struct>"; + Schema avro = struct("r1", + required("fA", Schema.Type.INT, null, null, ImmutableMap.of("pfA", "vfA")), + required("fB", struct("r2", + required("gA", Schema.Type.INT, null, null, ImmutableMap.of("pfB", "vfB")) + ), null, null, ImmutableMap.of("pgA", "vgA")) + ); + + assertSchema(avro, merge(hive, avro)); + } + + @Test + public void shouldHandleLists() { + String hive = "struct,fb:array,fc:array>,fd:array>"; + Schema avro = struct("r1", + required("fA", array(Schema.Type.INT)), + optional("fB", array(Schema.Type.INT)), + required("fC", array(struct("r2", + required("gA", Schema.Type.INT) + ))) + ); + + Schema expected = struct("r1", + required("fA", array(Schema.Type.INT)), + optional("fB", array(Schema.Type.INT)), + required("fC", array(struct("r2", + required("gA", Schema.Type.INT) + ))), + // Array element type is also nullable because it is generated from Hive + optional("fd", array(nullable(Schema.Type.INT))) + ); + + assertSchema(expected, merge(hive, avro)); + } + + @Test + public void shouldHandleMaps() { + String hive = "struct,fb:map,fc:map>,fd:map>"; + Schema avro = struct("r1", + required("fA", map(Schema.Type.INT)), + optional("fB", map(Schema.Type.INT)), + required("fC", map(struct("r2", + required("gA", Schema.Type.INT) + ))) + ); + + Schema expected = struct("r1", + required("fA", map(Schema.Type.INT)), + optional("fB", map(Schema.Type.INT)), + required("fC", map(struct("r2", + required("gA", Schema.Type.INT) + ))), + // Map value type is also nullable because it is generated from Hive + optional("fd", map(nullable(Schema.Type.INT))) + ); + + assertSchema(expected, merge(hive, avro)); + } + + @Test + public void shouldSanitizeIncompatibleFieldNames() { + StructTypeInfo typeInfo = (StructTypeInfo) TypeInfoFactory.getStructTypeInfo( + Lists.newArrayList("a.b.c", "$#@%!"), + Lists.newArrayList(TypeInfoFactory.intTypeInfo, TypeInfoFactory.intTypeInfo) + ); + Schema avro = struct("r1"); + + Schema expected = struct("r1", + optional("a_x2Eb_x2Ec", Schema.Type.INT), + optional("_x24_x23_x40_x25_x21", Schema.Type.INT) + ); + assertSchema(expected, merge(typeInfo, avro)); + } + + @Test + public void shouldReorderOptionalSchemaToMatchDefaultValue() { + String hive = "struct>"; + Schema avro = struct("r1", + field("fA", Schema.createUnion(Schema.create(Schema.Type.INT), Schema.create(Schema.Type.NULL)), + null, 1, null), + field("fB", Schema.createUnion( + struct("r2", required("gA", Schema.Type.INT, null, 2, null)), + Schema.create(Schema.Type.NULL) + ), null, fromJson("{\"gA\": 3}"), null) + ); + + assertSchema(avro, merge(hive, avro)); + } + + @Rule + public ExpectedException thrown = ExpectedException.none(); + + @Test + public void shouldFailForMapsWithNonStringKey() { + String hive = "struct>"; + Schema avro = struct("r1"); + + thrown.expect(IllegalArgumentException.class); + thrown.expectMessage("Map keys should always be non-nullable strings"); + assertSchema(avro, merge(hive, avro)); + } + + @Test + public void shouldRecoverLogicalType() { + String hive = "struct"; + Schema avro = struct("r1", + optional("fa", Schema.Type.INT), + optional("fb", Schema.Type.LONG), + optional("fc", Schema.Type.BYTES)); + Schema merged = merge(hive, avro); + + Schema expectedTimestampSchema = Schema.create(Schema.Type.LONG); + expectedTimestampSchema.addProp(AvroSchemaUtil.ADJUST_TO_UTC_PROP, false); + Schema expected = struct("r1", + optional("fa", LogicalTypes.date().addToSchema(Schema.create(Schema.Type.INT))), + optional("fb", LogicalTypes.timestampMillis().addToSchema(expectedTimestampSchema)), + optional("fc", LogicalTypes.decimal(4, 2).addToSchema(Schema.create(Schema.Type.BYTES)))); + + assertSchema(expected, merged); + Assert.assertEquals("date", + AvroSchemaUtil.fromOption(merged.getField("fa").schema()).getLogicalType().getName()); + // This last line should not throw any exception. + AvroSchemaUtil.toIceberg(merged); + } + + // TODO: tests to retain schema props + // TODO: tests for explicit type compatibility check between hive and avro primitives, once we implement it + // TODO: tests for error case => default value in Avro does not match with type from hive + + /** Test Helpers */ + + private void assertSchema(Schema expected, Schema actual) { + Assert.assertEquals(expected, actual); + Assert.assertEquals(expected.toString(true), actual.toString(true)); + } + + private Schema merge(StructTypeInfo typeInfo, Schema avro) { + return MergeHiveSchemaWithAvro.visit(typeInfo, avro); + } + + private Schema merge(String hive, Schema avro) { + StructTypeInfo typeInfo = (StructTypeInfo) TypeInfoUtils.getTypeInfoFromTypeString(hive); + return merge(typeInfo, avro); + } + + private Schema struct(String name, String doc, String namespace, Schema.Field... fields) { + return Schema.createRecord(name, doc, namespace, false, Arrays.asList(fields)); + } + + private Schema struct(String name, Schema.Field... fields) { + return struct(name, null, "n" + name, fields); + } + + private Schema array(Schema element) { + return Schema.createArray(element); + } + + private Schema array(Schema.Type elementType) { + return array(Schema.create(elementType)); + } + + private Schema map(Schema value) { + return Schema.createMap(value); + } + + private Schema map(Schema.Type valueType) { + return map(Schema.create(valueType)); + } + + private Schema.Field nullable(Schema.Field field) { + Preconditions.checkArgument(!AvroSchemaUtil.isOptionSchema(field.schema())); + return field(field.name(), nullable(field.schema()), field.doc(), + Schema.Field.NULL_DEFAULT_VALUE, field.getObjectProps()); + } + + private Schema nullable(Schema schema) { + return AvroSchemaUtil.toOption(schema); + } + + private Schema nullable(Schema.Type type) { + return nullable(Schema.create(type)); + } + + private Schema.Field field(String name, Schema schema, String doc, Object defaultValue, + Map props) { + Schema.Field field = new Schema.Field(name, schema, doc, defaultValue); + if (props != null) { + props.forEach(field::addProp); + } + return field; + } + + private Schema.Field required(String name, Schema schema, String doc, Object defaultValue, + Map props) { + return field(name, schema, doc, defaultValue, props); + } + + private Schema.Field required(String name, Schema schema) { + return required(name, schema, null, null, null); + } + + private Schema.Field required(String name, Schema.Type type, String doc, Object defaultValue, + Map props) { + return required(name, Schema.create(type), doc, defaultValue, props); + } + + private Schema.Field required(String name, Schema.Type type) { + return required(name, type, null, null, null); + } + + private Schema.Field optional(String name, Schema schema, String doc) { + return nullable(field(name, schema, doc, null, null)); + } + + private Schema.Field optional(String name, Schema schema) { + return optional(name, schema, null); + } + + private Schema.Field optional(String name, Schema.Type type, String doc) { + return optional(name, Schema.create(type), doc); + } + + private Schema.Field optional(String name, Schema.Type type) { + return optional(name, type, null); + } + + private Object fromJson(String json) { + ObjectMapper mapper = new ObjectMapper(); + try { + return JacksonUtils.toObject(mapper.readTree(json)); + } catch (JsonProcessingException e) { + throw new RuntimeException(e); + } + } +} diff --git a/spark/src/main/java/org/apache/iceberg/spark/PruneColumnsWithoutReordering.java b/spark/src/main/java/org/apache/iceberg/spark/PruneColumnsWithoutReordering.java index 653c27313e..c6984e2fe8 100644 --- a/spark/src/main/java/org/apache/iceberg/spark/PruneColumnsWithoutReordering.java +++ b/spark/src/main/java/org/apache/iceberg/spark/PruneColumnsWithoutReordering.java @@ -202,11 +202,6 @@ public Type primitive(Type.PrimitiveType primitive) { "Cannot project decimal with incompatible precision: %s < %s", requestedDecimal.precision(), decimal.precision()); break; - case TIMESTAMP: - Types.TimestampType timestamp = (Types.TimestampType) primitive; - Preconditions.checkArgument(timestamp.shouldAdjustToUTC(), - "Cannot project timestamp (without time zone) as timestamptz (with time zone)"); - break; default: } diff --git a/spark/src/main/java/org/apache/iceberg/spark/SparkReadOptions.java b/spark/src/main/java/org/apache/iceberg/spark/SparkReadOptions.java index e0242e062b..d696bea75b 100644 --- a/spark/src/main/java/org/apache/iceberg/spark/SparkReadOptions.java +++ b/spark/src/main/java/org/apache/iceberg/spark/SparkReadOptions.java @@ -47,4 +47,6 @@ private SparkReadOptions() { // Overrides the table's read.parquet.vectorization.batch-size public static final String VECTORIZATION_BATCH_SIZE = "batch-size"; + + public static final String READ_TIMESTAMP_WITHOUT_ZONE = "read-timestamp-without-zone"; } diff --git a/spark/src/main/java/org/apache/iceberg/spark/TypeToSparkType.java b/spark/src/main/java/org/apache/iceberg/spark/TypeToSparkType.java index b9e9dfade7..6a8be60eb0 100644 --- a/spark/src/main/java/org/apache/iceberg/spark/TypeToSparkType.java +++ b/spark/src/main/java/org/apache/iceberg/spark/TypeToSparkType.java @@ -104,12 +104,7 @@ public DataType primitive(Type.PrimitiveType primitive) { throw new UnsupportedOperationException( "Spark does not support time fields"); case TIMESTAMP: - Types.TimestampType timestamp = (Types.TimestampType) primitive; - if (timestamp.shouldAdjustToUTC()) { - return TimestampType$.MODULE$; - } - throw new UnsupportedOperationException( - "Spark does not support timestamp without time zone fields"); + return TimestampType$.MODULE$; case STRING: return StringType$.MODULE$; case UUID: diff --git a/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcReader.java b/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcReader.java index 8a45aabf5f..32a04886eb 100644 --- a/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcReader.java +++ b/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcReader.java @@ -104,6 +104,7 @@ public OrcValueReader primitive(Type.PrimitiveType iPrimitive, TypeDescriptio return OrcValueReaders.floats(); case DOUBLE: return OrcValueReaders.doubles(); + case TIMESTAMP: case TIMESTAMP_INSTANT: return SparkOrcValueReaders.timestampTzs(); case DECIMAL: diff --git a/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/VectorizedSparkOrcReaders.java b/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/VectorizedSparkOrcReaders.java index 160e4dbd77..81fc631ae7 100644 --- a/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/VectorizedSparkOrcReaders.java +++ b/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/VectorizedSparkOrcReaders.java @@ -127,6 +127,7 @@ public Converter primitive(Type.PrimitiveType iPrimitive, TypeDescription primit case DOUBLE: primitiveValueReader = OrcValueReaders.doubles(); break; + case TIMESTAMP: case TIMESTAMP_INSTANT: primitiveValueReader = SparkOrcValueReaders.timestampTzs(); break; diff --git a/spark/src/test/java/org/apache/iceberg/spark/data/GenericsHelpers.java b/spark/src/test/java/org/apache/iceberg/spark/data/GenericsHelpers.java index 0c4598a209..3112c9304b 100644 --- a/spark/src/test/java/org/apache/iceberg/spark/data/GenericsHelpers.java +++ b/spark/src/test/java/org/apache/iceberg/spark/data/GenericsHelpers.java @@ -24,6 +24,7 @@ import java.sql.Timestamp; import java.time.Instant; import java.time.LocalDate; +import java.time.LocalDateTime; import java.time.OffsetDateTime; import java.time.ZoneOffset; import java.time.temporal.ChronoUnit; @@ -122,13 +123,19 @@ private static void assertEqualsSafe(Type type, Object expected, Object actual) Assert.assertEquals("ISO-8601 date should be equal", expected.toString(), actual.toString()); break; case TIMESTAMP: - Assert.assertTrue("Should expect an OffsetDateTime", expected instanceof OffsetDateTime); Assert.assertTrue("Should be a Timestamp", actual instanceof Timestamp); Timestamp ts = (Timestamp) actual; // milliseconds from nanos has already been added by getTime OffsetDateTime actualTs = EPOCH.plusNanos( (ts.getTime() * 1_000_000) + (ts.getNanos() % 1_000_000)); - Assert.assertEquals("Timestamp should be equal", expected, actualTs); + Types.TimestampType timestampType = (Types.TimestampType) type; + if (timestampType.shouldAdjustToUTC()) { + Assert.assertTrue("Should expect an OffsetDateTime", expected instanceof OffsetDateTime); + Assert.assertEquals("Timestamp should be equal", expected, actualTs); + } else { + Assert.assertTrue("Should expect an LocalDateTime", expected instanceof LocalDateTime); + Assert.assertEquals("Timestamp should be equal", expected, actualTs.toLocalDateTime()); + } break; case STRING: Assert.assertTrue("Should be a String", actual instanceof String); diff --git a/spark/src/test/java/org/apache/iceberg/spark/source/TestTimestampWithoutZone.java b/spark/src/test/java/org/apache/iceberg/spark/source/TestTimestampWithoutZone.java new file mode 100644 index 0000000000..30daa9b2c3 --- /dev/null +++ b/spark/src/test/java/org/apache/iceberg/spark/source/TestTimestampWithoutZone.java @@ -0,0 +1,227 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.spark.source; + +import java.io.File; +import java.io.IOException; +import java.time.LocalDateTime; +import java.util.List; +import java.util.Locale; +import java.util.UUID; +import java.util.stream.Collectors; +import org.apache.hadoop.conf.Configuration; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.DataFiles; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; +import org.apache.iceberg.Table; +import org.apache.iceberg.data.GenericAppenderFactory; +import org.apache.iceberg.data.GenericRecord; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.hadoop.HadoopTables; +import org.apache.iceberg.io.FileAppender; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.spark.data.GenericsHelpers; +import org.apache.iceberg.types.Types; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SparkSession; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import org.junit.rules.TemporaryFolder; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import static org.apache.iceberg.Files.localOutput; + +@RunWith(Parameterized.class) +public abstract class TestTimestampWithoutZone { + private static final Configuration CONF = new Configuration(); + private static final HadoopTables TABLES = new HadoopTables(CONF); + + private static final Schema SCHEMA = new Schema( + Types.NestedField.required(1, "id", Types.LongType.get()), + Types.NestedField.optional(2, "ts", Types.TimestampType.withoutZone()), + Types.NestedField.optional(3, "data", Types.StringType.get()) + ); + + private static SparkSession spark = null; + + @BeforeClass + public static void startSpark() { + TestTimestampWithoutZone.spark = SparkSession.builder().master("local[2]").getOrCreate(); + } + + @AfterClass + public static void stopSpark() { + SparkSession currentSpark = TestTimestampWithoutZone.spark; + TestTimestampWithoutZone.spark = null; + currentSpark.stop(); + } + + @Rule + public TemporaryFolder temp = new TemporaryFolder(); + + private final String format; + private final boolean vectorized; + + @Parameterized.Parameters(name = "format = {0}, vectorized = {1}") + public static Object[][] parameters() { + return new Object[][] { + { "parquet", false }, + { "parquet", true }, + { "avro", false }, + { "orc", false }, + { "orc", true } + }; + } + + public TestTimestampWithoutZone(String format, boolean vectorized) { + this.format = format; + this.vectorized = vectorized; + } + + private File parent = null; + private File unpartitioned = null; + private List records = null; + + @Before + public void writeUnpartitionedTable() throws IOException { + this.parent = temp.newFolder("TestTimestampWithoutZone"); + this.unpartitioned = new File(parent, "unpartitioned"); + File dataFolder = new File(unpartitioned, "data"); + Assert.assertTrue("Mkdir should succeed", dataFolder.mkdirs()); + + Table table = TABLES.create(SCHEMA, PartitionSpec.unpartitioned(), unpartitioned.toString()); + Schema tableSchema = table.schema(); // use the table schema because ids are reassigned + + FileFormat fileFormat = FileFormat.valueOf(format.toUpperCase(Locale.ENGLISH)); + + File testFile = new File(dataFolder, fileFormat.addExtension(UUID.randomUUID().toString())); + + // create records using the table's schema + this.records = testRecords(tableSchema); + + try (FileAppender writer = new GenericAppenderFactory(tableSchema).newAppender( + localOutput(testFile), fileFormat)) { + writer.addAll(records); + } + + DataFile file = DataFiles.builder(PartitionSpec.unpartitioned()) + .withRecordCount(records.size()) + .withFileSizeInBytes(testFile.length()) + .withPath(testFile.toString()) + .build(); + + table.newAppend().appendFile(file).commit(); + } + + @Test + public void testUnpartitionedTimestampWithoutZone() { + assertEqualsSafe(SCHEMA.asStruct(), records, read(unpartitioned.toString(), vectorized)); + } + + @Test + public void testUnpartitionedTimestampWithoutZoneProjection() { + Schema projection = SCHEMA.select("id", "ts"); + assertEqualsSafe(projection.asStruct(), + records.stream().map(r -> projectFlat(projection, r)).collect(Collectors.toList()), + read(unpartitioned.toString(), vectorized, "id", "ts")); + } + + @Rule + public ExpectedException exception = ExpectedException.none(); + + @Test + public void testUnpartitionedTimestampWithoutZoneError() { + exception.expect(IllegalArgumentException.class); + exception.expectMessage("Spark does not support timestamp without time zone fields"); + + spark.read().format("iceberg") + .option("vectorization-enabled", String.valueOf(vectorized)) + .option("read-timestamp-without-zone", "false") + .load(unpartitioned.toString()) + .collectAsList(); + } + + private static Record projectFlat(Schema projection, Record record) { + Record result = GenericRecord.create(projection); + List fields = projection.asStruct().fields(); + for (int i = 0; i < fields.size(); i += 1) { + Types.NestedField field = fields.get(i); + result.set(i, record.getField(field.name())); + } + return result; + } + + public static void assertEqualsSafe(Types.StructType struct, + List expected, List actual) { + Assert.assertEquals("Number of results should match expected", expected.size(), actual.size()); + for (int i = 0; i < expected.size(); i += 1) { + GenericsHelpers.assertEqualsSafe(struct, expected.get(i), actual.get(i)); + } + } + + private List testRecords(Schema schema) { + return Lists.newArrayList( + record(schema, 0L, parseToLocal("2017-12-22T09:20:44.294658"), "junction"), + record(schema, 1L, parseToLocal("2017-12-22T07:15:34.582910"), "alligator"), + record(schema, 2L, parseToLocal("2017-12-22T06:02:09.243857"), "forrest"), + record(schema, 3L, parseToLocal("2017-12-22T03:10:11.134509"), "clapping"), + record(schema, 4L, parseToLocal("2017-12-22T00:34:00.184671"), "brush"), + record(schema, 5L, parseToLocal("2017-12-21T22:20:08.935889"), "trap"), + record(schema, 6L, parseToLocal("2017-12-21T21:55:30.589712"), "element"), + record(schema, 7L, parseToLocal("2017-12-21T17:31:14.532797"), "limited"), + record(schema, 8L, parseToLocal("2017-12-21T15:21:51.237521"), "global"), + record(schema, 9L, parseToLocal("2017-12-21T15:02:15.230570"), "goldfish") + ); + } + + private static List read(String table, boolean vectorized) { + return read(table, vectorized, "*"); + } + + private static List read(String table, boolean vectorized, String select0, String... selectN) { + Dataset dataset = spark.read().format("iceberg") + .option("vectorization-enabled", String.valueOf(vectorized)) + .option("read-timestamp-without-zone", "true") + .load(table) + .select(select0, selectN); + return dataset.collectAsList(); + } + + private static LocalDateTime parseToLocal(String timestamp) { + return LocalDateTime.parse(timestamp); + } + + private static Record record(Schema schema, Object... values) { + Record rec = GenericRecord.create(schema); + for (int i = 0; i < values.length; i += 1) { + rec.set(i, values[i]); + } + return rec; + } +} diff --git a/spark2/src/main/java/org/apache/iceberg/spark/source/Reader.java b/spark2/src/main/java/org/apache/iceberg/spark/source/Reader.java index 18e34189a0..5d90559486 100644 --- a/spark2/src/main/java/org/apache/iceberg/spark/source/Reader.java +++ b/spark2/src/main/java/org/apache/iceberg/spark/source/Reader.java @@ -54,6 +54,9 @@ import org.apache.iceberg.spark.SparkFilters; import org.apache.iceberg.spark.SparkReadOptions; import org.apache.iceberg.spark.SparkSchemaUtil; +import org.apache.iceberg.types.Type; +import org.apache.iceberg.types.TypeUtil; +import org.apache.iceberg.types.Types; import org.apache.iceberg.util.PropertyUtil; import org.apache.iceberg.util.TableScanUtil; import org.apache.spark.broadcast.Broadcast; @@ -101,6 +104,7 @@ class Reader implements DataSourceReader, SupportsScanColumnarBatch, SupportsPus private final boolean localityPreferred; private final boolean batchReadsEnabled; private final int batchSize; + private final boolean readTimestampWithoutZone; // lazy variables private Schema schema = null; @@ -174,6 +178,16 @@ class Reader implements DataSourceReader, SupportsScanColumnarBatch, SupportsPus this.batchSize = options.get(SparkReadOptions.VECTORIZATION_BATCH_SIZE).map(Integer::parseInt).orElseGet(() -> PropertyUtil.propertyAsInt(table.properties(), TableProperties.PARQUET_BATCH_SIZE, TableProperties.PARQUET_BATCH_SIZE_DEFAULT)); + // Allow reading timestamp without time zone as timestamp with time zone. Generally, this is not safe as timestamp + // without time zone is supposed to represent wall clock time semantics, i.e. no matter the reader/writer timezone + // 3PM should always be read as 3PM, but timestamp with time zone represents instant semantics, i.e the timestamp + // is adjusted so that the corresponding time in the reader timezone is displayed. However, at LinkedIn, all readers + // and writers are in the UTC timezone as our production machines are set to UTC. So, timestamp with/without time + // zone is the same. + // When set to false (default), we throw an exception at runtime + // "Spark does not support timestamp without time zone fields" if reading timestamp without time zone fields + this.readTimestampWithoutZone = options.get(SparkReadOptions.READ_TIMESTAMP_WITHOUT_ZONE) + .map(Boolean::parseBoolean).orElse(false); } private Schema lazySchema() { @@ -197,6 +211,8 @@ private Expression filterExpression() { private StructType lazyType() { if (type == null) { + Preconditions.checkArgument(readTimestampWithoutZone || !hasTimestampWithoutZone(lazySchema()), + "Spark does not support timestamp without time zone fields"); this.type = SparkSchemaUtil.convert(lazySchema()); } return type; @@ -365,12 +381,20 @@ public boolean enableBatchRead() { boolean hasNoDeleteFiles = tasks().stream().noneMatch(TableScanUtil::hasDeletes); + boolean hasTimestampWithoutZone = hasTimestampWithoutZone(lazySchema()); + this.readUsingBatch = batchReadsEnabled && hasNoDeleteFiles && ((allOrcFileScanTasks && hasNoRowFilters) || - (allParquetFileScanTasks && atLeastOneColumn && onlyPrimitives)); + (allParquetFileScanTasks && atLeastOneColumn && onlyPrimitives && !hasTimestampWithoutZone)); } return readUsingBatch; } + private static boolean hasTimestampWithoutZone(Schema schema) { + return TypeUtil.find(schema, t -> + t.typeId().equals(Type.TypeID.TIMESTAMP) && !((Types.TimestampType) t).shouldAdjustToUTC() + ) != null; + } + private static void mergeIcebergHadoopConfs( Configuration baseConf, Map options) { options.keySet().stream() diff --git a/spark2/src/test/java/org/apache/iceberg/spark/source/TestTimestampWithoutZone24.java b/spark2/src/test/java/org/apache/iceberg/spark/source/TestTimestampWithoutZone24.java new file mode 100644 index 0000000000..84c7d1aeb7 --- /dev/null +++ b/spark2/src/test/java/org/apache/iceberg/spark/source/TestTimestampWithoutZone24.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.spark.source; + +public class TestTimestampWithoutZone24 extends TestTimestampWithoutZone { + public TestTimestampWithoutZone24(String format, boolean vectorized) { + super(format, vectorized); + } +} diff --git a/spark3/src/main/java/org/apache/iceberg/spark/source/SparkBatchScan.java b/spark3/src/main/java/org/apache/iceberg/spark/source/SparkBatchScan.java index b529faac64..df5e10dbf9 100644 --- a/spark3/src/main/java/org/apache/iceberg/spark/source/SparkBatchScan.java +++ b/spark3/src/main/java/org/apache/iceberg/spark/source/SparkBatchScan.java @@ -40,8 +40,13 @@ import org.apache.iceberg.hive.legacy.LegacyHiveTable; import org.apache.iceberg.io.FileIO; import org.apache.iceberg.orc.OrcRowFilterUtils; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.spark.Spark3Util; +import org.apache.iceberg.spark.SparkReadOptions; import org.apache.iceberg.spark.SparkSchemaUtil; +import org.apache.iceberg.types.Type; +import org.apache.iceberg.types.TypeUtil; +import org.apache.iceberg.types.Types; import org.apache.iceberg.util.PropertyUtil; import org.apache.iceberg.util.TableScanUtil; import org.apache.spark.broadcast.Broadcast; @@ -71,6 +76,7 @@ abstract class SparkBatchScan implements Scan, Batch, SupportsReportStatistics { private final Broadcast encryptionManager; private final boolean batchReadsEnabled; private final int batchSize; + private final boolean readTimestampWithoutZone; // lazy variables private StructType readSchema = null; @@ -87,6 +93,15 @@ abstract class SparkBatchScan implements Scan, Batch, SupportsReportStatistics { this.localityPreferred = Spark3Util.isLocalityEnabled(io.value(), table.location(), options); this.batchReadsEnabled = Spark3Util.isVectorizationEnabled(table.properties(), options); this.batchSize = Spark3Util.batchSize(table.properties(), options); + // Allow reading timestamp without time zone as timestamp with time zone. Generally, this is not safe as timestamp + // without time zone is supposed to represent wall clock time semantics, i.e. no matter the reader/writer timezone + // 3PM should always be read as 3PM, but timestamp with time zone represents instant semantics, i.e the timestamp + // is adjusted so that the corresponding time in the reader timezone is displayed. However, at LinkedIn, all readers + // and writers are in the UTC timezone as our production machines are set to UTC. So, timestamp with/without time + // zone is the same. + // When set to false (default), we throw an exception at runtime + // "Spark does not support timestamp without time zone fields" if reading timestamp without time zone fields + this.readTimestampWithoutZone = options.getBoolean(SparkReadOptions.READ_TIMESTAMP_WITHOUT_ZONE, false); } protected Table table() { @@ -115,6 +130,8 @@ public Batch toBatch() { @Override public StructType readSchema() { if (readSchema == null) { + Preconditions.checkArgument(readTimestampWithoutZone || !hasTimestampWithoutZone(expectedSchema), + "Spark does not support timestamp without time zone fields"); this.readSchema = SparkSchemaUtil.convert(expectedSchema); } return readSchema; @@ -165,12 +182,20 @@ public PartitionReaderFactory createReaderFactory() { boolean hasNoDeleteFiles = tasks().stream().noneMatch(TableScanUtil::hasDeletes); + boolean hasTimestampWithoutZone = hasTimestampWithoutZone(expectedSchema); + boolean readUsingBatch = batchReadsEnabled && hasNoDeleteFiles && ((allOrcFileScanTasks && hasNoRowFilters) || - (allParquetFileScanTasks && atLeastOneColumn && onlyPrimitives)); + (allParquetFileScanTasks && atLeastOneColumn && onlyPrimitives && !hasTimestampWithoutZone)); return new ReaderFactory(readUsingBatch ? batchSize : 0); } + private static boolean hasTimestampWithoutZone(Schema schema) { + return TypeUtil.find(schema, t -> + t.typeId().equals(Type.TypeID.TIMESTAMP) && !((Types.TimestampType) t).shouldAdjustToUTC() + ) != null; + } + @Override public Statistics estimateStatistics() { if (table instanceof LegacyHiveTable) { diff --git a/spark3/src/test/java/org/apache/iceberg/spark/source/TestTimestampWithoutZone3.java b/spark3/src/test/java/org/apache/iceberg/spark/source/TestTimestampWithoutZone3.java new file mode 100644 index 0000000000..4216aec027 --- /dev/null +++ b/spark3/src/test/java/org/apache/iceberg/spark/source/TestTimestampWithoutZone3.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.spark.source; + +public class TestTimestampWithoutZone3 extends TestTimestampWithoutZone { + public TestTimestampWithoutZone3(String format, boolean vectorized) { + super(format, vectorized); + } +}