Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions R/pkg/R/functions.R
Original file line number Diff line number Diff line change
Expand Up @@ -2459,6 +2459,7 @@ setMethod("schema_of_csv", signature(x = "characterOrColumn"),
#' @note from_utc_timestamp since 1.5.0
setMethod("from_utc_timestamp", signature(y = "Column", x = "character"),
function(y, x) {
.Deprecated(msg = "from_utc_timestamp is deprecated. See SPARK-25496.")
jc <- callJStatic("org.apache.spark.sql.functions", "from_utc_timestamp", y@jc, x)
column(jc)
})
Expand Down Expand Up @@ -2517,6 +2518,7 @@ setMethod("next_day", signature(y = "Column", x = "character"),
#' @note to_utc_timestamp since 1.5.0
setMethod("to_utc_timestamp", signature(y = "Column", x = "character"),
function(y, x) {
.Deprecated(msg = "to_utc_timestamp is deprecated. See SPARK-25496.")
jc <- callJStatic("org.apache.spark.sql.functions", "to_utc_timestamp", y@jc, x)
column(jc)
})
Expand Down
18 changes: 14 additions & 4 deletions R/pkg/tests/fulltests/test_sparkSQL.R
Original file line number Diff line number Diff line change
Expand Up @@ -1905,10 +1905,20 @@ test_that("date functions on a DataFrame", {
df2 <- createDataFrame(l2)
expect_equal(collect(select(df2, minute(df2$b)))[, 1], c(34, 24))
expect_equal(collect(select(df2, second(df2$b)))[, 1], c(0, 34))
expect_equal(collect(select(df2, from_utc_timestamp(df2$b, "JST")))[, 1],
c(as.POSIXct("2012-12-13 21:34:00 UTC"), as.POSIXct("2014-12-15 10:24:34 UTC")))
expect_equal(collect(select(df2, to_utc_timestamp(df2$b, "JST")))[, 1],
c(as.POSIXct("2012-12-13 03:34:00 UTC"), as.POSIXct("2014-12-14 16:24:34 UTC")))
conf <- callJMethod(sparkSession, "conf")
isUtcTimestampFuncEnabled <- callJMethod(conf, "get", "spark.sql.legacy.utcTimestampFunc.enabled")
callJMethod(conf, "set", "spark.sql.legacy.utcTimestampFunc.enabled", "true")
tryCatch({
# Both from_utc_timestamp and to_utc_timestamp are deprecated as of SPARK-25496
expect_equal(suppressWarnings(collect(select(df2, from_utc_timestamp(df2$b, "JST"))))[, 1],
c(as.POSIXct("2012-12-13 21:34:00 UTC"), as.POSIXct("2014-12-15 10:24:34 UTC")))
expect_equal(suppressWarnings(collect(select(df2, to_utc_timestamp(df2$b, "JST"))))[, 1],
c(as.POSIXct("2012-12-13 03:34:00 UTC"), as.POSIXct("2014-12-14 16:24:34 UTC")))
},
finally = {
# Reverting the conf back
callJMethod(conf, "set", "spark.sql.legacy.utcTimestampFunc.enabled", isUtcTimestampFuncEnabled)
})
expect_gt(collect(select(df2, unix_timestamp()))[1, 1], 0)
expect_gt(collect(select(df2, unix_timestamp(df2$b)))[1, 1], 0)
expect_gt(collect(select(df2, unix_timestamp(lit("2015-01-01"), "yyyy-MM-dd")))[1, 1], 0)
Expand Down
10 changes: 10 additions & 0 deletions python/pyspark/sql/functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -1306,7 +1306,10 @@ def from_utc_timestamp(timestamp, tz):
[Row(local_time=datetime.datetime(1997, 2, 28, 2, 30))]
>>> df.select(from_utc_timestamp(df.ts, df.tz).alias('local_time')).collect()
[Row(local_time=datetime.datetime(1997, 2, 28, 19, 30))]

.. note:: Deprecated in 3.0. See SPARK-25496
"""
warnings.warn("Deprecated in 3.0. See SPARK-25496", DeprecationWarning)
sc = SparkContext._active_spark_context
if isinstance(tz, Column):
tz = _to_java_column(tz)
Expand Down Expand Up @@ -1340,7 +1343,10 @@ def to_utc_timestamp(timestamp, tz):
[Row(utc_time=datetime.datetime(1997, 2, 28, 18, 30))]
>>> df.select(to_utc_timestamp(df.ts, df.tz).alias('utc_time')).collect()
[Row(utc_time=datetime.datetime(1997, 2, 28, 1, 30))]

.. note:: Deprecated in 3.0. See SPARK-25496
"""
warnings.warn("Deprecated in 3.0. See SPARK-25496", DeprecationWarning)
sc = SparkContext._active_spark_context
if isinstance(tz, Column):
tz = _to_java_column(tz)
Expand Down Expand Up @@ -3191,9 +3197,13 @@ def _test():
globs['sc'] = sc
globs['spark'] = spark
globs['df'] = spark.createDataFrame([Row(name='Alice', age=2), Row(name='Bob', age=5)])

spark.conf.set("spark.sql.legacy.utcTimestampFunc.enabled", "true")
(failure_count, test_count) = doctest.testmod(
pyspark.sql.functions, globs=globs,
optionflags=doctest.ELLIPSIS | doctest.NORMALIZE_WHITESPACE)
spark.conf.unset("spark.sql.legacy.utcTimestampFunc.enabled")

spark.stop()
if failure_count:
sys.exit(-1)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,13 @@ import scala.util.control.NonFatal

import org.apache.commons.lang3.StringEscapeUtils

import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.codegen._
import org.apache.spark.sql.catalyst.expressions.codegen.Block._
import org.apache.spark.sql.catalyst.util.{DateTimeUtils, TimestampFormatter}
import org.apache.spark.sql.catalyst.util.DateTimeUtils._
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.{CalendarInterval, UTF8String}

Expand Down Expand Up @@ -1021,6 +1023,11 @@ case class TimeAdd(start: Expression, interval: Expression, timeZoneId: Option[S
case class FromUTCTimestamp(left: Expression, right: Expression)
extends BinaryExpression with ImplicitCastInputTypes {

if (!SQLConf.get.utcTimestampFuncEnabled) {
throw new AnalysisException(s"The $prettyName function has been disabled since Spark 3.0." +
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ur, @MaxGekk . Usually, deprecation means showing warnings instead of Exception. This looks like a ban to me.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea, deprecation is deprecation. I think we can simply deprecate them in functions.scala, and that's all.

s"Set ${SQLConf.UTC_TIMESTAMP_FUNC_ENABLED.key} to true to enable this function.")
}

override def inputTypes: Seq[AbstractDataType] = Seq(TimestampType, StringType)
override def dataType: DataType = TimestampType
override def prettyName: String = "from_utc_timestamp"
Expand Down Expand Up @@ -1227,6 +1234,11 @@ case class MonthsBetween(
case class ToUTCTimestamp(left: Expression, right: Expression)
extends BinaryExpression with ImplicitCastInputTypes {

if (!SQLConf.get.utcTimestampFuncEnabled) {
throw new AnalysisException(s"The $prettyName function has been disabled since Spark 3.0. " +
s"Set ${SQLConf.UTC_TIMESTAMP_FUNC_ENABLED.key} to true to enable this function.")
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan, I don't think this is a right way to deprecate. We should then at least rather throw a warning, and or we should better have a better mechanism to deprecate it in SQL side. Do we really want to add every configuration for every deprecation?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As I said, I don't think warning log is the right way to deprecate a SQL function, as users won't see it. This is not good either but this is the best I can think of.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Then we should have the right way first before we start to do it differently from what I have done so far.


override def inputTypes: Seq[AbstractDataType] = Seq(TimestampType, StringType)
override def dataType: DataType = TimestampType
override def prettyName: String = "to_utc_timestamp"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1723,6 +1723,12 @@ object SQLConf {
"and java.sql.Date are used for the same purpose.")
.booleanConf
.createWithDefault(false)

val UTC_TIMESTAMP_FUNC_ENABLED = buildConf("spark.sql.legacy.utcTimestampFunc.enabled")
.doc("The configuration property enables the to_utc_timestamp() " +
"and from_utc_timestamp() functions.")
.booleanConf
.createWithDefault(false)
}

/**
Expand Down Expand Up @@ -1916,6 +1922,8 @@ class SQLConf extends Serializable with Logging {

def datetimeJava8ApiEnabled: Boolean = getConf(DATETIME_JAVA8API_ENABLED)

def utcTimestampFuncEnabled: Boolean = getConf(UTC_TIMESTAMP_FUNC_ENABLED)

/**
* Returns the [[Resolver]] for the current configuration, which can be used to determine if two
* identifiers are equal.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package org.apache.spark.sql.catalyst.expressions

import java.sql.Timestamp

import org.apache.log4j.{Appender, AppenderSkeleton, Logger}
import org.apache.log4j.AppenderSkeleton
import org.apache.log4j.spi.LoggingEvent

import org.apache.spark.SparkFunSuite
Expand All @@ -31,6 +31,7 @@ import org.apache.spark.sql.catalyst.expressions.codegen._
import org.apache.spark.sql.catalyst.expressions.codegen.Block._
import org.apache.spark.sql.catalyst.expressions.objects._
import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, DateTimeUtils}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.UTF8String
import org.apache.spark.util.ThreadUtils
Expand Down Expand Up @@ -189,36 +190,42 @@ class CodeGenerationSuite extends SparkFunSuite with ExpressionEvalHelper {
}

test("SPARK-17702: split wide constructor into blocks due to JVM code size limit") {
val length = 5000
val expressions = Seq.fill(length) {
ToUTCTimestamp(
Literal.create(Timestamp.valueOf("2015-07-24 00:00:00"), TimestampType),
Literal.create("PST", StringType))
}
val plan = GenerateMutableProjection.generate(expressions)
val actual = plan(new GenericInternalRow(length)).toSeq(expressions.map(_.dataType))
val expected = Seq.fill(length)(
DateTimeUtils.fromJavaTimestamp(Timestamp.valueOf("2015-07-24 07:00:00")))

if (actual != expected) {
fail(s"Incorrect Evaluation: expressions: $expressions, actual: $actual, expected: $expected")
withSQLConf(SQLConf.UTC_TIMESTAMP_FUNC_ENABLED.key -> "true") {
val length = 5000
val expressions = Seq.fill(length) {
ToUTCTimestamp(
Literal.create(Timestamp.valueOf("2015-07-24 00:00:00"), TimestampType),
Literal.create("PST", StringType))
}
val plan = GenerateMutableProjection.generate(expressions)
val actual = plan(new GenericInternalRow(length)).toSeq(expressions.map(_.dataType))
val expected = Seq.fill(length)(
DateTimeUtils.fromJavaTimestamp(Timestamp.valueOf("2015-07-24 07:00:00")))

if (actual != expected) {
fail(
s"Incorrect Evaluation: expressions: $expressions, actual: $actual, expected: $expected")
}
}
}

test("SPARK-22226: group splitted expressions into one method per nested class") {
val length = 10000
val expressions = Seq.fill(length) {
ToUTCTimestamp(
Literal.create(Timestamp.valueOf("2017-10-10 00:00:00"), TimestampType),
Literal.create("PST", StringType))
}
val plan = GenerateMutableProjection.generate(expressions)
val actual = plan(new GenericInternalRow(length)).toSeq(expressions.map(_.dataType))
val expected = Seq.fill(length)(
DateTimeUtils.fromJavaTimestamp(Timestamp.valueOf("2017-10-10 07:00:00")))

if (actual != expected) {
fail(s"Incorrect Evaluation: expressions: $expressions, actual: $actual, expected: $expected")
withSQLConf(SQLConf.UTC_TIMESTAMP_FUNC_ENABLED.key -> "true") {
val length = 10000
val expressions = Seq.fill(length) {
ToUTCTimestamp(
Literal.create(Timestamp.valueOf("2017-10-10 00:00:00"), TimestampType),
Literal.create("PST", StringType))
}
val plan = GenerateMutableProjection.generate(expressions)
val actual = plan(new GenericInternalRow(length)).toSeq(expressions.map(_.dataType))
val expected = Seq.fill(length)(
DateTimeUtils.fromJavaTimestamp(Timestamp.valueOf("2017-10-10 07:00:00")))

if (actual != expected) {
fail(
s"Incorrect Evaluation: expressions: $expressions, actual: $actual, expected: $expected")
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,12 @@ import java.util.concurrent.TimeUnit
import java.util.concurrent.TimeUnit._

import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
import org.apache.spark.sql.catalyst.util.{DateTimeUtils, TimestampFormatter}
import org.apache.spark.sql.catalyst.util.DateTimeTestUtils._
import org.apache.spark.sql.catalyst.util.DateTimeUtils.TimeZoneGMT
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.CalendarInterval

Expand Down Expand Up @@ -816,21 +818,29 @@ class DateExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper {
NonFoldableLiteral.create(tz, StringType)),
if (expected != null) Timestamp.valueOf(expected) else null)
}
test("2015-07-24 00:00:00", "PST", "2015-07-24 07:00:00")
test("2015-01-24 00:00:00", "PST", "2015-01-24 08:00:00")
test(null, "UTC", null)
test("2015-07-24 00:00:00", null, null)
test(null, null, null)
withSQLConf(SQLConf.UTC_TIMESTAMP_FUNC_ENABLED.key -> "true") {
test("2015-07-24 00:00:00", "PST", "2015-07-24 07:00:00")
test("2015-01-24 00:00:00", "PST", "2015-01-24 08:00:00")
test(null, "UTC", null)
test("2015-07-24 00:00:00", null, null)
test(null, null, null)
}
val msg = intercept[AnalysisException] {
test("2015-07-24 00:00:00", "PST", "2015-07-24 07:00:00")
}.getMessage
assert(msg.contains(SQLConf.UTC_TIMESTAMP_FUNC_ENABLED.key))
}

test("to_utc_timestamp - invalid time zone id") {
Seq("Invalid time zone", "\"quote", "UTC*42").foreach { invalidTz =>
val msg = intercept[java.time.DateTimeException] {
GenerateUnsafeProjection.generate(
ToUTCTimestamp(
Literal(Timestamp.valueOf("2015-07-24 00:00:00")), Literal(invalidTz)) :: Nil)
}.getMessage
assert(msg.contains(invalidTz))
withSQLConf(SQLConf.UTC_TIMESTAMP_FUNC_ENABLED.key -> "true") {
Seq("Invalid time zone", "\"quote", "UTC*42").foreach { invalidTz =>
val msg = intercept[java.time.DateTimeException] {
GenerateUnsafeProjection.generate(
ToUTCTimestamp(
Literal(Timestamp.valueOf("2015-07-24 00:00:00")), Literal(invalidTz)) :: Nil)
}.getMessage
assert(msg.contains(invalidTz))
}
}
}

Expand All @@ -847,19 +857,28 @@ class DateExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper {
NonFoldableLiteral.create(tz, StringType)),
if (expected != null) Timestamp.valueOf(expected) else null)
}
test("2015-07-24 00:00:00", "PST", "2015-07-23 17:00:00")
test("2015-01-24 00:00:00", "PST", "2015-01-23 16:00:00")
test(null, "UTC", null)
test("2015-07-24 00:00:00", null, null)
test(null, null, null)
withSQLConf(SQLConf.UTC_TIMESTAMP_FUNC_ENABLED.key -> "true") {
test("2015-07-24 00:00:00", "PST", "2015-07-23 17:00:00")
test("2015-01-24 00:00:00", "PST", "2015-01-23 16:00:00")
test(null, "UTC", null)
test("2015-07-24 00:00:00", null, null)
test(null, null, null)
}
val msg = intercept[AnalysisException] {
test("2015-07-24 00:00:00", "PST", "2015-07-23 17:00:00")
}.getMessage
assert(msg.contains(SQLConf.UTC_TIMESTAMP_FUNC_ENABLED.key))
}

test("from_utc_timestamp - invalid time zone id") {
Seq("Invalid time zone", "\"quote", "UTC*42").foreach { invalidTz =>
val msg = intercept[java.time.DateTimeException] {
GenerateUnsafeProjection.generate(FromUTCTimestamp(Literal(0), Literal(invalidTz)) :: Nil)
}.getMessage
assert(msg.contains(invalidTz))
withSQLConf(SQLConf.UTC_TIMESTAMP_FUNC_ENABLED.key -> "true") {
Seq("Invalid time zone", "\"quote", "UTC*42").foreach { invalidTz =>
val msg = intercept[java.time.DateTimeException] {
GenerateUnsafeProjection.generate(
FromUTCTimestamp(Literal(0), Literal(invalidTz)) :: Nil)
}.getMessage
assert(msg.contains(invalidTz))
}
}
}
}
4 changes: 4 additions & 0 deletions sql/core/src/main/scala/org/apache/spark/sql/functions.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2988,6 +2988,7 @@ object functions {
* @group datetime_funcs
* @since 1.5.0
*/
@deprecated("This function is deprecated and will be removed in future versions.", "3.0.0")
def from_utc_timestamp(ts: Column, tz: String): Column = withExpr {
FromUTCTimestamp(ts.expr, Literal(tz))
}
Expand All @@ -2999,6 +3000,7 @@ object functions {
* @group datetime_funcs
* @since 2.4.0
*/
@deprecated("This function is deprecated and will be removed in future versions.", "3.0.0")
def from_utc_timestamp(ts: Column, tz: Column): Column = withExpr {
FromUTCTimestamp(ts.expr, tz.expr)
}
Expand All @@ -3017,6 +3019,7 @@ object functions {
* @group datetime_funcs
* @since 1.5.0
*/
@deprecated("This function is deprecated and will be removed in future versions.", "3.0.0")
def to_utc_timestamp(ts: Column, tz: String): Column = withExpr {
ToUTCTimestamp(ts.expr, Literal(tz))
}
Expand All @@ -3028,6 +3031,7 @@ object functions {
* @group datetime_funcs
* @since 2.4.0
*/
@deprecated("This function is deprecated and will be removed in future versions.", "3.0.0")
def to_utc_timestamp(ts: Column, tz: Column): Column = withExpr {
ToUTCTimestamp(ts.expr, tz.expr)
}
Expand Down
Loading