diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala index 9bc86d4fb4e42..5e19c4faf9281 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala @@ -453,8 +453,11 @@ abstract class CastBase extends UnaryExpression with TimeZoneAwareExpression wit if (d.isNaN || d.isInfinite) null else (d * MICROS_PER_SECOND).toLong } - // converting seconds to us - private[this] def longToTimestamp(t: Long): Long = SECONDS.toMicros(t) + // SPARK-31710 converting seconds to us,Add compatibility flag + private[this] def longToTimestamp(t: Long): Long = { + if ( SQLConf.get.getConf( SQLConf.LONG_TIMESTAMP_CONVERSION_IN_SECONDS )) t * 1000000L + else t * 1000L + } // converting us to seconds private[this] def timestampToLong(ts: Long): Long = { Math.floorDiv(ts, MICROS_PER_SECOND) @@ -1277,7 +1280,13 @@ abstract class CastBase extends UnaryExpression with TimeZoneAwareExpression wit val block = inline"new java.math.BigDecimal($MICROS_PER_SECOND)" code"($d.toBigDecimal().bigDecimal().multiply($block)).longValue()" } - private[this] def longToTimeStampCode(l: ExprValue): Block = code"$l * (long)$MICROS_PER_SECOND" + + // SPARK-31710 converting seconds to us,Add compatibility flag + private[this] def longToTimeStampCode(l: ExprValue): Block = { + if (SQLConf.get.getConf(SQLConf.LONG_TIMESTAMP_CONVERSION_IN_SECONDS)) code"$l * 1000000L" + else code"$l * 1000L" + } + private[this] def timestampToLongCode(ts: ExprValue): Block = code"java.lang.Math.floorDiv($ts, $MICROS_PER_SECOND)" private[this] def timestampToDoubleCode(ts: ExprValue): Block = diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index cf0268773c399..ea91fa51efc5a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -2586,6 +2586,15 @@ object SQLConf { .checkValue(_ > 0, "The timeout value must be positive") .createWithDefault(10L) + val LONG_TIMESTAMP_CONVERSION_IN_SECONDS = + buildConf("spark.sql.legacy.longTimestampConversionInSeconds") + .internal() + .doc("When false, Byte/Short/Int/Long value is interpreted as milliseconds " + + "during the timestamp conversion ." + + "when true, the value will be interpreted as seconds " + + "to be consistent with decimal/double. ") + .booleanConf + .createWithDefault(true) /** * Holds information about keys that have been deprecated. * @@ -3165,6 +3174,8 @@ class SQLConf extends Serializable with Logging { def integerGroupingIdEnabled: Boolean = getConf(SQLConf.LEGACY_INTEGER_GROUPING_ID) + def longTimestampConversionInSeconds: Boolean = getConf(LONG_TIMESTAMP_CONVERSION_IN_SECONDS) + /** ********************** SQLConf functionality methods ************ */ /** Set Spark SQL configuration properties. */ diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CastSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CastSuite.scala index e5bff7f7af007..9437dbfa125ae 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CastSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CastSuite.scala @@ -1353,17 +1353,33 @@ class AnsiCastSuite extends CastSuiteBase { cast("abc.com", dataType), "invalid input") } } - - test("cast a timestamp before the epoch 1970-01-01 00:00:00Z") { - def errMsg(t: String): String = s"Casting -2198208303900000 to $t causes overflow" - withDefaultTimeZone(UTC) { - val negativeTs = Timestamp.valueOf("1900-05-05 18:34:56.1") - assert(negativeTs.getTime < 0) - val expectedSecs = Math.floorDiv(negativeTs.getTime, MILLIS_PER_SECOND) - checkExceptionInExpression[ArithmeticException](cast(negativeTs, ByteType), errMsg("byte")) - checkExceptionInExpression[ArithmeticException](cast(negativeTs, ShortType), errMsg("short")) - checkExceptionInExpression[ArithmeticException](cast(negativeTs, IntegerType), errMsg("int")) - checkEvaluation(cast(negativeTs, LongType), expectedSecs) + + test("SPARK-31710:Add compatibility flag to cast long to timestamp") { + withSQLConf( + SQLConf.LONG_TIMESTAMP_CONVERSION_IN_SECONDS.key -> "false") { + for (tz <- ALL_TIMEZONES) { + def checkLongToTimestamp(str: Long, expected: Long): Unit = { + checkEvaluation(cast(str, TimestampType, Option(tz.getID)), expected) + } + checkLongToTimestamp(253402272000L, 253402272000000L) + checkLongToTimestamp(-5L, -5000L) + checkLongToTimestamp(1L, 1000L) + checkLongToTimestamp(0L, 0L) + checkLongToTimestamp(123L, 123000L) + } + } + withSQLConf( + SQLConf.LONG_TIMESTAMP_CONVERSION_IN_SECONDS.key -> "true") { + for (tz <- ALL_TIMEZONES) { + def checkLongToTimestamp(str: Long, expected: Long): Unit = { + checkEvaluation(cast(str, TimestampType, Option(tz.getID)), expected) + } + checkLongToTimestamp(253402272000L, 253402272000000000L) + checkLongToTimestamp(-5L, -5000000L) + checkLongToTimestamp(1L, 1000000L) + checkLongToTimestamp(0L, 0L) + checkLongToTimestamp(123L, 123000000L) + } } } -} +} \ No newline at end of file