diff --git a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java index 91c214713e31..a602b452c324 100644 --- a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java @@ -57,6 +57,7 @@ import java.math.BigDecimal; import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; +import java.sql.Timestamp; import java.time.LocalDate; import java.util.ArrayList; import java.util.Arrays; @@ -539,6 +540,8 @@ public static Object convertValueForSpecificDataTypes(Schema fieldSchema, Object private static Object convertValueForAvroLogicalTypes(Schema fieldSchema, Object fieldValue) { if (fieldSchema.getLogicalType() == LogicalTypes.date()) { return LocalDate.ofEpochDay(Long.parseLong(fieldValue.toString())); + } else if (fieldSchema.getLogicalType() == LogicalTypes.timestampMicros()) { + return new Timestamp(Long.parseLong(fieldValue.toString()) / 1000); } else if (fieldSchema.getLogicalType() instanceof LogicalTypes.Decimal) { Decimal dc = (Decimal) fieldSchema.getLogicalType(); DecimalConversion decimalConversion = new DecimalConversion(); diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/SqlKeyGenerator.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/SqlKeyGenerator.scala index e069df97aff5..2cc3ecee80b8 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/SqlKeyGenerator.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/SqlKeyGenerator.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql.hudi.command +import java.sql.Timestamp import java.util.concurrent.TimeUnit.{MICROSECONDS, MILLISECONDS} import org.apache.avro.generic.GenericRecord @@ -96,7 +97,7 @@ class SqlKeyGenerator(props: TypedProperties) extends ComplexKeyGenerator(props) val timeMs = if (rowType) { // In RowType, the partitionPathValue is the time format string, convert to millis SqlKeyGenerator.sqlTimestampFormat.parseMillis(_partitionValue) } else { - MILLISECONDS.convert(_partitionValue.toLong, MICROSECONDS) + Timestamp.valueOf(_partitionValue).getTime } val timestampFormat = PartitionPathEncodeUtils.escapePathName( SqlKeyGenerator.timestampTimeFormat.print(timeMs)) diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestGenericRecordAndRowConsistency.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestGenericRecordAndRowConsistency.scala new file mode 100644 index 000000000000..2caf4cc20eaa --- /dev/null +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestGenericRecordAndRowConsistency.scala @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi + +import java.sql.{Date, Timestamp} + +import org.apache.hudi.config.HoodieWriteConfig +import org.apache.hudi.testutils.HoodieClientTestBase +import org.apache.spark.sql.{DataFrame, SparkSession} + +import org.junit.jupiter.api.{BeforeEach, Test} + +class TestGenericRecordAndRowConsistency extends HoodieClientTestBase { + + var spark: SparkSession = _ + val commonOpts = Map( + HoodieWriteConfig.TBL_NAME.key -> "hoodie_type_consistency_tbl", + "hoodie.insert.shuffle.parallelism" -> "1", + "hoodie.upsert.shuffle.parallelism" -> "1", + DataSourceWriteOptions.TABLE_TYPE.key -> "COPY_ON_WRITE", + DataSourceWriteOptions.RECORDKEY_FIELD.key -> "str,eventTime", + DataSourceWriteOptions.PRECOMBINE_FIELD.key -> "typeId", + DataSourceWriteOptions.PARTITIONPATH_FIELD.key -> "typeId", + DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME.key -> "org.apache.hudi.keygen.ComplexKeyGenerator" + ) + + /** + * Setup method running before each test. + */ + @BeforeEach override def setUp() { + setTableName("hoodie_type_consistency_tbl") + initPath() + initSparkContexts() + spark = sqlContext.sparkSession + } + + @Test + def testTimestampTypeConsistency(): Unit = { + val _spark = spark + import _spark.implicits._ + + val df = Seq( + (1, Timestamp.valueOf("2014-01-01 23:00:01"), "abc"), + (1, Timestamp.valueOf("2014-11-30 12:40:32"), "abc"), + (2, Timestamp.valueOf("2016-12-29 09:54:00"), "def"), + (2, Timestamp.valueOf("2016-05-09 10:12:43"), "def") + ).toDF("typeId","eventTime", "str") + + testConsistencyBetweenGenericRecordAndRow(df) + } + + @Test + def testDateTypeConsistency(): Unit = { + val _spark = spark + import _spark.implicits._ + + val df = Seq( + (1, Date.valueOf("2014-01-01"), "abc"), + (1, Date.valueOf("2014-11-30"), "abc"), + (2, Date.valueOf("2016-12-29"), "def"), + (2, Date.valueOf("2016-05-09"), "def") + ).toDF("typeId","eventTime", "str") + + testConsistencyBetweenGenericRecordAndRow(df) + } + + private def testConsistencyBetweenGenericRecordAndRow(df: DataFrame): Unit = { + val _spark = spark + import _spark.implicits._ + + // upsert operation generate recordKey by GenericRecord + val tempRecordPath = basePath + "/record_tbl/" + df.write.format("hudi") + .options(commonOpts) + .option(DataSourceWriteOptions.OPERATION.key, "upsert") + .mode(org.apache.spark.sql.SaveMode.Overwrite) + .save(tempRecordPath) + + val data1 = spark.read.format("hudi") + .load(tempRecordPath) + .select("_hoodie_record_key") + .map(_.toString()).collect().sorted + + // bulk_insert operation generate recordKey by Row + val tempRowPath = basePath + "/row_tbl/" + df.write.format("hudi") + .options(commonOpts) + .option(DataSourceWriteOptions.OPERATION.key, "bulk_insert") + .mode(org.apache.spark.sql.SaveMode.Overwrite) + .save(tempRowPath) + + val data2 = spark.read.format("hudi") + .load(tempRowPath) + .select("_hoodie_record_key") + .map(_.toString()).collect().sorted + + assert(data1 sameElements data2) + } + +}