diff --git a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java index ff8aefe97e85a..fdaa466c3cc91 100644 --- a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java @@ -59,7 +59,6 @@ import java.math.BigDecimal; import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; -import java.sql.Timestamp; import java.time.LocalDate; import java.util.ArrayList; import java.util.Arrays; @@ -542,8 +541,6 @@ public static Object convertValueForSpecificDataTypes(Schema fieldSchema, Object private static Object convertValueForAvroLogicalTypes(Schema fieldSchema, Object fieldValue) { if (fieldSchema.getLogicalType() == LogicalTypes.date()) { return LocalDate.ofEpochDay(Long.parseLong(fieldValue.toString())); - } else if (fieldSchema.getLogicalType() == LogicalTypes.timestampMicros()) { - return new Timestamp(Long.parseLong(fieldValue.toString()) / 1000); } else if (fieldSchema.getLogicalType() instanceof LogicalTypes.Decimal) { Decimal dc = (Decimal) fieldSchema.getLogicalType(); DecimalConversion decimalConversion = new DecimalConversion(); diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/SqlKeyGenerator.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/SqlKeyGenerator.scala index 2cc3ecee80b86..e069df97aff55 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/SqlKeyGenerator.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/SqlKeyGenerator.scala @@ -17,7 +17,6 @@ package org.apache.spark.sql.hudi.command -import java.sql.Timestamp import java.util.concurrent.TimeUnit.{MICROSECONDS, MILLISECONDS} import org.apache.avro.generic.GenericRecord @@ -97,7 +96,7 @@ class SqlKeyGenerator(props: TypedProperties) extends ComplexKeyGenerator(props) val timeMs = if (rowType) { // In RowType, the partitionPathValue is the time format string, convert to millis SqlKeyGenerator.sqlTimestampFormat.parseMillis(_partitionValue) } else { - Timestamp.valueOf(_partitionValue).getTime + MILLISECONDS.convert(_partitionValue.toLong, MICROSECONDS) } val timestampFormat = PartitionPathEncodeUtils.escapePathName( SqlKeyGenerator.timestampTimeFormat.print(timeMs)) diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestGenericRecordAndRowConsistency.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestGenericRecordAndRowConsistency.scala deleted file mode 100644 index 985bf2e9408bd..0000000000000 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestGenericRecordAndRowConsistency.scala +++ /dev/null @@ -1,104 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hudi - -import org.apache.hudi.config.HoodieWriteConfig -import org.apache.hudi.testutils.SparkClientFunctionalTestHarness -import org.apache.spark.sql.DataFrame -import org.junit.jupiter.api.Test - -import java.sql.{Date, Timestamp} - -class TestGenericRecordAndRowConsistency extends SparkClientFunctionalTestHarness { - - val commonOpts = Map( - HoodieWriteConfig.TBL_NAME.key -> "hoodie_type_consistency_tbl", - "hoodie.insert.shuffle.parallelism" -> "1", - "hoodie.upsert.shuffle.parallelism" -> "1", - DataSourceWriteOptions.TABLE_TYPE.key -> "COPY_ON_WRITE", - DataSourceWriteOptions.RECORDKEY_FIELD.key -> "str,eventTime", - DataSourceWriteOptions.PRECOMBINE_FIELD.key -> "typeId", - DataSourceWriteOptions.PARTITIONPATH_FIELD.key -> "typeId", - DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME.key -> "org.apache.hudi.keygen.ComplexKeyGenerator" - ) - - @Test - def testTimestampTypeConsistency(): Unit = { - val _spark = spark - import _spark.implicits._ - - val df = Seq( - (1, Timestamp.valueOf("2014-01-01 23:00:01"), "abc"), - (1, Timestamp.valueOf("2014-11-30 12:40:32"), "abc"), - (2, Timestamp.valueOf("2016-12-29 09:54:00"), "def"), - (2, Timestamp.valueOf("2016-05-09 10:12:43"), "def") - ).toDF("typeId", "eventTime", "str") - - testConsistencyBetweenGenericRecordAndRow(df) - } - - @Test - def testDateTypeConsistency(): Unit = { - val _spark = spark - import _spark.implicits._ - - val df = Seq( - (1, Date.valueOf("2014-01-01"), "abc"), - (1, Date.valueOf("2014-11-30"), "abc"), - (2, Date.valueOf("2016-12-29"), "def"), - (2, Date.valueOf("2016-05-09"), "def") - ).toDF("typeId", "eventTime", "str") - - testConsistencyBetweenGenericRecordAndRow(df) - } - - private def testConsistencyBetweenGenericRecordAndRow(df: DataFrame): Unit = { - val _spark = spark - import _spark.implicits._ - - // upsert operation generate recordKey by GenericRecord - val tempRecordPath = basePath + "/record_tbl/" - df.write.format("hudi") - .options(commonOpts) - .option(DataSourceWriteOptions.OPERATION.key, "upsert") - .mode(org.apache.spark.sql.SaveMode.Overwrite) - .save(tempRecordPath) - - val data1 = spark.read.format("hudi") - .load(tempRecordPath) - .select("_hoodie_record_key") - .map(_.toString()).collect().sorted - - // bulk_insert operation generate recordKey by Row - val tempRowPath = basePath + "/row_tbl/" - df.write.format("hudi") - .options(commonOpts) - .option(DataSourceWriteOptions.OPERATION.key, "bulk_insert") - .mode(org.apache.spark.sql.SaveMode.Overwrite) - .save(tempRowPath) - - val data2 = spark.read.format("hudi") - .load(tempRowPath) - .select("_hoodie_record_key") - .map(_.toString()).collect().sorted - - assert(data1 sameElements data2) - } - -}