Skip to content
Closed
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.util._
import org.apache.spark.sql.catalyst.util.DateTimeUtils.fastParseToMicros
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.UTF8String
import org.apache.spark.util.Utils
Expand Down Expand Up @@ -216,7 +217,7 @@ class JacksonParser(
// This one will lose microseconds parts.
// See https://issues.apache.org/jira/browse/SPARK-10681.
Long.box {
Try(options.timestampFormat.parse(stringValue).getTime * 1000L)
Try(fastParseToMicros(options.timestampFormat, stringValue, options.timeZone))
.getOrElse {
// If it fails to parse, then tries the way used in 2.0 and 1.x for backwards
// compatibility.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,16 @@ package org.apache.spark.sql.catalyst.util
import java.sql.{Date, Timestamp}
import java.text.{DateFormat, SimpleDateFormat}
import java.time.Instant
import java.util.{Calendar, Locale, TimeZone}
import java.util.{Calendar, GregorianCalendar, Locale, TimeZone}
import java.util.concurrent.ConcurrentHashMap
import java.util.function.{Function => JFunction}
import javax.xml.bind.DatatypeConverter

import scala.annotation.tailrec

import org.apache.commons.lang3.time.FastDateFormat

import org.apache.spark.sql.types.Decimal
import org.apache.spark.unsafe.types.UTF8String

/**
Expand Down Expand Up @@ -1164,4 +1167,23 @@ object DateTimeUtils {
threadLocalTimestampFormat.remove()
threadLocalDateFormat.remove()
}

class MicrosCalendar(tz: TimeZone) extends GregorianCalendar(tz, Locale.US) {
def getMicros(digitsInFraction: Int): SQLTimestamp = {
val d = fields(Calendar.MILLISECOND) * MICROS_PER_SECOND
d / Decimal.POW_10(digitsInFraction)
}
}

def fastParseToMicros(parser: FastDateFormat, s: String, tz: TimeZone): SQLTimestamp = {
val pos = new java.text.ParsePosition(0)
val cal = new MicrosCalendar(tz)
cal.clear()
if (!parser.parse(s, pos, cal)) {
throw new IllegalArgumentException(s)
}
val micros = cal.getMicros(parser.getPattern.count(_ == 'S'))
cal.set(Calendar.MILLISECOND, 0)
cal.getTimeInMillis * MICROS_PER_MILLIS + micros
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -432,7 +432,7 @@ object Decimal {
/** Maximum number of decimal digits a Long can represent */
val MAX_LONG_DIGITS = 18

private val POW_10 = Array.tabulate[Long](MAX_LONG_DIGITS + 1)(i => math.pow(10, i).toLong)
val POW_10 = Array.tabulate[Long](MAX_LONG_DIGITS + 1)(i => math.pow(10, i).toLong)

private val BIG_DEC_ZERO = BigDecimal(0)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import java.sql.{Date, Timestamp}
import java.text.SimpleDateFormat
import java.util.{Calendar, Locale, TimeZone}

import org.apache.commons.lang3.time.FastDateFormat

import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.catalyst.util.DateTimeUtils._
import org.apache.spark.unsafe.types.UTF8String
Expand Down Expand Up @@ -692,4 +694,27 @@ class DateTimeUtilsSuite extends SparkFunSuite {
}
}
}

test("fast parse to micros") {
val timeZone = TimeZoneUTC
def check(pattern: String, input: String, reference: String): Unit = {
val parser = FastDateFormat.getInstance(pattern, timeZone, Locale.US)
val expected = DateTimeUtils.stringToTimestamp(
UTF8String.fromString(reference), timeZone).get
val actual = fastParseToMicros(parser, input, timeZone)
assert(actual === expected)
}
check("yyyy-MM-dd'T'HH:mm:ss.SSSSSSSXXX",
"2019-10-14T09:39:07.3220000Z", "2019-10-14T09:39:07.322Z")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this behavior the same with master branch?

check("yyyy-MM-dd'T'HH:mm:ss.SSSSSSXXX",
"2019-10-14T09:39:07.322000Z", "2019-10-14T09:39:07.322Z")
check("yyyy-MM-dd'T'HH:mm:ss.SSSX",
"2019-10-14T09:39:07.322Z", "2019-10-14T09:39:07.322Z")
check("yyyy-MM-dd'T'HH:mm:ss.SSSSSSX",
"2019-10-14T09:39:07.123456Z", "2019-10-14T09:39:07.123456Z")
check("yyyy-MM-dd'T'HH:mm:ss.SSSSSSX",
"2019-10-14T09:39:07.000010Z", "2019-10-14T09:39:07.000010Z")
check("yyyy-MM-dd'T'HH:mm:ss.SX",
"2019-10-14T09:39:07.1Z", "2019-10-14T09:39:07.1Z")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -518,4 +518,14 @@ class JsonFunctionsSuite extends QueryTest with SharedSQLContext {
jsonDF.select(to_json(from_json($"a", schema))),
Seq(Row(json)))
}

test("from_json - timestamp in micros") {
val df = Seq("""{"time": "1970-01-01T00:00:00.123456"}""").toDS()
val schema = new StructType().add("time", TimestampType)
val options = Map("timestampFormat" -> "yyyy-MM-dd'T'HH:mm:ss.SSSSSS")

checkAnswer(
df.select(from_json($"value", schema, options)),
Row(Row(java.sql.Timestamp.valueOf("1970-01-01 00:00:00.123456"))))
}
}