From ccd29b9eb855605aabdc26537d66983ddd869cb6 Mon Sep 17 00:00:00 2001 From: cdmikechen Date: Fri, 13 May 2022 20:29:55 +0800 Subject: [PATCH 01/15] Rebase master 2022-12-04 --- conf/hudi-defaults.conf.template | 2 +- .../testutils/HoodieMergeOnReadTestUtils.java | 2 +- .../util/TestDFSPropertiesConfiguration.java | 2 +- .../external-config/hudi-defaults.conf | 2 +- .../hudi/streamer/FlinkStreamerConfig.java | 2 +- .../hadoop/HoodieColumnProjectionUtils.java | 21 ++++ .../hudi/hadoop/HoodieParquetInputFormat.java | 35 +++++- .../avro/HudiAvroParquetInputFormat.java | 40 +++++++ .../hadoop/avro/HudiAvroParquetReader.java | 104 ++++++++++++++++++ .../RealtimeCompactedRecordReader.java | 2 +- .../RealtimeUnmergedRecordReader.java | 56 +++++----- .../hudi/hadoop/utils/HoodieHiveUtils.java | 78 +++++++++++++ .../hadoop/TestHoodieParquetInputFormat.java | 64 +++++++++++ .../hadoop/testutils/InputFormatTestUtil.java | 2 +- .../src/test/resources/test_timetype.avsc | 45 ++++++++ 15 files changed, 421 insertions(+), 36 deletions(-) create mode 100644 hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/avro/HudiAvroParquetInputFormat.java create mode 100644 hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/avro/HudiAvroParquetReader.java create mode 100644 hudi-hadoop-mr/src/test/resources/test_timetype.avsc diff --git a/conf/hudi-defaults.conf.template b/conf/hudi-defaults.conf.template index 175dbaf23d739..bd88bb544ea99 100644 --- a/conf/hudi-defaults.conf.template +++ b/conf/hudi-defaults.conf.template @@ -21,6 +21,6 @@ # Example: # hoodie.datasource.hive_sync.jdbcurl jdbc:hive2://localhost:10000 # hoodie.datasource.hive_sync.use_jdbc true -# hoodie.datasource.hive_sync.support_timestamp false +# hoodie.datasource.hive_sync.support_timestamp true # hoodie.index.type BLOOM # hoodie.metadata.enable false diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieMergeOnReadTestUtils.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieMergeOnReadTestUtils.java index eebfe56b890b4..2ef3923176d89 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieMergeOnReadTestUtils.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieMergeOnReadTestUtils.java @@ -211,7 +211,7 @@ private static void setPropsForInputFormat(FileInputFormat inputFormat, JobConf conf.set(hive_metastoreConstants.META_TABLE_PARTITION_COLUMNS, "datestr"); conf.set(hive_metastoreConstants.META_TABLE_COLUMN_TYPES, hiveColumnTypesWithDatestr); conf.set(IOConstants.COLUMNS, hiveColumnNames); - conf.get(IOConstants.COLUMNS_TYPES, hiveColumnTypesWithDatestr); + conf.set(IOConstants.COLUMNS_TYPES, hiveColumnTypesWithDatestr); // Hoodie Input formats are also configurable Configurable configurable = (Configurable)inputFormat; diff --git a/hudi-common/src/test/java/org/apache/hudi/common/util/TestDFSPropertiesConfiguration.java b/hudi-common/src/test/java/org/apache/hudi/common/util/TestDFSPropertiesConfiguration.java index df3efbd91f9e0..f1720bc58628c 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/util/TestDFSPropertiesConfiguration.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/util/TestDFSPropertiesConfiguration.java @@ -194,7 +194,7 @@ public void testLoadGlobalConfFile() { assertEquals(5, DFSPropertiesConfiguration.getGlobalProps().size()); assertEquals("jdbc:hive2://localhost:10000", DFSPropertiesConfiguration.getGlobalProps().get("hoodie.datasource.hive_sync.jdbcurl")); assertEquals("true", DFSPropertiesConfiguration.getGlobalProps().get("hoodie.datasource.hive_sync.use_jdbc")); - assertEquals("false", DFSPropertiesConfiguration.getGlobalProps().get("hoodie.datasource.hive_sync.support_timestamp")); + assertEquals("true", DFSPropertiesConfiguration.getGlobalProps().get("hoodie.datasource.hive_sync.support_timestamp")); assertEquals("BLOOM", DFSPropertiesConfiguration.getGlobalProps().get("hoodie.index.type")); assertEquals("true", DFSPropertiesConfiguration.getGlobalProps().get("hoodie.metadata.enable")); } diff --git a/hudi-common/src/test/resources/external-config/hudi-defaults.conf b/hudi-common/src/test/resources/external-config/hudi-defaults.conf index 1133adb4d7735..5d7cdf7a32d2e 100644 --- a/hudi-common/src/test/resources/external-config/hudi-defaults.conf +++ b/hudi-common/src/test/resources/external-config/hudi-defaults.conf @@ -21,6 +21,6 @@ # Example: hoodie.datasource.hive_sync.jdbcurl jdbc:hive2://localhost:10000 hoodie.datasource.hive_sync.use_jdbc true -hoodie.datasource.hive_sync.support_timestamp false +hoodie.datasource.hive_sync.support_timestamp true hoodie.index.type BLOOM hoodie.metadata.enable true diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/streamer/FlinkStreamerConfig.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/streamer/FlinkStreamerConfig.java index 345c9cde6cd35..2786a3aa443e6 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/streamer/FlinkStreamerConfig.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/streamer/FlinkStreamerConfig.java @@ -341,7 +341,7 @@ public class FlinkStreamerConfig extends Configuration { @Parameter(names = {"--hive-sync-support-timestamp"}, description = "INT64 with original type TIMESTAMP_MICROS is converted to hive timestamp type.\n" + "Disabled by default for backward compatibility.") - public Boolean hiveSyncSupportTimestamp = false; + public Boolean hiveSyncSupportTimestamp = true; /** diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieColumnProjectionUtils.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieColumnProjectionUtils.java index 9ca99c41888b1..0bc6eea774289 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieColumnProjectionUtils.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieColumnProjectionUtils.java @@ -109,4 +109,25 @@ public static List> getIOColumnNameAndTypes(Configuration co .collect(Collectors.toList()); } + /** + * if schema contains timestamp columns, this method is used for compatibility when there is no timestamp fields + * We expect 3 cases to use parquet-avro reader to read timestamp column: + * 1. read columns contain timestamp type + * 2. no read columns and exists original columns contain timestamp type + * 3. no read columns and no original columns, but avro schema contains type + */ + public static boolean supportTimestamp(Configuration conf) { + List reads = Arrays.asList(getReadColumnNames(conf)); + if (reads.isEmpty()) { + return getIOColumnTypes(conf).contains("timestamp"); + } + List names = getIOColumns(conf); + if (names.isEmpty()) { + return true; + } + List types = getIOColumnTypes(conf); + return types.isEmpty() || IntStream.range(0, names.size()).filter(i -> reads.contains(names.get(i))) + .anyMatch(i -> types.get(i).equals("timestamp")); + } + } \ No newline at end of file diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieParquetInputFormat.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieParquetInputFormat.java index c168924e65fe0..3d288dba8a480 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieParquetInputFormat.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieParquetInputFormat.java @@ -18,6 +18,7 @@ package org.apache.hudi.hadoop; +import org.apache.hadoop.hive.ql.io.parquet.read.ParquetRecordReaderWrapper; import org.apache.hadoop.hive.ql.io.sarg.ConvertAstToSearchArg; import org.apache.hadoop.hive.ql.plan.TableScanDesc; import org.apache.hadoop.io.ArrayWritable; @@ -29,11 +30,15 @@ import org.apache.hadoop.mapred.Reporter; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.util.collection.Pair; +import org.apache.hudi.hadoop.avro.HudiAvroParquetInputFormat; import org.apache.hudi.hadoop.utils.HoodieHiveUtils; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; +import org.apache.parquet.hadoop.ParquetInputFormat; import java.io.IOException; +import java.lang.reflect.Constructor; +import java.util.Arrays; import java.util.List; import java.util.stream.Collectors; import java.util.stream.IntStream; @@ -49,12 +54,32 @@ public class HoodieParquetInputFormat extends HoodieParquetInputFormatBase { private static final Logger LOG = LogManager.getLogger(HoodieParquetInputFormat.class); + private boolean supportAvroRead = false; + public HoodieParquetInputFormat() { super(new HoodieCopyOnWriteTableInputFormat()); + initAvroInputFormat(); } protected HoodieParquetInputFormat(HoodieCopyOnWriteTableInputFormat delegate) { super(delegate); + initAvroInputFormat(); + } + + /** + * Spark2 use `parquet.hadoopParquetInputFormat` in `com.twitter:parquet-hadoop-bundle`. + * So that we need to distinguish the constructions of classes with + * `parquet.hadoopParquetInputFormat` or `org.apache.parquet.hadoop.ParquetInputFormat`. + * If we use `org.apache.parquet:parquet-hadoop`, we can use `HudiAvroParquetInputFormat` + * in Hive or Spark3 to get timestamp with correct type. + */ + private void initAvroInputFormat() { + Constructor[] constructors = ParquetRecordReaderWrapper.class.getConstructors(); + if (Arrays.stream(constructors) + .anyMatch(c -> c.getParameterCount() > 0 && c.getParameterTypes()[0] + .getName().equals(ParquetInputFormat.class.getName()))) { + supportAvroRead = true; + } } @Override @@ -89,7 +114,15 @@ public RecordReader getRecordReader(final InputSpli private RecordReader getRecordReaderInternal(InputSplit split, JobConf job, Reporter reporter) throws IOException { - return super.getRecordReader(split, job, reporter); + try { + if (supportAvroRead && HoodieColumnProjectionUtils.supportTimestamp(job)) { + return new ParquetRecordReaderWrapper(new HudiAvroParquetInputFormat(), split, job, reporter); + } else { + return super.getRecordReader(split, job, reporter); + } + } catch (final InterruptedException | IOException e) { + throw new RuntimeException("Cannot create a RecordReaderWrapper", e); + } } private RecordReader createBootstrappingRecordReader(InputSplit split, diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/avro/HudiAvroParquetInputFormat.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/avro/HudiAvroParquetInputFormat.java new file mode 100644 index 0000000000000..6540695a41108 --- /dev/null +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/avro/HudiAvroParquetInputFormat.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.hadoop.avro; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.ArrayWritable; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.parquet.hadoop.ParquetInputFormat; +import org.apache.parquet.hadoop.util.ContextUtil; + +import java.io.IOException; + +public class HudiAvroParquetInputFormat extends ParquetInputFormat { + + @Override + public RecordReader createRecordReader( + InputSplit inputSplit, + TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException { + Configuration conf = ContextUtil.getConfiguration(taskAttemptContext); + return new HudiAvroParquetReader(inputSplit, conf); + } +} diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/avro/HudiAvroParquetReader.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/avro/HudiAvroParquetReader.java new file mode 100644 index 0000000000000..3e930d614c5e9 --- /dev/null +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/avro/HudiAvroParquetReader.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.hadoop.avro; + +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericRecord; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.ArrayWritable; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hudi.hadoop.HoodieColumnProjectionUtils; +import org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils; +import org.apache.parquet.avro.AvroReadSupport; +import org.apache.parquet.avro.AvroSchemaConverter; +import org.apache.parquet.format.converter.ParquetMetadataConverter; +import org.apache.parquet.hadoop.ParquetFileReader; +import org.apache.parquet.hadoop.ParquetInputSplit; +import org.apache.parquet.hadoop.ParquetRecordReader; +import org.apache.parquet.hadoop.metadata.ParquetMetadata; +import org.apache.parquet.schema.MessageType; + +import java.io.IOException; +import java.util.Arrays; +import java.util.List; +import java.util.stream.Collectors; + +import static org.apache.parquet.hadoop.ParquetInputFormat.getFilter; + +public class HudiAvroParquetReader extends RecordReader { + + private final ParquetRecordReader parquetRecordReader; + + public HudiAvroParquetReader(InputSplit inputSplit, Configuration conf) throws IOException { + AvroReadSupport avroReadSupport = new AvroReadSupport<>(); + // if exists read columns, we need to filter columns. + List readColNames = Arrays.asList(HoodieColumnProjectionUtils.getReadColumnNames(conf)); + if (!readColNames.isEmpty()) { + // get base schema + ParquetMetadata fileFooter = + ParquetFileReader.readFooter(conf, ((ParquetInputSplit) inputSplit).getPath(), ParquetMetadataConverter.NO_FILTER); + MessageType messageType = fileFooter.getFileMetaData().getSchema(); + Schema baseSchema = new AvroSchemaConverter(conf).convert(messageType); + // filter schema for reading + final Schema filterSchema = Schema.createRecord(baseSchema.getName(), + baseSchema.getDoc(), baseSchema.getNamespace(), baseSchema.isError(), + baseSchema.getFields().stream() + .filter(f -> readColNames.contains(f.name())) + .map(f -> new Schema.Field(f.name(), f.schema(), f.doc(), f.defaultVal())) + .collect(Collectors.toList())); + avroReadSupport.setAvroReadSchema(conf, filterSchema); + avroReadSupport.setRequestedProjection(conf, filterSchema); + } + parquetRecordReader = new ParquetRecordReader<>(avroReadSupport, getFilter(conf)); + } + + @Override + public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException { + parquetRecordReader.initialize(split, context); + } + + @Override + public boolean nextKeyValue() throws IOException, InterruptedException { + return parquetRecordReader.nextKeyValue(); + } + + @Override + public Void getCurrentKey() throws IOException, InterruptedException { + return parquetRecordReader.getCurrentKey(); + } + + @Override + public ArrayWritable getCurrentValue() throws IOException, InterruptedException { + GenericRecord record = parquetRecordReader.getCurrentValue(); + return (ArrayWritable) HoodieRealtimeRecordReaderUtils.avroToArrayWritable(record, record.getSchema(), true); + } + + @Override + public float getProgress() throws IOException, InterruptedException { + return parquetRecordReader.getProgress(); + } + + @Override + public void close() throws IOException { + parquetRecordReader.close(); + } +} diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeCompactedRecordReader.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeCompactedRecordReader.java index 356107dd82ca6..49471e72971b5 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeCompactedRecordReader.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeCompactedRecordReader.java @@ -152,7 +152,7 @@ private void setUpWritable(Option rec, ArrayWritable arrayWritabl // we assume, a later safe record in the log, is newer than what we have in the map & // replace it. Since we want to return an arrayWritable which is the same length as the elements in the latest // schema, we use writerSchema to create the arrayWritable from the latest generic record - ArrayWritable aWritable = (ArrayWritable) HoodieRealtimeRecordReaderUtils.avroToArrayWritable(recordToReturn, getHiveSchema()); + ArrayWritable aWritable = (ArrayWritable) HoodieRealtimeRecordReaderUtils.avroToArrayWritable(recordToReturn, getHiveSchema(), isSupportTimestamp()); Writable[] replaceValue = aWritable.get(); if (LOG.isDebugEnabled()) { LOG.debug(String.format("key %s, base values: %s, log values: %s", key, HoodieRealtimeRecordReaderUtils.arrayWritableToString(arrayWritable), diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeUnmergedRecordReader.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeUnmergedRecordReader.java index e21e266b7229f..90e13a2aff341 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeUnmergedRecordReader.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeUnmergedRecordReader.java @@ -44,7 +44,7 @@ import java.util.function.Function; class RealtimeUnmergedRecordReader extends AbstractRealtimeRecordReader - implements RecordReader { + implements RecordReader { // Parquet record reader private final RecordReader parquetReader; @@ -67,26 +67,26 @@ class RealtimeUnmergedRecordReader extends AbstractRealtimeRecordReader * @param realReader Parquet Reader */ public RealtimeUnmergedRecordReader(RealtimeSplit split, JobConf job, - RecordReader realReader) { + RecordReader realReader) { super(split, job); this.parquetReader = new SafeParquetRecordReaderWrapper(realReader); // Iterator for consuming records from parquet file this.parquetRecordsIterator = new RecordReaderValueIterator<>(this.parquetReader); HoodieUnMergedLogRecordScanner.Builder scannerBuilder = - HoodieUnMergedLogRecordScanner.newBuilder() - .withFileSystem(FSUtils.getFs(split.getPath().toString(), this.jobConf)) - .withBasePath(split.getBasePath()) - .withLogFilePaths(split.getDeltaLogPaths()) - .withReaderSchema(getReaderSchema()) - .withLatestInstantTime(split.getMaxCommitTime()) - .withReadBlocksLazily(Boolean.parseBoolean(this.jobConf.get(HoodieRealtimeConfig.COMPACTION_LAZY_BLOCK_READ_ENABLED_PROP, HoodieRealtimeConfig.DEFAULT_COMPACTION_LAZY_BLOCK_READ_ENABLED))) - .withReverseReader(false) - .withBufferSize(this.jobConf.getInt(HoodieRealtimeConfig.MAX_DFS_STREAM_BUFFER_SIZE_PROP, HoodieRealtimeConfig.DEFAULT_MAX_DFS_STREAM_BUFFER_SIZE)); + HoodieUnMergedLogRecordScanner.newBuilder() + .withFileSystem(FSUtils.getFs(split.getPath().toString(), this.jobConf)) + .withBasePath(split.getBasePath()) + .withLogFilePaths(split.getDeltaLogPaths()) + .withReaderSchema(getReaderSchema()) + .withLatestInstantTime(split.getMaxCommitTime()) + .withReadBlocksLazily(Boolean.parseBoolean(this.jobConf.get(HoodieRealtimeConfig.COMPACTION_LAZY_BLOCK_READ_ENABLED_PROP, HoodieRealtimeConfig.DEFAULT_COMPACTION_LAZY_BLOCK_READ_ENABLED))) + .withReverseReader(false) + .withBufferSize(this.jobConf.getInt(HoodieRealtimeConfig.MAX_DFS_STREAM_BUFFER_SIZE_PROP, HoodieRealtimeConfig.DEFAULT_MAX_DFS_STREAM_BUFFER_SIZE)); this.executor = new BoundedInMemoryExecutor<>( - HoodieRealtimeRecordReaderUtils.getMaxCompactionMemoryInBytes(jobConf), getParallelProducers(scannerBuilder), - Option.empty(), Function.identity(), new DefaultSizeEstimator<>(), Functions.noop()); + HoodieRealtimeRecordReaderUtils.getMaxCompactionMemoryInBytes(jobConf), getParallelProducers(scannerBuilder), + Option.empty(), Function.identity(), new DefaultSizeEstimator<>(), Functions.noop()); // Consumer of this record reader this.iterator = this.executor.getRecordIterator(); @@ -98,23 +98,23 @@ public RealtimeUnmergedRecordReader(RealtimeSplit split, JobConf job, * Setup log and parquet reading in parallel. Both write to central buffer. */ private List> getParallelProducers( - HoodieUnMergedLogRecordScanner.Builder scannerBuilder + HoodieUnMergedLogRecordScanner.Builder scannerBuilder ) { return Arrays.asList( - new FunctionBasedQueueProducer<>(queue -> { - HoodieUnMergedLogRecordScanner scanner = - scannerBuilder.withLogRecordScannerCallback(record -> { - // convert Hoodie log record to Hadoop AvroWritable and buffer - GenericRecord rec = (GenericRecord) record.getData().getInsertValue(getReaderSchema(), payloadProps).get(); - ArrayWritable aWritable = (ArrayWritable) HoodieRealtimeRecordReaderUtils.avroToArrayWritable(rec, getHiveSchema()); - queue.insertRecord(aWritable); - }) - .build(); - // Scan all the delta-log files, filling in the queue - scanner.scan(); - return null; - }), - new IteratorBasedQueueProducer<>(parquetRecordsIterator) + new FunctionBasedQueueProducer<>(queue -> { + HoodieUnMergedLogRecordScanner scanner = + scannerBuilder.withLogRecordScannerCallback(record -> { + // convert Hoodie log record to Hadoop AvroWritable and buffer + GenericRecord rec = (GenericRecord) record.getData().getInsertValue(getReaderSchema(), payloadProps).get(); + ArrayWritable aWritable = (ArrayWritable) HoodieRealtimeRecordReaderUtils.avroToArrayWritable(rec, getHiveSchema(), isSupportTimestamp()); + queue.insertRecord(aWritable); + }) + .build(); + // Scan all the delta-log files, filling in the queue + scanner.scan(); + return null; + }), + new IteratorBasedQueueProducer<>(parquetRecordsIterator) ); } diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieHiveUtils.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieHiveUtils.java index cbfd197f43897..8c683fcd98650 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieHiveUtils.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieHiveUtils.java @@ -20,13 +20,21 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.serde2.io.DateWritable; +import org.apache.hadoop.hive.serde2.io.TimestampWritable; +import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapreduce.JobContext; import org.apache.hudi.common.util.CollectionUtils; import org.apache.hudi.common.util.Option; +import org.apache.hudi.exception.HoodieException; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; +import java.lang.reflect.Constructor; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.sql.Timestamp; import java.util.ArrayList; import java.util.List; import java.util.Map; @@ -146,4 +154,74 @@ public static List getIncrementalTableNames(JobContext job) { public static boolean isIncrementalUseDatabase(Configuration conf) { return conf.getBoolean(HOODIE_INCREMENTAL_USE_DATABASE, false); } + + public static boolean SUPPORT_TIMESTAMP_WRITEABLE_V2; + private static Class timestampClass = null; + private static Method setTimeInMillis = null; + private static Constructor timestampWriteableV2constructor = null; + + public static boolean SUPPORT_DATE_WRITEABLE_V2; + private static Constructor dateWriteableV2constructor = null; + + static { + // timestamp + try { + timestampClass = Class.forName("org.apache.hadoop.hive.common.type.Timestamp"); + setTimeInMillis = timestampClass.getDeclaredMethod("setTimeInMillis", long.class); + Class twV2Class = Class.forName("org.apache.hadoop.hive.serde2.io.TimestampWritableV2"); + timestampWriteableV2constructor = twV2Class.getConstructor(timestampClass); + SUPPORT_TIMESTAMP_WRITEABLE_V2 = (null != timestampWriteableV2constructor); + LOG.trace("use TimestampWritableV2 to read hudi timestamp columns"); + } catch (ClassNotFoundException | NoSuchMethodException e) { + LOG.trace("can not find hive3 timestampv2 class or method, use hive2 class!", e); + SUPPORT_TIMESTAMP_WRITEABLE_V2 = false; + } + // date + try { + Class dateV2Class = Class.forName("org.apache.hadoop.hive.serde2.io.DateWritableV2"); + dateWriteableV2constructor = dateV2Class.getConstructor(int.class); + SUPPORT_DATE_WRITEABLE_V2 = (null != dateWriteableV2constructor); + LOG.trace("use DateWritableV2 to read hudi date columns"); + } catch (ClassNotFoundException | NoSuchMethodException e) { + LOG.trace("can not find hive3 datev2 class or method, use hive2 class!", e); + SUPPORT_DATE_WRITEABLE_V2 = false; + } + } + + /** + * Get timestamp writeable object from long value. + * Hive3 use TimestampWritableV2 to build timestamp objects and Hive2 use TimestampWritable. + * So that we need to initialize timestamp according to the version of Hive. + */ + public static Writable getTimestampWriteable(long value, boolean timestampMillis) { + if (SUPPORT_TIMESTAMP_WRITEABLE_V2) { + try { + Object timestamp = timestampClass.newInstance(); + setTimeInMillis.invoke(timestamp, timestampMillis ? value : value / 1000); + return (Writable) timestampWriteableV2constructor.newInstance(timestamp); + } catch (IllegalAccessException | InstantiationException | InvocationTargetException e) { + throw new HoodieException("can not create writable v2 class!", e); + } + } else { + Timestamp timestamp = new Timestamp(timestampMillis ? value : value / 1000); + return new TimestampWritable(timestamp); + } + } + + /** + * Get date writeable object from int value. + * Hive3 use DateWritableV2 to build date objects and Hive2 use DateWritable. + * So that we need to initialize date according to the version of Hive. + */ + public static Writable getDateWriteable(int value) { + if (SUPPORT_DATE_WRITEABLE_V2) { + try { + return (Writable) dateWriteableV2constructor.newInstance(value); + } catch (IllegalAccessException | InstantiationException | InvocationTargetException e) { + throw new HoodieException("can not create writable v2 class!", e); + } + } else { + return new DateWritable(value); + } + } } diff --git a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/TestHoodieParquetInputFormat.java b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/TestHoodieParquetInputFormat.java index ccc57d0f6185c..ee290bc585436 100644 --- a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/TestHoodieParquetInputFormat.java +++ b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/TestHoodieParquetInputFormat.java @@ -19,8 +19,12 @@ package org.apache.hudi.hadoop; import org.apache.avro.Schema; +import org.apache.avro.generic.GenericData; import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.ql.io.IOConstants; import org.apache.hadoop.io.ArrayWritable; +import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapred.FileInputFormat; import org.apache.hadoop.mapred.InputSplit; @@ -45,11 +49,13 @@ import org.apache.hudi.common.testutils.HoodieTestUtils; import org.apache.hudi.common.testutils.SchemaTestUtil; import org.apache.hudi.common.util.CommitUtils; +import org.apache.hive.common.util.HiveVersionInfo; import org.apache.hudi.common.util.Option; import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.hadoop.testutils.InputFormatTestUtil; import org.apache.hudi.hadoop.utils.HoodieHiveUtils; import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils; +import org.apache.parquet.avro.AvroParquetWriter; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; @@ -60,6 +66,11 @@ import java.io.IOException; import java.nio.charset.StandardCharsets; import java.nio.file.Paths; +import java.sql.Timestamp; +import java.time.Instant; +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.time.ZoneOffset; import java.util.ArrayList; import java.util.Collections; import java.util.List; @@ -704,4 +715,57 @@ private void ensureRecordsInCommit(String msg, String commit, int expectedNumber assertEquals(expectedNumberOfRecordsInCommit, actualCount, msg); assertEquals(totalExpected, totalCount, msg); } + + @Test + public void testHoodieParquetInputFormatReadTimeType() throws IOException { + long testTimestampLong = System.currentTimeMillis(); + int testDate = 19116;// 2022-05-04 + + Schema schema = getSchemaFromResource(getClass(), "/test_timetype.avsc"); + String commit = "20160628071126"; + HoodieTestUtils.init(HoodieTestUtils.getDefaultHadoopConf(), basePath.toString(), + HoodieTableType.COPY_ON_WRITE, HoodieFileFormat.PARQUET); + java.nio.file.Path partitionPath = basePath.resolve(Paths.get("2016", "06", "28")); + String fileId = FSUtils.makeDataFileName(commit, "1-0-1", "fileid1", + HoodieFileFormat.PARQUET.getFileExtension()); + try (AvroParquetWriter parquetWriter = new AvroParquetWriter( + new Path(partitionPath.resolve(fileId).toString()), schema)) { + GenericData.Record record = new GenericData.Record(schema); + record.put("test_timestamp", testTimestampLong * 1000); + record.put("test_long", testTimestampLong * 1000); + record.put("test_date", testDate); + record.put("_hoodie_commit_time", commit); + record.put("_hoodie_commit_seqno", commit + 1); + parquetWriter.write(record); + } + + jobConf.set(IOConstants.COLUMNS, "test_timestamp,test_long,test_date,_hoodie_commit_time,_hoodie_commit_seqno"); + jobConf.set(IOConstants.COLUMNS_TYPES, "timestamp,bigint,date,string,string"); + InputFormatTestUtil.setupPartition(basePath, partitionPath); + InputFormatTestUtil.commit(basePath, commit); + FileInputFormat.setInputPaths(jobConf, partitionPath.toFile().getPath()); + + InputSplit[] splits = inputFormat.getSplits(jobConf, 1); + for (InputSplit split : splits) { + RecordReader recordReader = inputFormat + .getRecordReader(split, jobConf, null); + NullWritable key = recordReader.createKey(); + ArrayWritable writable = recordReader.createValue(); + while (recordReader.next(key, writable)) { + // test timestamp + if (HiveVersionInfo.getShortVersion().startsWith("3")) { + LocalDateTime localDateTime = LocalDateTime.ofInstant( + Instant.ofEpochMilli(testTimestampLong), ZoneOffset.UTC); + assertEquals(Timestamp.valueOf(localDateTime).toString(), String.valueOf(writable.get()[0])); + } else { + assertEquals(new Timestamp(testTimestampLong).toString(), String.valueOf(writable.get()[0])); + } + // test long + assertEquals(testTimestampLong * 1000, ((LongWritable) writable.get()[1]).get()); + // test date + assertEquals(LocalDate.ofEpochDay(testDate).toString(), String.valueOf(writable.get()[2])); + } + } + } + } diff --git a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/testutils/InputFormatTestUtil.java b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/testutils/InputFormatTestUtil.java index 84a967a0c4a50..8862312292601 100644 --- a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/testutils/InputFormatTestUtil.java +++ b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/testutils/InputFormatTestUtil.java @@ -478,7 +478,7 @@ public static void setPropsForInputFormat(JobConf jobConf, jobConf.addResource(conf); } - private static void setupPartition(java.nio.file.Path basePath, java.nio.file.Path partitionPath) throws IOException { + public static void setupPartition(java.nio.file.Path basePath, java.nio.file.Path partitionPath) throws IOException { Files.createDirectories(partitionPath); // Create partition metadata to properly setup table's partition diff --git a/hudi-hadoop-mr/src/test/resources/test_timetype.avsc b/hudi-hadoop-mr/src/test/resources/test_timetype.avsc new file mode 100644 index 0000000000000..b1d5ad4377823 --- /dev/null +++ b/hudi-hadoop-mr/src/test/resources/test_timetype.avsc @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +{ + "type" : "record", + "name" : "testTimestampRecord", + "fields" : [ { + "name" : "test_timestamp", + "type" : { + "type" : "long", + "logicalType" : "timestamp-micros" + } + },{ + "name" : "test_long", + "type" : "long" + }, + { + "name" : "test_date", + "type" : { + "type": "int", + "logicalType": "date" + } + }, + { + "name" : "_hoodie_commit_time", + "type" : "string" + }, { + "name" : "_hoodie_commit_seqno", + "type" : "string" + }] +} \ No newline at end of file From 20ea53a34978d5fb1da9e52f303b699f21af06db Mon Sep 17 00:00:00 2001 From: cdmikechen Date: Fri, 20 May 2022 22:05:20 +0800 Subject: [PATCH 02/15] Fix by review --- .../apache/hudi/configuration/FlinkOptions.java | 5 +++-- .../hudi/hadoop/HoodieParquetInputFormat.java | 15 ++++++++++----- 2 files changed, 13 insertions(+), 7 deletions(-) diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java index 8706c5960ac29..04601e11d411c 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java @@ -831,8 +831,9 @@ private FlinkOptions() { .key("hive_sync.support_timestamp") .booleanType() .defaultValue(true) - .withDescription("INT64 with original type TIMESTAMP_MICROS is converted to hive timestamp type.\n" - + "Disabled by default for backward compatibility."); + .withDescription("'INT64' with original type TIMESTAMP_MICROS is converted to hive 'timestamp' type. " + + "From 0.12.0, 'timestamp' type will be supported and also can be disabled by this variable. " + + "Previous versions keep being disabled by default."); public static final ConfigOption HIVE_SYNC_TABLE_PROPERTIES = ConfigOptions .key("hive_sync.table_properties") diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieParquetInputFormat.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieParquetInputFormat.java index 3d288dba8a480..e1af812b9379a 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieParquetInputFormat.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieParquetInputFormat.java @@ -30,6 +30,7 @@ import org.apache.hadoop.mapred.Reporter; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.util.collection.Pair; +import org.apache.hudi.exception.HoodieException; import org.apache.hudi.hadoop.avro.HudiAvroParquetInputFormat; import org.apache.hudi.hadoop.utils.HoodieHiveUtils; import org.apache.log4j.LogManager; @@ -74,11 +75,15 @@ protected HoodieParquetInputFormat(HoodieCopyOnWriteTableInputFormat delegate) { * in Hive or Spark3 to get timestamp with correct type. */ private void initAvroInputFormat() { - Constructor[] constructors = ParquetRecordReaderWrapper.class.getConstructors(); - if (Arrays.stream(constructors) - .anyMatch(c -> c.getParameterCount() > 0 && c.getParameterTypes()[0] - .getName().equals(ParquetInputFormat.class.getName()))) { - supportAvroRead = true; + try { + Constructor[] constructors = ParquetRecordReaderWrapper.class.getConstructors(); + if (Arrays.stream(constructors) + .anyMatch(c -> c.getParameterCount() > 0 && c.getParameterTypes()[0] + .getName().equals(ParquetInputFormat.class.getName()))) { + supportAvroRead = true; + } + } catch (SecurityException e) { + throw new HoodieException("Failed to check if support avro reader: " + e.getMessage(), e); } } From c52e649a1ffa978565a264472e928943d6003d34 Mon Sep 17 00:00:00 2001 From: cdmikechen Date: Sun, 22 May 2022 22:09:20 +0800 Subject: [PATCH 03/15] Change to static final --- .../org/apache/hudi/common/util/Option.java | 4 + .../hudi/hadoop/utils/HoodieHiveUtils.java | 79 ++++++++++++------- 2 files changed, 55 insertions(+), 28 deletions(-) diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/Option.java b/hudi-common/src/main/java/org/apache/hudi/common/util/Option.java index 957dab28e2c28..67b17aa46c14c 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/Option.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/Option.java @@ -76,6 +76,10 @@ public static Option of(T value) { return new Option<>(value); } + public static Option ofNullable(Supplier value) { + return null == value ? empty() : ofNullable(value.get()); + } + public static Option ofNullable(T value) { return null == value ? empty() : of(value); } diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieHiveUtils.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieHiveUtils.java index 8c683fcd98650..650d83311235a 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieHiveUtils.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieHiveUtils.java @@ -27,6 +27,7 @@ import org.apache.hadoop.mapreduce.JobContext; import org.apache.hudi.common.util.CollectionUtils; import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.collection.ImmutableTriple; import org.apache.hudi.exception.HoodieException; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; @@ -155,36 +156,58 @@ public static boolean isIncrementalUseDatabase(Configuration conf) { return conf.getBoolean(HOODIE_INCREMENTAL_USE_DATABASE, false); } - public static boolean SUPPORT_TIMESTAMP_WRITEABLE_V2; - private static Class timestampClass = null; - private static Method setTimeInMillis = null; - private static Constructor timestampWriteableV2constructor = null; + public static final boolean SUPPORT_TIMESTAMP_WRITEABLE_V2; + private static final Class TIMESTAMP_CLASS; + private static final Method SET_TIME_IN_MILLIS; + private static final Constructor TIMESTAMP_WRITEABLE_V2_CONSTRUCTOR; - public static boolean SUPPORT_DATE_WRITEABLE_V2; - private static Constructor dateWriteableV2constructor = null; + public static final boolean SUPPORT_DATE_WRITEABLE_V2; + private static final Constructor DATE_WRITEABLE_V2_CONSTRUCTOR; static { // timestamp - try { - timestampClass = Class.forName("org.apache.hadoop.hive.common.type.Timestamp"); - setTimeInMillis = timestampClass.getDeclaredMethod("setTimeInMillis", long.class); - Class twV2Class = Class.forName("org.apache.hadoop.hive.serde2.io.TimestampWritableV2"); - timestampWriteableV2constructor = twV2Class.getConstructor(timestampClass); - SUPPORT_TIMESTAMP_WRITEABLE_V2 = (null != timestampWriteableV2constructor); - LOG.trace("use TimestampWritableV2 to read hudi timestamp columns"); - } catch (ClassNotFoundException | NoSuchMethodException e) { - LOG.trace("can not find hive3 timestampv2 class or method, use hive2 class!", e); - SUPPORT_TIMESTAMP_WRITEABLE_V2 = false; + Option> timestampTriple = Option.ofNullable(() -> { + try { + Class timestampClass = Class.forName("org.apache.hadoop.hive.common.type.Timestamp"); + Method setTimeInMillis = timestampClass.getDeclaredMethod("setTimeInMillis", long.class); + Class twV2Class = Class.forName("org.apache.hadoop.hive.serde2.io.TimestampWritableV2"); + return ImmutableTriple.of(timestampClass, setTimeInMillis, twV2Class.getConstructor(timestampClass)); + } catch (ClassNotFoundException | NoSuchMethodException e) { + LOG.trace("can not find hive3 timestampv2 class or method, use hive2 class!", e); + return null; + } + }); + SUPPORT_TIMESTAMP_WRITEABLE_V2 = timestampTriple.isPresent(); + if (SUPPORT_TIMESTAMP_WRITEABLE_V2) { + LOG.trace("use org.apache.hadoop.hive.serde2.io.TimestampWritableV2 to read hudi timestamp columns"); + ImmutableTriple triple = timestampTriple.get(); + TIMESTAMP_CLASS = triple.left; + SET_TIME_IN_MILLIS = triple.middle; + TIMESTAMP_WRITEABLE_V2_CONSTRUCTOR = triple.right; + } else { + LOG.trace("use org.apache.hadoop.hive.serde2.io.TimestampWritable to read hudi timestamp columns"); + TIMESTAMP_CLASS = null; + SET_TIME_IN_MILLIS = null; + TIMESTAMP_WRITEABLE_V2_CONSTRUCTOR = null; } + // date - try { - Class dateV2Class = Class.forName("org.apache.hadoop.hive.serde2.io.DateWritableV2"); - dateWriteableV2constructor = dateV2Class.getConstructor(int.class); - SUPPORT_DATE_WRITEABLE_V2 = (null != dateWriteableV2constructor); - LOG.trace("use DateWritableV2 to read hudi date columns"); - } catch (ClassNotFoundException | NoSuchMethodException e) { - LOG.trace("can not find hive3 datev2 class or method, use hive2 class!", e); - SUPPORT_DATE_WRITEABLE_V2 = false; + Option dateConstructor = Option.ofNullable(() -> { + try { + Class dateV2Class = Class.forName("org.apache.hadoop.hive.serde2.io.DateWritableV2"); + return dateV2Class.getConstructor(int.class); + } catch (ClassNotFoundException | NoSuchMethodException e) { + LOG.trace("can not find hive3 datev2 class or method, use hive2 class!", e); + return null; + } + }); + SUPPORT_DATE_WRITEABLE_V2 = dateConstructor.isPresent(); + if (SUPPORT_DATE_WRITEABLE_V2) { + LOG.trace("use org.apache.hadoop.hive.serde2.io.DateWritableV2 to read hudi date columns"); + DATE_WRITEABLE_V2_CONSTRUCTOR = dateConstructor.get(); + } else { + LOG.trace("use org.apache.hadoop.hive.serde2.io.DateWritable to read hudi date columns"); + DATE_WRITEABLE_V2_CONSTRUCTOR = null; } } @@ -196,9 +219,9 @@ public static boolean isIncrementalUseDatabase(Configuration conf) { public static Writable getTimestampWriteable(long value, boolean timestampMillis) { if (SUPPORT_TIMESTAMP_WRITEABLE_V2) { try { - Object timestamp = timestampClass.newInstance(); - setTimeInMillis.invoke(timestamp, timestampMillis ? value : value / 1000); - return (Writable) timestampWriteableV2constructor.newInstance(timestamp); + Object timestamp = TIMESTAMP_CLASS.newInstance(); + SET_TIME_IN_MILLIS.invoke(timestamp, timestampMillis ? value : value / 1000); + return (Writable) TIMESTAMP_WRITEABLE_V2_CONSTRUCTOR.newInstance(timestamp); } catch (IllegalAccessException | InstantiationException | InvocationTargetException e) { throw new HoodieException("can not create writable v2 class!", e); } @@ -216,7 +239,7 @@ public static Writable getTimestampWriteable(long value, boolean timestampMillis public static Writable getDateWriteable(int value) { if (SUPPORT_DATE_WRITEABLE_V2) { try { - return (Writable) dateWriteableV2constructor.newInstance(value); + return (Writable) DATE_WRITEABLE_V2_CONSTRUCTOR.newInstance(value); } catch (IllegalAccessException | InstantiationException | InvocationTargetException e) { throw new HoodieException("can not create writable v2 class!", e); } From 0fa6a882786a9dcae26c9ed281de7b04dce29f54 Mon Sep 17 00:00:00 2001 From: cdmikechen Date: Mon, 23 May 2022 20:13:54 +0800 Subject: [PATCH 04/15] Supplement modification description and adjust the class name to static final string --- .../org/apache/hudi/hadoop/utils/HoodieHiveUtils.java | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieHiveUtils.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieHiveUtils.java index 650d83311235a..652f85b06cf84 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieHiveUtils.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieHiveUtils.java @@ -156,11 +156,14 @@ public static boolean isIncrementalUseDatabase(Configuration conf) { return conf.getBoolean(HOODIE_INCREMENTAL_USE_DATABASE, false); } + public static final String HIVE_TIMESTAMP_TYPE_CLASS = "org.apache.hadoop.hive.common.type.Timestamp"; + public static final String TIMESTAMP_WRITEABLE_V2_CLASS = "org.apache.hadoop.hive.serde2.io.TimestampWritableV2"; public static final boolean SUPPORT_TIMESTAMP_WRITEABLE_V2; private static final Class TIMESTAMP_CLASS; private static final Method SET_TIME_IN_MILLIS; private static final Constructor TIMESTAMP_WRITEABLE_V2_CONSTRUCTOR; + public static final String DATE_WRITEABLE_V2_CLASS = "org.apache.hadoop.hive.serde2.io.DateWritableV2"; public static final boolean SUPPORT_DATE_WRITEABLE_V2; private static final Constructor DATE_WRITEABLE_V2_CONSTRUCTOR; @@ -168,9 +171,9 @@ public static boolean isIncrementalUseDatabase(Configuration conf) { // timestamp Option> timestampTriple = Option.ofNullable(() -> { try { - Class timestampClass = Class.forName("org.apache.hadoop.hive.common.type.Timestamp"); + Class timestampClass = Class.forName(HIVE_TIMESTAMP_TYPE_CLASS); Method setTimeInMillis = timestampClass.getDeclaredMethod("setTimeInMillis", long.class); - Class twV2Class = Class.forName("org.apache.hadoop.hive.serde2.io.TimestampWritableV2"); + Class twV2Class = Class.forName(TIMESTAMP_WRITEABLE_V2_CLASS); return ImmutableTriple.of(timestampClass, setTimeInMillis, twV2Class.getConstructor(timestampClass)); } catch (ClassNotFoundException | NoSuchMethodException e) { LOG.trace("can not find hive3 timestampv2 class or method, use hive2 class!", e); @@ -194,7 +197,7 @@ public static boolean isIncrementalUseDatabase(Configuration conf) { // date Option dateConstructor = Option.ofNullable(() -> { try { - Class dateV2Class = Class.forName("org.apache.hadoop.hive.serde2.io.DateWritableV2"); + Class dateV2Class = Class.forName(DATE_WRITEABLE_V2_CLASS); return dateV2Class.getConstructor(int.class); } catch (ClassNotFoundException | NoSuchMethodException e) { LOG.trace("can not find hive3 datev2 class or method, use hive2 class!", e); From a482134b7bd67e6baf0c01a77bb606cd6b4b5da5 Mon Sep 17 00:00:00 2001 From: cdmikechen Date: Sat, 4 Jun 2022 11:28:36 +0800 Subject: [PATCH 05/15] Fix review (change default support_timestamp to false, rename class name, re-process the supplier use) --- conf/hudi-defaults.conf.template | 2 +- .../main/java/org/apache/hudi/common/util/Option.java | 4 ---- .../common/util/TestDFSPropertiesConfiguration.java | 2 +- .../test/resources/external-config/hudi-defaults.conf | 2 +- .../org/apache/hudi/configuration/FlinkOptions.java | 2 +- .../org/apache/hudi/streamer/FlinkStreamerConfig.java | 5 +++-- .../hudi/hadoop/HoodieColumnProjectionUtils.java | 3 --- .../apache/hudi/hadoop/HoodieParquetInputFormat.java | 4 ++-- ...tFormat.java => HoodieAvroParquetInputFormat.java} | 4 ++-- ...arquetReader.java => HoodieAvroParquetReader.java} | 4 ++-- .../org/apache/hudi/hadoop/utils/HoodieHiveUtils.java | 11 +++++++---- 11 files changed, 20 insertions(+), 23 deletions(-) rename hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/avro/{HudiAvroParquetInputFormat.java => HoodieAvroParquetInputFormat.java} (91%) rename hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/avro/{HudiAvroParquetReader.java => HoodieAvroParquetReader.java} (96%) diff --git a/conf/hudi-defaults.conf.template b/conf/hudi-defaults.conf.template index bd88bb544ea99..175dbaf23d739 100644 --- a/conf/hudi-defaults.conf.template +++ b/conf/hudi-defaults.conf.template @@ -21,6 +21,6 @@ # Example: # hoodie.datasource.hive_sync.jdbcurl jdbc:hive2://localhost:10000 # hoodie.datasource.hive_sync.use_jdbc true -# hoodie.datasource.hive_sync.support_timestamp true +# hoodie.datasource.hive_sync.support_timestamp false # hoodie.index.type BLOOM # hoodie.metadata.enable false diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/Option.java b/hudi-common/src/main/java/org/apache/hudi/common/util/Option.java index 67b17aa46c14c..957dab28e2c28 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/Option.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/Option.java @@ -76,10 +76,6 @@ public static Option of(T value) { return new Option<>(value); } - public static Option ofNullable(Supplier value) { - return null == value ? empty() : ofNullable(value.get()); - } - public static Option ofNullable(T value) { return null == value ? empty() : of(value); } diff --git a/hudi-common/src/test/java/org/apache/hudi/common/util/TestDFSPropertiesConfiguration.java b/hudi-common/src/test/java/org/apache/hudi/common/util/TestDFSPropertiesConfiguration.java index f1720bc58628c..df3efbd91f9e0 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/util/TestDFSPropertiesConfiguration.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/util/TestDFSPropertiesConfiguration.java @@ -194,7 +194,7 @@ public void testLoadGlobalConfFile() { assertEquals(5, DFSPropertiesConfiguration.getGlobalProps().size()); assertEquals("jdbc:hive2://localhost:10000", DFSPropertiesConfiguration.getGlobalProps().get("hoodie.datasource.hive_sync.jdbcurl")); assertEquals("true", DFSPropertiesConfiguration.getGlobalProps().get("hoodie.datasource.hive_sync.use_jdbc")); - assertEquals("true", DFSPropertiesConfiguration.getGlobalProps().get("hoodie.datasource.hive_sync.support_timestamp")); + assertEquals("false", DFSPropertiesConfiguration.getGlobalProps().get("hoodie.datasource.hive_sync.support_timestamp")); assertEquals("BLOOM", DFSPropertiesConfiguration.getGlobalProps().get("hoodie.index.type")); assertEquals("true", DFSPropertiesConfiguration.getGlobalProps().get("hoodie.metadata.enable")); } diff --git a/hudi-common/src/test/resources/external-config/hudi-defaults.conf b/hudi-common/src/test/resources/external-config/hudi-defaults.conf index 5d7cdf7a32d2e..1133adb4d7735 100644 --- a/hudi-common/src/test/resources/external-config/hudi-defaults.conf +++ b/hudi-common/src/test/resources/external-config/hudi-defaults.conf @@ -21,6 +21,6 @@ # Example: hoodie.datasource.hive_sync.jdbcurl jdbc:hive2://localhost:10000 hoodie.datasource.hive_sync.use_jdbc true -hoodie.datasource.hive_sync.support_timestamp true +hoodie.datasource.hive_sync.support_timestamp false hoodie.index.type BLOOM hoodie.metadata.enable true diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java index 04601e11d411c..0162d2bf8ca89 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java @@ -830,7 +830,7 @@ private FlinkOptions() { public static final ConfigOption HIVE_SYNC_SUPPORT_TIMESTAMP = ConfigOptions .key("hive_sync.support_timestamp") .booleanType() - .defaultValue(true) + .defaultValue(false) .withDescription("'INT64' with original type TIMESTAMP_MICROS is converted to hive 'timestamp' type. " + "From 0.12.0, 'timestamp' type will be supported and also can be disabled by this variable. " + "Previous versions keep being disabled by default."); diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/streamer/FlinkStreamerConfig.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/streamer/FlinkStreamerConfig.java index 2786a3aa443e6..91ab866615f36 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/streamer/FlinkStreamerConfig.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/streamer/FlinkStreamerConfig.java @@ -340,8 +340,9 @@ public class FlinkStreamerConfig extends Configuration { public Boolean hiveSyncSkipRoSuffix = false; @Parameter(names = {"--hive-sync-support-timestamp"}, description = "INT64 with original type TIMESTAMP_MICROS is converted to hive timestamp type.\n" - + "Disabled by default for backward compatibility.") - public Boolean hiveSyncSupportTimestamp = true; + + "From 0.12.0, 'timestamp' type will be supported and also can be disabled by this variable. " + + "Previous versions keep being disabled by default.") + public Boolean hiveSyncSupportTimestamp = false; /** diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieColumnProjectionUtils.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieColumnProjectionUtils.java index 0bc6eea774289..3d9f7a35a6c41 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieColumnProjectionUtils.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieColumnProjectionUtils.java @@ -122,9 +122,6 @@ public static boolean supportTimestamp(Configuration conf) { return getIOColumnTypes(conf).contains("timestamp"); } List names = getIOColumns(conf); - if (names.isEmpty()) { - return true; - } List types = getIOColumnTypes(conf); return types.isEmpty() || IntStream.range(0, names.size()).filter(i -> reads.contains(names.get(i))) .anyMatch(i -> types.get(i).equals("timestamp")); diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieParquetInputFormat.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieParquetInputFormat.java index e1af812b9379a..2cee3deb2bdf8 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieParquetInputFormat.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieParquetInputFormat.java @@ -31,7 +31,7 @@ import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.exception.HoodieException; -import org.apache.hudi.hadoop.avro.HudiAvroParquetInputFormat; +import org.apache.hudi.hadoop.avro.HoodieAvroParquetInputFormat; import org.apache.hudi.hadoop.utils.HoodieHiveUtils; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; @@ -121,7 +121,7 @@ private RecordReader getRecordReaderInternal(InputS Reporter reporter) throws IOException { try { if (supportAvroRead && HoodieColumnProjectionUtils.supportTimestamp(job)) { - return new ParquetRecordReaderWrapper(new HudiAvroParquetInputFormat(), split, job, reporter); + return new ParquetRecordReaderWrapper(new HoodieAvroParquetInputFormat(), split, job, reporter); } else { return super.getRecordReader(split, job, reporter); } diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/avro/HudiAvroParquetInputFormat.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/avro/HoodieAvroParquetInputFormat.java similarity index 91% rename from hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/avro/HudiAvroParquetInputFormat.java rename to hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/avro/HoodieAvroParquetInputFormat.java index 6540695a41108..c464ba266e4e8 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/avro/HudiAvroParquetInputFormat.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/avro/HoodieAvroParquetInputFormat.java @@ -28,13 +28,13 @@ import java.io.IOException; -public class HudiAvroParquetInputFormat extends ParquetInputFormat { +public class HoodieAvroParquetInputFormat extends ParquetInputFormat { @Override public RecordReader createRecordReader( InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException { Configuration conf = ContextUtil.getConfiguration(taskAttemptContext); - return new HudiAvroParquetReader(inputSplit, conf); + return new HoodieAvroParquetReader(inputSplit, conf); } } diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/avro/HudiAvroParquetReader.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/avro/HoodieAvroParquetReader.java similarity index 96% rename from hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/avro/HudiAvroParquetReader.java rename to hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/avro/HoodieAvroParquetReader.java index 3e930d614c5e9..b12cae7736d05 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/avro/HudiAvroParquetReader.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/avro/HoodieAvroParquetReader.java @@ -44,11 +44,11 @@ import static org.apache.parquet.hadoop.ParquetInputFormat.getFilter; -public class HudiAvroParquetReader extends RecordReader { +public class HoodieAvroParquetReader extends RecordReader { private final ParquetRecordReader parquetRecordReader; - public HudiAvroParquetReader(InputSplit inputSplit, Configuration conf) throws IOException { + public HoodieAvroParquetReader(InputSplit inputSplit, Configuration conf) throws IOException { AvroReadSupport avroReadSupport = new AvroReadSupport<>(); // if exists read columns, we need to filter columns. List readColNames = Arrays.asList(HoodieColumnProjectionUtils.getReadColumnNames(conf)); diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieHiveUtils.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieHiveUtils.java index 652f85b06cf84..21b6add760d59 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieHiveUtils.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieHiveUtils.java @@ -41,6 +41,7 @@ import java.util.Map; import java.util.Objects; import java.util.Set; +import java.util.function.Supplier; import java.util.regex.Matcher; import java.util.regex.Pattern; import java.util.stream.Collectors; @@ -169,7 +170,7 @@ public static boolean isIncrementalUseDatabase(Configuration conf) { static { // timestamp - Option> timestampTriple = Option.ofNullable(() -> { + Supplier> timestampSupplier = () -> { try { Class timestampClass = Class.forName(HIVE_TIMESTAMP_TYPE_CLASS); Method setTimeInMillis = timestampClass.getDeclaredMethod("setTimeInMillis", long.class); @@ -179,7 +180,8 @@ public static boolean isIncrementalUseDatabase(Configuration conf) { LOG.trace("can not find hive3 timestampv2 class or method, use hive2 class!", e); return null; } - }); + }; + Option> timestampTriple = Option.ofNullable(timestampSupplier.get()); SUPPORT_TIMESTAMP_WRITEABLE_V2 = timestampTriple.isPresent(); if (SUPPORT_TIMESTAMP_WRITEABLE_V2) { LOG.trace("use org.apache.hadoop.hive.serde2.io.TimestampWritableV2 to read hudi timestamp columns"); @@ -195,7 +197,7 @@ public static boolean isIncrementalUseDatabase(Configuration conf) { } // date - Option dateConstructor = Option.ofNullable(() -> { + Supplier dateSupplier = () -> { try { Class dateV2Class = Class.forName(DATE_WRITEABLE_V2_CLASS); return dateV2Class.getConstructor(int.class); @@ -203,7 +205,8 @@ public static boolean isIncrementalUseDatabase(Configuration conf) { LOG.trace("can not find hive3 datev2 class or method, use hive2 class!", e); return null; } - }); + }; + Option dateConstructor = Option.ofNullable(dateSupplier.get()); SUPPORT_DATE_WRITEABLE_V2 = dateConstructor.isPresent(); if (SUPPORT_DATE_WRITEABLE_V2) { LOG.trace("use org.apache.hadoop.hive.serde2.io.DateWritableV2 to read hudi date columns"); From af53d0bb2b308bbf9d650de5295f7f2399e3053b Mon Sep 17 00:00:00 2001 From: cdmikechen Date: Sat, 4 Jun 2022 12:31:16 +0800 Subject: [PATCH 06/15] rebase master and rename FSUtils method --- .../org/apache/hudi/hadoop/TestHoodieParquetInputFormat.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/TestHoodieParquetInputFormat.java b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/TestHoodieParquetInputFormat.java index ee290bc585436..9d0811ec80dfa 100644 --- a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/TestHoodieParquetInputFormat.java +++ b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/TestHoodieParquetInputFormat.java @@ -726,7 +726,7 @@ public void testHoodieParquetInputFormatReadTimeType() throws IOException { HoodieTestUtils.init(HoodieTestUtils.getDefaultHadoopConf(), basePath.toString(), HoodieTableType.COPY_ON_WRITE, HoodieFileFormat.PARQUET); java.nio.file.Path partitionPath = basePath.resolve(Paths.get("2016", "06", "28")); - String fileId = FSUtils.makeDataFileName(commit, "1-0-1", "fileid1", + String fileId = FSUtils.makeBaseFileName(commit, "1-0-1", "fileid1", HoodieFileFormat.PARQUET.getFileExtension()); try (AvroParquetWriter parquetWriter = new AvroParquetWriter( new Path(partitionPath.resolve(fileId).toString()), schema)) { From 57166d2545452bd89de0f7e79c512f257592919e Mon Sep 17 00:00:00 2001 From: cdmikechen Date: Sat, 15 Oct 2022 17:46:12 +0800 Subject: [PATCH 07/15] Change support version --- .../java/org/apache/hudi/configuration/FlinkOptions.java | 2 +- .../src/main/java/org/apache/hudi/hive/HiveSyncConfig.java | 3 ++- .../main/java/org/apache/hudi/hive/HiveSyncConfigHolder.java | 5 +++-- 3 files changed, 6 insertions(+), 4 deletions(-) diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java index 0162d2bf8ca89..70b2953a9b12c 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java @@ -832,7 +832,7 @@ private FlinkOptions() { .booleanType() .defaultValue(false) .withDescription("'INT64' with original type TIMESTAMP_MICROS is converted to hive 'timestamp' type. " - + "From 0.12.0, 'timestamp' type will be supported and also can be disabled by this variable. " + + "From 0.13.0, 'timestamp' type will be supported and also can be disabled by this variable. " + "Previous versions keep being disabled by default."); public static final ConfigOption HIVE_SYNC_TABLE_PROPERTIES = ConfigOptions diff --git a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncConfig.java b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncConfig.java index 11eba393e6a68..c49feb1b088a7 100644 --- a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncConfig.java +++ b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncConfig.java @@ -138,7 +138,8 @@ public static class HiveSyncConfigParams { @Parameter(names = {"--serde-properties"}, description = "Serde properties to hive table") public String serdeProperties; @Parameter(names = {"--support-timestamp"}, description = "'INT64' with original type TIMESTAMP_MICROS is converted to hive 'timestamp' type." - + "Disabled by default for backward compatibility.") + + "From 0.13.0, 'timestamp' type will be supported and also can be disabled by this variable. " + + "Previous versions keep being disabled by default.") public Boolean supportTimestamp; @Parameter(names = {"--managed-table"}, description = "Create a managed table") public Boolean createManagedTable; diff --git a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncConfigHolder.java b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncConfigHolder.java index 02d6c0f2174ce..e5163ba0045b6 100644 --- a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncConfigHolder.java +++ b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncConfigHolder.java @@ -79,9 +79,10 @@ public class HiveSyncConfigHolder { .withDocumentation("Skip the _ro suffix for Read optimized table, when registering"); public static final ConfigProperty HIVE_SUPPORT_TIMESTAMP_TYPE = ConfigProperty .key("hoodie.datasource.hive_sync.support_timestamp") - .defaultValue("false") + .defaultValue("true") .withDocumentation("‘INT64’ with original type TIMESTAMP_MICROS is converted to hive ‘timestamp’ type. " - + "Disabled by default for backward compatibility."); + + "From 0.13.0, 'timestamp' type will be supported and also can be disabled by this variable. " + + "Previous versions keep being disabled by default."); public static final ConfigProperty HIVE_TABLE_PROPERTIES = ConfigProperty .key("hoodie.datasource.hive_sync.table_properties") .noDefaultValue() From 32c2be28d34de98993c26a74258ab3bf599c1ab2 Mon Sep 17 00:00:00 2001 From: cdmikechen Date: Sun, 16 Oct 2022 14:43:12 +0800 Subject: [PATCH 08/15] Append supportTimestamp --- .../hudi/hadoop/utils/HoodieRealtimeRecordReaderUtils.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeRecordReaderUtils.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeRecordReaderUtils.java index b87758af90b1c..7e5328b838227 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeRecordReaderUtils.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeRecordReaderUtils.java @@ -204,7 +204,7 @@ public static Writable avroToArrayWritable(Object value, Schema schema) { } catch (AvroRuntimeException e) { LOG.debug("Field:" + field.name() + "not found in Schema:" + schema); } - recordValues[recordValueIndex++] = avroToArrayWritable(fieldValue, field.schema()); + recordValues[recordValueIndex++] = avroToArrayWritable(fieldValue, field.schema(), supportTimestamp); } return new ArrayWritable(Writable.class, recordValues); case ENUM: @@ -214,7 +214,7 @@ public static Writable avroToArrayWritable(Object value, Schema schema) { Writable[] arrayValues = new Writable[arrayValue.size()]; int arrayValueIndex = 0; for (Object obj : arrayValue) { - arrayValues[arrayValueIndex++] = avroToArrayWritable(obj, schema.getElementType()); + arrayValues[arrayValueIndex++] = avroToArrayWritable(obj, schema.getElementType(), supportTimestamp); } // Hive 1.x will fail here, it requires values2 to be wrapped into another ArrayWritable return new ArrayWritable(Writable.class, arrayValues); @@ -226,7 +226,7 @@ public static Writable avroToArrayWritable(Object value, Schema schema) { Map.Entry mapEntry = (Map.Entry) entry; Writable[] nestedMapValues = new Writable[2]; nestedMapValues[0] = new Text(mapEntry.getKey().toString()); - nestedMapValues[1] = avroToArrayWritable(mapEntry.getValue(), schema.getValueType()); + nestedMapValues[1] = avroToArrayWritable(mapEntry.getValue(), schema.getValueType(), supportTimestamp); mapValues[mapValueIndex++] = new ArrayWritable(Writable.class, nestedMapValues); } // Hive 1.x will fail here, it requires values3 to be wrapped into another ArrayWritable From fc58129e45e6faff11f654dc45404d2d817f3ab2 Mon Sep 17 00:00:00 2001 From: cdmikechen Date: Sun, 16 Oct 2022 14:47:05 +0800 Subject: [PATCH 09/15] Change version --- .../main/java/org/apache/hudi/configuration/FlinkOptions.java | 2 +- .../java/org/apache/hudi/streamer/FlinkStreamerConfig.java | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java index 70b2953a9b12c..8ba625c2aff93 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java @@ -830,7 +830,7 @@ private FlinkOptions() { public static final ConfigOption HIVE_SYNC_SUPPORT_TIMESTAMP = ConfigOptions .key("hive_sync.support_timestamp") .booleanType() - .defaultValue(false) + .defaultValue(true) .withDescription("'INT64' with original type TIMESTAMP_MICROS is converted to hive 'timestamp' type. " + "From 0.13.0, 'timestamp' type will be supported and also can be disabled by this variable. " + "Previous versions keep being disabled by default."); diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/streamer/FlinkStreamerConfig.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/streamer/FlinkStreamerConfig.java index 91ab866615f36..f5a22f0ec3a3b 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/streamer/FlinkStreamerConfig.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/streamer/FlinkStreamerConfig.java @@ -340,9 +340,9 @@ public class FlinkStreamerConfig extends Configuration { public Boolean hiveSyncSkipRoSuffix = false; @Parameter(names = {"--hive-sync-support-timestamp"}, description = "INT64 with original type TIMESTAMP_MICROS is converted to hive timestamp type.\n" - + "From 0.12.0, 'timestamp' type will be supported and also can be disabled by this variable. " + + "From 0.13.0, 'timestamp' type will be supported and also can be disabled by this variable. " + "Previous versions keep being disabled by default.") - public Boolean hiveSyncSupportTimestamp = false; + public Boolean hiveSyncSupportTimestamp = true; /** From 00f0a27af86b05f70b9ef22eb5a6180a21756b0b Mon Sep 17 00:00:00 2001 From: cdmikechen Date: Tue, 18 Oct 2022 08:03:32 +0800 Subject: [PATCH 10/15] rename inputformat --- .../hudi/hadoop/HoodieParquetInputFormat.java | 4 +- .../avro/HoodieAvroParquetInputFormat.java | 40 ------------------- 2 files changed, 2 insertions(+), 42 deletions(-) delete mode 100644 hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/avro/HoodieAvroParquetInputFormat.java diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieParquetInputFormat.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieParquetInputFormat.java index 2cee3deb2bdf8..cbd009669aba4 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieParquetInputFormat.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieParquetInputFormat.java @@ -31,7 +31,7 @@ import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.exception.HoodieException; -import org.apache.hudi.hadoop.avro.HoodieAvroParquetInputFormat; +import org.apache.hudi.hadoop.avro.HoodieTimestampAwareParquetInputFormat; import org.apache.hudi.hadoop.utils.HoodieHiveUtils; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; @@ -121,7 +121,7 @@ private RecordReader getRecordReaderInternal(InputS Reporter reporter) throws IOException { try { if (supportAvroRead && HoodieColumnProjectionUtils.supportTimestamp(job)) { - return new ParquetRecordReaderWrapper(new HoodieAvroParquetInputFormat(), split, job, reporter); + return new ParquetRecordReaderWrapper(new HoodieTimestampAwareParquetInputFormat(), split, job, reporter); } else { return super.getRecordReader(split, job, reporter); } diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/avro/HoodieAvroParquetInputFormat.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/avro/HoodieAvroParquetInputFormat.java deleted file mode 100644 index c464ba266e4e8..0000000000000 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/avro/HoodieAvroParquetInputFormat.java +++ /dev/null @@ -1,40 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hudi.hadoop.avro; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.io.ArrayWritable; -import org.apache.hadoop.mapreduce.InputSplit; -import org.apache.hadoop.mapreduce.RecordReader; -import org.apache.hadoop.mapreduce.TaskAttemptContext; -import org.apache.parquet.hadoop.ParquetInputFormat; -import org.apache.parquet.hadoop.util.ContextUtil; - -import java.io.IOException; - -public class HoodieAvroParquetInputFormat extends ParquetInputFormat { - - @Override - public RecordReader createRecordReader( - InputSplit inputSplit, - TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException { - Configuration conf = ContextUtil.getConfiguration(taskAttemptContext); - return new HoodieAvroParquetReader(inputSplit, conf); - } -} From 60bff79c9f5e6afe0b0d22f902e1f2c2b2f4bb42 Mon Sep 17 00:00:00 2001 From: cdmikechen Date: Tue, 18 Oct 2022 08:05:09 +0800 Subject: [PATCH 11/15] add inputformat --- ...oodieTimestampAwareParquetInputFormat.java | 44 +++++++++++++++++++ 1 file changed, 44 insertions(+) create mode 100644 hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/avro/HoodieTimestampAwareParquetInputFormat.java diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/avro/HoodieTimestampAwareParquetInputFormat.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/avro/HoodieTimestampAwareParquetInputFormat.java new file mode 100644 index 0000000000000..4f4c96d0ec857 --- /dev/null +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/avro/HoodieTimestampAwareParquetInputFormat.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.hadoop.avro; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.ArrayWritable; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.parquet.hadoop.ParquetInputFormat; +import org.apache.parquet.hadoop.util.ContextUtil; + +import java.io.IOException; + +/** + * To resolve issue Fix Timestamp/Date type read by Hive3, + * we need to handle timestamp types separately based on the parquet-avro approach + */ +public class HoodieTimestampAwareParquetInputFormat extends ParquetInputFormat { + + @Override + public RecordReader createRecordReader( + InputSplit inputSplit, + TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException { + Configuration conf = ContextUtil.getConfiguration(taskAttemptContext); + return new HoodieAvroParquetReader(inputSplit, conf); + } +} From 2e30eed0b25a5e22b738abce7d5e70159d0e8afe Mon Sep 17 00:00:00 2001 From: cdmikechen Date: Sun, 4 Dec 2022 15:00:11 +0800 Subject: [PATCH 12/15] Optimise codes --- .../AbstractRealtimeRecordReader.java | 9 +++++++ .../HoodieRealtimeRecordReaderUtils.java | 25 +++++++++++++------ 2 files changed, 26 insertions(+), 8 deletions(-) diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/AbstractRealtimeRecordReader.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/AbstractRealtimeRecordReader.java index 1dd9fc4cfd5a1..320bd33650911 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/AbstractRealtimeRecordReader.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/AbstractRealtimeRecordReader.java @@ -25,6 +25,7 @@ import org.apache.hudi.common.table.TableSchemaResolver; import org.apache.hudi.common.util.Option; import org.apache.hudi.exception.HoodieException; +import org.apache.hudi.hadoop.HoodieColumnProjectionUtils; import org.apache.hudi.hadoop.SchemaEvolutionContext; import org.apache.hudi.hadoop.utils.HiveAvroSerializer; import org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils; @@ -72,6 +73,8 @@ public abstract class AbstractRealtimeRecordReader { protected boolean supportPayload = true; // handle hive type to avro record protected HiveAvroSerializer serializer; + // support timestamp + private final boolean supportTimestamp; public AbstractRealtimeRecordReader(RealtimeSplit split, JobConf job) { this.split = split; @@ -80,6 +83,8 @@ public AbstractRealtimeRecordReader(RealtimeSplit split, JobConf job) { LOG.info("columnIds ==> " + job.get(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR)); LOG.info("partitioningColumns ==> " + job.get(hive_metastoreConstants.META_TABLE_PARTITION_COLUMNS, "")); this.supportPayload = Boolean.parseBoolean(job.get("hoodie.support.payload", "true")); + // get timestamp columns + supportTimestamp = HoodieColumnProjectionUtils.supportTimestamp(jobConf); try { metaClient = HoodieTableMetaClient.builder().setConf(jobConf).setBasePath(split.getBasePath()).build(); if (metaClient.getTableConfig().getPreCombineField() != null) { @@ -219,4 +224,8 @@ public void setWriterSchema(Schema writerSchema) { public void setHiveSchema(Schema hiveSchema) { this.hiveSchema = hiveSchema; } + + public boolean isSupportTimestamp() { + return supportTimestamp; + } } diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeRecordReaderUtils.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeRecordReaderUtils.java index 7e5328b838227..452feec533e80 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeRecordReaderUtils.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeRecordReaderUtils.java @@ -20,6 +20,7 @@ import org.apache.avro.AvroRuntimeException; import org.apache.avro.JsonProperties; +import org.apache.avro.LogicalType; import org.apache.avro.LogicalTypes; import org.apache.avro.Schema; import org.apache.avro.generic.GenericArray; @@ -27,10 +28,8 @@ import org.apache.avro.generic.GenericRecord; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hive.serde2.io.DateWritable; import org.apache.hadoop.hive.serde2.io.DoubleWritable; import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable; -import org.apache.hadoop.hive.serde2.io.TimestampWritable; import org.apache.hadoop.hive.serde2.typeinfo.DecimalTypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.HiveDecimalUtils; import org.apache.hadoop.io.ArrayWritable; @@ -55,7 +54,6 @@ import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Arrays; -import java.sql.Timestamp; import java.util.LinkedHashSet; import java.util.List; import java.util.Map; @@ -164,6 +162,14 @@ public static Map getNameToFieldMap(Schema schema) { * Convert the projected read from delta record into an array writable. */ public static Writable avroToArrayWritable(Object value, Schema schema) { + return avroToArrayWritable(value, schema, false); + } + + /** + * Convert the projected read from delta record into an array writable. + * @param supportTimestamp Whether to support the timestamp field + */ + public static Writable avroToArrayWritable(Object value, Schema schema, boolean supportTimestamp) { if (value == null) { return null; @@ -176,12 +182,15 @@ public static Writable avroToArrayWritable(Object value, Schema schema) { return new BytesWritable(((ByteBuffer)value).array()); case INT: if (schema.getLogicalType() != null && schema.getLogicalType().getName().equals("date")) { - return new DateWritable((Integer) value); + return HoodieHiveUtils.getDateWriteable((Integer) value); } return new IntWritable((Integer) value); case LONG: - if (schema.getLogicalType() != null && "timestamp-micros".equals(schema.getLogicalType().getName())) { - return new TimestampWritable(new Timestamp((Long) value)); + LogicalType logicalType = schema.getLogicalType(); + boolean lttsm = LogicalTypes.timestampMillis().equals(logicalType); + // If there is a specified timestamp or under normal cases, we will process it + if (lttsm || (supportTimestamp && LogicalTypes.timestampMicros().equals(logicalType))) { + return HoodieHiveUtils.getTimestampWriteable((Long) value, lttsm); } return new LongWritable((Long) value); case FLOAT: @@ -239,9 +248,9 @@ public static Writable avroToArrayWritable(Object value, Schema schema) { Schema s1 = types.get(0); Schema s2 = types.get(1); if (s1.getType() == Schema.Type.NULL) { - return avroToArrayWritable(value, s2); + return avroToArrayWritable(value, s2, supportTimestamp); } else if (s2.getType() == Schema.Type.NULL) { - return avroToArrayWritable(value, s1); + return avroToArrayWritable(value, s1, supportTimestamp); } else { throw new IllegalArgumentException("Only support union with null"); } From 0fb7a3bf69d03d40e0fd501e0cccb608aa21a925 Mon Sep 17 00:00:00 2001 From: cdmikechen Date: Mon, 5 Dec 2022 08:08:39 +0800 Subject: [PATCH 13/15] Fix timestamp --- .../hadoop/utils/HoodieRealtimeRecordReaderUtils.java | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeRecordReaderUtils.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeRecordReaderUtils.java index 452feec533e80..e1109665b8372 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeRecordReaderUtils.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeRecordReaderUtils.java @@ -187,10 +187,13 @@ public static Writable avroToArrayWritable(Object value, Schema schema, boolean return new IntWritable((Integer) value); case LONG: LogicalType logicalType = schema.getLogicalType(); - boolean lttsm = LogicalTypes.timestampMillis().equals(logicalType); // If there is a specified timestamp or under normal cases, we will process it - if (lttsm || (supportTimestamp && LogicalTypes.timestampMicros().equals(logicalType))) { - return HoodieHiveUtils.getTimestampWriteable((Long) value, lttsm); + if (supportTimestamp) { + if (LogicalTypes.timestampMillis().equals(logicalType)) { + return HoodieHiveUtils.getTimestampWriteable((Long) value, true); + } else if (LogicalTypes.timestampMicros().equals(logicalType)) { + return HoodieHiveUtils.getTimestampWriteable((Long) value, false); + } } return new LongWritable((Long) value); case FLOAT: From 2cd8e80e69d86737836e14f38d2938ffa83fb2ce Mon Sep 17 00:00:00 2001 From: cdmikechen Date: Sat, 10 Dec 2022 10:18:59 +0800 Subject: [PATCH 14/15] Rebase 2022.12.10 and fix test case error --- .../hudi/hadoop/realtime/RealtimeUnmergedRecordReader.java | 3 ++- .../org/apache/hudi/hadoop/utils/TestHiveAvroSerializer.java | 2 +- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeUnmergedRecordReader.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeUnmergedRecordReader.java index 90e13a2aff341..9fa5856fee17d 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeUnmergedRecordReader.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeUnmergedRecordReader.java @@ -106,7 +106,8 @@ private List> getParallelProducers( scannerBuilder.withLogRecordScannerCallback(record -> { // convert Hoodie log record to Hadoop AvroWritable and buffer GenericRecord rec = (GenericRecord) record.getData().getInsertValue(getReaderSchema(), payloadProps).get(); - ArrayWritable aWritable = (ArrayWritable) HoodieRealtimeRecordReaderUtils.avroToArrayWritable(rec, getHiveSchema(), isSupportTimestamp()); + ArrayWritable aWritable = (ArrayWritable) HoodieRealtimeRecordReaderUtils.avroToArrayWritable(rec, getHiveSchema(), + isSupportTimestamp()); queue.insertRecord(aWritable); }) .build(); diff --git a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/utils/TestHiveAvroSerializer.java b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/utils/TestHiveAvroSerializer.java index 9de4630877ae5..6e664e87dbafd 100644 --- a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/utils/TestHiveAvroSerializer.java +++ b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/utils/TestHiveAvroSerializer.java @@ -82,7 +82,7 @@ public void testSerialize() { ByteBuffer bb = ByteBuffer.wrap(new byte[]{97, 48, 53}); avroRecord.put("col9", bb); assertTrue(GenericData.get().validate(avroSchema, avroRecord)); - ArrayWritable writable = (ArrayWritable) HoodieRealtimeRecordReaderUtils.avroToArrayWritable(avroRecord, avroSchema); + ArrayWritable writable = (ArrayWritable) HoodieRealtimeRecordReaderUtils.avroToArrayWritable(avroRecord, avroSchema, true); List writableList = Arrays.stream(writable.get()).collect(Collectors.toList()); writableList.remove(writableList.size() - 1); From b5e513307a9616b476991120bff051f83453a942 Mon Sep 17 00:00:00 2001 From: cdmikechen Date: Sat, 10 Dec 2022 10:47:59 +0800 Subject: [PATCH 15/15] Fix tab --- .../RealtimeUnmergedRecordReader.java | 57 +++++++++---------- 1 file changed, 28 insertions(+), 29 deletions(-) diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeUnmergedRecordReader.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeUnmergedRecordReader.java index 9fa5856fee17d..698db20fc6a9a 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeUnmergedRecordReader.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeUnmergedRecordReader.java @@ -44,7 +44,7 @@ import java.util.function.Function; class RealtimeUnmergedRecordReader extends AbstractRealtimeRecordReader - implements RecordReader { + implements RecordReader { // Parquet record reader private final RecordReader parquetReader; @@ -67,26 +67,26 @@ class RealtimeUnmergedRecordReader extends AbstractRealtimeRecordReader * @param realReader Parquet Reader */ public RealtimeUnmergedRecordReader(RealtimeSplit split, JobConf job, - RecordReader realReader) { + RecordReader realReader) { super(split, job); this.parquetReader = new SafeParquetRecordReaderWrapper(realReader); // Iterator for consuming records from parquet file this.parquetRecordsIterator = new RecordReaderValueIterator<>(this.parquetReader); HoodieUnMergedLogRecordScanner.Builder scannerBuilder = - HoodieUnMergedLogRecordScanner.newBuilder() - .withFileSystem(FSUtils.getFs(split.getPath().toString(), this.jobConf)) - .withBasePath(split.getBasePath()) - .withLogFilePaths(split.getDeltaLogPaths()) - .withReaderSchema(getReaderSchema()) - .withLatestInstantTime(split.getMaxCommitTime()) - .withReadBlocksLazily(Boolean.parseBoolean(this.jobConf.get(HoodieRealtimeConfig.COMPACTION_LAZY_BLOCK_READ_ENABLED_PROP, HoodieRealtimeConfig.DEFAULT_COMPACTION_LAZY_BLOCK_READ_ENABLED))) - .withReverseReader(false) - .withBufferSize(this.jobConf.getInt(HoodieRealtimeConfig.MAX_DFS_STREAM_BUFFER_SIZE_PROP, HoodieRealtimeConfig.DEFAULT_MAX_DFS_STREAM_BUFFER_SIZE)); + HoodieUnMergedLogRecordScanner.newBuilder() + .withFileSystem(FSUtils.getFs(split.getPath().toString(), this.jobConf)) + .withBasePath(split.getBasePath()) + .withLogFilePaths(split.getDeltaLogPaths()) + .withReaderSchema(getReaderSchema()) + .withLatestInstantTime(split.getMaxCommitTime()) + .withReadBlocksLazily(Boolean.parseBoolean(this.jobConf.get(HoodieRealtimeConfig.COMPACTION_LAZY_BLOCK_READ_ENABLED_PROP, HoodieRealtimeConfig.DEFAULT_COMPACTION_LAZY_BLOCK_READ_ENABLED))) + .withReverseReader(false) + .withBufferSize(this.jobConf.getInt(HoodieRealtimeConfig.MAX_DFS_STREAM_BUFFER_SIZE_PROP, HoodieRealtimeConfig.DEFAULT_MAX_DFS_STREAM_BUFFER_SIZE)); this.executor = new BoundedInMemoryExecutor<>( - HoodieRealtimeRecordReaderUtils.getMaxCompactionMemoryInBytes(jobConf), getParallelProducers(scannerBuilder), - Option.empty(), Function.identity(), new DefaultSizeEstimator<>(), Functions.noop()); + HoodieRealtimeRecordReaderUtils.getMaxCompactionMemoryInBytes(jobConf), getParallelProducers(scannerBuilder), + Option.empty(), Function.identity(), new DefaultSizeEstimator<>(), Functions.noop()); // Consumer of this record reader this.iterator = this.executor.getRecordIterator(); @@ -98,24 +98,23 @@ public RealtimeUnmergedRecordReader(RealtimeSplit split, JobConf job, * Setup log and parquet reading in parallel. Both write to central buffer. */ private List> getParallelProducers( - HoodieUnMergedLogRecordScanner.Builder scannerBuilder + HoodieUnMergedLogRecordScanner.Builder scannerBuilder ) { return Arrays.asList( - new FunctionBasedQueueProducer<>(queue -> { - HoodieUnMergedLogRecordScanner scanner = - scannerBuilder.withLogRecordScannerCallback(record -> { - // convert Hoodie log record to Hadoop AvroWritable and buffer - GenericRecord rec = (GenericRecord) record.getData().getInsertValue(getReaderSchema(), payloadProps).get(); - ArrayWritable aWritable = (ArrayWritable) HoodieRealtimeRecordReaderUtils.avroToArrayWritable(rec, getHiveSchema(), - isSupportTimestamp()); - queue.insertRecord(aWritable); - }) - .build(); - // Scan all the delta-log files, filling in the queue - scanner.scan(); - return null; - }), - new IteratorBasedQueueProducer<>(parquetRecordsIterator) + new FunctionBasedQueueProducer<>(queue -> { + HoodieUnMergedLogRecordScanner scanner = + scannerBuilder.withLogRecordScannerCallback(record -> { + // convert Hoodie log record to Hadoop AvroWritable and buffer + GenericRecord rec = (GenericRecord) record.getData().getInsertValue(getReaderSchema(), payloadProps).get(); + ArrayWritable aWritable = (ArrayWritable) HoodieRealtimeRecordReaderUtils.avroToArrayWritable(rec, getHiveSchema(), isSupportTimestamp()); + queue.insertRecord(aWritable); + }) + .build(); + // Scan all the delta-log files, filling in the queue + scanner.scan(); + return null; + }), + new IteratorBasedQueueProducer<>(parquetRecordsIterator) ); }