Skip to content
Closed
7 changes: 7 additions & 0 deletions core/src/main/scala/org/apache/spark/SparkException.scala
Original file line number Diff line number Diff line change
Expand Up @@ -43,3 +43,10 @@ private[spark] case class SparkUserAppException(exitCode: Int)
*/
private[spark] case class ExecutorDeadException(message: String)
extends SparkException(message)

/**
* Exception thrown when Spark returns different result after upgrading to a new version.
*/
private[spark] class SparkUpgradeException(version: String, message: String, cause: Throwable)
extends SparkException("You may get a different result due to the upgrading of Spark" +
s" $version: $message", cause)
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import scala.util.control.NonFatal

import com.univocity.parsers.csv.CsvParser

import org.apache.spark.SparkUpgradeException
import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{ExprUtils, GenericInternalRow}
Expand Down Expand Up @@ -179,6 +180,7 @@ class UnivocityParser(
try {
timestampFormatter.parse(datum)
} catch {
case e: SparkUpgradeException => throw e
case NonFatal(e) =>
// If fails to parse, then tries the way used in 2.0 and 1.x for backwards
// compatibility.
Expand All @@ -192,6 +194,7 @@ class UnivocityParser(
try {
dateFormatter.parse(datum)
} catch {
case e: SparkUpgradeException => throw e
case NonFatal(e) =>
// If fails to parse, then tries the way used in 2.0 and 1.x for backwards
// compatibility.
Expand Down Expand Up @@ -285,6 +288,7 @@ class UnivocityParser(
}
}
} catch {
case e: SparkUpgradeException => throw e
case NonFatal(e) =>
badRecordException = badRecordException.orElse(Some(e))
row.setNullAt(i)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import scala.util.control.NonFatal

import org.apache.commons.text.StringEscapeUtils

import org.apache.spark.SparkUpgradeException
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.codegen._
Expand Down Expand Up @@ -789,6 +790,7 @@ abstract class ToTimestamp
formatter.parse(
t.asInstanceOf[UTF8String].toString) / downScaleFactor
} catch {
case e: SparkUpgradeException => throw e
case NonFatal(_) => null
}
}
Expand All @@ -802,6 +804,7 @@ abstract class ToTimestamp
TimestampFormatter(formatString, zoneId, legacyFormat = SIMPLE_DATE_FORMAT)
.parse(t.asInstanceOf[UTF8String].toString) / downScaleFactor
} catch {
case e: SparkUpgradeException => throw e
case NonFatal(_) => null
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import scala.util.control.NonFatal

import com.fasterxml.jackson.core._

import org.apache.spark.SparkUpgradeException
import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions._
Expand Down Expand Up @@ -232,6 +233,7 @@ class JacksonParser(
try {
timestampFormatter.parse(parser.getText)
} catch {
case e: SparkUpgradeException => throw e
case NonFatal(e) =>
// If fails to parse, then tries the way used in 2.0 and 1.x for backwards
// compatibility.
Expand All @@ -249,6 +251,7 @@ class JacksonParser(
try {
dateFormatter.parse(parser.getText)
} catch {
case e: SparkUpgradeException => throw e
case NonFatal(e) =>
// If fails to parse, then tries the way used in 2.0 and 1.x for backwards
// compatibility.
Expand Down Expand Up @@ -382,6 +385,7 @@ class JacksonParser(
try {
row.update(index, fieldConverters(index).apply(parser))
} catch {
case e: SparkUpgradeException => throw e
case NonFatal(e) =>
badRecordException = badRecordException.orElse(Some(e))
parser.skipChildren()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@ import java.util.{Date, Locale}

import org.apache.commons.lang3.time.FastDateFormat

import org.apache.spark.sql.catalyst.util.DateTimeConstants.MICROS_PER_MILLIS
import org.apache.spark.sql.catalyst.util.DateTimeUtils._
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.internal.SQLConf.LegacyBehaviorPolicy._

sealed trait DateFormatter extends Serializable {
def parse(s: String): Int // returns days since epoch
Expand All @@ -35,16 +35,24 @@ sealed trait DateFormatter extends Serializable {
class Iso8601DateFormatter(
pattern: String,
zoneId: ZoneId,
locale: Locale) extends DateFormatter with DateTimeFormatterHelper {
locale: Locale,
legacyFormat: LegacyDateFormats.LegacyDateFormat)
extends DateFormatter with DateTimeFormatterHelper {

@transient
private lazy val formatter = getOrCreateFormatter(pattern, locale)

@transient
private lazy val legacyFormatter = DateFormatter.getLegacyFormatter(
pattern, zoneId, locale, legacyFormat)

override def parse(s: String): Int = {
val specialDate = convertSpecialDate(s.trim, zoneId)
specialDate.getOrElse {
val localDate = LocalDate.parse(s, formatter)
localDateToDays(localDate)
try {
val localDate = LocalDate.parse(s, formatter)
localDateToDays(localDate)
} catch checkDiffResult(s, legacyFormatter.parse)
}
}

Expand Down Expand Up @@ -88,33 +96,40 @@ object DateFormatter {
val defaultLocale: Locale = Locale.US

def defaultPattern(): String = {
if (SQLConf.get.legacyTimeParserEnabled) "yyyy-MM-dd" else "uuuu-MM-dd"
if (SQLConf.get.legacyTimeParserPolicy == LEGACY) "yyyy-MM-dd" else "uuuu-MM-dd"
}

private def getFormatter(
format: Option[String],
zoneId: ZoneId,
locale: Locale = defaultLocale,
legacyFormat: LegacyDateFormat = LENIENT_SIMPLE_DATE_FORMAT): DateFormatter = {

format: Option[String],
zoneId: ZoneId,
locale: Locale = defaultLocale,
legacyFormat: LegacyDateFormat = LENIENT_SIMPLE_DATE_FORMAT): DateFormatter = {
val pattern = format.getOrElse(defaultPattern)
if (SQLConf.get.legacyTimeParserEnabled) {
legacyFormat match {
case FAST_DATE_FORMAT =>
new LegacyFastDateFormatter(pattern, locale)
case SIMPLE_DATE_FORMAT | LENIENT_SIMPLE_DATE_FORMAT =>
new LegacySimpleDateFormatter(pattern, locale)
}
if (SQLConf.get.legacyTimeParserPolicy == LEGACY) {
getLegacyFormatter(pattern, zoneId, locale, legacyFormat)
} else {
new Iso8601DateFormatter(pattern, zoneId, locale)
new Iso8601DateFormatter(pattern, zoneId, locale, legacyFormat)
}
}

def getLegacyFormatter(
pattern: String,
zoneId: ZoneId,
locale: Locale,
legacyFormat: LegacyDateFormat): DateFormatter = {
legacyFormat match {
case FAST_DATE_FORMAT =>
new LegacyFastDateFormatter(pattern, locale)
case SIMPLE_DATE_FORMAT | LENIENT_SIMPLE_DATE_FORMAT =>
new LegacySimpleDateFormatter(pattern, locale)
}
}

def apply(
format: String,
zoneId: ZoneId,
locale: Locale,
legacyFormat: LegacyDateFormat): DateFormatter = {
format: String,
zoneId: ZoneId,
locale: Locale,
legacyFormat: LegacyDateFormat): DateFormatter = {
getFormatter(Some(format), zoneId, locale, legacyFormat)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,16 @@ package org.apache.spark.sql.catalyst.util

import java.time._
import java.time.chrono.IsoChronology
import java.time.format.{DateTimeFormatter, DateTimeFormatterBuilder, ResolverStyle}
import java.time.format.{DateTimeFormatter, DateTimeFormatterBuilder, DateTimeParseException, ResolverStyle}
import java.time.temporal.{ChronoField, TemporalAccessor, TemporalQueries}
import java.util.Locale

import com.google.common.cache.CacheBuilder

import org.apache.spark.SparkUpgradeException
import org.apache.spark.sql.catalyst.util.DateTimeFormatterHelper._
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.internal.SQLConf.LegacyBehaviorPolicy._

trait DateTimeFormatterHelper {
// Converts the parsed temporal object to ZonedDateTime. It sets time components to zeros
Expand Down Expand Up @@ -57,6 +60,27 @@ trait DateTimeFormatterHelper {
}
formatter
}

// When legacy time parser policy set to EXCEPTION, check whether we will get different results
// between legacy parser and new parser. If new parser fails but legacy parser works, throw a
// SparkUpgradeException. On the contrary, if the legacy policy set to CORRECTED,
// DateTimeParseException will address by the caller side.
protected def checkDiffResult[T](
s: String, legacyParseFunc: String => T): PartialFunction[Throwable, T] = {
case e: DateTimeParseException if SQLConf.get.legacyTimeParserPolicy == EXCEPTION =>
val res = try {
Some(legacyParseFunc(s))
} catch {
case _: Throwable => None
}
if (res.nonEmpty) {
throw new SparkUpgradeException("3.0", s"Fail to parse '$s' in the new parser. You can " +
s"set ${SQLConf.LEGACY_TIME_PARSER_POLICY.key} to LEGACY to restore the behavior " +
s"before Spark 3.0, or set to CORRECTED and treat it as an invalid datetime string.", e)
} else {
throw e
}
}
}

private object DateTimeFormatterHelper {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,9 @@ import org.apache.commons.lang3.time.FastDateFormat

import org.apache.spark.sql.catalyst.util.DateTimeConstants._
import org.apache.spark.sql.catalyst.util.DateTimeUtils._
import org.apache.spark.sql.catalyst.util.LegacyDateFormats.{LegacyDateFormat, LENIENT_SIMPLE_DATE_FORMAT}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.internal.SQLConf.LegacyBehaviorPolicy._
import org.apache.spark.sql.types.Decimal

sealed trait TimestampFormatter extends Serializable {
Expand All @@ -52,21 +54,29 @@ sealed trait TimestampFormatter extends Serializable {
class Iso8601TimestampFormatter(
pattern: String,
zoneId: ZoneId,
locale: Locale) extends TimestampFormatter with DateTimeFormatterHelper {
locale: Locale,
legacyFormat: LegacyDateFormat = LENIENT_SIMPLE_DATE_FORMAT)
extends TimestampFormatter with DateTimeFormatterHelper {
@transient
protected lazy val formatter = getOrCreateFormatter(pattern, locale)

@transient
protected lazy val legacyFormatter = TimestampFormatter.getLegacyFormatter(
pattern, zoneId, locale, legacyFormat)

override def parse(s: String): Long = {
val specialDate = convertSpecialTimestamp(s.trim, zoneId)
specialDate.getOrElse {
val parsed = formatter.parse(s)
val parsedZoneId = parsed.query(TemporalQueries.zone())
val timeZoneId = if (parsedZoneId == null) zoneId else parsedZoneId
val zonedDateTime = toZonedDateTime(parsed, timeZoneId)
val epochSeconds = zonedDateTime.toEpochSecond
val microsOfSecond = zonedDateTime.get(MICRO_OF_SECOND)

Math.addExact(SECONDS.toMicros(epochSeconds), microsOfSecond)
try {
val parsed = formatter.parse(s)
val parsedZoneId = parsed.query(TemporalQueries.zone())
val timeZoneId = if (parsedZoneId == null) zoneId else parsedZoneId
val zonedDateTime = toZonedDateTime(parsed, timeZoneId)
val epochSeconds = zonedDateTime.toEpochSecond
val microsOfSecond = zonedDateTime.get(MICRO_OF_SECOND)

Math.addExact(SECONDS.toMicros(epochSeconds), microsOfSecond)
} catch checkDiffResult(s, legacyFormatter.parse)
}
}

Expand Down Expand Up @@ -186,31 +196,38 @@ object TimestampFormatter {
def defaultPattern(): String = s"${DateFormatter.defaultPattern()} HH:mm:ss"

private def getFormatter(
format: Option[String],
zoneId: ZoneId,
locale: Locale = defaultLocale,
legacyFormat: LegacyDateFormat = LENIENT_SIMPLE_DATE_FORMAT): TimestampFormatter = {

format: Option[String],
zoneId: ZoneId,
locale: Locale = defaultLocale,
legacyFormat: LegacyDateFormat = LENIENT_SIMPLE_DATE_FORMAT): TimestampFormatter = {
val pattern = format.getOrElse(defaultPattern)
if (SQLConf.get.legacyTimeParserEnabled) {
legacyFormat match {
case FAST_DATE_FORMAT =>
new LegacyFastTimestampFormatter(pattern, zoneId, locale)
case SIMPLE_DATE_FORMAT =>
new LegacySimpleTimestampFormatter(pattern, zoneId, locale, lenient = false)
case LENIENT_SIMPLE_DATE_FORMAT =>
new LegacySimpleTimestampFormatter(pattern, zoneId, locale, lenient = true)
}
if (SQLConf.get.legacyTimeParserPolicy == LEGACY) {
getLegacyFormatter(pattern, zoneId, locale, legacyFormat)
} else {
new Iso8601TimestampFormatter(pattern, zoneId, locale)
new Iso8601TimestampFormatter(pattern, zoneId, locale, legacyFormat)
}
}

def getLegacyFormatter(
pattern: String,
zoneId: ZoneId,
locale: Locale,
legacyFormat: LegacyDateFormat): TimestampFormatter = {
legacyFormat match {
case FAST_DATE_FORMAT =>
new LegacyFastTimestampFormatter(pattern, zoneId, locale)
case SIMPLE_DATE_FORMAT =>
new LegacySimpleTimestampFormatter(pattern, zoneId, locale, lenient = false)
case LENIENT_SIMPLE_DATE_FORMAT =>
new LegacySimpleTimestampFormatter(pattern, zoneId, locale, lenient = true)
}
}

def apply(
format: String,
zoneId: ZoneId,
locale: Locale,
legacyFormat: LegacyDateFormat): TimestampFormatter = {
format: String,
zoneId: ZoneId,
locale: Locale,
legacyFormat: LegacyDateFormat): TimestampFormatter = {
getFormatter(Some(format), zoneId, locale, legacyFormat)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2352,6 +2352,18 @@ object SQLConf {
.checkValues(LegacyBehaviorPolicy.values.map(_.toString))
.createWithDefault(LegacyBehaviorPolicy.EXCEPTION.toString)

val LEGACY_TIME_PARSER_POLICY = buildConf("spark.sql.legacy.timeParserPolicy")
.internal()
.doc("When LEGACY, java.text.SimpleDateFormat is used for formatting and parsing " +
"dates/timestamps in a locale-sensitive manner, which is the approach before Spark 3.0. " +
"When set to CORRECTED, classes from java.time.* packages are used for the same purpose. " +
"The default value is EXCEPTION, RuntimeException is thrown when we will get different " +
"results.")
.stringConf
.transform(_.toUpperCase(Locale.ROOT))
.checkValues(LegacyBehaviorPolicy.values.map(_.toString))
.createWithDefault(LegacyBehaviorPolicy.EXCEPTION.toString)

val LEGACY_ARRAY_EXISTS_FOLLOWS_THREE_VALUED_LOGIC =
buildConf("spark.sql.legacy.followThreeValuedLogicInArrayExists")
.internal()
Expand Down Expand Up @@ -2743,7 +2755,9 @@ class SQLConf extends Serializable with Logging {
def legacyMsSqlServerNumericMappingEnabled: Boolean =
getConf(LEGACY_MSSQLSERVER_NUMERIC_MAPPING_ENABLED)

def legacyTimeParserEnabled: Boolean = getConf(SQLConf.LEGACY_TIME_PARSER_ENABLED)
def legacyTimeParserPolicy: LegacyBehaviorPolicy.Value = {
LegacyBehaviorPolicy.withName(getConf(SQLConf.LEGACY_TIME_PARSER_POLICY))
}

/**
* Returns the [[Resolver]] for the current configuration, which can be used to determine if two
Expand Down
Loading