diff --git a/hudi-common/src/test/java/org/apache/hudi/avro/TestHoodieAvroUtils.java b/hudi-common/src/test/java/org/apache/hudi/avro/TestHoodieAvroUtils.java index 4b3c6a349857a..d825080bf737e 100644 --- a/hudi-common/src/test/java/org/apache/hudi/avro/TestHoodieAvroUtils.java +++ b/hudi-common/src/test/java/org/apache/hudi/avro/TestHoodieAvroUtils.java @@ -36,10 +36,12 @@ import java.nio.ByteBuffer; import java.sql.Date; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.stream.Collectors; import static org.apache.hudi.avro.HoodieAvroUtils.getNestedFieldSchemaFromWriteSchema; import static org.apache.hudi.avro.HoodieAvroUtils.sanitizeName; @@ -437,4 +439,21 @@ public void testSanitizeName() { assertEquals("abcdef___", sanitizeName("abcdef_.")); assertEquals("__ab__cd__", sanitizeName("1ab*cd?")); } + + @Test + public void testGenerateProjectionSchema() { + Schema originalSchema = HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(EXAMPLE_SCHEMA)); + + Schema schema1 = HoodieAvroUtils.generateProjectionSchema(originalSchema, Arrays.asList("_row_key", "timestamp")); + assertEquals(2, schema1.getFields().size()); + List fieldNames1 = schema1.getFields().stream().map(Schema.Field::name).collect(Collectors.toList()); + assertTrue(fieldNames1.contains("_row_key")); + assertTrue(fieldNames1.contains("timestamp")); + + assertEquals("Field fake_field not found in log schema. Query cannot proceed! Derived Schema Fields: " + + "[non_pii_col, _hoodie_commit_time, _row_key, _hoodie_partition_path, _hoodie_record_key, pii_col," + + " _hoodie_commit_seqno, _hoodie_file_name, timestamp]", + assertThrows(HoodieException.class, () -> + HoodieAvroUtils.generateProjectionSchema(originalSchema, Arrays.asList("_row_key", "timestamp", "fake_field"))).getMessage()); + } } diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieColumnProjectionUtils.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieColumnProjectionUtils.java index 1f08fcde97022..8971220d1330b 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieColumnProjectionUtils.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieColumnProjectionUtils.java @@ -18,13 +18,19 @@ package org.apache.hudi.hadoop; -import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; import org.apache.hudi.common.util.ValidationUtils; import org.apache.hudi.common.util.collection.Pair; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.ql.io.IOConstants; import org.apache.hadoop.hive.serde2.ColumnProjectionUtils; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category; +import org.apache.hadoop.hive.serde2.typeinfo.ListTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.MapTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; +import org.apache.hadoop.hive.serde2.typeinfo.UnionTypeInfo; import org.apache.hadoop.util.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -35,6 +41,8 @@ import java.util.stream.Collectors; import java.util.stream.IntStream; +import static org.apache.hadoop.hive.serde.serdeConstants.TIMESTAMP_TYPE_NAME; + /** * Utility functions copied from Hive ColumnProjectionUtils.java. * Needed to copy as we see NoSuchMethod errors when directly using these APIs with/without Spark. @@ -112,22 +120,49 @@ public static List> getIOColumnNameAndTypes(Configuration co /** * If schema contains timestamp columns, this method is used for compatibility when there is no timestamp fields. * - *

We expect 3 cases to use parquet-avro reader {@link org.apache.hudi.hadoop.avro.HoodieAvroParquetReader} to read timestamp column: - * - *

    - *
  1. Read columns contain timestamp type;
  2. - *
  3. Empty original columns;
  4. - *
  5. Empty read columns but existing original columns contain timestamp type.
  6. - *
+ *

We expect to use parquet-avro reader {@link org.apache.hudi.hadoop.avro.HoodieAvroParquetReader} to read + * timestamp column when read columns contain timestamp type. */ public static boolean supportTimestamp(Configuration conf) { List readCols = Arrays.asList(getReadColumnNames(conf)); if (readCols.isEmpty()) { - return getIOColumnTypes(conf).contains("timestamp"); + return false; + } + + String colTypes = conf.get(IOConstants.COLUMNS_TYPES, ""); + if (colTypes == null || colTypes.isEmpty()) { + return false; } + + ArrayList types = TypeInfoUtils.getTypeInfosFromTypeString(colTypes); List names = getIOColumns(conf); - List types = getIOColumnTypes(conf); - return types.isEmpty() || IntStream.range(0, names.size()).filter(i -> readCols.contains(names.get(i))) - .anyMatch(i -> types.get(i).equals("timestamp")); + return IntStream.range(0, names.size()).filter(i -> readCols.contains(names.get(i))) + .anyMatch(i -> typeContainsTimestamp(types.get(i))); + } + + public static boolean typeContainsTimestamp(TypeInfo type) { + Category category = type.getCategory(); + + switch (category) { + case PRIMITIVE: + return type.getTypeName().equals(TIMESTAMP_TYPE_NAME); + case LIST: + ListTypeInfo listTypeInfo = (ListTypeInfo) type; + return typeContainsTimestamp(listTypeInfo.getListElementTypeInfo()); + case MAP: + MapTypeInfo mapTypeInfo = (MapTypeInfo) type; + return typeContainsTimestamp(mapTypeInfo.getMapKeyTypeInfo()) + || typeContainsTimestamp(mapTypeInfo.getMapValueTypeInfo()); + case STRUCT: + StructTypeInfo structTypeInfo = (StructTypeInfo) type; + return structTypeInfo.getAllStructFieldTypeInfos().stream() + .anyMatch(HoodieColumnProjectionUtils::typeContainsTimestamp); + case UNION: + UnionTypeInfo unionTypeInfo = (UnionTypeInfo) type; + return unionTypeInfo.getAllUnionObjectTypeInfos().stream() + .anyMatch(HoodieColumnProjectionUtils::typeContainsTimestamp); + default: + return false; + } } -} \ No newline at end of file +} diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/avro/HoodieAvroParquetReader.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/avro/HoodieAvroParquetReader.java index 603359c3efd0b..c31041ddc76b0 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/avro/HoodieAvroParquetReader.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/avro/HoodieAvroParquetReader.java @@ -18,6 +18,10 @@ package org.apache.hudi.hadoop.avro; +import org.apache.hudi.avro.HoodieAvroUtils; +import org.apache.hudi.hadoop.HoodieColumnProjectionUtils; +import org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils; + import org.apache.avro.Schema; import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericRecord; @@ -26,8 +30,6 @@ import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; -import org.apache.hudi.hadoop.HoodieColumnProjectionUtils; -import org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils; import org.apache.parquet.avro.AvroReadSupport; import org.apache.parquet.avro.AvroSchemaConverter; import org.apache.parquet.format.converter.ParquetMetadataConverter; @@ -40,7 +42,6 @@ import java.io.IOException; import java.util.Arrays; import java.util.List; -import java.util.stream.Collectors; import static org.apache.parquet.hadoop.ParquetInputFormat.getFilter; @@ -50,24 +51,20 @@ public class HoodieAvroParquetReader extends RecordReader { private Schema baseSchema; public HoodieAvroParquetReader(InputSplit inputSplit, Configuration conf) throws IOException { + // get base schema + ParquetMetadata fileFooter = + ParquetFileReader.readFooter(conf, ((ParquetInputSplit) inputSplit).getPath(), ParquetMetadataConverter.NO_FILTER); + MessageType messageType = fileFooter.getFileMetaData().getSchema(); + baseSchema = new AvroSchemaConverter(conf).convert(messageType); + // if exists read columns, we need to filter columns. List readColNames = Arrays.asList(HoodieColumnProjectionUtils.getReadColumnNames(conf)); if (!readColNames.isEmpty()) { - // get base schema - ParquetMetadata fileFooter = - ParquetFileReader.readFooter(conf, ((ParquetInputSplit) inputSplit).getPath(), ParquetMetadataConverter.NO_FILTER); - MessageType messageType = fileFooter.getFileMetaData().getSchema(); - baseSchema = new AvroSchemaConverter(conf).convert(messageType); - // filter schema for reading - final Schema filterSchema = Schema.createRecord(baseSchema.getName(), - baseSchema.getDoc(), baseSchema.getNamespace(), baseSchema.isError(), - baseSchema.getFields().stream() - .filter(f -> readColNames.contains(f.name())) - .map(f -> new Schema.Field(f.name(), f.schema(), f.doc(), f.defaultVal())) - .collect(Collectors.toList())); + Schema filterSchema = HoodieAvroUtils.generateProjectionSchema(baseSchema, readColNames); AvroReadSupport.setAvroReadSchema(conf, filterSchema); AvroReadSupport.setRequestedProjection(conf, filterSchema); } + parquetRecordReader = new ParquetRecordReader<>(new AvroReadSupport<>(), getFilter(conf)); } diff --git a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/TestHoodieColumnProjectionUtils.java b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/TestHoodieColumnProjectionUtils.java new file mode 100644 index 0000000000000..e7467acef5d0a --- /dev/null +++ b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/TestHoodieColumnProjectionUtils.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.hadoop; + +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class TestHoodieColumnProjectionUtils { + + @Test + void testTypeContainsTimestamp() { + String col1 = "timestamp"; + TypeInfo typeInfo1 = TypeInfoUtils.getTypeInfosFromTypeString(col1).get(0); + assertTrue(HoodieColumnProjectionUtils.typeContainsTimestamp(typeInfo1)); + + String col2 = "string"; + TypeInfo typeInfo2 = TypeInfoUtils.getTypeInfosFromTypeString(col2).get(0); + assertFalse(HoodieColumnProjectionUtils.typeContainsTimestamp(typeInfo2)); + + String col3 = "array"; + TypeInfo typeInfo3 = TypeInfoUtils.getTypeInfosFromTypeString(col3).get(0); + assertTrue(HoodieColumnProjectionUtils.typeContainsTimestamp(typeInfo3)); + + String col4 = "array"; + TypeInfo typeInfo4 = TypeInfoUtils.getTypeInfosFromTypeString(col4).get(0); + assertFalse(HoodieColumnProjectionUtils.typeContainsTimestamp(typeInfo4)); + + String col5 = "map"; + TypeInfo typeInfo5 = TypeInfoUtils.getTypeInfosFromTypeString(col5).get(0); + assertTrue(HoodieColumnProjectionUtils.typeContainsTimestamp(typeInfo5)); + + String col6 = "map"; + TypeInfo typeInfo6 = TypeInfoUtils.getTypeInfosFromTypeString(col6).get(0); + assertFalse(HoodieColumnProjectionUtils.typeContainsTimestamp(typeInfo6)); + + String col7 = "struct"; + TypeInfo typeInfo7 = TypeInfoUtils.getTypeInfosFromTypeString(col7).get(0); + assertTrue(HoodieColumnProjectionUtils.typeContainsTimestamp(typeInfo7)); + + String col8 = "struct"; + TypeInfo typeInfo8 = TypeInfoUtils.getTypeInfosFromTypeString(col8).get(0); + assertFalse(HoodieColumnProjectionUtils.typeContainsTimestamp(typeInfo8)); + + String col9 = "uniontype"; + TypeInfo typeInfo9 = TypeInfoUtils.getTypeInfosFromTypeString(col9).get(0); + assertTrue(HoodieColumnProjectionUtils.typeContainsTimestamp(typeInfo9)); + + String col10 = "uniontype"; + TypeInfo typeInfo10 = TypeInfoUtils.getTypeInfosFromTypeString(col10).get(0); + assertFalse(HoodieColumnProjectionUtils.typeContainsTimestamp(typeInfo10)); + + String col11 = "uniontype>>"; + TypeInfo typeInfo11 = TypeInfoUtils.getTypeInfosFromTypeString(col11).get(0); + assertTrue(HoodieColumnProjectionUtils.typeContainsTimestamp(typeInfo11)); + } +}