diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala index 026a2a677baec..8c1fe6cf7055c 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala @@ -2589,6 +2589,8 @@ object Sequence { } } + // To generate time sequences, we use scale 1 in TemporalSequenceImpl + // for `TimestampType`, while MICROS_PER_DAY for `DateType` private class TemporalSequenceImpl[T: ClassTag] (dt: IntegralType, scale: Long, fromLong: Long => T, zoneId: ZoneId) (implicit num: Integral[T]) extends SequenceImpl { @@ -2623,8 +2625,16 @@ object Sequence { // about a month length in days and a day length in microseconds val intervalStepInMicros = stepMicros + stepMonths * microsPerMonth + stepDays * microsPerDay - val startMicros: Long = num.toLong(start) * scale - val stopMicros: Long = num.toLong(stop) * scale + + // Date to timestamp is not equal from GMT and Chicago timezones + val (startMicros, stopMicros) = if (scale == 1) { + (num.toLong(start), num.toLong(stop)) + } + else { + (daysToMicros(num.toInt(start), zoneId), + daysToMicros(num.toInt(stop), zoneId)) + } + val maxEstimatedArrayLength = getSequenceLength(startMicros, stopMicros, intervalStepInMicros) @@ -2635,7 +2645,11 @@ object Sequence { var i = 0 while (t < exclusiveItem ^ stepSign < 0) { - arr(i) = fromLong(t / scale) + arr(i) = if (scale == 1) { + fromLong(t) + } else { + fromLong(Math.round(t / scale.toFloat)) + } i += 1 t = timestampAddInterval( startMicros, i * stepMonths, i * stepDays, i * stepMicros, zoneId) @@ -2685,8 +2699,19 @@ object Sequence { |} else if ($stepMonths == 0 && $stepDays == 0 && ${scale}L == 1) { | ${backedSequenceImpl.genCode(ctx, start, stop, stepMicros, arr, elemType)}; |} else { - | final long $startMicros = $start * ${scale}L; - | final long $stopMicros = $stop * ${scale}L; + | long $startMicros; + | long $stopMicros; + | if (${scale}L == 1L) { + | $startMicros = $start; + | $stopMicros = $stop; + | } else { + | $startMicros = + | org.apache.spark.sql.catalyst.util.DateTimeUtils.daysToMicros( + | (int)$start, $zid); + | $stopMicros = + | org.apache.spark.sql.catalyst.util.DateTimeUtils.daysToMicros( + | (int)$stop, $zid); + | } | | $sequenceLengthCode | @@ -2698,7 +2723,11 @@ object Sequence { | int $i = 0; | | while ($t < $exclusiveItem ^ $stepSign < 0) { - | $arr[$i] = ($elemType) ($t / ${scale}L); + | if (${scale}L == 1L) { + | $arr[$i] = ($elemType) $t; + | } else { + | $arr[$i] = ($elemType) (Math.round($t / (float)${scale}L)); + | } | $i += 1; | $t = org.apache.spark.sql.catalyst.util.DateTimeUtils.timestampAddInterval( | $startMicros, $i * $stepMonths, $i * $stepDays, $i * $stepMicros, $zid); diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CollectionExpressionsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CollectionExpressionsSuite.scala index 3a0c02b29d92c..34179529ea6a6 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CollectionExpressionsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CollectionExpressionsSuite.scala @@ -1854,4 +1854,17 @@ class CollectionExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper Literal(stringToInterval("interval 1 year"))), Seq(Date.valueOf("2018-01-01"))) } + + test("SPARK-31982: sequence doesn't handle date increments that cross DST") { + Array("America/Chicago", "GMT", "Asia/Shanghai").foreach(tz => { + DateTimeTestUtils.withDefaultTimeZone(DateTimeUtils.getZoneId(tz)) { + checkEvaluation(Sequence( + Cast(Literal("2011-03-01"), DateType), + Cast(Literal("2011-04-01"), DateType), + Option(Literal(stringToInterval("interval 1 month"))), + Option(tz)), + Seq(Date.valueOf("2011-03-01"), Date.valueOf("2011-04-01"))) + } + }) + } }