Skip to content
2 changes: 2 additions & 0 deletions docs/sql-migration-guide-upgrade.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ displayTitle: Spark SQL Upgrading Guide

- In Spark version 2.4 and earlier, if `org.apache.spark.sql.functions.udf(Any, DataType)` gets a Scala closure with primitive-type argument, the returned UDF will return null if the input values is null. Since Spark 3.0, the UDF will return the default value of the Java type if the input value is null. For example, `val f = udf((x: Int) => x, IntegerType)`, `f($"x")` will return null in Spark 2.4 and earlier if column `x` is null, and return 0 in Spark 3.0. This behavior change is introduced because Spark 3.0 is built with Scala 2.12 by default.

- Since Spark 3.0, the JDBC options `lowerBound` and `upperBound` are converted to TimestampType/DateType values in the same way as casting strings to TimestampType/DateType values. The conversion is based on Proleptic Gregorian calendar, and time zone defined by the SQL config `spark.sql.session.timeZone`. In Spark version 2.4 and earlier, the conversion is based on the hybrid calendar (Julian + Gregorian) and on default system time zone.

## Upgrading From Spark SQL 2.3 to 2.4

- In Spark version 2.3 and earlier, the second parameter to array_contains function is implicitly promoted to the element type of first array type parameter. This type promotion can be lossy and may cause `array_contains` function to return wrong result. This problem has been addressed in 2.4 by employing a safer type promotion mechanism. This can cause some change in behavior and are illustrated in the table below.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@

package org.apache.spark.sql.execution.datasources.jdbc

import java.sql.{Date, Timestamp}

import scala.collection.mutable.ArrayBuffer

import org.apache.spark.Partition
Expand All @@ -27,10 +25,12 @@ import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{AnalysisException, DataFrame, Row, SaveMode, SparkSession, SQLContext}
import org.apache.spark.sql.catalyst.analysis._
import org.apache.spark.sql.catalyst.util.{DateFormatter, DateTimeUtils, TimestampFormatter}
import org.apache.spark.sql.catalyst.util.DateTimeUtils.{getTimeZone, stringToDate, stringToTimestamp}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.jdbc.JdbcDialects
import org.apache.spark.sql.sources._
import org.apache.spark.sql.types.{DataType, DateType, NumericType, StructType, TimestampType}
import org.apache.spark.unsafe.types.UTF8String

/**
* Instructions on how to partition the table among workers.
Expand Down Expand Up @@ -85,8 +85,8 @@ private[sql] object JDBCRelation extends Logging {
val (column, columnType) = verifyAndGetNormalizedPartitionColumn(
schema, partitionColumn.get, resolver, jdbcOptions)

val lowerBoundValue = toInternalBoundValue(lowerBound.get, columnType)
val upperBoundValue = toInternalBoundValue(upperBound.get, columnType)
val lowerBoundValue = toInternalBoundValue(lowerBound.get, columnType, timeZoneId)
val upperBoundValue = toInternalBoundValue(upperBound.get, columnType, timeZoneId)
JDBCPartitioningInfo(
column, columnType, lowerBoundValue, upperBoundValue, numPartitions.get)
}
Expand Down Expand Up @@ -174,10 +174,21 @@ private[sql] object JDBCRelation extends Logging {
(dialect.quoteIdentifier(column.name), column.dataType)
}

private def toInternalBoundValue(value: String, columnType: DataType): Long = columnType match {
case _: NumericType => value.toLong
case DateType => DateTimeUtils.fromJavaDate(Date.valueOf(value)).toLong
case TimestampType => DateTimeUtils.fromJavaTimestamp(Timestamp.valueOf(value))
private def toInternalBoundValue(
value: String,
columnType: DataType,
timeZoneId: String): Long = {
def parse[T](f: UTF8String => Option[T]): T = {
f(UTF8String.fromString(value)).getOrElse {
throw new IllegalArgumentException(
s"Cannot parse the bound value $value as ${columnType.catalogString}")
}
}
columnType match {
case _: NumericType => value.toLong
case DateType => parse(stringToDate).toLong
case TimestampType => parse(stringToTimestamp(_, getTimeZone(timeZoneId)))
}
}

private def toBoundValueInWhereClause(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import org.scalatest.{BeforeAndAfter, PrivateMethodTester}
import org.apache.spark.SparkException
import org.apache.spark.sql.{AnalysisException, DataFrame, QueryTest, Row}
import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeTestUtils}
import org.apache.spark.sql.execution.DataSourceScanExec
import org.apache.spark.sql.execution.command.ExplainCommand
import org.apache.spark.sql.execution.datasources.LogicalRelation
Expand Down Expand Up @@ -1523,4 +1523,37 @@ class JDBCSuite extends QueryTest
assert(e.contains("The driver could not open a JDBC connection. " +
"Check the URL: jdbc:mysql://localhost/db"))
}

test("parsing timestamp bounds") {
DateTimeTestUtils.outstandingTimezonesIds.foreach { timeZone =>
withSQLConf(SQLConf.SESSION_LOCAL_TIMEZONE.key -> timeZone) {
Seq(
("1972-07-04 03:30:00", "1972-07-15 20:50:32.5", "1972-07-27 14:11:05"),
("2019-01-20 12:00:00.502", "2019-01-20 12:00:00.751", "2019-01-20 12:00:01.000"),
(
"2019-01-20T00:00:00.123456",
"2019-01-20 00:05:00.123456",
"2019-01-20T00:10:00.123456"),
("1500-01-20T00:00:00.123456", "1500-01-20 00:05:00.123456", "1500-01-20T00:10:00.123456")
).foreach { case (lower, middle, upper) =>
val df = spark.read.format("jdbc")
.option("url", urlWithUserAndPass)
.option("dbtable", "TEST.DATETIME")
.option("partitionColumn", "t")
.option("lowerBound", lower)
.option("upperBound", upper)
.option("numPartitions", 2)
.load()

df.logicalPlan match {
case LogicalRelation(JDBCRelation(_, parts, _), _, _, _) =>
val whereClauses = parts.map(_.asInstanceOf[JDBCPartition].whereClause).toSet
assert(whereClauses === Set(
s""""T" < '$middle' or "T" is null""",
s""""T" >= '$middle'"""))
}
}
}
}
}
}