diff --git a/sql/core/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala b/sql/core/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala index 65fafb5a34c6e..f66b5bd988c2b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala @@ -58,7 +58,7 @@ private[sql] class AvroDeserializer( def this( rootAvroType: Schema, rootCatalystType: DataType, - datetimeRebaseMode: LegacyBehaviorPolicy.Value, + datetimeRebaseMode: String, useStableIdForUnionType: Boolean, stableIdPrefixForUnionType: String, recursiveFieldMaxDepth: Int) = { @@ -66,7 +66,7 @@ private[sql] class AvroDeserializer( rootAvroType, rootCatalystType, positionalFieldMatch = false, - RebaseSpec(datetimeRebaseMode), + RebaseSpec(LegacyBehaviorPolicy.withName(datetimeRebaseMode)), new NoopFilters, useStableIdForUnionType, stableIdPrefixForUnionType, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/avro/AvroOptions.scala b/sql/core/src/main/scala/org/apache/spark/sql/avro/AvroOptions.scala index ab3607d1bd7a7..da42333fad0fb 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/avro/AvroOptions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/avro/AvroOptions.scala @@ -29,7 +29,7 @@ import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.{DataSourceOptions, FileSourceOptions} import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, FailFastMode, ParseMode} import org.apache.spark.sql.errors.QueryCompilationErrors -import org.apache.spark.sql.internal.{LegacyBehaviorPolicy, SQLConf} +import org.apache.spark.sql.internal.SQLConf /** * Options for Avro Reader and Writer stored in case insensitive manner. @@ -129,9 +129,9 @@ private[sql] class AvroOptions( /** * The rebasing mode for the DATE and TIMESTAMP_MICROS, TIMESTAMP_MILLIS values in reads. */ - val datetimeRebaseModeInRead: LegacyBehaviorPolicy.Value = parameters - .get(DATETIME_REBASE_MODE).map(LegacyBehaviorPolicy.withName) - .getOrElse(SQLConf.get.getConf(SQLConf.AVRO_REBASE_MODE_IN_READ)) + val datetimeRebaseModeInRead: String = parameters + .get(DATETIME_REBASE_MODE) + .getOrElse(SQLConf.get.getConf(SQLConf.AVRO_REBASE_MODE_IN_READ).toString) val useStableIdForUnionType: Boolean = parameters.get(STABLE_ID_FOR_UNION_TYPE).map(_.toBoolean).getOrElse(false) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala index 10cfe9f145f6e..d43c9eab0a5ba 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala @@ -128,7 +128,7 @@ object DataSourceUtils extends PredicateHelper { private def getRebaseSpec( lookupFileMeta: String => String, - modeByConfig: LegacyBehaviorPolicy.Value, + modeByConfig: String, minVersion: String, metadataKey: String): RebaseSpec = { val policy = if (Utils.isTesting && @@ -146,7 +146,7 @@ object DataSourceUtils extends PredicateHelper { } else { LegacyBehaviorPolicy.CORRECTED } - }.getOrElse(modeByConfig) + }.getOrElse(LegacyBehaviorPolicy.withName(modeByConfig)) } policy match { case LegacyBehaviorPolicy.LEGACY => @@ -157,7 +157,7 @@ object DataSourceUtils extends PredicateHelper { def datetimeRebaseSpec( lookupFileMeta: String => String, - modeByConfig: LegacyBehaviorPolicy.Value): RebaseSpec = { + modeByConfig: String): RebaseSpec = { getRebaseSpec( lookupFileMeta, modeByConfig, @@ -167,7 +167,7 @@ object DataSourceUtils extends PredicateHelper { def int96RebaseSpec( lookupFileMeta: String => String, - modeByConfig: LegacyBehaviorPolicy.Value): RebaseSpec = { + modeByConfig: String): RebaseSpec = { getRebaseSpec( lookupFileMeta, modeByConfig, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala index c56c947e3da5b..661be2b9cfa08 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala @@ -44,7 +44,7 @@ import org.apache.spark.sql.catalyst.util.DateTimeUtils import org.apache.spark.sql.errors.QueryExecutionErrors import org.apache.spark.sql.execution.datasources._ import org.apache.spark.sql.execution.vectorized.{ConstantColumnVector, OffHeapColumnVector, OnHeapColumnVector} -import org.apache.spark.sql.internal.{LegacyBehaviorPolicy, SQLConf} +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.sources._ import org.apache.spark.sql.types._ import org.apache.spark.util.{SerializableConfiguration, ThreadUtils} @@ -182,10 +182,8 @@ class ParquetFileFormat val pushDownInFilterThreshold = sqlConf.parquetFilterPushDownInFilterThreshold val isCaseSensitive = sqlConf.caseSensitiveAnalysis val parquetOptions = new ParquetOptions(options, sparkSession.sessionState.conf) - val datetimeRebaseModeInRead = LegacyBehaviorPolicy.withName( - parquetOptions.datetimeRebaseModeInRead) - val int96RebaseModeInRead = LegacyBehaviorPolicy.withName( - parquetOptions.int96RebaseModeInRead) + val datetimeRebaseModeInRead = parquetOptions.datetimeRebaseModeInRead + val int96RebaseModeInRead = parquetOptions.int96RebaseModeInRead // Should always be set by FileSourceScanExec creating this. // Check conf before checking option, to allow working around an issue by changing conf. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetPartitionReaderFactory.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetPartitionReaderFactory.scala index 4674320e8498a..70ae8068a03a0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetPartitionReaderFactory.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetPartitionReaderFactory.scala @@ -38,7 +38,7 @@ import org.apache.spark.sql.execution.WholeStageCodegenExec import org.apache.spark.sql.execution.datasources.{AggregatePushDownUtils, DataSourceUtils, PartitionedFile, RecordReaderIterator} import org.apache.spark.sql.execution.datasources.parquet._ import org.apache.spark.sql.execution.datasources.v2._ -import org.apache.spark.sql.internal.{LegacyBehaviorPolicy, SQLConf} +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.sources.Filter import org.apache.spark.sql.types.StructType import org.apache.spark.sql.vectorized.ColumnarBatch @@ -81,10 +81,8 @@ case class ParquetPartitionReaderFactory( private val pushDownDecimal = sqlConf.parquetFilterPushDownDecimal private val pushDownStringPredicate = sqlConf.parquetFilterPushDownStringPredicate private val pushDownInFilterThreshold = sqlConf.parquetFilterPushDownInFilterThreshold - private val datetimeRebaseModeInRead = LegacyBehaviorPolicy.withName( - options.datetimeRebaseModeInRead) - private val int96RebaseModeInRead = LegacyBehaviorPolicy.withName( - options.int96RebaseModeInRead) + private val datetimeRebaseModeInRead = options.datetimeRebaseModeInRead + private val int96RebaseModeInRead = options.int96RebaseModeInRead private val parquetReaderCallback = new ParquetReaderCallback()