Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ class CSVInferSchema(val options: CSVOptions) extends Serializable {
// We can only parse the value as TimestampNTZType if it does not have zone-offset or
// time-zone component and can be parsed with the timestamp formatter.
// Otherwise, it is likely to be a timestamp with timezone.
if ((allCatch opt timestampNTZFormatter.parseWithoutTimeZone(field, true)).isDefined) {
if ((allCatch opt timestampNTZFormatter.parseWithoutTimeZone(field, false)).isDefined) {
SQLConf.get.timestampType
} else {
tryParseTimestamp(field)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ class UnivocityParser(

case _: TimestampNTZType => (d: String) =>
nullSafeDatum(d, name, nullable, options) { datum =>
timestampNTZFormatter.parseWithoutTimeZone(datum, true)
timestampNTZFormatter.parseWithoutTimeZone(datum, false)
}

case _: DateType => (d: String) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -445,19 +445,19 @@ object DateTimeUtils {
* number of microseconds since the epoch. The result will be independent of time zones.
*
* If the input string contains a component associated with time zone, the method will return
* `None` if `failOnError` is set to `true`. If `failOnError` is set to `false`, the method
* `None` if `allowTimeZone` is set to `false`. If `allowTimeZone` is set to `true`, the method
* will simply discard the time zone component. Enable the check to detect situations like parsing
* a timestamp with time zone as TimestampNTZType.
*
* The return type is [[Option]] in order to distinguish between 0L and null. Please
* refer to `parseTimestampString` for the allowed formats.
*/
def stringToTimestampWithoutTimeZone(s: UTF8String, failOnError: Boolean): Option[Long] = {
def stringToTimestampWithoutTimeZone(s: UTF8String, allowTimeZone: Boolean): Option[Long] = {
try {
val (segments, zoneIdOpt, justTime) = parseTimestampString(s)
// If the input string can't be parsed as a timestamp without time zone, or it contains only
// the time part of a timestamp and we can't determine its date, return None.
if (segments.isEmpty || justTime || failOnError && zoneIdOpt.isDefined) {
if (segments.isEmpty || justTime || !allowTimeZone && zoneIdOpt.isDefined) {
return None
}
val nanoseconds = MICROSECONDS.toNanos(segments(6))
Expand All @@ -473,16 +473,16 @@ object DateTimeUtils {
/**
* Trims and parses a given UTF8 string to a corresponding [[Long]] value which representing the
* number of microseconds since the epoch. The result is independent of time zones. Zone id
* component will be discarded and ignored.
* component will be ignored.
* The return type is [[Option]] in order to distinguish between 0L and null. Please
* refer to `parseTimestampString` for the allowed formats.
*/
def stringToTimestampWithoutTimeZone(s: UTF8String): Option[Long] = {
stringToTimestampWithoutTimeZone(s, false)
stringToTimestampWithoutTimeZone(s, true)
}

def stringToTimestampWithoutTimeZoneAnsi(s: UTF8String): Long = {
stringToTimestampWithoutTimeZone(s, false).getOrElse {
stringToTimestampWithoutTimeZone(s, true).getOrElse {
throw QueryExecutionErrors.cannotCastToDateTimeError(s, TimestampNTZType)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ sealed trait TimestampFormatter extends Serializable {
* Parses a timestamp in a string and converts it to microseconds since Unix Epoch in local time.
*
* @param s - string with timestamp to parse
* @param failOnError - indicates strict parsing of timezone
* @param allowTimeZone - indicates strict parsing of timezone
* @return microseconds since epoch.
* @throws ParseException can be thrown by legacy parser
* @throws DateTimeParseException can be thrown by new parser
Expand All @@ -68,9 +68,9 @@ sealed trait TimestampFormatter extends Serializable {
@throws(classOf[DateTimeParseException])
@throws(classOf[DateTimeException])
@throws(classOf[IllegalStateException])
def parseWithoutTimeZone(s: String, failOnError: Boolean): Long =
def parseWithoutTimeZone(s: String, allowTimeZone: Boolean): Long =
throw new IllegalStateException(
s"The method `parseWithoutTimeZone(s: String, failOnError: Boolean)` should be " +
s"The method `parseWithoutTimeZone(s: String, allowTimeZone: Boolean)` should be " +
"implemented in the formatter of timestamp without time zone")

/**
Expand All @@ -84,7 +84,7 @@ sealed trait TimestampFormatter extends Serializable {
final def parseWithoutTimeZone(s: String): Long =
// This is implemented to adhere to the original behaviour of `parseWithoutTimeZone` where we
// did not fail if timestamp contained zone-id or zone-offset component and instead ignored it.
parseWithoutTimeZone(s, false)
parseWithoutTimeZone(s, true)

def format(us: Long): String
def format(ts: Timestamp): String
Expand Down Expand Up @@ -133,10 +133,10 @@ class Iso8601TimestampFormatter(
} catch checkParsedDiff(s, legacyFormatter.parse)
}

override def parseWithoutTimeZone(s: String, failOnError: Boolean): Long = {
override def parseWithoutTimeZone(s: String, allowTimeZone: Boolean): Long = {
try {
val parsed = formatter.parse(s)
if (failOnError && parsed.query(TemporalQueries.zone()) != null) {
if (!allowTimeZone && parsed.query(TemporalQueries.zone()) != null) {
throw QueryExecutionErrors.cannotParseStringAsDataTypeError(pattern, s, TimestampNTZType)
}
val localDate = toLocalDate(parsed)
Expand Down Expand Up @@ -204,10 +204,10 @@ class DefaultTimestampFormatter(
} catch checkParsedDiff(s, legacyFormatter.parse)
}

override def parseWithoutTimeZone(s: String, failOnError: Boolean): Long = {
override def parseWithoutTimeZone(s: String, allowTimeZone: Boolean): Long = {
try {
val utf8Value = UTF8String.fromString(s)
DateTimeUtils.stringToTimestampWithoutTimeZone(utf8Value, failOnError).getOrElse {
DateTimeUtils.stringToTimestampWithoutTimeZone(utf8Value, allowTimeZone).getOrElse {
throw QueryExecutionErrors.cannotParseStringAsDataTypeError(
TimestampFormatter.defaultPattern(), s, TimestampNTZType)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -357,15 +357,15 @@ class DateTimeUtilsSuite extends SparkFunSuite with Matchers with SQLHelper {
checkStringToTimestamp("2021-01-01T12:30:4294967297+4294967297:30", None)
}

test("SPARK-37326: stringToTimestampWithoutTimeZone with failOnError") {
test("SPARK-37326: stringToTimestampWithoutTimeZone with allowTimeZone") {
assert(
stringToTimestampWithoutTimeZone(
UTF8String.fromString("2021-11-22 10:54:27 +08:00"), false) ==
UTF8String.fromString("2021-11-22 10:54:27 +08:00"), true) ==
Some(DateTimeUtils.localDateTimeToMicros(LocalDateTime.of(2021, 11, 22, 10, 54, 27))))

assert(
stringToTimestampWithoutTimeZone(
UTF8String.fromString("2021-11-22 10:54:27 +08:00"), true) ==
UTF8String.fromString("2021-11-22 10:54:27 +08:00"), false) ==
None)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1012,49 +1012,45 @@ abstract class CSVSuite
}
}

test("SPARK-37326: Use different pattern to write and infer TIMESTAMP_NTZ values") {
withTempDir { dir =>
val path = s"${dir.getCanonicalPath}/csv"

test("SPARK-37326: Write and infer TIMESTAMP_NTZ values with a non-default pattern") {
withTempPath { path =>
val exp = spark.sql("select timestamp_ntz'2020-12-12 12:12:12' as col0")
exp.write
.format("csv")
.option("header", "true")
.option("timestampNTZFormat", "yyyy-MM-dd HH:mm:ss.SSSSSS")
.save(path)
.save(path.getAbsolutePath)

withSQLConf(SQLConf.TIMESTAMP_TYPE.key -> SQLConf.TimestampTypes.TIMESTAMP_NTZ.toString) {
val res = spark.read
.format("csv")
.option("inferSchema", "true")
.option("header", "true")
.option("timestampNTZFormat", "yyyy-MM-dd HH:mm:ss.SSSSSS")
.load(path)
.load(path.getAbsolutePath)

assert(res.dtypes === exp.dtypes)
checkAnswer(res, exp)
}
}
}

test("SPARK-37326: Use different pattern to write and infer TIMESTAMP_LTZ values") {
withTempDir { dir =>
val path = s"${dir.getCanonicalPath}/csv"

test("SPARK-37326: Write and infer TIMESTAMP_LTZ values with a non-default pattern") {
withTempPath { path =>
val exp = spark.sql("select timestamp_ltz'2020-12-12 12:12:12' as col0")
exp.write
.format("csv")
.option("header", "true")
.option("timestampFormat", "yyyy-MM-dd HH:mm:ss.SSSSSS")
.save(path)
.save(path.getAbsolutePath)

withSQLConf(SQLConf.TIMESTAMP_TYPE.key -> SQLConf.TimestampTypes.TIMESTAMP_LTZ.toString) {
val res = spark.read
.format("csv")
.option("inferSchema", "true")
.option("header", "true")
.option("timestampFormat", "yyyy-MM-dd HH:mm:ss.SSSSSS")
.load(path)
.load(path.getAbsolutePath)

assert(res.dtypes === exp.dtypes)
checkAnswer(res, exp)
Expand All @@ -1063,37 +1059,33 @@ abstract class CSVSuite
}

test("SPARK-37326: Roundtrip in reading and writing TIMESTAMP_NTZ values with custom schema") {
withTempDir { dir =>
val path = s"${dir.getCanonicalPath}/csv"

withTempPath { path =>
val exp = spark.sql("""
select
timestamp_ntz'2020-12-12 12:12:12' as col1,
timestamp_ltz'2020-12-12 12:12:12' as col2
""")

exp.write.format("csv").option("header", "true").save(path)
exp.write.format("csv").option("header", "true").save(path.getAbsolutePath)

val res = spark.read
.format("csv")
.schema("col1 TIMESTAMP_NTZ, col2 TIMESTAMP_LTZ")
.option("header", "true")
.load(path)
.load(path.getAbsolutePath)

checkAnswer(res, exp)
}
}

test("SPARK-37326: Timestamp type inference for a column with TIMESTAMP_NTZ values") {
withTempDir { dir =>
val path = s"${dir.getCanonicalPath}/csv"

withTempPath { path =>
val exp = spark.sql("""
select timestamp_ntz'2020-12-12 12:12:12' as col0 union all
select timestamp_ntz'2020-12-12 12:12:12' as col0
""")

exp.write.format("csv").option("header", "true").save(path)
exp.write.format("csv").option("header", "true").save(path.getAbsolutePath)

val timestampTypes = Seq(
SQLConf.TimestampTypes.TIMESTAMP_NTZ.toString,
Expand All @@ -1105,7 +1097,7 @@ abstract class CSVSuite
.format("csv")
.option("inferSchema", "true")
.option("header", "true")
.load(path)
.load(path.getAbsolutePath)

if (timestampType == SQLConf.TimestampTypes.TIMESTAMP_NTZ.toString) {
checkAnswer(res, exp)
Expand All @@ -1124,9 +1116,7 @@ abstract class CSVSuite
}

test("SPARK-37326: Timestamp type inference for a mix of TIMESTAMP_NTZ and TIMESTAMP_LTZ") {
withTempDir { dir =>
val path = s"${dir.getCanonicalPath}/csv"

withTempPath { path =>
Seq(
"col0",
"2020-12-12T12:12:12.000",
Expand All @@ -1135,19 +1125,19 @@ abstract class CSVSuite
"2020-12-12T12:12:12.000"
).toDF("data")
.coalesce(1)
.write.text(path)
.write.text(path.getAbsolutePath)

for (policy <- Seq("exception", "corrected", "legacy")) {
withSQLConf(SQLConf.LEGACY_TIME_PARSER_POLICY.key -> policy) {
val res = spark.read.format("csv")
.option("inferSchema", "true")
.option("header", "true")
.load(path)
.load(path.getAbsolutePath)

if (policy == "legacy") {
// Timestamps without timezone are parsed as strings, so the col0 type would be
// StringType which is similar to reading without schema inference.
val exp = spark.read.format("csv").option("header", "true").load(path)
val exp = spark.read.format("csv").option("header", "true").load(path.getAbsolutePath)
checkAnswer(res, exp)
} else {
val exp = spark.sql("""
Expand All @@ -1164,23 +1154,23 @@ abstract class CSVSuite
}

test("SPARK-37326: Malformed records when reading TIMESTAMP_LTZ as TIMESTAMP_NTZ") {
withTempDir { dir =>
val path = s"${dir.getCanonicalPath}/csv"

withTempPath { path =>
Seq(
"2020-12-12T12:12:12.000",
"2020-12-12T17:12:12.000Z",
"2020-12-12T17:12:12.000+05:00",
"2020-12-12T12:12:12.000"
).toDF("data")
.coalesce(1)
.write.text(path)
.write.text(path.getAbsolutePath)

for (timestampNTZFormat <- Seq(None, Some("yyyy-MM-dd'T'HH:mm:ss[.SSS]"))) {
val reader = spark.read.format("csv").schema("col0 TIMESTAMP_NTZ")
val res = timestampNTZFormat match {
case Some(format) => reader.option("timestampNTZFormat", format).load(path)
case None => reader.load(path)
case Some(format) =>
reader.option("timestampNTZFormat", format).load(path.getAbsolutePath)
case None =>
reader.load(path.getAbsolutePath)
}

checkAnswer(
Expand All @@ -1204,10 +1194,9 @@ abstract class CSVSuite

val exp = spark.sql("select timestamp_ntz'2020-12-12 12:12:12' as col0")
for (pattern <- patterns) {
withTempDir { dir =>
val path = s"${dir.getCanonicalPath}/csv"
withTempPath { path =>
val err = intercept[SparkException] {
exp.write.format("csv").option("timestampNTZFormat", pattern).save(path)
exp.write.format("csv").option("timestampNTZFormat", pattern).save(path.getAbsolutePath)
}
assert(
err.getCause.getMessage.contains("Unsupported field: OffsetSeconds") ||
Expand Down