Skip to content
Closed
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.util._
import org.apache.spark.sql.catalyst.util.DateTimeUtils.getDateTimeParser
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.UTF8String
import org.apache.spark.util.Utils
Expand All @@ -53,6 +54,9 @@ class JacksonParser(
private val factory = new JsonFactory()
options.setJacksonOptions(factory)

@transient private lazy val dateTimeParser =
getDateTimeParser(options.timestampFormat, options.timeZone)

/**
* Create a converter which converts the JSON documents held by the `JsonParser`
* to a value according to a desired schema. This is a wrapper for the method
Expand Down Expand Up @@ -213,15 +217,12 @@ class JacksonParser(
(parser: JsonParser) => parseJsonToken[java.lang.Long](parser, dataType) {
case VALUE_STRING =>
val stringValue = parser.getText
// This one will lose microseconds parts.
// See https://issues.apache.org/jira/browse/SPARK-10681.
Long.box {
Try(options.timestampFormat.parse(stringValue).getTime * 1000L)
.getOrElse {
// If it fails to parse, then tries the way used in 2.0 and 1.x for backwards
// compatibility.
DateTimeUtils.stringToTime(stringValue).getTime * 1000L
}
Try(dateTimeParser.parse(stringValue)).getOrElse {
// If it fails to parse, then tries the way used in 2.0 and 1.x for backwards
// compatibility.
DateTimeUtils.stringToTime(stringValue).getTime * 1000L
}
}

case VALUE_NUMBER_INT =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,18 @@
package org.apache.spark.sql.catalyst.util

import java.sql.{Date, Timestamp}
import java.text.{DateFormat, SimpleDateFormat}
import java.text.{DateFormat, ParsePosition, SimpleDateFormat}
import java.time.Instant
import java.util.{Calendar, Locale, TimeZone}
import java.util.{Calendar, GregorianCalendar, Locale, TimeZone}
import java.util.concurrent.ConcurrentHashMap
import java.util.function.{Function => JFunction}
import javax.xml.bind.DatatypeConverter

import scala.annotation.tailrec

import org.apache.commons.lang3.time.FastDateFormat

import org.apache.spark.sql.types.Decimal
import org.apache.spark.unsafe.types.UTF8String

/**
Expand Down Expand Up @@ -1164,4 +1167,27 @@ object DateTimeUtils {
threadLocalTimestampFormat.remove()
threadLocalDateFormat.remove()
}

class MicrosCalendar(tz: TimeZone) extends GregorianCalendar(tz, Locale.US) {
def getMicros(digitsInFraction: Int): SQLTimestamp = {
val d = fields(Calendar.MILLISECOND) * MICROS_PER_SECOND
d / Decimal.POW_10(digitsInFraction)
}
}

class DateTimeParser(format: FastDateFormat, digitsInFraction: Int, cal: MicrosCalendar) {
def parse(s: String): SQLTimestamp = {
cal.clear()
if (!format.parse(s, new ParsePosition(0), cal)) {
throw new IllegalArgumentException(s)
}
val micros = cal.getMicros(digitsInFraction)
cal.set(Calendar.MILLISECOND, 0)
cal.getTimeInMillis * MICROS_PER_MILLIS + micros
}
}

def getDateTimeParser(format: FastDateFormat, tz: TimeZone): DateTimeParser = {
new DateTimeParser(format, format.getPattern.count(_ == 'S'), new MicrosCalendar(tz))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -432,7 +432,7 @@ object Decimal {
/** Maximum number of decimal digits a Long can represent */
val MAX_LONG_DIGITS = 18

private val POW_10 = Array.tabulate[Long](MAX_LONG_DIGITS + 1)(i => math.pow(10, i).toLong)
val POW_10 = Array.tabulate[Long](MAX_LONG_DIGITS + 1)(i => math.pow(10, i).toLong)

private val BIG_DEC_ZERO = BigDecimal(0)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import java.sql.{Date, Timestamp}
import java.text.SimpleDateFormat
import java.util.{Calendar, Locale, TimeZone}

import org.apache.commons.lang3.time.FastDateFormat

import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.catalyst.util.DateTimeUtils._
import org.apache.spark.unsafe.types.UTF8String
Expand Down Expand Up @@ -692,4 +694,31 @@ class DateTimeUtilsSuite extends SparkFunSuite {
}
}
}

test("fast parse to micros") {
val timeZone = TimeZoneUTC
def check(pattern: String, input: String, reference: String): Unit = {
val parser = getDateTimeParser(
FastDateFormat.getInstance(pattern, timeZone, Locale.US),
timeZone)
val expected = DateTimeUtils.stringToTimestamp(
UTF8String.fromString(reference), timeZone).get
val actual = parser.parse(input)
assert(actual === expected)
}
check("yyyy-MM-dd'T'HH:mm:ss.SSSSSSSXXX",
"2019-10-14T09:39:07.3220000Z", "2019-10-14T09:39:07.322Z")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this behavior the same with master branch?

check("yyyy-MM-dd'T'HH:mm:ss.SSSSSSXXX",
"2019-10-14T09:39:07.322000Z", "2019-10-14T09:39:07.322Z")
check("yyyy-MM-dd'T'HH:mm:ss.SSSX",
"2019-10-14T09:39:07.322Z", "2019-10-14T09:39:07.322Z")
check("yyyy-MM-dd'T'HH:mm:ss.SSSSSSX",
"2019-10-14T09:39:07.123456Z", "2019-10-14T09:39:07.123456Z")
check("yyyy-MM-dd'T'HH:mm:ss.SSSSSSX",
"2019-10-14T09:39:07.000010Z", "2019-10-14T09:39:07.000010Z")
check("yyyy-MM-dd'T'HH:mm:ss.SX",
"2019-10-14T09:39:07.1Z", "2019-10-14T09:39:07.1Z")
check("yyyy-MM-dd'T'HH:mm:ss.SSX",
"2019-10-14T09:39:07.10Z", "2019-10-14T09:39:07.1Z")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we add some negative tests?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd like to see a test like xxx.123 with format .SS

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It just returns invalid result xxx+1.23. For example:
"2019-10-14T09:39:07.123Z" -> "2019-10-14T09:39:08.23Z". I can add such test but I don't know what it aims to validate.

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -518,4 +518,14 @@ class JsonFunctionsSuite extends QueryTest with SharedSQLContext {
jsonDF.select(to_json(from_json($"a", schema))),
Seq(Row(json)))
}

test("from_json - timestamp in micros") {
val df = Seq("""{"time": "1970-01-01T00:00:00.123456"}""").toDS()
val schema = new StructType().add("time", TimestampType)
val options = Map("timestampFormat" -> "yyyy-MM-dd'T'HH:mm:ss.SSSSSS")

checkAnswer(
df.select(from_json($"value", schema, options)),
Row(Row(java.sql.Timestamp.valueOf("1970-01-01 00:00:00.123456"))))
}
}