Skip to content

Commit f906a54

Browse files
committed
address comment
1 parent c8920ca commit f906a54

File tree

5 files changed

+38
-34
lines changed

5 files changed

+38
-34
lines changed

external/avro/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -55,11 +55,11 @@ class AvroDeserializer(
5555

5656
private lazy val decimalConversions = new DecimalConversion()
5757

58-
private val dateRebaseFunc = DataSourceUtils.creteDateRebaseFunc(
59-
datetimeRebaseMode, "Avro", isRead = true)
58+
private val dateRebaseFunc = DataSourceUtils.creteDateRebaseFuncInRead(
59+
datetimeRebaseMode, "Avro")
6060

61-
private val timestampRebaseFunc = DataSourceUtils.creteTimestampRebaseFunc(
62-
datetimeRebaseMode, "Avro", isRead = true)
61+
private val timestampRebaseFunc = DataSourceUtils.creteTimestampRebaseFuncInRead(
62+
datetimeRebaseMode, "Avro")
6363

6464
private val converter: Any => Any = rootCatalystType match {
6565
// A shortcut for empty schema.

external/avro/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -59,11 +59,11 @@ class AvroSerializer(
5959
converter.apply(catalystData)
6060
}
6161

62-
private val dateRebaseFunc = DataSourceUtils.creteDateRebaseFunc(
63-
datetimeRebaseMode, "Avro", isRead = false)
62+
private val dateRebaseFunc = DataSourceUtils.creteDateRebaseFuncInWrite(
63+
datetimeRebaseMode, "Avro")
6464

65-
private val timestampRebaseFunc = DataSourceUtils.creteTimestampRebaseFunc(
66-
datetimeRebaseMode, "Avro", isRead = false)
65+
private val timestampRebaseFunc = DataSourceUtils.creteTimestampRebaseFuncInWrite(
66+
datetimeRebaseMode, "Avro")
6767

6868
private val converter: Any => Any = {
6969
val actualAvroType = resolveNullableType(rootAvroType, nullable)

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala

Lines changed: 22 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -143,47 +143,51 @@ object DataSourceUtils {
143143
"Gregorian calendar.", null)
144144
}
145145

146-
def creteDateRebaseFunc(
146+
def creteDateRebaseFuncInRead(
147147
rebaseMode: LegacyBehaviorPolicy.Value,
148-
format: String,
149-
isRead: Boolean): Int => Int = rebaseMode match {
150-
case LegacyBehaviorPolicy.EXCEPTION if isRead => days: Int =>
148+
format: String): Int => Int = rebaseMode match {
149+
case LegacyBehaviorPolicy.EXCEPTION => days: Int =>
151150
if (days < RebaseDateTime.lastSwitchJulianDay) {
152151
throw DataSourceUtils.newRebaseExceptionInRead(format)
153152
}
154153
days
154+
case LegacyBehaviorPolicy.LEGACY => RebaseDateTime.rebaseJulianToGregorianDays
155+
case LegacyBehaviorPolicy.CORRECTED => identity[Int]
156+
}
157+
158+
def creteDateRebaseFuncInWrite(
159+
rebaseMode: LegacyBehaviorPolicy.Value,
160+
format: String): Int => Int = rebaseMode match {
155161
case LegacyBehaviorPolicy.EXCEPTION => days: Int =>
156162
if (days < RebaseDateTime.lastSwitchGregorianDay) {
157163
throw DataSourceUtils.newRebaseExceptionInWrite(format)
158164
}
159165
days
160-
case LegacyBehaviorPolicy.LEGACY => if (isRead) {
161-
RebaseDateTime.rebaseJulianToGregorianDays
162-
} else {
163-
RebaseDateTime.rebaseGregorianToJulianDays
164-
}
166+
case LegacyBehaviorPolicy.LEGACY => RebaseDateTime.rebaseGregorianToJulianDays
165167
case LegacyBehaviorPolicy.CORRECTED => identity[Int]
166168
}
167169

168-
def creteTimestampRebaseFunc(
170+
def creteTimestampRebaseFuncInRead(
169171
rebaseMode: LegacyBehaviorPolicy.Value,
170-
format: String,
171-
isRead: Boolean): Long => Long = rebaseMode match {
172-
case LegacyBehaviorPolicy.EXCEPTION if isRead => micros: Long =>
172+
format: String): Long => Long = rebaseMode match {
173+
case LegacyBehaviorPolicy.EXCEPTION => micros: Long =>
173174
if (micros < RebaseDateTime.lastSwitchJulianTs) {
174175
throw DataSourceUtils.newRebaseExceptionInRead(format)
175176
}
176177
micros
178+
case LegacyBehaviorPolicy.LEGACY => RebaseDateTime.rebaseJulianToGregorianMicros
179+
case LegacyBehaviorPolicy.CORRECTED => identity[Long]
180+
}
181+
182+
def creteTimestampRebaseFuncInWrite(
183+
rebaseMode: LegacyBehaviorPolicy.Value,
184+
format: String): Long => Long = rebaseMode match {
177185
case LegacyBehaviorPolicy.EXCEPTION => micros: Long =>
178186
if (micros < RebaseDateTime.lastSwitchGregorianTs) {
179187
throw DataSourceUtils.newRebaseExceptionInWrite(format)
180188
}
181189
micros
182-
case LegacyBehaviorPolicy.LEGACY => if (isRead) {
183-
RebaseDateTime.rebaseJulianToGregorianMicros
184-
} else {
185-
RebaseDateTime.rebaseGregorianToJulianMicros
186-
}
190+
case LegacyBehaviorPolicy.LEGACY => RebaseDateTime.rebaseGregorianToJulianMicros
187191
case LegacyBehaviorPolicy.CORRECTED => identity[Long]
188192
}
189193
}

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -182,11 +182,11 @@ private[parquet] class ParquetRowConverter(
182182
*/
183183
def currentRecord: InternalRow = currentRow
184184

185-
private val dateRebaseFunc = DataSourceUtils.creteDateRebaseFunc(
186-
datetimeRebaseMode, "Parquet", isRead = true)
185+
private val dateRebaseFunc = DataSourceUtils.creteDateRebaseFuncInRead(
186+
datetimeRebaseMode, "Parquet")
187187

188-
private val timestampRebaseFunc = DataSourceUtils.creteTimestampRebaseFunc(
189-
datetimeRebaseMode, "Parquet", isRead = true)
188+
private val timestampRebaseFunc = DataSourceUtils.creteTimestampRebaseFuncInRead(
189+
datetimeRebaseMode, "Parquet")
190190

191191
// Converters for each field.
192192
private[this] val fieldConverters: Array[Converter with HasParentContainerUpdater] = {

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetWriteSupport.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -82,11 +82,11 @@ class ParquetWriteSupport extends WriteSupport[InternalRow] with Logging {
8282
private val datetimeRebaseMode = LegacyBehaviorPolicy.withName(
8383
SQLConf.get.getConf(SQLConf.LEGACY_PARQUET_REBASE_MODE_IN_WRITE))
8484

85-
private val dateRebaseFunc = DataSourceUtils.creteDateRebaseFunc(
86-
datetimeRebaseMode, "Parquet", isRead = false)
85+
private val dateRebaseFunc = DataSourceUtils.creteDateRebaseFuncInWrite(
86+
datetimeRebaseMode, "Parquet")
8787

88-
private val timestampRebaseFunc = DataSourceUtils.creteTimestampRebaseFunc(
89-
datetimeRebaseMode, "Parquet", isRead = false)
88+
private val timestampRebaseFunc = DataSourceUtils.creteTimestampRebaseFuncInWrite(
89+
datetimeRebaseMode, "Parquet")
9090

9191
override def init(configuration: Configuration): WriteContext = {
9292
val schemaString = configuration.get(ParquetWriteSupport.SPARK_ROW_SCHEMA)

0 commit comments

Comments
 (0)