diff --git a/R/pkg/R/functions.R b/R/pkg/R/functions.R index d91896a9a994..0566a47cc875 100644 --- a/R/pkg/R/functions.R +++ b/R/pkg/R/functions.R @@ -2459,6 +2459,7 @@ setMethod("schema_of_csv", signature(x = "characterOrColumn"), #' @note from_utc_timestamp since 1.5.0 setMethod("from_utc_timestamp", signature(y = "Column", x = "character"), function(y, x) { + .Deprecated(msg = "from_utc_timestamp is deprecated. See SPARK-25496.") jc <- callJStatic("org.apache.spark.sql.functions", "from_utc_timestamp", y@jc, x) column(jc) }) @@ -2517,6 +2518,7 @@ setMethod("next_day", signature(y = "Column", x = "character"), #' @note to_utc_timestamp since 1.5.0 setMethod("to_utc_timestamp", signature(y = "Column", x = "character"), function(y, x) { + .Deprecated(msg = "to_utc_timestamp is deprecated. See SPARK-25496.") jc <- callJStatic("org.apache.spark.sql.functions", "to_utc_timestamp", y@jc, x) column(jc) }) diff --git a/R/pkg/tests/fulltests/test_sparkSQL.R b/R/pkg/tests/fulltests/test_sparkSQL.R index 2394f7414284..fdc747482065 100644 --- a/R/pkg/tests/fulltests/test_sparkSQL.R +++ b/R/pkg/tests/fulltests/test_sparkSQL.R @@ -1905,10 +1905,20 @@ test_that("date functions on a DataFrame", { df2 <- createDataFrame(l2) expect_equal(collect(select(df2, minute(df2$b)))[, 1], c(34, 24)) expect_equal(collect(select(df2, second(df2$b)))[, 1], c(0, 34)) - expect_equal(collect(select(df2, from_utc_timestamp(df2$b, "JST")))[, 1], - c(as.POSIXct("2012-12-13 21:34:00 UTC"), as.POSIXct("2014-12-15 10:24:34 UTC"))) - expect_equal(collect(select(df2, to_utc_timestamp(df2$b, "JST")))[, 1], - c(as.POSIXct("2012-12-13 03:34:00 UTC"), as.POSIXct("2014-12-14 16:24:34 UTC"))) + conf <- callJMethod(sparkSession, "conf") + isUtcTimestampFuncEnabled <- callJMethod(conf, "get", "spark.sql.legacy.utcTimestampFunc.enabled") + callJMethod(conf, "set", "spark.sql.legacy.utcTimestampFunc.enabled", "true") + tryCatch({ + # Both from_utc_timestamp and to_utc_timestamp are deprecated as of SPARK-25496 + expect_equal(suppressWarnings(collect(select(df2, from_utc_timestamp(df2$b, "JST"))))[, 1], + c(as.POSIXct("2012-12-13 21:34:00 UTC"), as.POSIXct("2014-12-15 10:24:34 UTC"))) + expect_equal(suppressWarnings(collect(select(df2, to_utc_timestamp(df2$b, "JST"))))[, 1], + c(as.POSIXct("2012-12-13 03:34:00 UTC"), as.POSIXct("2014-12-14 16:24:34 UTC"))) + }, + finally = { + # Reverting the conf back + callJMethod(conf, "set", "spark.sql.legacy.utcTimestampFunc.enabled", isUtcTimestampFuncEnabled) + }) expect_gt(collect(select(df2, unix_timestamp()))[1, 1], 0) expect_gt(collect(select(df2, unix_timestamp(df2$b)))[1, 1], 0) expect_gt(collect(select(df2, unix_timestamp(lit("2015-01-01"), "yyyy-MM-dd")))[1, 1], 0) diff --git a/python/pyspark/sql/functions.py b/python/pyspark/sql/functions.py index 6ae23576e7bc..22163f52b472 100644 --- a/python/pyspark/sql/functions.py +++ b/python/pyspark/sql/functions.py @@ -1306,7 +1306,10 @@ def from_utc_timestamp(timestamp, tz): [Row(local_time=datetime.datetime(1997, 2, 28, 2, 30))] >>> df.select(from_utc_timestamp(df.ts, df.tz).alias('local_time')).collect() [Row(local_time=datetime.datetime(1997, 2, 28, 19, 30))] + + .. note:: Deprecated in 3.0. See SPARK-25496 """ + warnings.warn("Deprecated in 3.0. See SPARK-25496", DeprecationWarning) sc = SparkContext._active_spark_context if isinstance(tz, Column): tz = _to_java_column(tz) @@ -1340,7 +1343,10 @@ def to_utc_timestamp(timestamp, tz): [Row(utc_time=datetime.datetime(1997, 2, 28, 18, 30))] >>> df.select(to_utc_timestamp(df.ts, df.tz).alias('utc_time')).collect() [Row(utc_time=datetime.datetime(1997, 2, 28, 1, 30))] + + .. note:: Deprecated in 3.0. See SPARK-25496 """ + warnings.warn("Deprecated in 3.0. See SPARK-25496", DeprecationWarning) sc = SparkContext._active_spark_context if isinstance(tz, Column): tz = _to_java_column(tz) @@ -3191,9 +3197,13 @@ def _test(): globs['sc'] = sc globs['spark'] = spark globs['df'] = spark.createDataFrame([Row(name='Alice', age=2), Row(name='Bob', age=5)]) + + spark.conf.set("spark.sql.legacy.utcTimestampFunc.enabled", "true") (failure_count, test_count) = doctest.testmod( pyspark.sql.functions, globs=globs, optionflags=doctest.ELLIPSIS | doctest.NORMALIZE_WHITESPACE) + spark.conf.unset("spark.sql.legacy.utcTimestampFunc.enabled") + spark.stop() if failure_count: sys.exit(-1) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala index 3cda9899aa5c..784425e29739 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala @@ -26,11 +26,13 @@ import scala.util.control.NonFatal import org.apache.commons.lang3.StringEscapeUtils +import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.codegen._ import org.apache.spark.sql.catalyst.expressions.codegen.Block._ import org.apache.spark.sql.catalyst.util.{DateTimeUtils, TimestampFormatter} import org.apache.spark.sql.catalyst.util.DateTimeUtils._ +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ import org.apache.spark.unsafe.types.{CalendarInterval, UTF8String} @@ -1021,6 +1023,11 @@ case class TimeAdd(start: Expression, interval: Expression, timeZoneId: Option[S case class FromUTCTimestamp(left: Expression, right: Expression) extends BinaryExpression with ImplicitCastInputTypes { + if (!SQLConf.get.utcTimestampFuncEnabled) { + throw new AnalysisException(s"The $prettyName function has been disabled since Spark 3.0." + + s"Set ${SQLConf.UTC_TIMESTAMP_FUNC_ENABLED.key} to true to enable this function.") + } + override def inputTypes: Seq[AbstractDataType] = Seq(TimestampType, StringType) override def dataType: DataType = TimestampType override def prettyName: String = "from_utc_timestamp" @@ -1227,6 +1234,11 @@ case class MonthsBetween( case class ToUTCTimestamp(left: Expression, right: Expression) extends BinaryExpression with ImplicitCastInputTypes { + if (!SQLConf.get.utcTimestampFuncEnabled) { + throw new AnalysisException(s"The $prettyName function has been disabled since Spark 3.0. " + + s"Set ${SQLConf.UTC_TIMESTAMP_FUNC_ENABLED.key} to true to enable this function.") + } + override def inputTypes: Seq[AbstractDataType] = Seq(TimestampType, StringType) override def dataType: DataType = TimestampType override def prettyName: String = "to_utc_timestamp" diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 411805e63287..71a49c2657c9 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -1723,6 +1723,12 @@ object SQLConf { "and java.sql.Date are used for the same purpose.") .booleanConf .createWithDefault(false) + + val UTC_TIMESTAMP_FUNC_ENABLED = buildConf("spark.sql.legacy.utcTimestampFunc.enabled") + .doc("The configuration property enables the to_utc_timestamp() " + + "and from_utc_timestamp() functions.") + .booleanConf + .createWithDefault(false) } /** @@ -1916,6 +1922,8 @@ class SQLConf extends Serializable with Logging { def datetimeJava8ApiEnabled: Boolean = getConf(DATETIME_JAVA8API_ENABLED) + def utcTimestampFuncEnabled: Boolean = getConf(UTC_TIMESTAMP_FUNC_ENABLED) + /** * Returns the [[Resolver]] for the current configuration, which can be used to determine if two * identifiers are equal. diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CodeGenerationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CodeGenerationSuite.scala index 7d656fc57d75..4e64313da136 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CodeGenerationSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CodeGenerationSuite.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql.catalyst.expressions import java.sql.Timestamp -import org.apache.log4j.{Appender, AppenderSkeleton, Logger} +import org.apache.log4j.AppenderSkeleton import org.apache.log4j.spi.LoggingEvent import org.apache.spark.SparkFunSuite @@ -31,6 +31,7 @@ import org.apache.spark.sql.catalyst.expressions.codegen._ import org.apache.spark.sql.catalyst.expressions.codegen.Block._ import org.apache.spark.sql.catalyst.expressions.objects._ import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, DateTimeUtils} +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ import org.apache.spark.unsafe.types.UTF8String import org.apache.spark.util.ThreadUtils @@ -189,36 +190,42 @@ class CodeGenerationSuite extends SparkFunSuite with ExpressionEvalHelper { } test("SPARK-17702: split wide constructor into blocks due to JVM code size limit") { - val length = 5000 - val expressions = Seq.fill(length) { - ToUTCTimestamp( - Literal.create(Timestamp.valueOf("2015-07-24 00:00:00"), TimestampType), - Literal.create("PST", StringType)) - } - val plan = GenerateMutableProjection.generate(expressions) - val actual = plan(new GenericInternalRow(length)).toSeq(expressions.map(_.dataType)) - val expected = Seq.fill(length)( - DateTimeUtils.fromJavaTimestamp(Timestamp.valueOf("2015-07-24 07:00:00"))) - - if (actual != expected) { - fail(s"Incorrect Evaluation: expressions: $expressions, actual: $actual, expected: $expected") + withSQLConf(SQLConf.UTC_TIMESTAMP_FUNC_ENABLED.key -> "true") { + val length = 5000 + val expressions = Seq.fill(length) { + ToUTCTimestamp( + Literal.create(Timestamp.valueOf("2015-07-24 00:00:00"), TimestampType), + Literal.create("PST", StringType)) + } + val plan = GenerateMutableProjection.generate(expressions) + val actual = plan(new GenericInternalRow(length)).toSeq(expressions.map(_.dataType)) + val expected = Seq.fill(length)( + DateTimeUtils.fromJavaTimestamp(Timestamp.valueOf("2015-07-24 07:00:00"))) + + if (actual != expected) { + fail( + s"Incorrect Evaluation: expressions: $expressions, actual: $actual, expected: $expected") + } } } test("SPARK-22226: group splitted expressions into one method per nested class") { - val length = 10000 - val expressions = Seq.fill(length) { - ToUTCTimestamp( - Literal.create(Timestamp.valueOf("2017-10-10 00:00:00"), TimestampType), - Literal.create("PST", StringType)) - } - val plan = GenerateMutableProjection.generate(expressions) - val actual = plan(new GenericInternalRow(length)).toSeq(expressions.map(_.dataType)) - val expected = Seq.fill(length)( - DateTimeUtils.fromJavaTimestamp(Timestamp.valueOf("2017-10-10 07:00:00"))) - - if (actual != expected) { - fail(s"Incorrect Evaluation: expressions: $expressions, actual: $actual, expected: $expected") + withSQLConf(SQLConf.UTC_TIMESTAMP_FUNC_ENABLED.key -> "true") { + val length = 10000 + val expressions = Seq.fill(length) { + ToUTCTimestamp( + Literal.create(Timestamp.valueOf("2017-10-10 00:00:00"), TimestampType), + Literal.create("PST", StringType)) + } + val plan = GenerateMutableProjection.generate(expressions) + val actual = plan(new GenericInternalRow(length)).toSeq(expressions.map(_.dataType)) + val expected = Seq.fill(length)( + DateTimeUtils.fromJavaTimestamp(Timestamp.valueOf("2017-10-10 07:00:00"))) + + if (actual != expected) { + fail( + s"Incorrect Evaluation: expressions: $expressions, actual: $actual, expected: $expected") + } } } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/DateExpressionsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/DateExpressionsSuite.scala index 8823fe779406..bc2c575cb023 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/DateExpressionsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/DateExpressionsSuite.scala @@ -25,10 +25,12 @@ import java.util.concurrent.TimeUnit import java.util.concurrent.TimeUnit._ import org.apache.spark.SparkFunSuite +import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection import org.apache.spark.sql.catalyst.util.{DateTimeUtils, TimestampFormatter} import org.apache.spark.sql.catalyst.util.DateTimeTestUtils._ import org.apache.spark.sql.catalyst.util.DateTimeUtils.TimeZoneGMT +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ import org.apache.spark.unsafe.types.CalendarInterval @@ -816,21 +818,29 @@ class DateExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper { NonFoldableLiteral.create(tz, StringType)), if (expected != null) Timestamp.valueOf(expected) else null) } - test("2015-07-24 00:00:00", "PST", "2015-07-24 07:00:00") - test("2015-01-24 00:00:00", "PST", "2015-01-24 08:00:00") - test(null, "UTC", null) - test("2015-07-24 00:00:00", null, null) - test(null, null, null) + withSQLConf(SQLConf.UTC_TIMESTAMP_FUNC_ENABLED.key -> "true") { + test("2015-07-24 00:00:00", "PST", "2015-07-24 07:00:00") + test("2015-01-24 00:00:00", "PST", "2015-01-24 08:00:00") + test(null, "UTC", null) + test("2015-07-24 00:00:00", null, null) + test(null, null, null) + } + val msg = intercept[AnalysisException] { + test("2015-07-24 00:00:00", "PST", "2015-07-24 07:00:00") + }.getMessage + assert(msg.contains(SQLConf.UTC_TIMESTAMP_FUNC_ENABLED.key)) } test("to_utc_timestamp - invalid time zone id") { - Seq("Invalid time zone", "\"quote", "UTC*42").foreach { invalidTz => - val msg = intercept[java.time.DateTimeException] { - GenerateUnsafeProjection.generate( - ToUTCTimestamp( - Literal(Timestamp.valueOf("2015-07-24 00:00:00")), Literal(invalidTz)) :: Nil) - }.getMessage - assert(msg.contains(invalidTz)) + withSQLConf(SQLConf.UTC_TIMESTAMP_FUNC_ENABLED.key -> "true") { + Seq("Invalid time zone", "\"quote", "UTC*42").foreach { invalidTz => + val msg = intercept[java.time.DateTimeException] { + GenerateUnsafeProjection.generate( + ToUTCTimestamp( + Literal(Timestamp.valueOf("2015-07-24 00:00:00")), Literal(invalidTz)) :: Nil) + }.getMessage + assert(msg.contains(invalidTz)) + } } } @@ -847,19 +857,28 @@ class DateExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper { NonFoldableLiteral.create(tz, StringType)), if (expected != null) Timestamp.valueOf(expected) else null) } - test("2015-07-24 00:00:00", "PST", "2015-07-23 17:00:00") - test("2015-01-24 00:00:00", "PST", "2015-01-23 16:00:00") - test(null, "UTC", null) - test("2015-07-24 00:00:00", null, null) - test(null, null, null) + withSQLConf(SQLConf.UTC_TIMESTAMP_FUNC_ENABLED.key -> "true") { + test("2015-07-24 00:00:00", "PST", "2015-07-23 17:00:00") + test("2015-01-24 00:00:00", "PST", "2015-01-23 16:00:00") + test(null, "UTC", null) + test("2015-07-24 00:00:00", null, null) + test(null, null, null) + } + val msg = intercept[AnalysisException] { + test("2015-07-24 00:00:00", "PST", "2015-07-23 17:00:00") + }.getMessage + assert(msg.contains(SQLConf.UTC_TIMESTAMP_FUNC_ENABLED.key)) } test("from_utc_timestamp - invalid time zone id") { - Seq("Invalid time zone", "\"quote", "UTC*42").foreach { invalidTz => - val msg = intercept[java.time.DateTimeException] { - GenerateUnsafeProjection.generate(FromUTCTimestamp(Literal(0), Literal(invalidTz)) :: Nil) - }.getMessage - assert(msg.contains(invalidTz)) + withSQLConf(SQLConf.UTC_TIMESTAMP_FUNC_ENABLED.key -> "true") { + Seq("Invalid time zone", "\"quote", "UTC*42").foreach { invalidTz => + val msg = intercept[java.time.DateTimeException] { + GenerateUnsafeProjection.generate( + FromUTCTimestamp(Literal(0), Literal(invalidTz)) :: Nil) + }.getMessage + assert(msg.contains(invalidTz)) + } } } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala index d6d1f6aee50b..d0be216e6a2f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala @@ -2988,6 +2988,7 @@ object functions { * @group datetime_funcs * @since 1.5.0 */ + @deprecated("This function is deprecated and will be removed in future versions.", "3.0.0") def from_utc_timestamp(ts: Column, tz: String): Column = withExpr { FromUTCTimestamp(ts.expr, Literal(tz)) } @@ -2999,6 +3000,7 @@ object functions { * @group datetime_funcs * @since 2.4.0 */ + @deprecated("This function is deprecated and will be removed in future versions.", "3.0.0") def from_utc_timestamp(ts: Column, tz: Column): Column = withExpr { FromUTCTimestamp(ts.expr, tz.expr) } @@ -3017,6 +3019,7 @@ object functions { * @group datetime_funcs * @since 1.5.0 */ + @deprecated("This function is deprecated and will be removed in future versions.", "3.0.0") def to_utc_timestamp(ts: Column, tz: String): Column = withExpr { ToUTCTimestamp(ts.expr, Literal(tz)) } @@ -3028,6 +3031,7 @@ object functions { * @group datetime_funcs * @since 2.4.0 */ + @deprecated("This function is deprecated and will be removed in future versions.", "3.0.0") def to_utc_timestamp(ts: Column, tz: Column): Column = withExpr { ToUTCTimestamp(ts.expr, tz.expr) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DateFunctionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DateFunctionsSuite.scala index a9435e01ef8f..5ad1cb32d003 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DateFunctionsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DateFunctionsSuite.scala @@ -675,33 +675,41 @@ class DateFunctionsSuite extends QueryTest with SharedSQLContext { (Timestamp.valueOf("2015-07-24 00:00:00"), "2015-07-24 00:00:00"), (Timestamp.valueOf("2015-07-25 00:00:00"), "2015-07-25 00:00:00") ).toDF("a", "b") - checkAnswer( - df.select(from_utc_timestamp(col("a"), "PST")), - Seq( - Row(Timestamp.valueOf("2015-07-23 17:00:00")), - Row(Timestamp.valueOf("2015-07-24 17:00:00")))) - checkAnswer( - df.select(from_utc_timestamp(col("b"), "PST")), - Seq( - Row(Timestamp.valueOf("2015-07-23 17:00:00")), - Row(Timestamp.valueOf("2015-07-24 17:00:00")))) + withSQLConf(SQLConf.UTC_TIMESTAMP_FUNC_ENABLED.key -> "true") { + checkAnswer( + df.select(from_utc_timestamp(col("a"), "PST")), + Seq( + Row(Timestamp.valueOf("2015-07-23 17:00:00")), + Row(Timestamp.valueOf("2015-07-24 17:00:00")))) + checkAnswer( + df.select(from_utc_timestamp(col("b"), "PST")), + Seq( + Row(Timestamp.valueOf("2015-07-23 17:00:00")), + Row(Timestamp.valueOf("2015-07-24 17:00:00")))) + } + val msg = intercept[AnalysisException] { + df.select(from_utc_timestamp(col("a"), "PST")).collect() + }.getMessage + assert(msg.contains(SQLConf.UTC_TIMESTAMP_FUNC_ENABLED.key)) } test("from_utc_timestamp with column zone") { - val df = Seq( - (Timestamp.valueOf("2015-07-24 00:00:00"), "2015-07-24 00:00:00", "CET"), - (Timestamp.valueOf("2015-07-25 00:00:00"), "2015-07-25 00:00:00", "PST") - ).toDF("a", "b", "c") - checkAnswer( - df.select(from_utc_timestamp(col("a"), col("c"))), - Seq( - Row(Timestamp.valueOf("2015-07-24 02:00:00")), - Row(Timestamp.valueOf("2015-07-24 17:00:00")))) - checkAnswer( - df.select(from_utc_timestamp(col("b"), col("c"))), - Seq( - Row(Timestamp.valueOf("2015-07-24 02:00:00")), - Row(Timestamp.valueOf("2015-07-24 17:00:00")))) + withSQLConf(SQLConf.UTC_TIMESTAMP_FUNC_ENABLED.key -> "true") { + val df = Seq( + (Timestamp.valueOf("2015-07-24 00:00:00"), "2015-07-24 00:00:00", "CET"), + (Timestamp.valueOf("2015-07-25 00:00:00"), "2015-07-25 00:00:00", "PST") + ).toDF("a", "b", "c") + checkAnswer( + df.select(from_utc_timestamp(col("a"), col("c"))), + Seq( + Row(Timestamp.valueOf("2015-07-24 02:00:00")), + Row(Timestamp.valueOf("2015-07-24 17:00:00")))) + checkAnswer( + df.select(from_utc_timestamp(col("b"), col("c"))), + Seq( + Row(Timestamp.valueOf("2015-07-24 02:00:00")), + Row(Timestamp.valueOf("2015-07-24 17:00:00")))) + } } test("to_utc_timestamp with literal zone") { @@ -709,32 +717,40 @@ class DateFunctionsSuite extends QueryTest with SharedSQLContext { (Timestamp.valueOf("2015-07-24 00:00:00"), "2015-07-24 00:00:00"), (Timestamp.valueOf("2015-07-25 00:00:00"), "2015-07-25 00:00:00") ).toDF("a", "b") - checkAnswer( - df.select(to_utc_timestamp(col("a"), "PST")), - Seq( - Row(Timestamp.valueOf("2015-07-24 07:00:00")), - Row(Timestamp.valueOf("2015-07-25 07:00:00")))) - checkAnswer( - df.select(to_utc_timestamp(col("b"), "PST")), - Seq( - Row(Timestamp.valueOf("2015-07-24 07:00:00")), - Row(Timestamp.valueOf("2015-07-25 07:00:00")))) + withSQLConf(SQLConf.UTC_TIMESTAMP_FUNC_ENABLED.key -> "true") { + checkAnswer( + df.select(to_utc_timestamp(col("a"), "PST")), + Seq( + Row(Timestamp.valueOf("2015-07-24 07:00:00")), + Row(Timestamp.valueOf("2015-07-25 07:00:00")))) + checkAnswer( + df.select(to_utc_timestamp(col("b"), "PST")), + Seq( + Row(Timestamp.valueOf("2015-07-24 07:00:00")), + Row(Timestamp.valueOf("2015-07-25 07:00:00")))) + } + val msg = intercept[AnalysisException] { + df.select(to_utc_timestamp(col("a"), "PST")).collect() + }.getMessage + assert(msg.contains(SQLConf.UTC_TIMESTAMP_FUNC_ENABLED.key)) } test("to_utc_timestamp with column zone") { - val df = Seq( - (Timestamp.valueOf("2015-07-24 00:00:00"), "2015-07-24 00:00:00", "PST"), - (Timestamp.valueOf("2015-07-25 00:00:00"), "2015-07-25 00:00:00", "CET") - ).toDF("a", "b", "c") - checkAnswer( - df.select(to_utc_timestamp(col("a"), col("c"))), - Seq( - Row(Timestamp.valueOf("2015-07-24 07:00:00")), - Row(Timestamp.valueOf("2015-07-24 22:00:00")))) - checkAnswer( - df.select(to_utc_timestamp(col("b"), col("c"))), - Seq( - Row(Timestamp.valueOf("2015-07-24 07:00:00")), - Row(Timestamp.valueOf("2015-07-24 22:00:00")))) + withSQLConf(SQLConf.UTC_TIMESTAMP_FUNC_ENABLED.key -> "true") { + val df = Seq( + (Timestamp.valueOf("2015-07-24 00:00:00"), "2015-07-24 00:00:00", "PST"), + (Timestamp.valueOf("2015-07-25 00:00:00"), "2015-07-25 00:00:00", "CET") + ).toDF("a", "b", "c") + checkAnswer( + df.select(to_utc_timestamp(col("a"), col("c"))), + Seq( + Row(Timestamp.valueOf("2015-07-24 07:00:00")), + Row(Timestamp.valueOf("2015-07-24 22:00:00")))) + checkAnswer( + df.select(to_utc_timestamp(col("b"), col("c"))), + Seq( + Row(Timestamp.valueOf("2015-07-24 07:00:00")), + Row(Timestamp.valueOf("2015-07-24 22:00:00")))) + } } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/DateTimeBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/DateTimeBenchmark.scala index cbd51b490141..17bdd218dc17 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/DateTimeBenchmark.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/DateTimeBenchmark.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql import org.apache.spark.sql.execution.benchmark.SqlBasedBenchmark +import org.apache.spark.sql.internal.SQLConf /** * Synthetic benchmark for date and timestamp functions. @@ -86,9 +87,11 @@ object DateTimeBenchmark extends SqlBasedBenchmark { run(N, "from_unixtime", "from_unixtime(id, 'yyyy-MM-dd HH:mm:ss.SSSSSS')") } runBenchmark("Convert timestamps") { - val timestampExpr = "cast(id as timestamp)" - run(N, "from_utc_timestamp", s"from_utc_timestamp($timestampExpr, 'CET')") - run(N, "to_utc_timestamp", s"to_utc_timestamp($timestampExpr, 'CET')") + withSQLConf(SQLConf.UTC_TIMESTAMP_FUNC_ENABLED.key -> "true") { + val timestampExpr = "cast(id as timestamp)" + run(N, "from_utc_timestamp", s"from_utc_timestamp($timestampExpr, 'CET')") + run(N, "to_utc_timestamp", s"to_utc_timestamp($timestampExpr, 'CET')") + } } runBenchmark("Intervals") { val (start, end) = ("cast(id as timestamp)", "cast((id+8640000) as timestamp)") diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala index 116fd74352fe..2a9e6b849d89 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala @@ -21,7 +21,7 @@ import java.io.File import java.util.{Locale, TimeZone} import org.apache.commons.io.FileUtils -import org.scalatest.{Assertions, BeforeAndAfterAll} +import org.scalatest.Assertions import org.apache.spark.{SparkEnv, SparkException} import org.apache.spark.rdd.BlockRDD @@ -32,7 +32,7 @@ import org.apache.spark.sql.catalyst.util.DateTimeUtils import org.apache.spark.sql.execution.{SparkPlan, UnaryExecNode} import org.apache.spark.sql.execution.exchange.Exchange import org.apache.spark.sql.execution.streaming._ -import org.apache.spark.sql.execution.streaming.state.{StateStore, StreamingAggregationStateManager} +import org.apache.spark.sql.execution.streaming.state.StreamingAggregationStateManager import org.apache.spark.sql.expressions.scalalang.typed import org.apache.spark.sql.functions._ import org.apache.spark.sql.internal.SQLConf @@ -342,52 +342,55 @@ class StreamingAggregationSuite extends StateStoreMetricsTest with Assertions { } testWithAllStateVersions("prune results by current_date, complete mode") { - import testImplicits._ - val clock = new StreamManualClock - val tz = TimeZone.getDefault.getID - val inputData = MemoryStream[Long] - val aggregated = - inputData.toDF() - .select(to_utc_timestamp(from_unixtime('value * DateTimeUtils.SECONDS_PER_DAY), tz)) - .toDF("value") - .groupBy($"value") - .agg(count("*")) - .where($"value".cast("date") >= date_sub(current_date(), 10)) - .select(($"value".cast("long") / DateTimeUtils.SECONDS_PER_DAY).cast("long"), $"count(1)") - testStream(aggregated, Complete)( - StartStream(Trigger.ProcessingTime("10 day"), triggerClock = clock), - // advance clock to 10 days, should retain all keys - AddData(inputData, 0L, 5L, 5L, 10L), - AdvanceManualClock(DateTimeUtils.MILLIS_PER_DAY * 10), - CheckLastBatch((0L, 1), (5L, 2), (10L, 1)), - // advance clock to 20 days, should retain keys >= 10 - AddData(inputData, 15L, 15L, 20L), - AdvanceManualClock(DateTimeUtils.MILLIS_PER_DAY * 10), - CheckLastBatch((10L, 1), (15L, 2), (20L, 1)), - // advance clock to 30 days, should retain keys >= 20 - AddData(inputData, 85L), - AdvanceManualClock(DateTimeUtils.MILLIS_PER_DAY * 10), - CheckLastBatch((20L, 1), (85L, 1)), - - // bounce stream and ensure correct batch timestamp is used - // i.e., we don't take it from the clock, which is at 90 days. - StopStream, - AssertOnQuery { q => // clear the sink - q.sink.asInstanceOf[MemorySink].clear() - q.commitLog.purge(3) - // advance by 60 days i.e., 90 days total - clock.advance(DateTimeUtils.MILLIS_PER_DAY * 60) - true - }, - StartStream(Trigger.ProcessingTime("10 day"), triggerClock = clock), - // Commit log blown, causing a re-run of the last batch - CheckLastBatch((20L, 1), (85L, 1)), - - // advance clock to 100 days, should retain keys >= 90 - AddData(inputData, 85L, 90L, 100L, 105L), - AdvanceManualClock(DateTimeUtils.MILLIS_PER_DAY * 10), - CheckLastBatch((90L, 1), (100L, 1), (105L, 1)) - ) + withSQLConf(SQLConf.UTC_TIMESTAMP_FUNC_ENABLED.key -> "true") { + import testImplicits._ + val clock = new StreamManualClock + val tz = TimeZone.getDefault.getID + val inputData = MemoryStream[Long] + val aggregated = + inputData.toDF() + .select(to_utc_timestamp(from_unixtime('value * DateTimeUtils.SECONDS_PER_DAY), tz)) + .toDF("value") + .groupBy($"value") + .agg(count("*")) + .where($"value".cast("date") >= date_sub(current_date(), 10)) + .select( + ($"value".cast("long") / DateTimeUtils.SECONDS_PER_DAY).cast("long"), $"count(1)") + testStream(aggregated, Complete)( + StartStream(Trigger.ProcessingTime("10 day"), triggerClock = clock), + // advance clock to 10 days, should retain all keys + AddData(inputData, 0L, 5L, 5L, 10L), + AdvanceManualClock(DateTimeUtils.MILLIS_PER_DAY * 10), + CheckLastBatch((0L, 1), (5L, 2), (10L, 1)), + // advance clock to 20 days, should retain keys >= 10 + AddData(inputData, 15L, 15L, 20L), + AdvanceManualClock(DateTimeUtils.MILLIS_PER_DAY * 10), + CheckLastBatch((10L, 1), (15L, 2), (20L, 1)), + // advance clock to 30 days, should retain keys >= 20 + AddData(inputData, 85L), + AdvanceManualClock(DateTimeUtils.MILLIS_PER_DAY * 10), + CheckLastBatch((20L, 1), (85L, 1)), + + // bounce stream and ensure correct batch timestamp is used + // i.e., we don't take it from the clock, which is at 90 days. + StopStream, + AssertOnQuery { q => // clear the sink + q.sink.asInstanceOf[MemorySink].clear() + q.commitLog.purge(3) + // advance by 60 days i.e., 90 days total + clock.advance(DateTimeUtils.MILLIS_PER_DAY * 60) + true + }, + StartStream(Trigger.ProcessingTime("10 day"), triggerClock = clock), + // Commit log blown, causing a re-run of the last batch + CheckLastBatch((20L, 1), (85L, 1)), + + // advance clock to 100 days, should retain keys >= 90 + AddData(inputData, 85L, 90L, 100L, 105L), + AdvanceManualClock(DateTimeUtils.MILLIS_PER_DAY * 10), + CheckLastBatch((90L, 1), (100L, 1), (105L, 1)) + ) + } } testWithAllStateVersions("SPARK-19690: do not convert batch aggregation in streaming query " +