Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions docs/sql-data-sources-json.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@ license: |
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
Expand Down Expand Up @@ -196,6 +196,12 @@ Data source options of JSON can be set via:
<td>Sets the string that indicates a timestamp format. Custom date formats follow the formats at <a href="https://spark.apache.org/docs/latest/sql-ref-datetime-pattern.html"> datetime pattern</a>. This applies to timestamp type.</td>
<td>read/write</td>
</tr>
<tr>
<td><code>timestampNTZFormat</code></td>
<td>yyyy-MM-dd'T'HH:mm:ss[.SSS]</td>
<td>Sets the string that indicates a timestamp without timezone format. Custom date formats follow the formats at <a href="https://spark.apache.org/docs/latest/sql-ref-datetime-pattern.html">Datetime Patterns</a>. This applies to timestamp without timezone type, note that zone-offset and time-zone components are not supported when writing or reading this data type.</td>
<td>read/write</td>
</tr>
<tr>
<td><code>multiLine</code></td>
<td><code>false</code></td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,10 @@ private[sql] class JSONOptions(
s"${DateFormatter.defaultPattern}'T'HH:mm:ss[.SSS][XXX]"
})

val timestampNTZFormatInRead: Option[String] = parameters.get("timestampNTZFormat")
val timestampNTZFormatInWrite: String =
parameters.getOrElse("timestampNTZFormat", s"${DateFormatter.defaultPattern}'T'HH:mm:ss[.SSS]")

val multiLine = parameters.get("multiLine").map(_.toBoolean).getOrElse(false)

/**
Expand Down Expand Up @@ -138,8 +142,9 @@ private[sql] class JSONOptions(
val pretty: Boolean = parameters.get("pretty").map(_.toBoolean).getOrElse(false)

/**
* Enables inferring of TimestampType from strings matched to the timestamp pattern
* defined by the timestampFormat option.
* Enables inferring of TimestampType and TimestampNTZType from strings matched to the
* corresponding timestamp pattern defined by the timestampFormat and timestampNTZFormat options
* respectively.
*/
val inferTimestamp: Boolean = parameters.get("inferTimestamp").map(_.toBoolean).getOrElse(false)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ private[sql] class JacksonGenerator(
legacyFormat = FAST_DATE_FORMAT,
isParsing = false)
private val timestampNTZFormatter = TimestampFormatter(
options.timestampFormatInWrite,
options.timestampNTZFormatInWrite,
options.zoneId,
legacyFormat = FAST_DATE_FORMAT,
isParsing = false,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ class JacksonParser(
legacyFormat = FAST_DATE_FORMAT,
isParsing = true)
private lazy val timestampNTZFormatter = TimestampFormatter(
options.timestampFormatInRead,
options.timestampNTZFormatInRead,
options.zoneId,
legacyFormat = FAST_DATE_FORMAT,
isParsing = true,
Expand Down Expand Up @@ -262,7 +262,7 @@ class JacksonParser(
case TimestampNTZType =>
(parser: JsonParser) => parseJsonToken[java.lang.Long](parser, dataType) {
case VALUE_STRING if parser.getTextLength >= 1 =>
timestampNTZFormatter.parseWithoutTimeZone(parser.getText)
timestampNTZFormatter.parseWithoutTimeZone(parser.getText, false)
}

case DateType =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,12 @@ private[sql] class JsonInferSchema(options: JSONOptions) extends Serializable {
options.locale,
legacyFormat = FAST_DATE_FORMAT,
isParsing = true)
private val timestampNTZFormatter = TimestampFormatter(
options.timestampNTZFormatInRead,
options.zoneId,
legacyFormat = FAST_DATE_FORMAT,
isParsing = true,
forTimestampNTZ = true)

private def handleJsonErrorsByParseMode(parseMode: ParseMode,
columnNameOfCorruptRecord: String, e: Throwable): Option[StructType] = {
Expand Down Expand Up @@ -144,6 +150,9 @@ private[sql] class JsonInferSchema(options: JSONOptions) extends Serializable {
}
if (options.prefersDecimal && decimalTry.isDefined) {
decimalTry.get
} else if (options.inferTimestamp &&
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you adjust the comment for inferTimestamp and mention the ntz type:

* Enables inferring of TimestampType from strings matched to the timestamp pattern

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, sure. I will update, thanks for pointing it out!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated! Could you check the latest PR version? Thanks.

(allCatch opt timestampNTZFormatter.parseWithoutTimeZone(field, false)).isDefined) {
SQLConf.get.timestampType
} else if (options.inferTimestamp &&
(allCatch opt timestampFormatter.parse(field)).isDefined) {
TimestampType
Expand Down Expand Up @@ -393,6 +402,9 @@ object JsonInferSchema {
case (t1: DecimalType, t2: IntegralType) =>
compatibleType(t1, DecimalType.forType(t2))

case (TimestampNTZType, TimestampType) | (TimestampType, TimestampNTZType) =>
TimestampType

// strings and every string is a Json object.
case (_, _) => StringType
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2746,6 +2746,188 @@ abstract class JsonSuite
}
}

test("SPARK-37360: Write and infer TIMESTAMP_NTZ values with a non-default pattern") {
withTempPath { path =>
val exp = spark.sql("""
select timestamp_ntz'2020-12-12 12:12:12' as col0 union all
select timestamp_ntz'2020-12-12 12:12:12.123456' as col0
""")
exp.write
.option("timestampNTZFormat", "yyyy-MM-dd HH:mm:ss.SSSSSS")
.json(path.getAbsolutePath)

withSQLConf(SQLConf.TIMESTAMP_TYPE.key -> SQLConf.TimestampTypes.TIMESTAMP_NTZ.toString) {
val res = spark.read
.option("timestampNTZFormat", "yyyy-MM-dd HH:mm:ss.SSSSSS")
.option("inferTimestamp", "true")
.json(path.getAbsolutePath)

assert(res.dtypes === exp.dtypes)
checkAnswer(res, exp)
}
}
}

test("SPARK-37360: Write and infer TIMESTAMP_LTZ values with a non-default pattern") {
withTempPath { path =>
val exp = spark.sql("""
select timestamp_ltz'2020-12-12 12:12:12' as col0 union all
select timestamp_ltz'2020-12-12 12:12:12.123456' as col0
""")
exp.write
.option("timestampFormat", "yyyy-MM-dd HH:mm:ss.SSSSSS")
.json(path.getAbsolutePath)

withSQLConf(SQLConf.TIMESTAMP_TYPE.key -> SQLConf.TimestampTypes.TIMESTAMP_LTZ.toString) {
val res = spark.read
.option("timestampFormat", "yyyy-MM-dd HH:mm:ss.SSSSSS")
.option("inferTimestamp", "true")
.json(path.getAbsolutePath)

assert(res.dtypes === exp.dtypes)
checkAnswer(res, exp)
}
}
}

test("SPARK-37360: Roundtrip in reading and writing TIMESTAMP_NTZ values with custom schema") {
withTempPath { path =>
val exp = spark.sql("""
select
timestamp_ntz'2020-12-12 12:12:12' as col1,
timestamp_ltz'2020-12-12 12:12:12' as col2
""")

exp.write.json(path.getAbsolutePath)

val res = spark.read
.schema("col1 TIMESTAMP_NTZ, col2 TIMESTAMP_LTZ")
.json(path.getAbsolutePath)

checkAnswer(res, exp)
}
}

test("SPARK-37360: Timestamp type inference for a column with TIMESTAMP_NTZ values") {
withTempPath { path =>
val exp = spark.sql("""
select timestamp_ntz'2020-12-12 12:12:12' as col0 union all
select timestamp_ntz'2020-12-12 12:12:12' as col0
""")

exp.write.json(path.getAbsolutePath)

val timestampTypes = Seq(
SQLConf.TimestampTypes.TIMESTAMP_NTZ.toString,
SQLConf.TimestampTypes.TIMESTAMP_LTZ.toString)

for (timestampType <- timestampTypes) {
withSQLConf(SQLConf.TIMESTAMP_TYPE.key -> timestampType) {
val res = spark.read.option("inferTimestamp", "true").json(path.getAbsolutePath)

if (timestampType == SQLConf.TimestampTypes.TIMESTAMP_NTZ.toString) {
checkAnswer(res, exp)
} else {
checkAnswer(
res,
spark.sql("""
select timestamp_ltz'2020-12-12 12:12:12' as col0 union all
select timestamp_ltz'2020-12-12 12:12:12' as col0
""")
)
}
}
}
}
}

test("SPARK-37360: Timestamp type inference for a mix of TIMESTAMP_NTZ and TIMESTAMP_LTZ") {
withTempPath { path =>
Seq(
"""{"col0":"2020-12-12T12:12:12.000"}""",
"""{"col0":"2020-12-12T17:12:12.000Z"}""",
"""{"col0":"2020-12-12T17:12:12.000+05:00"}""",
"""{"col0":"2020-12-12T12:12:12.000"}"""
).toDF("data")
.coalesce(1)
.write.text(path.getAbsolutePath)

for (policy <- Seq("exception", "corrected", "legacy")) {
withSQLConf(SQLConf.LEGACY_TIME_PARSER_POLICY.key -> policy) {
val res = spark.read.option("inferTimestamp", "true").json(path.getAbsolutePath)

// NOTE:
// Every value is tested for all types in JSON schema inference so the sequence of
// ["timestamp_ntz", "timestamp_ltz", "timestamp_ntz"] results in "timestamp_ltz".
// This is different from CSV where inference starts from the last inferred type.
//
// This is why the similar test in CSV has a different result in "legacy" mode.

val exp = spark.sql("""
select timestamp_ltz'2020-12-12T12:12:12.000' as col0 union all
select timestamp_ltz'2020-12-12T17:12:12.000Z' as col0 union all
select timestamp_ltz'2020-12-12T17:12:12.000+05:00' as col0 union all
select timestamp_ltz'2020-12-12T12:12:12.000' as col0
""")
checkAnswer(res, exp)
}
}
}
}

test("SPARK-37360: Malformed records when reading TIMESTAMP_LTZ as TIMESTAMP_NTZ") {
withTempPath { path =>
Seq(
"""{"col0": "2020-12-12T12:12:12.000"}""",
"""{"col0": "2020-12-12T12:12:12.000Z"}""",
"""{"col0": "2020-12-12T12:12:12.000+05:00"}""",
"""{"col0": "2020-12-12T12:12:12.000"}"""
).toDF("data")
.coalesce(1)
.write.text(path.getAbsolutePath)

for (timestampNTZFormat <- Seq(None, Some("yyyy-MM-dd'T'HH:mm:ss[.SSS]"))) {
val reader = spark.read.schema("col0 TIMESTAMP_NTZ")
val res = timestampNTZFormat match {
case Some(format) =>
reader.option("timestampNTZFormat", format).json(path.getAbsolutePath)
case None =>
reader.json(path.getAbsolutePath)
}

checkAnswer(
res,
Seq(
Row(LocalDateTime.of(2020, 12, 12, 12, 12, 12)),
Row(null),
Row(null),
Row(LocalDateTime.of(2020, 12, 12, 12, 12, 12))
)
)
}
}
}

test("SPARK-37360: Fail to write TIMESTAMP_NTZ if timestampNTZFormat contains zone offset") {
val patterns = Seq(
"yyyy-MM-dd HH:mm:ss XXX",
"yyyy-MM-dd HH:mm:ss Z",
"yyyy-MM-dd HH:mm:ss z")

val exp = spark.sql("select timestamp_ntz'2020-12-12 12:12:12' as col0")
for (pattern <- patterns) {
withTempPath { path =>
val err = intercept[SparkException] {
exp.write.option("timestampNTZFormat", pattern).json(path.getAbsolutePath)
}
assert(
err.getCause.getMessage.contains("Unsupported field: OffsetSeconds") ||
err.getCause.getMessage.contains("Unable to extract value") ||
err.getCause.getMessage.contains("Unable to extract ZoneId"))
}
}
}

test("filters push down") {
withTempPath { path =>
val t = "2019-12-17 00:01:02"
Expand Down Expand Up @@ -2996,10 +3178,6 @@ abstract class JsonSuite
}

test("SPARK-36536: use casting when datetime pattern is not set") {
def isLegacy: Boolean = {
spark.conf.get(SQLConf.LEGACY_TIME_PARSER_POLICY).toUpperCase(Locale.ROOT) ==
SQLConf.LegacyBehaviorPolicy.LEGACY.toString
}
withSQLConf(
SQLConf.DATETIME_JAVA8API_ENABLED.key -> "true",
SQLConf.SESSION_LOCAL_TIMEZONE.key -> DateTimeTestUtils.UTC.getId) {
Expand All @@ -3017,13 +3195,13 @@ abstract class JsonSuite
readback,
Seq(
Row(LocalDate.of(2021, 1, 1), Instant.parse("2021-01-01T00:00:00Z"),
if (isLegacy) null else LocalDateTime.of(2021, 1, 1, 0, 0, 0)),
LocalDateTime.of(2021, 1, 1, 0, 0, 0)),
Row(LocalDate.of(2021, 1, 1), Instant.parse("2021-01-01T00:00:00Z"),
if (isLegacy) null else LocalDateTime.of(2021, 1, 1, 0, 0, 0)),
LocalDateTime.of(2021, 1, 1, 0, 0, 0)),
Row(LocalDate.of(2021, 2, 1), Instant.parse("2021-03-02T00:00:00Z"),
if (isLegacy) null else LocalDateTime.of(2021, 10, 1, 0, 0, 0)),
LocalDateTime.of(2021, 10, 1, 0, 0, 0)),
Row(LocalDate.of(2021, 8, 18), Instant.parse("2021-08-18T21:44:30Z"),
if (isLegacy) null else LocalDateTime.of(2021, 8, 18, 21, 44, 30, 123000000))))
LocalDateTime.of(2021, 8, 18, 21, 44, 30, 123000000))))
}
}
}
Expand Down