diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieMergeOnReadTestUtils.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieMergeOnReadTestUtils.java index eebfe56b890b4..2ef3923176d89 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieMergeOnReadTestUtils.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieMergeOnReadTestUtils.java @@ -211,7 +211,7 @@ private static void setPropsForInputFormat(FileInputFormat inputFormat, JobConf conf.set(hive_metastoreConstants.META_TABLE_PARTITION_COLUMNS, "datestr"); conf.set(hive_metastoreConstants.META_TABLE_COLUMN_TYPES, hiveColumnTypesWithDatestr); conf.set(IOConstants.COLUMNS, hiveColumnNames); - conf.get(IOConstants.COLUMNS_TYPES, hiveColumnTypesWithDatestr); + conf.set(IOConstants.COLUMNS_TYPES, hiveColumnTypesWithDatestr); // Hoodie Input formats are also configurable Configurable configurable = (Configurable)inputFormat; diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java index 8706c5960ac29..8ba625c2aff93 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java @@ -831,8 +831,9 @@ private FlinkOptions() { .key("hive_sync.support_timestamp") .booleanType() .defaultValue(true) - .withDescription("INT64 with original type TIMESTAMP_MICROS is converted to hive timestamp type.\n" - + "Disabled by default for backward compatibility."); + .withDescription("'INT64' with original type TIMESTAMP_MICROS is converted to hive 'timestamp' type. " + + "From 0.13.0, 'timestamp' type will be supported and also can be disabled by this variable. " + + "Previous versions keep being disabled by default."); public static final ConfigOption HIVE_SYNC_TABLE_PROPERTIES = ConfigOptions .key("hive_sync.table_properties") diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/streamer/FlinkStreamerConfig.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/streamer/FlinkStreamerConfig.java index 345c9cde6cd35..f5a22f0ec3a3b 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/streamer/FlinkStreamerConfig.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/streamer/FlinkStreamerConfig.java @@ -340,8 +340,9 @@ public class FlinkStreamerConfig extends Configuration { public Boolean hiveSyncSkipRoSuffix = false; @Parameter(names = {"--hive-sync-support-timestamp"}, description = "INT64 with original type TIMESTAMP_MICROS is converted to hive timestamp type.\n" - + "Disabled by default for backward compatibility.") - public Boolean hiveSyncSupportTimestamp = false; + + "From 0.13.0, 'timestamp' type will be supported and also can be disabled by this variable. " + + "Previous versions keep being disabled by default.") + public Boolean hiveSyncSupportTimestamp = true; /** diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieColumnProjectionUtils.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieColumnProjectionUtils.java index 9ca99c41888b1..3d9f7a35a6c41 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieColumnProjectionUtils.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieColumnProjectionUtils.java @@ -109,4 +109,22 @@ public static List> getIOColumnNameAndTypes(Configuration co .collect(Collectors.toList()); } + /** + * if schema contains timestamp columns, this method is used for compatibility when there is no timestamp fields + * We expect 3 cases to use parquet-avro reader to read timestamp column: + * 1. read columns contain timestamp type + * 2. no read columns and exists original columns contain timestamp type + * 3. no read columns and no original columns, but avro schema contains type + */ + public static boolean supportTimestamp(Configuration conf) { + List reads = Arrays.asList(getReadColumnNames(conf)); + if (reads.isEmpty()) { + return getIOColumnTypes(conf).contains("timestamp"); + } + List names = getIOColumns(conf); + List types = getIOColumnTypes(conf); + return types.isEmpty() || IntStream.range(0, names.size()).filter(i -> reads.contains(names.get(i))) + .anyMatch(i -> types.get(i).equals("timestamp")); + } + } \ No newline at end of file diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieParquetInputFormat.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieParquetInputFormat.java index c168924e65fe0..cbd009669aba4 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieParquetInputFormat.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieParquetInputFormat.java @@ -18,6 +18,7 @@ package org.apache.hudi.hadoop; +import org.apache.hadoop.hive.ql.io.parquet.read.ParquetRecordReaderWrapper; import org.apache.hadoop.hive.ql.io.sarg.ConvertAstToSearchArg; import org.apache.hadoop.hive.ql.plan.TableScanDesc; import org.apache.hadoop.io.ArrayWritable; @@ -29,11 +30,16 @@ import org.apache.hadoop.mapred.Reporter; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.util.collection.Pair; +import org.apache.hudi.exception.HoodieException; +import org.apache.hudi.hadoop.avro.HoodieTimestampAwareParquetInputFormat; import org.apache.hudi.hadoop.utils.HoodieHiveUtils; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; +import org.apache.parquet.hadoop.ParquetInputFormat; import java.io.IOException; +import java.lang.reflect.Constructor; +import java.util.Arrays; import java.util.List; import java.util.stream.Collectors; import java.util.stream.IntStream; @@ -49,12 +55,36 @@ public class HoodieParquetInputFormat extends HoodieParquetInputFormatBase { private static final Logger LOG = LogManager.getLogger(HoodieParquetInputFormat.class); + private boolean supportAvroRead = false; + public HoodieParquetInputFormat() { super(new HoodieCopyOnWriteTableInputFormat()); + initAvroInputFormat(); } protected HoodieParquetInputFormat(HoodieCopyOnWriteTableInputFormat delegate) { super(delegate); + initAvroInputFormat(); + } + + /** + * Spark2 use `parquet.hadoopParquetInputFormat` in `com.twitter:parquet-hadoop-bundle`. + * So that we need to distinguish the constructions of classes with + * `parquet.hadoopParquetInputFormat` or `org.apache.parquet.hadoop.ParquetInputFormat`. + * If we use `org.apache.parquet:parquet-hadoop`, we can use `HudiAvroParquetInputFormat` + * in Hive or Spark3 to get timestamp with correct type. + */ + private void initAvroInputFormat() { + try { + Constructor[] constructors = ParquetRecordReaderWrapper.class.getConstructors(); + if (Arrays.stream(constructors) + .anyMatch(c -> c.getParameterCount() > 0 && c.getParameterTypes()[0] + .getName().equals(ParquetInputFormat.class.getName()))) { + supportAvroRead = true; + } + } catch (SecurityException e) { + throw new HoodieException("Failed to check if support avro reader: " + e.getMessage(), e); + } } @Override @@ -89,7 +119,15 @@ public RecordReader getRecordReader(final InputSpli private RecordReader getRecordReaderInternal(InputSplit split, JobConf job, Reporter reporter) throws IOException { - return super.getRecordReader(split, job, reporter); + try { + if (supportAvroRead && HoodieColumnProjectionUtils.supportTimestamp(job)) { + return new ParquetRecordReaderWrapper(new HoodieTimestampAwareParquetInputFormat(), split, job, reporter); + } else { + return super.getRecordReader(split, job, reporter); + } + } catch (final InterruptedException | IOException e) { + throw new RuntimeException("Cannot create a RecordReaderWrapper", e); + } } private RecordReader createBootstrappingRecordReader(InputSplit split, diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/avro/HoodieAvroParquetReader.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/avro/HoodieAvroParquetReader.java new file mode 100644 index 0000000000000..b12cae7736d05 --- /dev/null +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/avro/HoodieAvroParquetReader.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.hadoop.avro; + +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericRecord; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.ArrayWritable; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hudi.hadoop.HoodieColumnProjectionUtils; +import org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils; +import org.apache.parquet.avro.AvroReadSupport; +import org.apache.parquet.avro.AvroSchemaConverter; +import org.apache.parquet.format.converter.ParquetMetadataConverter; +import org.apache.parquet.hadoop.ParquetFileReader; +import org.apache.parquet.hadoop.ParquetInputSplit; +import org.apache.parquet.hadoop.ParquetRecordReader; +import org.apache.parquet.hadoop.metadata.ParquetMetadata; +import org.apache.parquet.schema.MessageType; + +import java.io.IOException; +import java.util.Arrays; +import java.util.List; +import java.util.stream.Collectors; + +import static org.apache.parquet.hadoop.ParquetInputFormat.getFilter; + +public class HoodieAvroParquetReader extends RecordReader { + + private final ParquetRecordReader parquetRecordReader; + + public HoodieAvroParquetReader(InputSplit inputSplit, Configuration conf) throws IOException { + AvroReadSupport avroReadSupport = new AvroReadSupport<>(); + // if exists read columns, we need to filter columns. + List readColNames = Arrays.asList(HoodieColumnProjectionUtils.getReadColumnNames(conf)); + if (!readColNames.isEmpty()) { + // get base schema + ParquetMetadata fileFooter = + ParquetFileReader.readFooter(conf, ((ParquetInputSplit) inputSplit).getPath(), ParquetMetadataConverter.NO_FILTER); + MessageType messageType = fileFooter.getFileMetaData().getSchema(); + Schema baseSchema = new AvroSchemaConverter(conf).convert(messageType); + // filter schema for reading + final Schema filterSchema = Schema.createRecord(baseSchema.getName(), + baseSchema.getDoc(), baseSchema.getNamespace(), baseSchema.isError(), + baseSchema.getFields().stream() + .filter(f -> readColNames.contains(f.name())) + .map(f -> new Schema.Field(f.name(), f.schema(), f.doc(), f.defaultVal())) + .collect(Collectors.toList())); + avroReadSupport.setAvroReadSchema(conf, filterSchema); + avroReadSupport.setRequestedProjection(conf, filterSchema); + } + parquetRecordReader = new ParquetRecordReader<>(avroReadSupport, getFilter(conf)); + } + + @Override + public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException { + parquetRecordReader.initialize(split, context); + } + + @Override + public boolean nextKeyValue() throws IOException, InterruptedException { + return parquetRecordReader.nextKeyValue(); + } + + @Override + public Void getCurrentKey() throws IOException, InterruptedException { + return parquetRecordReader.getCurrentKey(); + } + + @Override + public ArrayWritable getCurrentValue() throws IOException, InterruptedException { + GenericRecord record = parquetRecordReader.getCurrentValue(); + return (ArrayWritable) HoodieRealtimeRecordReaderUtils.avroToArrayWritable(record, record.getSchema(), true); + } + + @Override + public float getProgress() throws IOException, InterruptedException { + return parquetRecordReader.getProgress(); + } + + @Override + public void close() throws IOException { + parquetRecordReader.close(); + } +} diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/avro/HoodieTimestampAwareParquetInputFormat.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/avro/HoodieTimestampAwareParquetInputFormat.java new file mode 100644 index 0000000000000..4f4c96d0ec857 --- /dev/null +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/avro/HoodieTimestampAwareParquetInputFormat.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.hadoop.avro; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.ArrayWritable; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.parquet.hadoop.ParquetInputFormat; +import org.apache.parquet.hadoop.util.ContextUtil; + +import java.io.IOException; + +/** + * To resolve issue Fix Timestamp/Date type read by Hive3, + * we need to handle timestamp types separately based on the parquet-avro approach + */ +public class HoodieTimestampAwareParquetInputFormat extends ParquetInputFormat { + + @Override + public RecordReader createRecordReader( + InputSplit inputSplit, + TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException { + Configuration conf = ContextUtil.getConfiguration(taskAttemptContext); + return new HoodieAvroParquetReader(inputSplit, conf); + } +} diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/AbstractRealtimeRecordReader.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/AbstractRealtimeRecordReader.java index 1dd9fc4cfd5a1..320bd33650911 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/AbstractRealtimeRecordReader.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/AbstractRealtimeRecordReader.java @@ -25,6 +25,7 @@ import org.apache.hudi.common.table.TableSchemaResolver; import org.apache.hudi.common.util.Option; import org.apache.hudi.exception.HoodieException; +import org.apache.hudi.hadoop.HoodieColumnProjectionUtils; import org.apache.hudi.hadoop.SchemaEvolutionContext; import org.apache.hudi.hadoop.utils.HiveAvroSerializer; import org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils; @@ -72,6 +73,8 @@ public abstract class AbstractRealtimeRecordReader { protected boolean supportPayload = true; // handle hive type to avro record protected HiveAvroSerializer serializer; + // support timestamp + private final boolean supportTimestamp; public AbstractRealtimeRecordReader(RealtimeSplit split, JobConf job) { this.split = split; @@ -80,6 +83,8 @@ public AbstractRealtimeRecordReader(RealtimeSplit split, JobConf job) { LOG.info("columnIds ==> " + job.get(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR)); LOG.info("partitioningColumns ==> " + job.get(hive_metastoreConstants.META_TABLE_PARTITION_COLUMNS, "")); this.supportPayload = Boolean.parseBoolean(job.get("hoodie.support.payload", "true")); + // get timestamp columns + supportTimestamp = HoodieColumnProjectionUtils.supportTimestamp(jobConf); try { metaClient = HoodieTableMetaClient.builder().setConf(jobConf).setBasePath(split.getBasePath()).build(); if (metaClient.getTableConfig().getPreCombineField() != null) { @@ -219,4 +224,8 @@ public void setWriterSchema(Schema writerSchema) { public void setHiveSchema(Schema hiveSchema) { this.hiveSchema = hiveSchema; } + + public boolean isSupportTimestamp() { + return supportTimestamp; + } } diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeCompactedRecordReader.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeCompactedRecordReader.java index 356107dd82ca6..49471e72971b5 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeCompactedRecordReader.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeCompactedRecordReader.java @@ -152,7 +152,7 @@ private void setUpWritable(Option rec, ArrayWritable arrayWritabl // we assume, a later safe record in the log, is newer than what we have in the map & // replace it. Since we want to return an arrayWritable which is the same length as the elements in the latest // schema, we use writerSchema to create the arrayWritable from the latest generic record - ArrayWritable aWritable = (ArrayWritable) HoodieRealtimeRecordReaderUtils.avroToArrayWritable(recordToReturn, getHiveSchema()); + ArrayWritable aWritable = (ArrayWritable) HoodieRealtimeRecordReaderUtils.avroToArrayWritable(recordToReturn, getHiveSchema(), isSupportTimestamp()); Writable[] replaceValue = aWritable.get(); if (LOG.isDebugEnabled()) { LOG.debug(String.format("key %s, base values: %s, log values: %s", key, HoodieRealtimeRecordReaderUtils.arrayWritableToString(arrayWritable), diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeUnmergedRecordReader.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeUnmergedRecordReader.java index e21e266b7229f..698db20fc6a9a 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeUnmergedRecordReader.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeUnmergedRecordReader.java @@ -106,7 +106,7 @@ private List> getParallelProducers( scannerBuilder.withLogRecordScannerCallback(record -> { // convert Hoodie log record to Hadoop AvroWritable and buffer GenericRecord rec = (GenericRecord) record.getData().getInsertValue(getReaderSchema(), payloadProps).get(); - ArrayWritable aWritable = (ArrayWritable) HoodieRealtimeRecordReaderUtils.avroToArrayWritable(rec, getHiveSchema()); + ArrayWritable aWritable = (ArrayWritable) HoodieRealtimeRecordReaderUtils.avroToArrayWritable(rec, getHiveSchema(), isSupportTimestamp()); queue.insertRecord(aWritable); }) .build(); diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieHiveUtils.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieHiveUtils.java index cbfd197f43897..21b6add760d59 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieHiveUtils.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieHiveUtils.java @@ -20,18 +20,28 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.serde2.io.DateWritable; +import org.apache.hadoop.hive.serde2.io.TimestampWritable; +import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapreduce.JobContext; import org.apache.hudi.common.util.CollectionUtils; import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.collection.ImmutableTriple; +import org.apache.hudi.exception.HoodieException; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; +import java.lang.reflect.Constructor; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.sql.Timestamp; import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Set; +import java.util.function.Supplier; import java.util.regex.Matcher; import java.util.regex.Pattern; import java.util.stream.Collectors; @@ -146,4 +156,101 @@ public static List getIncrementalTableNames(JobContext job) { public static boolean isIncrementalUseDatabase(Configuration conf) { return conf.getBoolean(HOODIE_INCREMENTAL_USE_DATABASE, false); } + + public static final String HIVE_TIMESTAMP_TYPE_CLASS = "org.apache.hadoop.hive.common.type.Timestamp"; + public static final String TIMESTAMP_WRITEABLE_V2_CLASS = "org.apache.hadoop.hive.serde2.io.TimestampWritableV2"; + public static final boolean SUPPORT_TIMESTAMP_WRITEABLE_V2; + private static final Class TIMESTAMP_CLASS; + private static final Method SET_TIME_IN_MILLIS; + private static final Constructor TIMESTAMP_WRITEABLE_V2_CONSTRUCTOR; + + public static final String DATE_WRITEABLE_V2_CLASS = "org.apache.hadoop.hive.serde2.io.DateWritableV2"; + public static final boolean SUPPORT_DATE_WRITEABLE_V2; + private static final Constructor DATE_WRITEABLE_V2_CONSTRUCTOR; + + static { + // timestamp + Supplier> timestampSupplier = () -> { + try { + Class timestampClass = Class.forName(HIVE_TIMESTAMP_TYPE_CLASS); + Method setTimeInMillis = timestampClass.getDeclaredMethod("setTimeInMillis", long.class); + Class twV2Class = Class.forName(TIMESTAMP_WRITEABLE_V2_CLASS); + return ImmutableTriple.of(timestampClass, setTimeInMillis, twV2Class.getConstructor(timestampClass)); + } catch (ClassNotFoundException | NoSuchMethodException e) { + LOG.trace("can not find hive3 timestampv2 class or method, use hive2 class!", e); + return null; + } + }; + Option> timestampTriple = Option.ofNullable(timestampSupplier.get()); + SUPPORT_TIMESTAMP_WRITEABLE_V2 = timestampTriple.isPresent(); + if (SUPPORT_TIMESTAMP_WRITEABLE_V2) { + LOG.trace("use org.apache.hadoop.hive.serde2.io.TimestampWritableV2 to read hudi timestamp columns"); + ImmutableTriple triple = timestampTriple.get(); + TIMESTAMP_CLASS = triple.left; + SET_TIME_IN_MILLIS = triple.middle; + TIMESTAMP_WRITEABLE_V2_CONSTRUCTOR = triple.right; + } else { + LOG.trace("use org.apache.hadoop.hive.serde2.io.TimestampWritable to read hudi timestamp columns"); + TIMESTAMP_CLASS = null; + SET_TIME_IN_MILLIS = null; + TIMESTAMP_WRITEABLE_V2_CONSTRUCTOR = null; + } + + // date + Supplier dateSupplier = () -> { + try { + Class dateV2Class = Class.forName(DATE_WRITEABLE_V2_CLASS); + return dateV2Class.getConstructor(int.class); + } catch (ClassNotFoundException | NoSuchMethodException e) { + LOG.trace("can not find hive3 datev2 class or method, use hive2 class!", e); + return null; + } + }; + Option dateConstructor = Option.ofNullable(dateSupplier.get()); + SUPPORT_DATE_WRITEABLE_V2 = dateConstructor.isPresent(); + if (SUPPORT_DATE_WRITEABLE_V2) { + LOG.trace("use org.apache.hadoop.hive.serde2.io.DateWritableV2 to read hudi date columns"); + DATE_WRITEABLE_V2_CONSTRUCTOR = dateConstructor.get(); + } else { + LOG.trace("use org.apache.hadoop.hive.serde2.io.DateWritable to read hudi date columns"); + DATE_WRITEABLE_V2_CONSTRUCTOR = null; + } + } + + /** + * Get timestamp writeable object from long value. + * Hive3 use TimestampWritableV2 to build timestamp objects and Hive2 use TimestampWritable. + * So that we need to initialize timestamp according to the version of Hive. + */ + public static Writable getTimestampWriteable(long value, boolean timestampMillis) { + if (SUPPORT_TIMESTAMP_WRITEABLE_V2) { + try { + Object timestamp = TIMESTAMP_CLASS.newInstance(); + SET_TIME_IN_MILLIS.invoke(timestamp, timestampMillis ? value : value / 1000); + return (Writable) TIMESTAMP_WRITEABLE_V2_CONSTRUCTOR.newInstance(timestamp); + } catch (IllegalAccessException | InstantiationException | InvocationTargetException e) { + throw new HoodieException("can not create writable v2 class!", e); + } + } else { + Timestamp timestamp = new Timestamp(timestampMillis ? value : value / 1000); + return new TimestampWritable(timestamp); + } + } + + /** + * Get date writeable object from int value. + * Hive3 use DateWritableV2 to build date objects and Hive2 use DateWritable. + * So that we need to initialize date according to the version of Hive. + */ + public static Writable getDateWriteable(int value) { + if (SUPPORT_DATE_WRITEABLE_V2) { + try { + return (Writable) DATE_WRITEABLE_V2_CONSTRUCTOR.newInstance(value); + } catch (IllegalAccessException | InstantiationException | InvocationTargetException e) { + throw new HoodieException("can not create writable v2 class!", e); + } + } else { + return new DateWritable(value); + } + } } diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeRecordReaderUtils.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeRecordReaderUtils.java index b87758af90b1c..e1109665b8372 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeRecordReaderUtils.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeRecordReaderUtils.java @@ -20,6 +20,7 @@ import org.apache.avro.AvroRuntimeException; import org.apache.avro.JsonProperties; +import org.apache.avro.LogicalType; import org.apache.avro.LogicalTypes; import org.apache.avro.Schema; import org.apache.avro.generic.GenericArray; @@ -27,10 +28,8 @@ import org.apache.avro.generic.GenericRecord; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hive.serde2.io.DateWritable; import org.apache.hadoop.hive.serde2.io.DoubleWritable; import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable; -import org.apache.hadoop.hive.serde2.io.TimestampWritable; import org.apache.hadoop.hive.serde2.typeinfo.DecimalTypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.HiveDecimalUtils; import org.apache.hadoop.io.ArrayWritable; @@ -55,7 +54,6 @@ import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Arrays; -import java.sql.Timestamp; import java.util.LinkedHashSet; import java.util.List; import java.util.Map; @@ -164,6 +162,14 @@ public static Map getNameToFieldMap(Schema schema) { * Convert the projected read from delta record into an array writable. */ public static Writable avroToArrayWritable(Object value, Schema schema) { + return avroToArrayWritable(value, schema, false); + } + + /** + * Convert the projected read from delta record into an array writable. + * @param supportTimestamp Whether to support the timestamp field + */ + public static Writable avroToArrayWritable(Object value, Schema schema, boolean supportTimestamp) { if (value == null) { return null; @@ -176,12 +182,18 @@ public static Writable avroToArrayWritable(Object value, Schema schema) { return new BytesWritable(((ByteBuffer)value).array()); case INT: if (schema.getLogicalType() != null && schema.getLogicalType().getName().equals("date")) { - return new DateWritable((Integer) value); + return HoodieHiveUtils.getDateWriteable((Integer) value); } return new IntWritable((Integer) value); case LONG: - if (schema.getLogicalType() != null && "timestamp-micros".equals(schema.getLogicalType().getName())) { - return new TimestampWritable(new Timestamp((Long) value)); + LogicalType logicalType = schema.getLogicalType(); + // If there is a specified timestamp or under normal cases, we will process it + if (supportTimestamp) { + if (LogicalTypes.timestampMillis().equals(logicalType)) { + return HoodieHiveUtils.getTimestampWriteable((Long) value, true); + } else if (LogicalTypes.timestampMicros().equals(logicalType)) { + return HoodieHiveUtils.getTimestampWriteable((Long) value, false); + } } return new LongWritable((Long) value); case FLOAT: @@ -204,7 +216,7 @@ public static Writable avroToArrayWritable(Object value, Schema schema) { } catch (AvroRuntimeException e) { LOG.debug("Field:" + field.name() + "not found in Schema:" + schema); } - recordValues[recordValueIndex++] = avroToArrayWritable(fieldValue, field.schema()); + recordValues[recordValueIndex++] = avroToArrayWritable(fieldValue, field.schema(), supportTimestamp); } return new ArrayWritable(Writable.class, recordValues); case ENUM: @@ -214,7 +226,7 @@ public static Writable avroToArrayWritable(Object value, Schema schema) { Writable[] arrayValues = new Writable[arrayValue.size()]; int arrayValueIndex = 0; for (Object obj : arrayValue) { - arrayValues[arrayValueIndex++] = avroToArrayWritable(obj, schema.getElementType()); + arrayValues[arrayValueIndex++] = avroToArrayWritable(obj, schema.getElementType(), supportTimestamp); } // Hive 1.x will fail here, it requires values2 to be wrapped into another ArrayWritable return new ArrayWritable(Writable.class, arrayValues); @@ -226,7 +238,7 @@ public static Writable avroToArrayWritable(Object value, Schema schema) { Map.Entry mapEntry = (Map.Entry) entry; Writable[] nestedMapValues = new Writable[2]; nestedMapValues[0] = new Text(mapEntry.getKey().toString()); - nestedMapValues[1] = avroToArrayWritable(mapEntry.getValue(), schema.getValueType()); + nestedMapValues[1] = avroToArrayWritable(mapEntry.getValue(), schema.getValueType(), supportTimestamp); mapValues[mapValueIndex++] = new ArrayWritable(Writable.class, nestedMapValues); } // Hive 1.x will fail here, it requires values3 to be wrapped into another ArrayWritable @@ -239,9 +251,9 @@ public static Writable avroToArrayWritable(Object value, Schema schema) { Schema s1 = types.get(0); Schema s2 = types.get(1); if (s1.getType() == Schema.Type.NULL) { - return avroToArrayWritable(value, s2); + return avroToArrayWritable(value, s2, supportTimestamp); } else if (s2.getType() == Schema.Type.NULL) { - return avroToArrayWritable(value, s1); + return avroToArrayWritable(value, s1, supportTimestamp); } else { throw new IllegalArgumentException("Only support union with null"); } diff --git a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/TestHoodieParquetInputFormat.java b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/TestHoodieParquetInputFormat.java index ccc57d0f6185c..9d0811ec80dfa 100644 --- a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/TestHoodieParquetInputFormat.java +++ b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/TestHoodieParquetInputFormat.java @@ -19,8 +19,12 @@ package org.apache.hudi.hadoop; import org.apache.avro.Schema; +import org.apache.avro.generic.GenericData; import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.ql.io.IOConstants; import org.apache.hadoop.io.ArrayWritable; +import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapred.FileInputFormat; import org.apache.hadoop.mapred.InputSplit; @@ -45,11 +49,13 @@ import org.apache.hudi.common.testutils.HoodieTestUtils; import org.apache.hudi.common.testutils.SchemaTestUtil; import org.apache.hudi.common.util.CommitUtils; +import org.apache.hive.common.util.HiveVersionInfo; import org.apache.hudi.common.util.Option; import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.hadoop.testutils.InputFormatTestUtil; import org.apache.hudi.hadoop.utils.HoodieHiveUtils; import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils; +import org.apache.parquet.avro.AvroParquetWriter; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; @@ -60,6 +66,11 @@ import java.io.IOException; import java.nio.charset.StandardCharsets; import java.nio.file.Paths; +import java.sql.Timestamp; +import java.time.Instant; +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.time.ZoneOffset; import java.util.ArrayList; import java.util.Collections; import java.util.List; @@ -704,4 +715,57 @@ private void ensureRecordsInCommit(String msg, String commit, int expectedNumber assertEquals(expectedNumberOfRecordsInCommit, actualCount, msg); assertEquals(totalExpected, totalCount, msg); } + + @Test + public void testHoodieParquetInputFormatReadTimeType() throws IOException { + long testTimestampLong = System.currentTimeMillis(); + int testDate = 19116;// 2022-05-04 + + Schema schema = getSchemaFromResource(getClass(), "/test_timetype.avsc"); + String commit = "20160628071126"; + HoodieTestUtils.init(HoodieTestUtils.getDefaultHadoopConf(), basePath.toString(), + HoodieTableType.COPY_ON_WRITE, HoodieFileFormat.PARQUET); + java.nio.file.Path partitionPath = basePath.resolve(Paths.get("2016", "06", "28")); + String fileId = FSUtils.makeBaseFileName(commit, "1-0-1", "fileid1", + HoodieFileFormat.PARQUET.getFileExtension()); + try (AvroParquetWriter parquetWriter = new AvroParquetWriter( + new Path(partitionPath.resolve(fileId).toString()), schema)) { + GenericData.Record record = new GenericData.Record(schema); + record.put("test_timestamp", testTimestampLong * 1000); + record.put("test_long", testTimestampLong * 1000); + record.put("test_date", testDate); + record.put("_hoodie_commit_time", commit); + record.put("_hoodie_commit_seqno", commit + 1); + parquetWriter.write(record); + } + + jobConf.set(IOConstants.COLUMNS, "test_timestamp,test_long,test_date,_hoodie_commit_time,_hoodie_commit_seqno"); + jobConf.set(IOConstants.COLUMNS_TYPES, "timestamp,bigint,date,string,string"); + InputFormatTestUtil.setupPartition(basePath, partitionPath); + InputFormatTestUtil.commit(basePath, commit); + FileInputFormat.setInputPaths(jobConf, partitionPath.toFile().getPath()); + + InputSplit[] splits = inputFormat.getSplits(jobConf, 1); + for (InputSplit split : splits) { + RecordReader recordReader = inputFormat + .getRecordReader(split, jobConf, null); + NullWritable key = recordReader.createKey(); + ArrayWritable writable = recordReader.createValue(); + while (recordReader.next(key, writable)) { + // test timestamp + if (HiveVersionInfo.getShortVersion().startsWith("3")) { + LocalDateTime localDateTime = LocalDateTime.ofInstant( + Instant.ofEpochMilli(testTimestampLong), ZoneOffset.UTC); + assertEquals(Timestamp.valueOf(localDateTime).toString(), String.valueOf(writable.get()[0])); + } else { + assertEquals(new Timestamp(testTimestampLong).toString(), String.valueOf(writable.get()[0])); + } + // test long + assertEquals(testTimestampLong * 1000, ((LongWritable) writable.get()[1]).get()); + // test date + assertEquals(LocalDate.ofEpochDay(testDate).toString(), String.valueOf(writable.get()[2])); + } + } + } + } diff --git a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/testutils/InputFormatTestUtil.java b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/testutils/InputFormatTestUtil.java index 84a967a0c4a50..8862312292601 100644 --- a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/testutils/InputFormatTestUtil.java +++ b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/testutils/InputFormatTestUtil.java @@ -478,7 +478,7 @@ public static void setPropsForInputFormat(JobConf jobConf, jobConf.addResource(conf); } - private static void setupPartition(java.nio.file.Path basePath, java.nio.file.Path partitionPath) throws IOException { + public static void setupPartition(java.nio.file.Path basePath, java.nio.file.Path partitionPath) throws IOException { Files.createDirectories(partitionPath); // Create partition metadata to properly setup table's partition diff --git a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/utils/TestHiveAvroSerializer.java b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/utils/TestHiveAvroSerializer.java index 9de4630877ae5..6e664e87dbafd 100644 --- a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/utils/TestHiveAvroSerializer.java +++ b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/utils/TestHiveAvroSerializer.java @@ -82,7 +82,7 @@ public void testSerialize() { ByteBuffer bb = ByteBuffer.wrap(new byte[]{97, 48, 53}); avroRecord.put("col9", bb); assertTrue(GenericData.get().validate(avroSchema, avroRecord)); - ArrayWritable writable = (ArrayWritable) HoodieRealtimeRecordReaderUtils.avroToArrayWritable(avroRecord, avroSchema); + ArrayWritable writable = (ArrayWritable) HoodieRealtimeRecordReaderUtils.avroToArrayWritable(avroRecord, avroSchema, true); List writableList = Arrays.stream(writable.get()).collect(Collectors.toList()); writableList.remove(writableList.size() - 1); diff --git a/hudi-hadoop-mr/src/test/resources/test_timetype.avsc b/hudi-hadoop-mr/src/test/resources/test_timetype.avsc new file mode 100644 index 0000000000000..b1d5ad4377823 --- /dev/null +++ b/hudi-hadoop-mr/src/test/resources/test_timetype.avsc @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +{ + "type" : "record", + "name" : "testTimestampRecord", + "fields" : [ { + "name" : "test_timestamp", + "type" : { + "type" : "long", + "logicalType" : "timestamp-micros" + } + },{ + "name" : "test_long", + "type" : "long" + }, + { + "name" : "test_date", + "type" : { + "type": "int", + "logicalType": "date" + } + }, + { + "name" : "_hoodie_commit_time", + "type" : "string" + }, { + "name" : "_hoodie_commit_seqno", + "type" : "string" + }] +} \ No newline at end of file diff --git a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncConfig.java b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncConfig.java index 11eba393e6a68..c49feb1b088a7 100644 --- a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncConfig.java +++ b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncConfig.java @@ -138,7 +138,8 @@ public static class HiveSyncConfigParams { @Parameter(names = {"--serde-properties"}, description = "Serde properties to hive table") public String serdeProperties; @Parameter(names = {"--support-timestamp"}, description = "'INT64' with original type TIMESTAMP_MICROS is converted to hive 'timestamp' type." - + "Disabled by default for backward compatibility.") + + "From 0.13.0, 'timestamp' type will be supported and also can be disabled by this variable. " + + "Previous versions keep being disabled by default.") public Boolean supportTimestamp; @Parameter(names = {"--managed-table"}, description = "Create a managed table") public Boolean createManagedTable; diff --git a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncConfigHolder.java b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncConfigHolder.java index 02d6c0f2174ce..e5163ba0045b6 100644 --- a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncConfigHolder.java +++ b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncConfigHolder.java @@ -79,9 +79,10 @@ public class HiveSyncConfigHolder { .withDocumentation("Skip the _ro suffix for Read optimized table, when registering"); public static final ConfigProperty HIVE_SUPPORT_TIMESTAMP_TYPE = ConfigProperty .key("hoodie.datasource.hive_sync.support_timestamp") - .defaultValue("false") + .defaultValue("true") .withDocumentation("‘INT64’ with original type TIMESTAMP_MICROS is converted to hive ‘timestamp’ type. " - + "Disabled by default for backward compatibility."); + + "From 0.13.0, 'timestamp' type will be supported and also can be disabled by this variable. " + + "Previous versions keep being disabled by default."); public static final ConfigProperty HIVE_TABLE_PROPERTIES = ConfigProperty .key("hoodie.datasource.hive_sync.table_properties") .noDefaultValue()