diff --git a/core/src/main/resources/error/error-classes.json b/core/src/main/resources/error/error-classes.json
index a159202d99c4..dddf2bdfec1a 100644
--- a/core/src/main/resources/error/error-classes.json
+++ b/core/src/main/resources/error/error-classes.json
@@ -23,6 +23,12 @@
],
"sqlState" : "22005"
},
+ "CANNOT_INFER_DATE" : {
+ "message" : [
+ "Cannot infer date in schema inference when LegacyTimeParserPolicy is \"LEGACY\". Legacy Date formatter does not support strict date format matching which is required to avoid inferring timestamps and other non-date entries to date."
+ ],
+ "sqlState" : "22007"
+ },
"CANNOT_PARSE_DECIMAL" : {
"message" : [
"Cannot parse decimal"
diff --git a/docs/sql-data-sources-csv.md b/docs/sql-data-sources-csv.md
index 1be1d7446e80..8384f8332a6a 100644
--- a/docs/sql-data-sources-csv.md
+++ b/docs/sql-data-sources-csv.md
@@ -108,6 +108,12 @@ Data source options of CSV can be set via:
Infers the input schema automatically from data. It requires one extra pass over the data. CSV built-in functions ignore this option. |
read |
+
+ inferDate |
+ false |
+ Whether or not to infer columns that satisfy the dateFormat option as Date. Requires inferSchema to be true. When false, columns with dates will be inferred as String (or as Timestamp if it fits the timestampFormat). |
+ read |
+
enforceSchema |
true |
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchema.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchema.scala
index 8b0c6c49b855..3132fea8700b 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchema.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchema.scala
@@ -24,8 +24,8 @@ import scala.util.control.Exception.allCatch
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.analysis.TypeCoercion
import org.apache.spark.sql.catalyst.expressions.ExprUtils
+import org.apache.spark.sql.catalyst.util.{DateFormatter, TimestampFormatter}
import org.apache.spark.sql.catalyst.util.LegacyDateFormats.FAST_DATE_FORMAT
-import org.apache.spark.sql.catalyst.util.TimestampFormatter
import org.apache.spark.sql.errors.QueryExecutionErrors
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._
@@ -46,6 +46,12 @@ class CSVInferSchema(val options: CSVOptions) extends Serializable {
isParsing = true,
forTimestampNTZ = true)
+ private lazy val dateFormatter = DateFormatter(
+ options.dateFormatInRead,
+ options.locale,
+ legacyFormat = FAST_DATE_FORMAT,
+ isParsing = true)
+
private val decimalParser = if (options.locale == Locale.US) {
// Special handling the default locale for backward compatibility
s: String => new java.math.BigDecimal(s)
@@ -117,7 +123,10 @@ class CSVInferSchema(val options: CSVOptions) extends Serializable {
case LongType => tryParseLong(field)
case _: DecimalType => tryParseDecimal(field)
case DoubleType => tryParseDouble(field)
+ case DateType => tryParseDateTime(field)
+ case TimestampNTZType if options.inferDate => tryParseDateTime(field)
case TimestampNTZType => tryParseTimestampNTZ(field)
+ case TimestampType if options.inferDate => tryParseDateTime(field)
case TimestampType => tryParseTimestamp(field)
case BooleanType => tryParseBoolean(field)
case StringType => StringType
@@ -169,6 +178,16 @@ class CSVInferSchema(val options: CSVOptions) extends Serializable {
private def tryParseDouble(field: String): DataType = {
if ((allCatch opt field.toDouble).isDefined || isInfOrNan(field)) {
DoubleType
+ } else if (options.inferDate) {
+ tryParseDateTime(field)
+ } else {
+ tryParseTimestampNTZ(field)
+ }
+ }
+
+ private def tryParseDateTime(field: String): DataType = {
+ if ((allCatch opt dateFormatter.parse(field)).isDefined) {
+ DateType
} else {
tryParseTimestampNTZ(field)
}
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVOptions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVOptions.scala
index 3e92c3d25eb4..a033e3a3a8d7 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVOptions.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVOptions.scala
@@ -148,7 +148,28 @@ class CSVOptions(
// A language tag in IETF BCP 47 format
val locale: Locale = parameters.get("locale").map(Locale.forLanguageTag).getOrElse(Locale.US)
- val dateFormatInRead: Option[String] = parameters.get("dateFormat")
+ /**
+ * Infer columns with all valid date entries as date type (otherwise inferred as timestamp type).
+ * Disabled by default for backwards compatibility and performance. When enabled, date entries in
+ * timestamp columns will be cast to timestamp upon parsing. Not compatible with
+ * legacyTimeParserPolicy == LEGACY since legacy date parser will accept extra trailing characters
+ */
+ val inferDate = {
+ val inferDateFlag = getBool("inferDate")
+ if (SQLConf.get.legacyTimeParserPolicy == LegacyBehaviorPolicy.LEGACY && inferDateFlag) {
+ throw QueryExecutionErrors.inferDateWithLegacyTimeParserError()
+ }
+ inferDateFlag
+ }
+
+ // Provide a default value for dateFormatInRead when inferDate. This ensures that the
+ // Iso8601DateFormatter (with strict date parsing) is used for date inference
+ val dateFormatInRead: Option[String] =
+ if (inferDate) {
+ Option(parameters.getOrElse("dateFormat", DateFormatter.defaultPattern))
+ } else {
+ parameters.get("dateFormat")
+ }
val dateFormatInWrite: String = parameters.getOrElse("dateFormat", DateFormatter.defaultPattern)
val timestampFormatInRead: Option[String] =
@@ -195,7 +216,6 @@ class CSVOptions(
*/
val enforceSchema = getBool("enforceSchema", default = true)
-
/**
* String representation of an empty value in read and in write.
*/
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala
index 56ebfcc26c63..0237b6c454d0 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala
@@ -28,6 +28,7 @@ import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.{InternalRow, NoopFilters, OrderedFilters}
import org.apache.spark.sql.catalyst.expressions.{Cast, EmptyRow, ExprUtils, GenericInternalRow, Literal}
import org.apache.spark.sql.catalyst.util._
+import org.apache.spark.sql.catalyst.util.DateTimeUtils.{daysToMicros, TimeZoneUTC}
import org.apache.spark.sql.catalyst.util.LegacyDateFormats.FAST_DATE_FORMAT
import org.apache.spark.sql.catalyst.util.ResolveDefaultColumns._
import org.apache.spark.sql.errors.QueryExecutionErrors
@@ -197,34 +198,46 @@ class UnivocityParser(
Decimal(decimalParser(datum), dt.precision, dt.scale)
}
- case _: TimestampType => (d: String) =>
+ case _: DateType => (d: String) =>
nullSafeDatum(d, name, nullable, options) { datum =>
try {
- timestampFormatter.parse(datum)
+ dateFormatter.parse(datum)
} catch {
case NonFatal(e) =>
// If fails to parse, then tries the way used in 2.0 and 1.x for backwards
// compatibility.
val str = DateTimeUtils.cleanLegacyTimestampStr(UTF8String.fromString(datum))
- DateTimeUtils.stringToTimestamp(str, options.zoneId).getOrElse(throw e)
+ DateTimeUtils.stringToDate(str).getOrElse(throw e)
}
}
- case _: TimestampNTZType => (d: String) =>
- nullSafeDatum(d, name, nullable, options) { datum =>
- timestampNTZFormatter.parseWithoutTimeZone(datum, false)
- }
-
- case _: DateType => (d: String) =>
+ case _: TimestampType => (d: String) =>
nullSafeDatum(d, name, nullable, options) { datum =>
try {
- dateFormatter.parse(datum)
+ timestampFormatter.parse(datum)
} catch {
case NonFatal(e) =>
// If fails to parse, then tries the way used in 2.0 and 1.x for backwards
// compatibility.
val str = DateTimeUtils.cleanLegacyTimestampStr(UTF8String.fromString(datum))
- DateTimeUtils.stringToDate(str).getOrElse(throw e)
+ DateTimeUtils.stringToTimestamp(str, options.zoneId).getOrElse {
+ // There may be date type entries in timestamp column due to schema inference
+ if (options.inferDate) {
+ daysToMicros(dateFormatter.parse(datum), options.zoneId)
+ } else {
+ throw(e)
+ }
+ }
+ }
+ }
+
+ case _: TimestampNTZType => (d: String) =>
+ nullSafeDatum(d, name, nullable, options) { datum =>
+ try {
+ timestampNTZFormatter.parseWithoutTimeZone(datum, false)
+ } catch {
+ case NonFatal(e) if (options.inferDate) =>
+ daysToMicros(dateFormatter.parse(datum), TimeZoneUTC.toZoneId)
}
}
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala
index a3129f249c1d..62e3f95940b8 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala
@@ -34,7 +34,7 @@ import org.apache.hadoop.fs.permission.FsPermission
import org.codehaus.commons.compiler.CompileException
import org.codehaus.janino.InternalCompilerException
-import org.apache.spark.{Partition, SparkArithmeticException, SparkArrayIndexOutOfBoundsException, SparkClassNotFoundException, SparkConcurrentModificationException, SparkDateTimeException, SparkException, SparkFileAlreadyExistsException, SparkFileNotFoundException, SparkIllegalArgumentException, SparkIndexOutOfBoundsException, SparkNoSuchElementException, SparkNoSuchMethodException, SparkNumberFormatException, SparkRuntimeException, SparkSecurityException, SparkSQLException, SparkSQLFeatureNotSupportedException, SparkUnsupportedOperationException, SparkUpgradeException}
+import org.apache.spark.{Partition, SparkArithmeticException, SparkArrayIndexOutOfBoundsException, SparkClassNotFoundException, SparkConcurrentModificationException, SparkDateTimeException, SparkException, SparkFileAlreadyExistsException, SparkFileNotFoundException, SparkIllegalArgumentException, SparkIndexOutOfBoundsException, SparkNoSuchElementException, SparkNoSuchMethodException, SparkNumberFormatException, SparkRuntimeException, SparkSecurityException, SparkSQLException, SparkSQLFeatureNotSupportedException, SparkThrowable, SparkUnsupportedOperationException, SparkUpgradeException}
import org.apache.spark.executor.CommitDeniedException
import org.apache.spark.launcher.SparkLauncher
import org.apache.spark.memory.SparkOutOfMemoryError
@@ -529,6 +529,12 @@ private[sql] object QueryExecutionErrors extends QueryErrorsBase {
""".stripMargin)
}
+ def inferDateWithLegacyTimeParserError(): Throwable with SparkThrowable = {
+ new SparkIllegalArgumentException(errorClass = "CANNOT_INFER_DATE",
+ messageParameters = Array()
+ )
+ }
+
def streamedOperatorUnsupportedByDataSourceError(
className: String, operator: String): Throwable = {
new UnsupportedOperationException(
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchemaSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchemaSuite.scala
index d268f8c2e721..8790223a680f 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchemaSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchemaSuite.scala
@@ -109,6 +109,12 @@ class CSVInferSchemaSuite extends SparkFunSuite with SQLHelper {
assert(
inferSchema.mergeRowTypes(Array(DoubleType),
Array(LongType)).sameElements(Array(DoubleType)))
+ assert(
+ inferSchema.mergeRowTypes(Array(DateType),
+ Array(TimestampNTZType)).sameElements(Array(TimestampNTZType)))
+ assert(
+ inferSchema.mergeRowTypes(Array(DateType),
+ Array(TimestampType)).sameElements(Array(TimestampType)))
}
test("Null fields are handled properly when a nullValue is specified") {
@@ -192,4 +198,53 @@ class CSVInferSchemaSuite extends SparkFunSuite with SQLHelper {
Seq("en-US").foreach(checkDecimalInfer(_, StringType))
Seq("ko-KR", "ru-RU", "de-DE").foreach(checkDecimalInfer(_, DecimalType(7, 0)))
}
+
+ test("SPARK-39469: inferring date type") {
+ // "yyyy/MM/dd" format
+ var options = new CSVOptions(Map("dateFormat" -> "yyyy/MM/dd", "inferDate" -> "true"),
+ false, "UTC")
+ var inferSchema = new CSVInferSchema(options)
+ assert(inferSchema.inferField(NullType, "2018/12/02") == DateType)
+ // "MMM yyyy" format
+ options = new CSVOptions(Map("dateFormat" -> "MMM yyyy", "inferDate" -> "true"),
+ false, "GMT")
+ inferSchema = new CSVInferSchema(options)
+ assert(inferSchema.inferField(NullType, "Dec 2018") == DateType)
+ // Field should strictly match date format to infer as date
+ options = new CSVOptions(
+ Map("dateFormat" -> "yyyy-MM-dd", "timestampFormat" -> "yyyy-MM-dd'T'HH:mm:ss",
+ "inferDate" -> "true"),
+ columnPruning = false,
+ defaultTimeZoneId = "GMT")
+ inferSchema = new CSVInferSchema(options)
+ assert(inferSchema.inferField(NullType, "2018-12-03T11:00:00") == TimestampType)
+ assert(inferSchema.inferField(NullType, "2018-12-03") == DateType)
+ }
+
+ test("SPARK-39469: inferring date and timestamp types in a mixed column with inferDate=true") {
+ var options = new CSVOptions(
+ Map("dateFormat" -> "yyyy_MM_dd", "timestampFormat" -> "yyyy|MM|dd",
+ "timestampNTZFormat" -> "yyyy/MM/dd", "inferDate" -> "true"),
+ columnPruning = false,
+ defaultTimeZoneId = "UTC")
+ var inferSchema = new CSVInferSchema(options)
+ assert(inferSchema.inferField(DateType, "2012_12_12") == DateType)
+ assert(inferSchema.inferField(DateType, "2003|01|01") == TimestampType)
+ // SQL configuration must be set to default to TimestampNTZ
+ withSQLConf(SQLConf.TIMESTAMP_TYPE.key -> "TIMESTAMP_NTZ") {
+ assert(inferSchema.inferField(DateType, "2003/02/05") == TimestampNTZType)
+ }
+
+ // inferField should upgrade a date field to timestamp if the typeSoFar is a timestamp
+ assert(inferSchema.inferField(TimestampNTZType, "2012_12_12") == TimestampNTZType)
+ assert(inferSchema.inferField(TimestampType, "2018_12_03") == TimestampType)
+
+ // No errors when Date and Timestamp have the same format. Inference defaults to date
+ options = new CSVOptions(
+ Map("dateFormat" -> "yyyy_MM_dd", "timestampFormat" -> "yyyy_MM_dd"),
+ columnPruning = false,
+ defaultTimeZoneId = "UTC")
+ inferSchema = new CSVInferSchema(options)
+ assert(inferSchema.inferField(DateType, "2012_12_12") == DateType)
+ }
}
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/csv/UnivocityParserSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/csv/UnivocityParserSuite.scala
index 4166401d040f..2589376bc3dc 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/csv/UnivocityParserSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/csv/UnivocityParserSuite.scala
@@ -19,6 +19,7 @@ package org.apache.spark.sql.catalyst.csv
import java.math.BigDecimal
import java.text.{DecimalFormat, DecimalFormatSymbols}
+import java.time.{ZoneOffset}
import java.util.{Locale, TimeZone}
import org.apache.commons.lang3.time.FastDateFormat
@@ -358,4 +359,26 @@ class UnivocityParserSuite extends SparkFunSuite with SQLHelper {
Map("timestampFormat" -> "invalid", "dateFormat" -> "invalid"), false, "UTC")
check(new UnivocityParser(StructType(Seq.empty), optionsWithPattern))
}
+
+ test("SPARK-39469: dates should be parsed correctly in a timestamp column when inferDate=true") {
+ def checkDate(dataType: DataType): Unit = {
+ val timestampsOptions =
+ new CSVOptions(Map("inferDate" -> "true", "timestampFormat" -> "dd/MM/yyyy HH:mm",
+ "timestampNTZFormat" -> "dd-MM-yyyy HH:mm", "dateFormat" -> "dd_MM_yyyy"),
+ false, DateTimeUtils.getZoneId("-08:00").toString)
+ // Use CSVOption ZoneId="-08:00" (PST) to test that Dates in TimestampNTZ column are always
+ // converted to their equivalent UTC timestamp
+ val dateString = "08_09_2001"
+ val expected = dataType match {
+ case TimestampType => date(2001, 9, 8, 0, 0, 0, 0, ZoneOffset.of("-08:00"))
+ case TimestampNTZType => date(2001, 9, 8, 0, 0, 0, 0, ZoneOffset.UTC)
+ case DateType => days(2001, 9, 8)
+ }
+ val parser = new UnivocityParser(new StructType(), timestampsOptions)
+ assert(parser.makeConverter("d", dataType).apply(dateString) == expected)
+ }
+ checkDate(TimestampType)
+ checkDate(TimestampNTZType)
+ checkDate(DateType)
+ }
}
diff --git a/sql/core/src/test/resources/test-data/date-infer-schema.csv b/sql/core/src/test/resources/test-data/date-infer-schema.csv
new file mode 100644
index 000000000000..ca16ec81e6dc
--- /dev/null
+++ b/sql/core/src/test/resources/test-data/date-infer-schema.csv
@@ -0,0 +1,4 @@
+date,timestamp-date,date-timestamp
+2001-09-08,2014-10-27T18:30:00,1765-03-28
+1941-01-02,2000-09-14T01:01:00,1423-11-12T23:41:00
+0293-11-07,1995-06-25,2016-01-28T20:00:00
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
index bf92ffcf4651..758f54306088 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
@@ -41,6 +41,7 @@ import org.apache.spark.sql.{AnalysisException, Column, DataFrame, Encoders, Que
import org.apache.spark.sql.catalyst.util.{DateTimeTestUtils, DateTimeUtils}
import org.apache.spark.sql.execution.datasources.CommonFileDataSourceSuite
import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.internal.SQLConf.LegacyBehaviorPolicy
import org.apache.spark.sql.test.SharedSparkSession
import org.apache.spark.sql.types._
@@ -74,6 +75,7 @@ abstract class CSVSuite
private val simpleSparseFile = "test-data/simple_sparse.csv"
private val numbersFile = "test-data/numbers.csv"
private val datesFile = "test-data/dates.csv"
+ private val dateInferSchemaFile = "test-data/date-infer-schema.csv"
private val unescapedQuotesFile = "test-data/unescaped-quotes.csv"
private val valueMalformedFile = "test-data/value-malformed.csv"
private val badAfterGoodFile = "test-data/bad_after_good.csv"
@@ -2788,6 +2790,56 @@ abstract class CSVSuite
}
}
}
+
+ test("SPARK-39469: Infer schema for date type") {
+ val options1 = Map(
+ "header" -> "true",
+ "inferSchema" -> "true",
+ "timestampFormat" -> "yyyy-MM-dd'T'HH:mm:ss",
+ "dateFormat" -> "yyyy-MM-dd",
+ "inferDate" -> "true")
+ val options2 = Map(
+ "header" -> "true",
+ "inferSchema" -> "true",
+ "inferDate" -> "true")
+
+ // Error should be thrown when attempting to inferDate with Legacy parser
+ if (SQLConf.get.legacyTimeParserPolicy == LegacyBehaviorPolicy.LEGACY) {
+ val msg = intercept[IllegalArgumentException] {
+ spark.read
+ .format("csv")
+ .options(options1)
+ .load(testFile(dateInferSchemaFile))
+ }.getMessage
+ assert(msg.contains("CANNOT_INFER_DATE"))
+ } else {
+ // 1. Specify date format and timestamp format
+ // 2. Date inference should work with default date format when dateFormat is not provided
+ Seq(options1, options2).foreach {options =>
+ val results = spark.read
+ .format("csv")
+ .options(options)
+ .load(testFile(dateInferSchemaFile))
+
+ val expectedSchema = StructType(List(StructField("date", DateType),
+ StructField("timestamp-date", TimestampType),
+ StructField("date-timestamp", TimestampType)))
+ assert(results.schema == expectedSchema)
+
+ val expected =
+ Seq(
+ Seq(Date.valueOf("2001-9-8"), Timestamp.valueOf("2014-10-27 18:30:0.0"),
+ Timestamp.valueOf("1765-03-28 00:00:0.0")),
+ Seq(Date.valueOf("1941-1-2"), Timestamp.valueOf("2000-09-14 01:01:0.0"),
+ Timestamp.valueOf("1423-11-12 23:41:0.0")),
+ Seq(Date.valueOf("0293-11-7"), Timestamp.valueOf("1995-06-25 00:00:00.0"),
+ Timestamp.valueOf("2016-01-28 20:00:00.0"))
+ )
+ assert(results.collect().toSeq.map(_.toSeq) == expected)
+ }
+
+ }
+ }
}
class CSVv1Suite extends CSVSuite {