diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala index c9955d72524c..9d855d1a93d6 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala @@ -124,15 +124,19 @@ class UnivocityParser( // dates and timestamps. // For more information, see comments for "enableDateTimeParsingFallback" option in CSVOptions. private val enableParsingFallbackForTimestampType = - options.enableDateTimeParsingFallback.getOrElse { - SQLConf.get.legacyTimeParserPolicy == SQLConf.LegacyBehaviorPolicy.LEGACY || - options.timestampFormatInRead.isEmpty - } + options.enableDateTimeParsingFallback + .orElse(SQLConf.get.csvEnableDateTimeParsingFallback) + .getOrElse { + SQLConf.get.legacyTimeParserPolicy == SQLConf.LegacyBehaviorPolicy.LEGACY || + options.timestampFormatInRead.isEmpty + } private val enableParsingFallbackForDateType = - options.enableDateTimeParsingFallback.getOrElse { - SQLConf.get.legacyTimeParserPolicy == SQLConf.LegacyBehaviorPolicy.LEGACY || - options.dateFormatInRead.isEmpty - } + options.enableDateTimeParsingFallback + .orElse(SQLConf.get.csvEnableDateTimeParsingFallback) + .getOrElse { + SQLConf.get.legacyTimeParserPolicy == SQLConf.LegacyBehaviorPolicy.LEGACY || + options.dateFormatInRead.isEmpty + } // Retrieve the raw record string. private def getCurrentInput: UTF8String = { diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala index 06133d44c13a..f8adac1ee44f 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala @@ -82,15 +82,19 @@ class JacksonParser( // dates and timestamps. // For more information, see comments for "enableDateTimeParsingFallback" option in JSONOptions. private val enableParsingFallbackForTimestampType = - options.enableDateTimeParsingFallback.getOrElse { - SQLConf.get.legacyTimeParserPolicy == SQLConf.LegacyBehaviorPolicy.LEGACY || - options.timestampFormatInRead.isEmpty - } + options.enableDateTimeParsingFallback + .orElse(SQLConf.get.jsonEnableDateTimeParsingFallback) + .getOrElse { + SQLConf.get.legacyTimeParserPolicy == SQLConf.LegacyBehaviorPolicy.LEGACY || + options.timestampFormatInRead.isEmpty + } private val enableParsingFallbackForDateType = - options.enableDateTimeParsingFallback.getOrElse { - SQLConf.get.legacyTimeParserPolicy == SQLConf.LegacyBehaviorPolicy.LEGACY || - options.dateFormatInRead.isEmpty - } + options.enableDateTimeParsingFallback + .orElse(SQLConf.get.jsonEnableDateTimeParsingFallback) + .getOrElse { + SQLConf.get.legacyTimeParserPolicy == SQLConf.LegacyBehaviorPolicy.LEGACY || + options.dateFormatInRead.isEmpty + } /** * Create a converter which converts the JSON documents held by the `JsonParser` diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 31bdbca4a256..eb7a6a9105e8 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -3520,6 +3520,22 @@ object SQLConf { .booleanConf .createWithDefault(true) + val LEGACY_CSV_ENABLE_DATE_TIME_PARSING_FALLBACK = + buildConf("spark.sql.legacy.csv.enableDateTimeParsingFallback") + .internal() + .doc("When true, enable legacy date/time parsing fallback in CSV") + .version("3.4.0") + .booleanConf + .createOptional + + val LEGACY_JSON_ENABLE_DATE_TIME_PARSING_FALLBACK = + buildConf("spark.sql.legacy.json.enableDateTimeParsingFallback") + .internal() + .doc("When true, enable legacy date/time parsing fallback in JSON") + .version("3.4.0") + .booleanConf + .createOptional + val ADD_PARTITION_BATCH_SIZE = buildConf("spark.sql.addPartitionInBatch.size") .internal() @@ -4621,6 +4637,12 @@ class SQLConf extends Serializable with Logging { def avroFilterPushDown: Boolean = getConf(AVRO_FILTER_PUSHDOWN_ENABLED) + def jsonEnableDateTimeParsingFallback: Option[Boolean] = + getConf(LEGACY_JSON_ENABLE_DATE_TIME_PARSING_FALLBACK) + + def csvEnableDateTimeParsingFallback: Option[Boolean] = + getConf(LEGACY_CSV_ENABLE_DATE_TIME_PARSING_FALLBACK) + def integerGroupingIdEnabled: Boolean = getConf(SQLConf.LEGACY_INTEGER_GROUPING_ID) def metadataCacheTTL: Long = getConf(StaticSQLConf.METADATA_CACHE_TTL_SECONDS) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala index 0068f57a7697..5c97821f11ec 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala @@ -2949,6 +2949,38 @@ abstract class CSVSuite ) } } + + test("SPARK-40215: enable parsing fallback for CSV in CORRECTED mode with a SQL config") { + withTempPath { path => + Seq("2020-01-01,2020-01-01").toDF() + .repartition(1) + .write.text(path.getAbsolutePath) + + for (fallbackEnabled <- Seq(true, false)) { + withSQLConf( + SQLConf.LEGACY_TIME_PARSER_POLICY.key -> "CORRECTED", + SQLConf.LEGACY_CSV_ENABLE_DATE_TIME_PARSING_FALLBACK.key -> s"$fallbackEnabled") { + val df = spark.read + .schema("date date, ts timestamp") + .option("dateFormat", "invalid") + .option("timestampFormat", "invalid") + .csv(path.getAbsolutePath) + + if (fallbackEnabled) { + checkAnswer( + df, + Seq(Row(Date.valueOf("2020-01-01"), Timestamp.valueOf("2020-01-01 00:00:00"))) + ) + } else { + checkAnswer( + df, + Seq(Row(null, null)) + ) + } + } + } + } + } } class CSVv1Suite extends CSVSuite { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala index 02225d40c831..f0801ae313e8 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala @@ -3322,6 +3322,38 @@ abstract class JsonSuite ) } } + + test("SPARK-40215: enable parsing fallback for JSON in CORRECTED mode with a SQL config") { + withTempPath { path => + Seq("""{"date": "2020-01-01", "ts": "2020-01-01"}""").toDF() + .repartition(1) + .write.text(path.getAbsolutePath) + + for (fallbackEnabled <- Seq(true, false)) { + withSQLConf( + SQLConf.LEGACY_TIME_PARSER_POLICY.key -> "CORRECTED", + SQLConf.LEGACY_JSON_ENABLE_DATE_TIME_PARSING_FALLBACK.key -> s"$fallbackEnabled") { + val df = spark.read + .schema("date date, ts timestamp") + .option("dateFormat", "invalid") + .option("timestampFormat", "invalid") + .json(path.getAbsolutePath) + + if (fallbackEnabled) { + checkAnswer( + df, + Seq(Row(Date.valueOf("2020-01-01"), Timestamp.valueOf("2020-01-01 00:00:00"))) + ) + } else { + checkAnswer( + df, + Seq(Row(null, null)) + ) + } + } + } + } + } } class JsonV1Suite extends JsonSuite {