Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.errors.QueryExecutionErrors
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.execution.vectorized.{ConstantColumnVector, OffHeapColumnVector, OnHeapColumnVector}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.internal.{LegacyBehaviorPolicy, SQLConf}
import org.apache.spark.sql.sources._
import org.apache.spark.sql.types._
import org.apache.spark.util.{SerializableConfiguration, ThreadUtils}
Expand Down Expand Up @@ -182,8 +182,10 @@ class ParquetFileFormat
val pushDownInFilterThreshold = sqlConf.parquetFilterPushDownInFilterThreshold
val isCaseSensitive = sqlConf.caseSensitiveAnalysis
val parquetOptions = new ParquetOptions(options, sparkSession.sessionState.conf)
val datetimeRebaseModeInRead = parquetOptions.datetimeRebaseModeInRead
val int96RebaseModeInRead = parquetOptions.int96RebaseModeInRead
val datetimeRebaseModeInRead = LegacyBehaviorPolicy.withName(
parquetOptions.datetimeRebaseModeInRead)
val int96RebaseModeInRead = LegacyBehaviorPolicy.withName(
parquetOptions.int96RebaseModeInRead)

// Should always be set by FileSourceScanExec creating this.
// Check conf before checking option, to allow working around an issue by changing conf.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import org.apache.parquet.hadoop.ParquetOutputFormat
import org.apache.spark.sql.catalyst.{DataSourceOptions, FileSourceOptions}
import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
import org.apache.spark.sql.errors.QueryExecutionErrors
import org.apache.spark.sql.internal.{LegacyBehaviorPolicy, SQLConf}
import org.apache.spark.sql.internal.SQLConf

/**
* Options for the Parquet data source.
Expand Down Expand Up @@ -74,16 +74,15 @@ class ParquetOptions(
/**
* The rebasing mode for the DATE and TIMESTAMP_MICROS, TIMESTAMP_MILLIS values in reads.
*/
def datetimeRebaseModeInRead: LegacyBehaviorPolicy.Value = parameters
def datetimeRebaseModeInRead: String = parameters
.get(DATETIME_REBASE_MODE)
.map(LegacyBehaviorPolicy.withName)
.getOrElse(sqlConf.getConf(SQLConf.PARQUET_REBASE_MODE_IN_READ))
.getOrElse(sqlConf.getConf(SQLConf.PARQUET_REBASE_MODE_IN_READ).toString)
/**
* The rebasing mode for INT96 timestamp values in reads.
*/
def int96RebaseModeInRead: LegacyBehaviorPolicy.Value = parameters
.get(INT96_REBASE_MODE).map(LegacyBehaviorPolicy.withName)
.getOrElse(sqlConf.getConf(SQLConf.PARQUET_INT96_REBASE_MODE_IN_READ))
def int96RebaseModeInRead: String = parameters
.get(INT96_REBASE_MODE)
.getOrElse(sqlConf.getConf(SQLConf.PARQUET_INT96_REBASE_MODE_IN_READ).toString)
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ import org.apache.spark.sql.execution.WholeStageCodegenExec
import org.apache.spark.sql.execution.datasources.{AggregatePushDownUtils, DataSourceUtils, PartitionedFile, RecordReaderIterator}
import org.apache.spark.sql.execution.datasources.parquet._
import org.apache.spark.sql.execution.datasources.v2._
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.internal.{LegacyBehaviorPolicy, SQLConf}
import org.apache.spark.sql.sources.Filter
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.vectorized.ColumnarBatch
Expand Down Expand Up @@ -81,8 +81,10 @@ case class ParquetPartitionReaderFactory(
private val pushDownDecimal = sqlConf.parquetFilterPushDownDecimal
private val pushDownStringPredicate = sqlConf.parquetFilterPushDownStringPredicate
private val pushDownInFilterThreshold = sqlConf.parquetFilterPushDownInFilterThreshold
private val datetimeRebaseModeInRead = options.datetimeRebaseModeInRead
private val int96RebaseModeInRead = options.int96RebaseModeInRead
private val datetimeRebaseModeInRead = LegacyBehaviorPolicy.withName(
options.datetimeRebaseModeInRead)
private val int96RebaseModeInRead = LegacyBehaviorPolicy.withName(
options.int96RebaseModeInRead)

private val parquetReaderCallback = new ParquetReaderCallback()

Expand Down