Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,12 @@
import java.nio.ByteBuffer;
import java.sql.Date;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

import static org.apache.hudi.avro.HoodieAvroUtils.getNestedFieldSchemaFromWriteSchema;
import static org.apache.hudi.avro.HoodieAvroUtils.sanitizeName;
Expand Down Expand Up @@ -437,4 +439,21 @@ public void testSanitizeName() {
assertEquals("abcdef___", sanitizeName("abcdef_."));
assertEquals("__ab__cd__", sanitizeName("1ab*cd?"));
}

@Test
public void testGenerateProjectionSchema() {
Schema originalSchema = HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(EXAMPLE_SCHEMA));

Schema schema1 = HoodieAvroUtils.generateProjectionSchema(originalSchema, Arrays.asList("_row_key", "timestamp"));
assertEquals(2, schema1.getFields().size());
List<String> fieldNames1 = schema1.getFields().stream().map(Schema.Field::name).collect(Collectors.toList());
assertTrue(fieldNames1.contains("_row_key"));
assertTrue(fieldNames1.contains("timestamp"));

assertEquals("Field fake_field not found in log schema. Query cannot proceed! Derived Schema Fields: "
+ "[non_pii_col, _hoodie_commit_time, _row_key, _hoodie_partition_path, _hoodie_record_key, pii_col,"
+ " _hoodie_commit_seqno, _hoodie_file_name, timestamp]",
assertThrows(HoodieException.class, () ->
HoodieAvroUtils.generateProjectionSchema(originalSchema, Arrays.asList("_row_key", "timestamp", "fake_field"))).getMessage());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,19 @@

package org.apache.hudi.hadoop;

import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.common.util.collection.Pair;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.ql.io.IOConstants;
import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category;
import org.apache.hadoop.hive.serde2.typeinfo.ListTypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.MapTypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
import org.apache.hadoop.hive.serde2.typeinfo.UnionTypeInfo;
import org.apache.hadoop.util.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -35,6 +41,8 @@
import java.util.stream.Collectors;
import java.util.stream.IntStream;

import static org.apache.hadoop.hive.serde.serdeConstants.TIMESTAMP_TYPE_NAME;

/**
* Utility functions copied from Hive ColumnProjectionUtils.java.
* Needed to copy as we see NoSuchMethod errors when directly using these APIs with/without Spark.
Expand Down Expand Up @@ -112,22 +120,49 @@ public static List<Pair<String,String>> getIOColumnNameAndTypes(Configuration co
/**
* If schema contains timestamp columns, this method is used for compatibility when there is no timestamp fields.
*
* <p>We expect 3 cases to use parquet-avro reader {@link org.apache.hudi.hadoop.avro.HoodieAvroParquetReader} to read timestamp column:
*
* <ol>
* <li>Read columns contain timestamp type;</li>
* <li>Empty original columns;</li>
* <li>Empty read columns but existing original columns contain timestamp type.</li>
* </ol>
* <p>We expect to use parquet-avro reader {@link org.apache.hudi.hadoop.avro.HoodieAvroParquetReader} to read
* timestamp column when read columns contain timestamp type.
*/
public static boolean supportTimestamp(Configuration conf) {
List<String> readCols = Arrays.asList(getReadColumnNames(conf));
if (readCols.isEmpty()) {
return getIOColumnTypes(conf).contains("timestamp");
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@xicm Here I think it should return false directly, what do you think.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree with you, @cdmikechen do you have any other concern?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When the readCols can be empty ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When the readCols can be empty ?

As far as I know, such as count(*) which doesn't need to read any cols

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In such case, the timestamp can be read correctly anyway?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In such case, the timestamp can be read correctly anyway?

yes

return false;
}

String colTypes = conf.get(IOConstants.COLUMNS_TYPES, "");
if (colTypes == null || colTypes.isEmpty()) {
return false;
}

ArrayList<TypeInfo> types = TypeInfoUtils.getTypeInfosFromTypeString(colTypes);
List<String> names = getIOColumns(conf);
List<String> types = getIOColumnTypes(conf);
return types.isEmpty() || IntStream.range(0, names.size()).filter(i -> readCols.contains(names.get(i)))
.anyMatch(i -> types.get(i).equals("timestamp"));
return IntStream.range(0, names.size()).filter(i -> readCols.contains(names.get(i)))
.anyMatch(i -> typeContainsTimestamp(types.get(i)));
}

public static boolean typeContainsTimestamp(TypeInfo type) {
Category category = type.getCategory();

switch (category) {
case PRIMITIVE:
return type.getTypeName().equals(TIMESTAMP_TYPE_NAME);
case LIST:
ListTypeInfo listTypeInfo = (ListTypeInfo) type;
return typeContainsTimestamp(listTypeInfo.getListElementTypeInfo());
case MAP:
MapTypeInfo mapTypeInfo = (MapTypeInfo) type;
return typeContainsTimestamp(mapTypeInfo.getMapKeyTypeInfo())
|| typeContainsTimestamp(mapTypeInfo.getMapValueTypeInfo());
case STRUCT:
StructTypeInfo structTypeInfo = (StructTypeInfo) type;
return structTypeInfo.getAllStructFieldTypeInfos().stream()
.anyMatch(HoodieColumnProjectionUtils::typeContainsTimestamp);
case UNION:
UnionTypeInfo unionTypeInfo = (UnionTypeInfo) type;
return unionTypeInfo.getAllUnionObjectTypeInfos().stream()
.anyMatch(HoodieColumnProjectionUtils::typeContainsTimestamp);
default:
return false;
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@

package org.apache.hudi.hadoop.avro;

import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.hadoop.HoodieColumnProjectionUtils;
import org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils;

import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
Expand All @@ -26,8 +30,6 @@
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hudi.hadoop.HoodieColumnProjectionUtils;
import org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils;
import org.apache.parquet.avro.AvroReadSupport;
import org.apache.parquet.avro.AvroSchemaConverter;
import org.apache.parquet.format.converter.ParquetMetadataConverter;
Expand All @@ -40,7 +42,6 @@
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;

import static org.apache.parquet.hadoop.ParquetInputFormat.getFilter;

Expand All @@ -50,24 +51,20 @@ public class HoodieAvroParquetReader extends RecordReader<Void, ArrayWritable> {
private Schema baseSchema;
Copy link
Contributor

@cdmikechen cdmikechen Jun 14, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In my origin PR HUDI-83 I didn't declare the baseSchema variable and didn't modify the getCurrentValue method.
In fact I would like to know if there is any problem or no NPE if we don't declare the baseSchema?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In my origin PR HUDI-83 I didn't declare the baseSchema variable and didn't modify the getCurrentValue method. In fact I would like to know if there is any problem or no NPE if we don't declare the baseSchema?

I have tested that baseSchema need to be used in getCurrentValue, otherwise, the result field will be null, like this #7173 (comment)

Copy link
Contributor

@cdmikechen cdmikechen Jun 14, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Zouxxyy
I'm having some confusion, I remember doing some situation testing against Hive when I first made the changes (about 1 year ago), including count(*) or specified fields.
I don't know if some subsequent new FEATURE or PR has affected this, I think I'll do another test later this week. Although we have added a separate class to handle timestamp types, my original intention was to use Hive or Hadoop origin method as much as possible for other fields, otherwise it would be costly for us to maintain subsequently.

Copy link
Contributor Author

@Zouxxyy Zouxxyy Jun 15, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cdmikechen Have you ever tested select id, ts1 from test_ts_1? will return null if don't use baseSchema
Below is my full test, feel free to try

-- spark-sql
create table test_ts_1(
  id int, 
  ts1 timestamp)
using hudi
tblproperties(
  type='mor', 
  primaryKey='id'
);

INSERT INTO test_ts_1
SELECT 1,
cast ('2021-12-25 12:01:01' as timestamp);

create table test_ts_2(
  id int, 
  ts1 array<timestamp>, 
  ts2 map<string, timestamp>, 
  ts3 struct<province:timestamp, city:string>)
using hudi
tblproperties(
  type='mor', 
  primaryKey='id'
);

INSERT INTO test_ts_2
SELECT 1,
array(cast ('2021-12-25 12:01:01' as timestamp)),
map('key', cast ('2021-12-25 12:01:01' as timestamp)),
struct(cast ('2021-12-25 12:01:01' as timestamp), 'test');

-- hive
select * from test_ts_1;
select id from test_ts_1;
select ts1 from test_ts_1;
select id, ts1 from test_ts_1;
select count(*) from test_ts_1;

select * from test_ts_2;
select id from test_ts_2;
select ts1 from test_ts_2;
select id, ts1 from test_ts_2;
select count(*) from test_ts_2;

CC @danny0405 @xicm


public HoodieAvroParquetReader(InputSplit inputSplit, Configuration conf) throws IOException {
// get base schema
ParquetMetadata fileFooter =
ParquetFileReader.readFooter(conf, ((ParquetInputSplit) inputSplit).getPath(), ParquetMetadataConverter.NO_FILTER);
MessageType messageType = fileFooter.getFileMetaData().getSchema();
baseSchema = new AvroSchemaConverter(conf).convert(messageType);

// if exists read columns, we need to filter columns.
List<String> readColNames = Arrays.asList(HoodieColumnProjectionUtils.getReadColumnNames(conf));
if (!readColNames.isEmpty()) {
// get base schema
ParquetMetadata fileFooter =
ParquetFileReader.readFooter(conf, ((ParquetInputSplit) inputSplit).getPath(), ParquetMetadataConverter.NO_FILTER);
MessageType messageType = fileFooter.getFileMetaData().getSchema();
baseSchema = new AvroSchemaConverter(conf).convert(messageType);
// filter schema for reading
final Schema filterSchema = Schema.createRecord(baseSchema.getName(),
baseSchema.getDoc(), baseSchema.getNamespace(), baseSchema.isError(),
baseSchema.getFields().stream()
.filter(f -> readColNames.contains(f.name()))
.map(f -> new Schema.Field(f.name(), f.schema(), f.doc(), f.defaultVal()))
.collect(Collectors.toList()));
Schema filterSchema = HoodieAvroUtils.generateProjectionSchema(baseSchema, readColNames);
AvroReadSupport.setAvroReadSchema(conf, filterSchema);
AvroReadSupport.setRequestedProjection(conf, filterSchema);
}

parquetRecordReader = new ParquetRecordReader<>(new AvroReadSupport<>(), getFilter(conf));
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hudi.hadoop;

import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
import org.junit.jupiter.api.Test;

import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;

public class TestHoodieColumnProjectionUtils {

@Test
void testTypeContainsTimestamp() {
String col1 = "timestamp";
TypeInfo typeInfo1 = TypeInfoUtils.getTypeInfosFromTypeString(col1).get(0);
assertTrue(HoodieColumnProjectionUtils.typeContainsTimestamp(typeInfo1));

String col2 = "string";
TypeInfo typeInfo2 = TypeInfoUtils.getTypeInfosFromTypeString(col2).get(0);
assertFalse(HoodieColumnProjectionUtils.typeContainsTimestamp(typeInfo2));

String col3 = "array<timestamp>";
TypeInfo typeInfo3 = TypeInfoUtils.getTypeInfosFromTypeString(col3).get(0);
assertTrue(HoodieColumnProjectionUtils.typeContainsTimestamp(typeInfo3));

String col4 = "array<string>";
TypeInfo typeInfo4 = TypeInfoUtils.getTypeInfosFromTypeString(col4).get(0);
assertFalse(HoodieColumnProjectionUtils.typeContainsTimestamp(typeInfo4));

String col5 = "map<string,timestamp>";
TypeInfo typeInfo5 = TypeInfoUtils.getTypeInfosFromTypeString(col5).get(0);
assertTrue(HoodieColumnProjectionUtils.typeContainsTimestamp(typeInfo5));

String col6 = "map<string,string>";
TypeInfo typeInfo6 = TypeInfoUtils.getTypeInfosFromTypeString(col6).get(0);
assertFalse(HoodieColumnProjectionUtils.typeContainsTimestamp(typeInfo6));

String col7 = "struct<name1:string,name2:timestamp>";
TypeInfo typeInfo7 = TypeInfoUtils.getTypeInfosFromTypeString(col7).get(0);
assertTrue(HoodieColumnProjectionUtils.typeContainsTimestamp(typeInfo7));

String col8 = "struct<name1:string,name2:string>";
TypeInfo typeInfo8 = TypeInfoUtils.getTypeInfosFromTypeString(col8).get(0);
assertFalse(HoodieColumnProjectionUtils.typeContainsTimestamp(typeInfo8));

String col9 = "uniontype<string,timestamp>";
TypeInfo typeInfo9 = TypeInfoUtils.getTypeInfosFromTypeString(col9).get(0);
assertTrue(HoodieColumnProjectionUtils.typeContainsTimestamp(typeInfo9));

String col10 = "uniontype<string,int>";
TypeInfo typeInfo10 = TypeInfoUtils.getTypeInfosFromTypeString(col10).get(0);
assertFalse(HoodieColumnProjectionUtils.typeContainsTimestamp(typeInfo10));

String col11 = "uniontype<string,int,map<string,array<timestamp>>>";
TypeInfo typeInfo11 = TypeInfoUtils.getTypeInfosFromTypeString(col11).get(0);
assertTrue(HoodieColumnProjectionUtils.typeContainsTimestamp(typeInfo11));
}
}