Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -58,15 +58,15 @@ private[sql] class AvroDeserializer(
def this(
rootAvroType: Schema,
rootCatalystType: DataType,
datetimeRebaseMode: LegacyBehaviorPolicy.Value,
datetimeRebaseMode: String,
useStableIdForUnionType: Boolean,
stableIdPrefixForUnionType: String,
recursiveFieldMaxDepth: Int) = {
this(
rootAvroType,
rootCatalystType,
positionalFieldMatch = false,
RebaseSpec(datetimeRebaseMode),
RebaseSpec(LegacyBehaviorPolicy.withName(datetimeRebaseMode)),
new NoopFilters,
useStableIdForUnionType,
stableIdPrefixForUnionType,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.{DataSourceOptions, FileSourceOptions}
import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, FailFastMode, ParseMode}
import org.apache.spark.sql.errors.QueryCompilationErrors
import org.apache.spark.sql.internal.{LegacyBehaviorPolicy, SQLConf}
import org.apache.spark.sql.internal.SQLConf

/**
* Options for Avro Reader and Writer stored in case insensitive manner.
Expand Down Expand Up @@ -129,9 +129,9 @@ private[sql] class AvroOptions(
/**
* The rebasing mode for the DATE and TIMESTAMP_MICROS, TIMESTAMP_MILLIS values in reads.
*/
val datetimeRebaseModeInRead: LegacyBehaviorPolicy.Value = parameters
.get(DATETIME_REBASE_MODE).map(LegacyBehaviorPolicy.withName)
.getOrElse(SQLConf.get.getConf(SQLConf.AVRO_REBASE_MODE_IN_READ))
val datetimeRebaseModeInRead: String = parameters
.get(DATETIME_REBASE_MODE)
.getOrElse(SQLConf.get.getConf(SQLConf.AVRO_REBASE_MODE_IN_READ).toString)

val useStableIdForUnionType: Boolean =
parameters.get(STABLE_ID_FOR_UNION_TYPE).map(_.toBoolean).getOrElse(false)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ object DataSourceUtils extends PredicateHelper {

private def getRebaseSpec(
lookupFileMeta: String => String,
modeByConfig: LegacyBehaviorPolicy.Value,
modeByConfig: String,
minVersion: String,
metadataKey: String): RebaseSpec = {
val policy = if (Utils.isTesting &&
Expand All @@ -146,7 +146,7 @@ object DataSourceUtils extends PredicateHelper {
} else {
LegacyBehaviorPolicy.CORRECTED
}
}.getOrElse(modeByConfig)
}.getOrElse(LegacyBehaviorPolicy.withName(modeByConfig))
}
policy match {
case LegacyBehaviorPolicy.LEGACY =>
Expand All @@ -157,7 +157,7 @@ object DataSourceUtils extends PredicateHelper {

def datetimeRebaseSpec(
lookupFileMeta: String => String,
modeByConfig: LegacyBehaviorPolicy.Value): RebaseSpec = {
modeByConfig: String): RebaseSpec = {
getRebaseSpec(
lookupFileMeta,
modeByConfig,
Expand All @@ -167,7 +167,7 @@ object DataSourceUtils extends PredicateHelper {

def int96RebaseSpec(
lookupFileMeta: String => String,
modeByConfig: LegacyBehaviorPolicy.Value): RebaseSpec = {
modeByConfig: String): RebaseSpec = {
getRebaseSpec(
lookupFileMeta,
modeByConfig,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.errors.QueryExecutionErrors
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.execution.vectorized.{ConstantColumnVector, OffHeapColumnVector, OnHeapColumnVector}
import org.apache.spark.sql.internal.{LegacyBehaviorPolicy, SQLConf}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.sources._
import org.apache.spark.sql.types._
import org.apache.spark.util.{SerializableConfiguration, ThreadUtils}
Expand Down Expand Up @@ -182,10 +182,8 @@ class ParquetFileFormat
val pushDownInFilterThreshold = sqlConf.parquetFilterPushDownInFilterThreshold
val isCaseSensitive = sqlConf.caseSensitiveAnalysis
val parquetOptions = new ParquetOptions(options, sparkSession.sessionState.conf)
val datetimeRebaseModeInRead = LegacyBehaviorPolicy.withName(
parquetOptions.datetimeRebaseModeInRead)
val int96RebaseModeInRead = LegacyBehaviorPolicy.withName(
parquetOptions.int96RebaseModeInRead)
val datetimeRebaseModeInRead = parquetOptions.datetimeRebaseModeInRead
val int96RebaseModeInRead = parquetOptions.int96RebaseModeInRead

// Should always be set by FileSourceScanExec creating this.
// Check conf before checking option, to allow working around an issue by changing conf.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ import org.apache.spark.sql.execution.WholeStageCodegenExec
import org.apache.spark.sql.execution.datasources.{AggregatePushDownUtils, DataSourceUtils, PartitionedFile, RecordReaderIterator}
import org.apache.spark.sql.execution.datasources.parquet._
import org.apache.spark.sql.execution.datasources.v2._
import org.apache.spark.sql.internal.{LegacyBehaviorPolicy, SQLConf}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.sources.Filter
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.vectorized.ColumnarBatch
Expand Down Expand Up @@ -81,10 +81,8 @@ case class ParquetPartitionReaderFactory(
private val pushDownDecimal = sqlConf.parquetFilterPushDownDecimal
private val pushDownStringPredicate = sqlConf.parquetFilterPushDownStringPredicate
private val pushDownInFilterThreshold = sqlConf.parquetFilterPushDownInFilterThreshold
private val datetimeRebaseModeInRead = LegacyBehaviorPolicy.withName(
options.datetimeRebaseModeInRead)
private val int96RebaseModeInRead = LegacyBehaviorPolicy.withName(
options.int96RebaseModeInRead)
private val datetimeRebaseModeInRead = options.datetimeRebaseModeInRead
private val int96RebaseModeInRead = options.int96RebaseModeInRead

private val parquetReaderCallback = new ParquetReaderCallback()

Expand Down