Skip to content
Closed
Show file tree
Hide file tree
Changes from 16 commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
e09c972
Switching to TimestampFormatter
MaxGekk Dec 27, 2018
697688a
Switching to DateFormatter
MaxGekk Dec 27, 2018
fcffc12
Separate parser for PartitioningUtils
MaxGekk Dec 27, 2018
f0a9fe7
Revert unneeded changes
MaxGekk Dec 27, 2018
5f0b0a3
Moving partition timestamp and date parsers to PartitioningUtils
MaxGekk Dec 28, 2018
17a32a3
Add local date and timestamp to Cast
MaxGekk Dec 28, 2018
9d90d3f
Add local date and timestamp formatters to QueryExecution
MaxGekk Dec 28, 2018
038ad80
Add local date and timestamp formatters to JDBC
MaxGekk Dec 28, 2018
f6308f6
Removing unused timestampToString
MaxGekk Dec 28, 2018
9f85ac6
Removing unused dateToString
MaxGekk Dec 28, 2018
d348c07
Fix build error
MaxGekk Dec 28, 2018
36c9f9a
Remove unused threadLocalTimestampFormat
MaxGekk Dec 28, 2018
4580c25
Revert changes in TimestampFormatter
MaxGekk Dec 28, 2018
56bdae4
apply with default locale
MaxGekk Dec 28, 2018
8541602
Merge remote-tracking branch 'origin/master' into thread-local-date-f…
MaxGekk Dec 28, 2018
ecf0e89
Merge remote-tracking branch 'origin/master' into thread-local-date-f…
MaxGekk Dec 28, 2018
c3066e1
Making dateFormatter lazy in cast
MaxGekk Dec 28, 2018
eed40d7
Removing date partition pattern
MaxGekk Dec 30, 2018
94cad6a
Merge branch 'master' into thread-local-date-format
MaxGekk Jan 1, 2019
0de72c8
Merge branch 'master' into thread-local-date-format
MaxGekk Jan 2, 2019
1156291
Merge remote-tracking branch 'origin/master' into thread-local-date-f…
MaxGekk Jan 3, 2019
5ae58a3
Fix merge
MaxGekk Jan 3, 2019
78c3961
Merge remote-tracking branch 'origin/master' into thread-local-date-f…
MaxGekk Jan 7, 2019
2397401
Fix merge
MaxGekk Jan 7, 2019
206c955
Merge remote-tracking branch 'origin/master' into thread-local-date-f…
MaxGekk Jan 11, 2019
483e95b
Remove unused import
MaxGekk Jan 14, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -230,12 +230,15 @@ case class Cast(child: Expression, dataType: DataType, timeZoneId: Option[String
// [[func]] assumes the input is no longer null because eval already does the null check.
@inline private[this] def buildCast[T](a: Any, func: T => Any): Any = func(a.asInstanceOf[T])

private val dateFormatter = DateFormatter()
private lazy val timestampFormatter = TimestampFormatter(timeZone)

// UDFToString
private[this] def castToString(from: DataType): Any => Any = from match {
case BinaryType => buildCast[Array[Byte]](_, UTF8String.fromBytes)
case DateType => buildCast[Int](_, d => UTF8String.fromString(DateTimeUtils.dateToString(d)))
case DateType => buildCast[Int](_, d => UTF8String.fromString(dateFormatter.format(d)))
case TimestampType => buildCast[Long](_,
t => UTF8String.fromString(DateTimeUtils.timestampToString(t, timeZone)))
t => UTF8String.fromString(DateTimeUtils.timestampToString(timestampFormatter, t)))
case ArrayType(et, _) =>
buildCast[ArrayData](_, array => {
val builder = new UTF8StringBuilder
Expand Down Expand Up @@ -843,12 +846,16 @@ case class Cast(child: Expression, dataType: DataType, timeZoneId: Option[String
case BinaryType =>
(c, evPrim, evNull) => code"$evPrim = UTF8String.fromBytes($c);"
case DateType =>
(c, evPrim, evNull) => code"""$evPrim = UTF8String.fromString(
org.apache.spark.sql.catalyst.util.DateTimeUtils.dateToString($c));"""
val df = JavaCode.global(
ctx.addReferenceObj("dateFormatter", dateFormatter),
dateFormatter.getClass)
(c, evPrim, evNull) => code"""$evPrim = UTF8String.fromString(${df}.format($c));"""
case TimestampType =>
val tz = JavaCode.global(ctx.addReferenceObj("timeZone", timeZone), timeZone.getClass)
val tf = JavaCode.global(
ctx.addReferenceObj("timestampFormatter", timestampFormatter),
timestampFormatter.getClass)
(c, evPrim, evNull) => code"""$evPrim = UTF8String.fromString(
org.apache.spark.sql.catalyst.util.DateTimeUtils.timestampToString($c, $tz));"""
org.apache.spark.sql.catalyst.util.DateTimeUtils.timestampToString($tf, $c));"""
case ArrayType(et, _) =>
(c, evPrim, evNull) => {
val buffer = ctx.freshVariable("buffer", classOf[UTF8StringBuilder])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -562,7 +562,7 @@ case class DateFormatClass(left: Expression, right: Expression, timeZoneId: Opti
copy(timeZoneId = Option(timeZoneId))

override protected def nullSafeEval(timestamp: Any, format: Any): Any = {
val df = TimestampFormatter(format.toString, timeZone, Locale.US)
val df = TimestampFormatter(format.toString, timeZone)
UTF8String.fromString(df.format(timestamp.asInstanceOf[Long]))
}

Expand Down Expand Up @@ -667,7 +667,7 @@ abstract class UnixTime
private lazy val constFormat: UTF8String = right.eval().asInstanceOf[UTF8String]
private lazy val formatter: TimestampFormatter =
try {
TimestampFormatter(constFormat.toString, timeZone, Locale.US)
TimestampFormatter(constFormat.toString, timeZone)
} catch {
case NonFatal(_) => null
}
Expand Down Expand Up @@ -700,7 +700,7 @@ abstract class UnixTime
} else {
val formatString = f.asInstanceOf[UTF8String].toString
try {
TimestampFormatter(formatString, timeZone, Locale.US).parse(
TimestampFormatter(formatString, timeZone).parse(
t.asInstanceOf[UTF8String].toString) / MICROS_PER_SECOND
} catch {
case NonFatal(_) => null
Expand Down Expand Up @@ -821,7 +821,7 @@ case class FromUnixTime(sec: Expression, format: Expression, timeZoneId: Option[
private lazy val constFormat: UTF8String = right.eval().asInstanceOf[UTF8String]
private lazy val formatter: TimestampFormatter =
try {
TimestampFormatter(constFormat.toString, timeZone, Locale.US)
TimestampFormatter(constFormat.toString, timeZone)
} catch {
case NonFatal(_) => null
}
Expand All @@ -847,7 +847,7 @@ case class FromUnixTime(sec: Expression, format: Expression, timeZoneId: Option[
null
} else {
try {
UTF8String.fromString(TimestampFormatter(f.toString, timeZone, Locale.US)
UTF8String.fromString(TimestampFormatter(f.toString, timeZone)
.format(time.asInstanceOf[Long] * MICROS_PER_SECOND))
} catch {
case NonFatal(_) => null
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,11 +88,18 @@ class LegacyFallbackDateFormatter(
}

object DateFormatter {
val defaultPattern: String = "yyyy-MM-dd"
val defaultLocale: Locale = Locale.US

def apply(format: String, locale: Locale): DateFormatter = {
if (SQLConf.get.legacyTimeParserEnabled) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When would you need this? You could argue that since we are moving to Spark 3.0 we don't need to care as much about legacy.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this PR, date and timestamp patterns are fixed, and we shouldn't see any behavior changes but DateFormatter/TimestampFormatter are used from CSV/JSON Datasources and from a functions where user can set any patterns. Unfortunately supported patterns by SimpleDateFormat and DateTimeFormat are not absolutely the same. Also there are other differences in their behavior: https://github.com/apache/spark/pull/23358/files#diff-3f19ec3d15dcd8cd42bb25dde1c5c1a9R42

What I have learned from other PRs, if I introduce a behavior change, I should leave opportunity to users to come back to previous behavior. Later the old behavior can be deprecated.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am just saying that we are going to break stuff anyway. If the legacy behavior is somewhat unreasonable, then we should consider not supporting it.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Getting rid of the flag in the PR is slightly out of its scope, I believe. I would prefer to open a ticket and leave that to somebody who is much more brave.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The ticket to remove the flag: https://issues.apache.org/jira/browse/SPARK-26503

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For clarification, I don't think we should treat the previous behaviour unreasonable ... I am okay with considering to remove that legacy configuration regarding that we're going ahead for 3.0, it causes some overhead about maintenance, and it blocks some features.

Also, for clarification, it's kind of a breaking changes. Think about that the CSV codes were dependent on timestamp being inferred and suddenly it becomes strings after upgrade. Even, this behaviour was documented in 2.x (by referring SimpleDateFormat).

new LegacyFallbackDateFormatter(format, locale)
} else {
new Iso8601DateFormatter(format, locale)
}
}

def apply(format: String): DateFormatter = apply(format, defaultLocale)

def apply(): DateFormatter = apply(defaultPattern)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Both formatters seems to use thread safe implementations. You could consider just returning cached instances here.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At the moment, both formatters are created per partition at least not per row. Do you think it makes sense to cache them?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, lets leave it for now.

}
Original file line number Diff line number Diff line change
Expand Up @@ -76,32 +76,6 @@ object DateTimeUtils {
}
}

// `SimpleDateFormat` is not thread-safe.
private val threadLocalTimestampFormat = new ThreadLocal[DateFormat] {
override def initialValue(): SimpleDateFormat = {
new SimpleDateFormat("yyyy-MM-dd HH:mm:ss", Locale.US)
}
}

def getThreadLocalTimestampFormat(timeZone: TimeZone): DateFormat = {
val sdf = threadLocalTimestampFormat.get()
sdf.setTimeZone(timeZone)
sdf
}

// `SimpleDateFormat` is not thread-safe.
private val threadLocalDateFormat = new ThreadLocal[DateFormat] {
override def initialValue(): SimpleDateFormat = {
new SimpleDateFormat("yyyy-MM-dd", Locale.US)
}
}

def getThreadLocalDateFormat(timeZone: TimeZone): DateFormat = {
val sdf = threadLocalDateFormat.get()
sdf.setTimeZone(timeZone)
sdf
}

private val computedTimeZones = new ConcurrentHashMap[String, TimeZone]
private val computeTimeZone = new JFunction[String, TimeZone] {
override def apply(timeZoneId: String): TimeZone = TimeZone.getTimeZone(timeZoneId)
Expand Down Expand Up @@ -133,24 +107,11 @@ object DateTimeUtils {
millisLocal - getOffsetFromLocalMillis(millisLocal, timeZone)
}

def dateToString(days: SQLDate): String =
getThreadLocalDateFormat(defaultTimeZone()).format(toJavaDate(days))

def dateToString(days: SQLDate, timeZone: TimeZone): String = {
getThreadLocalDateFormat(timeZone).format(toJavaDate(days))
}

// Converts Timestamp to string according to Hive TimestampWritable convention.
def timestampToString(us: SQLTimestamp): String = {
timestampToString(us, defaultTimeZone())
}

// Converts Timestamp to string according to Hive TimestampWritable convention.
def timestampToString(us: SQLTimestamp, timeZone: TimeZone): String = {
def timestampToString(tf: TimestampFormatter, us: SQLTimestamp): String = {
val ts = toJavaTimestamp(us)
val timestampString = ts.toString
val timestampFormat = getThreadLocalTimestampFormat(timeZone)
val formatted = timestampFormat.format(ts)
val formatted = tf.format(us)

if (timestampString.length > 19 && timestampString.substring(19) != ".0") {
formatted + timestampString.substring(19)
Expand Down Expand Up @@ -1134,7 +1095,5 @@ object DateTimeUtils {
*/
private[util] def resetThreadLocals(): Unit = {
threadLocalGmtCalendar.remove()
threadLocalTimestampFormat.remove()
threadLocalDateFormat.remove()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ sealed trait TimestampFormatter extends Serializable {
@throws(classOf[ParseException])
@throws(classOf[DateTimeParseException])
@throws(classOf[DateTimeException])
def parse(s: String): Long // returns microseconds since epoch
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why remove this comment?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

because it duplicates method description above.

def parse(s: String): Long
def format(us: Long): String
}

Expand Down Expand Up @@ -105,11 +105,22 @@ class LegacyFallbackTimestampFormatter(
}

object TimestampFormatter {
val defaultPattern: String = "yyyy-MM-dd HH:mm:ss"
val defaultLocale: Locale = Locale.US

def apply(format: String, timeZone: TimeZone, locale: Locale): TimestampFormatter = {
if (SQLConf.get.legacyTimeParserEnabled) {
new LegacyFallbackTimestampFormatter(format, timeZone, locale)
} else {
new Iso8601TimestampFormatter(format, timeZone, locale)
}
}

def apply(format: String, timeZone: TimeZone): TimestampFormatter = {
apply(format, timeZone, defaultLocale)
}

def apply(timeZone: TimeZone): TimestampFormatter = {
apply(defaultPattern, timeZone, defaultLocale)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,11 @@ class DateTimeUtilsSuite extends SparkFunSuite {
}

test("nanoseconds truncation") {
val tf = TimestampFormatter(DateTimeUtils.defaultTimeZone())
def checkStringToTimestamp(originalTime: String, expectedParsedTime: String) {
val parsedTimestampOp = DateTimeUtils.stringToTimestamp(UTF8String.fromString(originalTime))
assert(parsedTimestampOp.isDefined, "timestamp with nanoseconds was not parsed correctly")
assert(DateTimeUtils.timestampToString(parsedTimestampOp.get) === expectedParsedTime)
assert(DateTimeUtils.timestampToString(tf, parsedTimestampOp.get) === expectedParsedTime)
}

checkStringToTimestamp("2015-01-02 00:00:00.123456789", "2015-01-02 00:00:00.123456")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ class DateFormatterSuite extends SparkFunSuite with SQLHelper {
test("parsing dates") {
DateTimeTestUtils.outstandingTimezonesIds.foreach { timeZone =>
withSQLConf(SQLConf.SESSION_LOCAL_TIMEZONE.key -> timeZone) {
val formatter = DateFormatter("yyyy-MM-dd", Locale.US)
val formatter = DateFormatter()
val daysSinceEpoch = formatter.parse("2018-12-02")
assert(daysSinceEpoch === 17867)
}
Expand All @@ -39,7 +39,7 @@ class DateFormatterSuite extends SparkFunSuite with SQLHelper {
test("format dates") {
DateTimeTestUtils.outstandingTimezonesIds.foreach { timeZone =>
withSQLConf(SQLConf.SESSION_LOCAL_TIMEZONE.key -> timeZone) {
val formatter = DateFormatter("yyyy-MM-dd", Locale.US)
val formatter = DateFormatter()
val date = formatter.format(17867)
assert(date === "2018-12-02")
}
Expand All @@ -59,7 +59,7 @@ class DateFormatterSuite extends SparkFunSuite with SQLHelper {
"5010-11-17").foreach { date =>
DateTimeTestUtils.outstandingTimezonesIds.foreach { timeZone =>
withSQLConf(SQLConf.SESSION_LOCAL_TIMEZONE.key -> timeZone) {
val formatter = DateFormatter("yyyy-MM-dd", Locale.US)
val formatter = DateFormatter()
val days = formatter.parse(date)
val formatted = formatter.format(days)
assert(date === formatted)
Expand All @@ -82,7 +82,7 @@ class DateFormatterSuite extends SparkFunSuite with SQLHelper {
1110657).foreach { days =>
DateTimeTestUtils.outstandingTimezonesIds.foreach { timeZone =>
withSQLConf(SQLConf.SESSION_LOCAL_TIMEZONE.key -> timeZone) {
val formatter = DateFormatter("yyyy-MM-dd", Locale.US)
val formatter = DateFormatter()
val date = formatter.format(days)
val parsed = formatter.parse(date)
assert(days === parsed)
Expand All @@ -92,7 +92,7 @@ class DateFormatterSuite extends SparkFunSuite with SQLHelper {
}

test("parsing date without explicit day") {
val formatter = DateFormatter("yyyy MMM", Locale.US)
val formatter = DateFormatter("yyyy MMM")
val daysSinceEpoch = formatter.parse("2018 Dec")
assert(daysSinceEpoch === LocalDate.of(2018, 12, 1).toEpochDay)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,7 @@ class TimestampFormatterSuite extends SparkFunSuite with SQLHelper {
DateTimeTestUtils.outstandingTimezonesIds.foreach { timeZone =>
val formatter = TimestampFormatter(
"yyyy-MM-dd'T'HH:mm:ss.SSSSSS",
TimeZone.getTimeZone(timeZone),
Locale.US)
TimeZone.getTimeZone(timeZone))
val microsSinceEpoch = formatter.parse(localDate)
assert(microsSinceEpoch === expectedMicros(timeZone))
}
Expand All @@ -62,8 +61,7 @@ class TimestampFormatterSuite extends SparkFunSuite with SQLHelper {
DateTimeTestUtils.outstandingTimezonesIds.foreach { timeZone =>
val formatter = TimestampFormatter(
"yyyy-MM-dd'T'HH:mm:ss.SSSSSS",
TimeZone.getTimeZone(timeZone),
Locale.US)
TimeZone.getTimeZone(timeZone))
val timestamp = formatter.format(microsSinceEpoch)
assert(timestamp === expectedTimestamp(timeZone))
}
Expand All @@ -81,7 +79,7 @@ class TimestampFormatterSuite extends SparkFunSuite with SQLHelper {
2177456523456789L,
11858049903010203L).foreach { micros =>
DateTimeTestUtils.outstandingTimezones.foreach { timeZone =>
val formatter = TimestampFormatter("yyyy-MM-dd'T'HH:mm:ss.SSSSSS", timeZone, Locale.US)
val formatter = TimestampFormatter("yyyy-MM-dd'T'HH:mm:ss.SSSSSS", timeZone)
val timestamp = formatter.format(micros)
val parsed = formatter.parse(timestamp)
assert(micros === parsed)
Expand All @@ -101,7 +99,7 @@ class TimestampFormatterSuite extends SparkFunSuite with SQLHelper {
"2039-01-01T01:02:03.456789",
"2345-10-07T22:45:03.010203").foreach { timestamp =>
DateTimeTestUtils.outstandingTimezones.foreach { timeZone =>
val formatter = TimestampFormatter("yyyy-MM-dd'T'HH:mm:ss.SSSSSS", timeZone, Locale.US)
val formatter = TimestampFormatter("yyyy-MM-dd'T'HH:mm:ss.SSSSSS", timeZone)
val micros = formatter.parse(timestamp)
val formatted = formatter.format(micros)
assert(timestamp === formatted)
Expand All @@ -112,8 +110,7 @@ class TimestampFormatterSuite extends SparkFunSuite with SQLHelper {
test(" case insensitive parsing of am and pm") {
val formatter = TimestampFormatter(
"yyyy MMM dd hh:mm:ss a",
TimeZone.getTimeZone("UTC"),
Locale.US)
TimeZone.getTimeZone("UTC"))
val micros = formatter.parse("2009 Mar 20 11:30:01 am")
assert(micros === TimeUnit.SECONDS.toMicros(
LocalDateTime.of(2009, 3, 20, 11, 30, 1).toEpochSecond(ZoneOffset.UTC)))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,7 @@ import org.apache.spark.sql.catalyst.{InternalRow, QueryPlanningTracker}
import org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, ReturnAnswer}
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.catalyst.util.truncatedString
import org.apache.spark.sql.catalyst.util.{truncatedString, DateFormatter, DateTimeUtils, TimestampFormatter}
import org.apache.spark.sql.execution.command.{DescribeTableCommand, ExecutedCommandExec, ShowTablesCommand}
import org.apache.spark.sql.execution.exchange.{EnsureRequirements, ReuseExchange}
import org.apache.spark.sql.internal.SQLConf
Expand Down Expand Up @@ -111,6 +110,9 @@ class QueryExecution(
protected def stringOrError[A](f: => A): String =
try f.toString catch { case e: AnalysisException => e.toString }

private val dateFormatter = DateFormatter()
Copy link
Contributor

@hvanhovell hvanhovell Dec 28, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should probably get rid of the hiveResultString method for 3.0. It does not make much sense to keep it in there.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should I create a separate JIRA for that?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes please. We should just move that into the a test class.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

private val timestampFormatter = TimestampFormatter(
DateTimeUtils.getTimeZone(sparkSession.sessionState.conf.sessionLocalTimeZone))

/**
* Returns the result as a hive compatible sequence of strings. This is used in tests and
Expand Down Expand Up @@ -184,11 +186,9 @@ class QueryExecution(
toHiveStructString((key, kType)) + ":" + toHiveStructString((value, vType))
}.toSeq.sorted.mkString("{", ",", "}")
case (null, _) => "NULL"
case (d: Date, DateType) =>
DateTimeUtils.dateToString(DateTimeUtils.fromJavaDate(d))
case (d: Date, DateType) => dateFormatter.format(DateTimeUtils.fromJavaDate(d))
case (t: Timestamp, TimestampType) =>
DateTimeUtils.timestampToString(DateTimeUtils.fromJavaTimestamp(t),
DateTimeUtils.getTimeZone(sparkSession.sessionState.conf.sessionLocalTimeZone))
DateTimeUtils.timestampToString(timestampFormatter, DateTimeUtils.fromJavaTimestamp(t))
case (bin: Array[Byte], BinaryType) => new String(bin, StandardCharsets.UTF_8)
case (decimal: java.math.BigDecimal, DecimalType()) => formatDecimal(decimal)
case (interval, CalendarIntervalType) => interval.toString
Expand Down
Loading