Skip to content
Closed
Show file tree
Hide file tree
Changes from 4 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
0394030
SPARK-40474 Infer columns with mixed date and timestamp as String in …
xiaonanyang-db Sep 19, 2022
f4fadf7
[SPARK-40474] remove unused imports
xiaonanyang-db Sep 19, 2022
813ac74
[SPARK-40474] Resolve test failures
xiaonanyang-db Sep 19, 2022
5c2dde8
[SPARK-40474] fix test failures
xiaonanyang-db Sep 20, 2022
0d2be1d
[SPARK-40474] handle edge cases
xiaonanyang-db Sep 20, 2022
df56946
[SPARK-40474] handle edge cases
xiaonanyang-db Sep 20, 2022
4bc480d
SPARK-40474 revert part of CSVOptions changes
xiaonanyang-db Sep 20, 2022
f6ed29f
SPARK-40474 revert part of CSVOptions changes
xiaonanyang-db Sep 20, 2022
93b6422
[SPARK-40474] fix test failures
xiaonanyang-db Sep 20, 2022
6942f2b
[SPARK-40474] handle columns with mixing dates and timestamps inferen…
xiaonanyang-db Sep 21, 2022
b4a6f1d
[SPARK-40474] remove unnecessary changes
xiaonanyang-db Sep 21, 2022
1502618
[SPARK-40474] small changes
xiaonanyang-db Sep 21, 2022
4767ae7
Merge remote-tracking branch 'origin' into SPARK-40474
xiaonanyang-db Sep 21, 2022
1f57098
[SPARK-40474] remove new line added by mistake
xiaonanyang-db Sep 21, 2022
e9150ec
[SPARK-40474] address comments
xiaonanyang-db Sep 21, 2022
a07e432
[SPARK-40474] small changes
xiaonanyang-db Sep 21, 2022
255aea3
[SPARK-40474] fix test failures
xiaonanyang-db Sep 21, 2022
533c487
[SPARK-40474] address review comments
xiaonanyang-db Sep 22, 2022
be4c86f
[SPARK-40474] fix test failures
xiaonanyang-db Sep 22, 2022
c7225b1
[SPARK-40474] update doc
xiaonanyang-db Sep 22, 2022
9e87d6e
[SPARK-40474] disable prefersDate when leagcyTimeParser is enabled
xiaonanyang-db Sep 22, 2022
af66b83
[SPARK-40474] fix test failures
xiaonanyang-db Sep 22, 2022
812fa65
[SPARK-40474] fix test failures
xiaonanyang-db Sep 22, 2022
5288eb0
[SPARK-40474] fix tests
xiaonanyang-db Sep 22, 2022
a2f0b80
[SPARK-40474] revert code causing behavior change
xiaonanyang-db Sep 23, 2022
00a8661
[SPARK-40474] revert changes
xiaonanyang-db Sep 23, 2022
16e187c
SPARK-40474 reduce diff
xiaonanyang-db Sep 23, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/sql-data-sources-csv.md
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ Data source options of CSV can be set via:
<tr>
<td><code>prefersDate</code></td>
<td>false</td>
<td>During schema inference (<code>inferSchema</code>), attempts to infer string columns that contain dates or timestamps as <code>Date</code> if the values satisfy the <code>dateFormat</code> option and failed to be parsed by the respective formatter. With a user-provided schema, attempts to parse timestamp columns as dates using <code>dateFormat</code> if they fail to conform to <code>timestampFormat</code>, in this case the parsed values will be cast to timestamp type afterwards.</td>
<td>During schema inference (<code>inferSchema</code>), attempts to infer string columns that contain dates as <code>Date</code> if the values satisfy the <code>dateFormat</code> option or default date format. For columns that contain mixing dates and timestamps, infer them as <code>StringType</code>.</td>
<td>read</td>
</tr>
<tr>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,10 +123,8 @@ class CSVInferSchema(val options: CSVOptions) extends Serializable {
case LongType => tryParseLong(field)
case _: DecimalType => tryParseDecimal(field)
case DoubleType => tryParseDouble(field)
case DateType => tryParseDateTime(field)
case TimestampNTZType if options.prefersDate => tryParseDateTime(field)
case DateType => tryParseDate(field)
case TimestampNTZType => tryParseTimestampNTZ(field)
case TimestampType if options.prefersDate => tryParseDateTime(field)
case TimestampType => tryParseTimestamp(field)
case BooleanType => tryParseBoolean(field)
case StringType => StringType
Expand Down Expand Up @@ -179,13 +177,13 @@ class CSVInferSchema(val options: CSVOptions) extends Serializable {
if ((allCatch opt field.toDouble).isDefined || isInfOrNan(field)) {
DoubleType
} else if (options.prefersDate) {
tryParseDateTime(field)
tryParseDate(field)
} else {
tryParseTimestampNTZ(field)
}
}

private def tryParseDateTime(field: String): DataType = {
private def tryParseDate(field: String): DataType = {
if ((allCatch opt dateFormatter.parse(field)).isDefined) {
DateType
} else {
Expand Down Expand Up @@ -233,7 +231,12 @@ class CSVInferSchema(val options: CSVOptions) extends Serializable {
* is compatible with both input data types.
*/
private def compatibleType(t1: DataType, t2: DataType): Option[DataType] = {
TypeCoercion.findTightestCommonType(t1, t2).orElse(findCompatibleTypeForCSV(t1, t2))
(t1, t2) match {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this match be in findCompatibleTypeForCSV? Or does findTightestCommonType merge DateType and TimestampType in a way that is not applicable here?

Copy link
Contributor Author

@xiaonanyang-db xiaonanyang-db Sep 21, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

findTightestCommonType merge DateType and TimestampType in a way that is not applicable here

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What result does findTightestCommonType return for DateType and TimestampType?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(d1, d2) match {
      case (_: TimestampType, _: DateType) | (_: DateType, _: TimestampType) =>
        TimestampType

      case (_: TimestampType, _: TimestampNTZType) | (_: TimestampNTZType, _: TimestampType) =>
        TimestampType

      case (_: TimestampNTZType, _: DateType) | (_: DateType, _: TimestampNTZType) =>
        TimestampNTZType
    }

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Never mind, I checked the code, the resulting type will be TimestampType or TimestampNTZ.

// For fields with mixing dates and timestamps, relax it as string type
case (DateType, TimestampType) | (TimestampType, DateType) |
(DateType, TimestampNTZType) | (TimestampNTZType, DateType) => Some(StringType)
case _ => TypeCoercion.findTightestCommonType(t1, t2).orElse(findCompatibleTypeForCSV(t1, t2))
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,13 +149,10 @@ class CSVOptions(
val locale: Locale = parameters.get("locale").map(Locale.forLanguageTag).getOrElse(Locale.US)

/**
* Infer columns with all valid date entries as date type (otherwise inferred as timestamp type)
* if schema inference is enabled. When being used with user-provided schema, tries to parse
* timestamp values as dates if the values do not conform to the timestamp formatter before
* falling back to the backward compatible parsing - the parsed values will be cast to timestamp
* afterwards.
* Infer columns with all valid date entries as date type (otherwise inferred as string type)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
* Infer columns with all valid date entries as date type (otherwise inferred as string type)
* Infer columns with all valid date entries as date type (otherwise inferred as string or timestamp type)

* if schema inference is enabled.
*
* Disabled by default for backwards compatibility and performance.
* Disabled by default for backwards compatibility.
*
* Not compatible with legacyTimeParserPolicy == LEGACY since legacy date parser will accept
* extra trailing characters.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.{InternalRow, NoopFilters, OrderedFilters}
import org.apache.spark.sql.catalyst.expressions.{Cast, EmptyRow, ExprUtils, GenericInternalRow, Literal}
import org.apache.spark.sql.catalyst.util._
import org.apache.spark.sql.catalyst.util.DateTimeUtils.{daysToMicros, TimeZoneUTC}
import org.apache.spark.sql.catalyst.util.LegacyDateFormats.FAST_DATE_FORMAT
import org.apache.spark.sql.catalyst.util.ResolveDefaultColumns._
import org.apache.spark.sql.errors.QueryExecutionErrors
Expand Down Expand Up @@ -224,7 +223,7 @@ class UnivocityParser(
case NonFatal(e) =>
// If fails to parse, then tries the way used in 2.0 and 1.x for backwards
// compatibility if enabled.
if (!enableParsingFallbackForTimestampType) {
if (!enableParsingFallbackForDateType) {
throw e
}
val str = DateTimeUtils.cleanLegacyTimestampStr(UTF8String.fromString(datum))
Expand All @@ -238,29 +237,19 @@ class UnivocityParser(
timestampFormatter.parse(datum)
} catch {
case NonFatal(e) =>
// There may be date type entries in timestamp column due to schema inference
if (options.prefersDate) {
daysToMicros(dateFormatter.parse(datum), options.zoneId)
} else {
// If fails to parse, then tries the way used in 2.0 and 1.x for backwards
// compatibility if enabled.
if (!enableParsingFallbackForDateType) {
throw e
}
val str = DateTimeUtils.cleanLegacyTimestampStr(UTF8String.fromString(datum))
DateTimeUtils.stringToTimestamp(str, options.zoneId).getOrElse(throw(e))
// If fails to parse, then tries the way used in 2.0 and 1.x for backwards
// compatibility if enabled.
if (!enableParsingFallbackForTimestampType) {
throw e
}
val str = DateTimeUtils.cleanLegacyTimestampStr(UTF8String.fromString(datum))
DateTimeUtils.stringToTimestamp(str, options.zoneId).getOrElse(throw(e))
}
}

case _: TimestampNTZType => (d: String) =>
nullSafeDatum(d, name, nullable, options) { datum =>
try {
timestampNTZFormatter.parseWithoutTimeZone(datum, false)
} catch {
case NonFatal(e) if options.prefersDate =>
daysToMicros(dateFormatter.parse(datum), TimeZoneUTC.toZoneId)
}
timestampNTZFormatter.parseWithoutTimeZone(datum, false)
}

case _: StringType => (d: String) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,10 +111,10 @@ class CSVInferSchemaSuite extends SparkFunSuite with SQLHelper {
Array(LongType)).sameElements(Array(DoubleType)))
assert(
inferSchema.mergeRowTypes(Array(DateType),
Array(TimestampNTZType)).sameElements(Array(TimestampNTZType)))
Array(TimestampNTZType)).sameElements(Array(StringType)))
assert(
inferSchema.mergeRowTypes(Array(DateType),
Array(TimestampType)).sameElements(Array(TimestampType)))
Array(TimestampType)).sameElements(Array(StringType)))
}

test("Null fields are handled properly when a nullValue is specified") {
Expand Down Expand Up @@ -221,23 +221,24 @@ class CSVInferSchemaSuite extends SparkFunSuite with SQLHelper {
assert(inferSchema.inferField(NullType, "2018-12-03") == DateType)
}

test("SPARK-39469: inferring date and timestamp types in a mixed column with prefersDate=true") {
test("SPARK-39469: inferring the schema of columns with mixing dates and timestamps properly") {
var options = new CSVOptions(
Map("dateFormat" -> "yyyy_MM_dd", "timestampFormat" -> "yyyy|MM|dd",
"timestampNTZFormat" -> "yyyy/MM/dd", "prefersDate" -> "true"),
columnPruning = false,
defaultTimeZoneId = "UTC")
var inferSchema = new CSVInferSchema(options)

assert(inferSchema.inferField(DateType, "2012_12_12") == DateType)
assert(inferSchema.inferField(DateType, "2003|01|01") == TimestampType)

// inferField should infer a column as string type if it contains mixing dates and timestamps
assert(inferSchema.inferField(DateType, "2003|01|01") == StringType)
// SQL configuration must be set to default to TimestampNTZ
withSQLConf(SQLConf.TIMESTAMP_TYPE.key -> "TIMESTAMP_NTZ") {
assert(inferSchema.inferField(DateType, "2003/02/05") == TimestampNTZType)
assert(inferSchema.inferField(DateType, "2003/02/05") == StringType)
}

// inferField should upgrade a date field to timestamp if the typeSoFar is a timestamp
assert(inferSchema.inferField(TimestampNTZType, "2012_12_12") == TimestampNTZType)
assert(inferSchema.inferField(TimestampType, "2018_12_03") == TimestampType)
assert(inferSchema.inferField(TimestampNTZType, "2012_12_12") == StringType)
assert(inferSchema.inferField(TimestampType, "2018_12_03") == StringType)

// No errors when Date and Timestamp have the same format. Inference defaults to date
options = new CSVOptions(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package org.apache.spark.sql.catalyst.csv

import java.math.BigDecimal
import java.text.{DecimalFormat, DecimalFormatSymbols}
import java.time.{ZoneOffset}
import java.util.{Locale, TimeZone}

import org.apache.commons.lang3.time.FastDateFormat
Expand Down Expand Up @@ -372,26 +371,4 @@ class UnivocityParserSuite extends SparkFunSuite with SQLHelper {
}
assert(err.getMessage.contains("Illegal pattern character: n"))
}

test("SPARK-39469: dates should be parsed correctly in timestamp column when prefersDate=true") {
def checkDate(dataType: DataType): Unit = {
val timestampsOptions =
new CSVOptions(Map("prefersDate" -> "true", "timestampFormat" -> "dd/MM/yyyy HH:mm",
"timestampNTZFormat" -> "dd-MM-yyyy HH:mm", "dateFormat" -> "dd_MM_yyyy"),
false, DateTimeUtils.getZoneId("-08:00").toString)
// Use CSVOption ZoneId="-08:00" (PST) to test that Dates in TimestampNTZ column are always
// converted to their equivalent UTC timestamp
val dateString = "08_09_2001"
val expected = dataType match {
case TimestampType => date(2001, 9, 8, 0, 0, 0, 0, ZoneOffset.of("-08:00"))
case TimestampNTZType => date(2001, 9, 8, 0, 0, 0, 0, ZoneOffset.UTC)
case DateType => days(2001, 9, 8)
}
val parser = new UnivocityParser(new StructType(), timestampsOptions)
assert(parser.makeConverter("d", dataType).apply(dateString) == expected)
}
checkDate(TimestampType)
checkDate(TimestampNTZType)
checkDate(DateType)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2829,6 +2829,7 @@ abstract class CSVSuite
val options2 = Map(
"header" -> "true",
"inferSchema" -> "true",
"timestampFormat" -> "yyyy-MM-dd'T'HH:mm:ss",
"prefersDate" -> "true")

// Error should be thrown when attempting to prefersDate with Legacy parser
Expand All @@ -2848,18 +2849,15 @@ abstract class CSVSuite
.load(testFile(dateInferSchemaFile))

val expectedSchema = StructType(List(StructField("date", DateType),
StructField("timestamp-date", TimestampType),
StructField("date-timestamp", TimestampType)))
StructField("timestamp-date", StringType),
StructField("date-timestamp", StringType)))
assert(results.schema == expectedSchema)

val expected =
Seq(
Seq(Date.valueOf("2001-9-8"), Timestamp.valueOf("2014-10-27 18:30:0.0"),
Timestamp.valueOf("1765-03-28 00:00:0.0")),
Seq(Date.valueOf("1941-1-2"), Timestamp.valueOf("2000-09-14 01:01:0.0"),
Timestamp.valueOf("1423-11-12 23:41:0.0")),
Seq(Date.valueOf("0293-11-7"), Timestamp.valueOf("1995-06-25 00:00:00.0"),
Timestamp.valueOf("2016-01-28 20:00:00.0"))
Seq(Date.valueOf("2001-9-8"), "2014-10-27T18:30:00", "1765-03-28"),
Seq(Date.valueOf("1941-1-2"), "2000-09-14T01:01:00", "1423-11-12T23:41:00"),
Seq(Date.valueOf("0293-11-7"), "1995-06-25", "2016-01-28T20:00:00")
)
assert(results.collect().toSeq.map(_.toSeq) == expected)
}
Expand Down