diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/EventTimeWatermark.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/EventTimeWatermark.scala index 2309aa42b80c..8441c2c481ec 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/EventTimeWatermark.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/EventTimeWatermark.scala @@ -17,8 +17,9 @@ package org.apache.spark.sql.catalyst.plans.logical +import java.util.concurrent.TimeUnit + import org.apache.spark.sql.catalyst.expressions.Attribute -import org.apache.spark.sql.catalyst.util.DateTimeUtils.MILLIS_PER_MONTH import org.apache.spark.sql.types.MetadataBuilder import org.apache.spark.unsafe.types.CalendarInterval @@ -27,7 +28,9 @@ object EventTimeWatermark { val delayKey = "spark.watermarkDelayMs" def getDelayMs(delay: CalendarInterval): Long = { - delay.milliseconds + delay.months * MILLIS_PER_MONTH + // We define month as `31 days` to simplify calculation. + val millisPerMonth = TimeUnit.MICROSECONDS.toMillis(CalendarInterval.MICROS_PER_DAY) * 31 + delay.milliseconds + delay.months * millisPerMonth } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala index 79fc45ec8947..34e8012106bb 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala @@ -59,15 +59,6 @@ object DateTimeUtils { final val MILLIS_PER_MINUTE: Long = 60 * MILLIS_PER_SECOND final val MILLIS_PER_HOUR: Long = 60 * MILLIS_PER_MINUTE final val MILLIS_PER_DAY: Long = SECONDS_PER_DAY * MILLIS_PER_SECOND - // The average year of the Gregorian calendar 365.2425 days long, see - // https://en.wikipedia.org/wiki/Gregorian_calendar - // Leap year occurs every 4 years, except for years that are divisible by 100 - // and not divisible by 400. So, the mean length of of the Gregorian calendar year is: - // 1 mean year = (365 + 1/4 - 1/100 + 1/400) days = 365.2425 days - // The mean year length in seconds is: - // 60 * 60 * 24 * 365.2425 = 31556952.0 = 12 * 2629746 - final val SECONDS_PER_MONTH: Int = 2629746 - final val MILLIS_PER_MONTH: Long = SECONDS_PER_MONTH * MILLIS_PER_SECOND // number of days between 1.1.1970 and 1.1.2001 final val to2001 = -11323 diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/GroupStateImpl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/GroupStateImpl.scala index f459a2c1f8e2..dda9d41f630e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/GroupStateImpl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/GroupStateImpl.scala @@ -18,9 +18,9 @@ package org.apache.spark.sql.execution.streaming import java.sql.Date +import java.util.concurrent.TimeUnit import org.apache.spark.sql.catalyst.plans.logical.{EventTimeTimeout, ProcessingTimeTimeout} -import org.apache.spark.sql.catalyst.util.DateTimeUtils.MILLIS_PER_MONTH import org.apache.spark.sql.execution.streaming.GroupStateImpl._ import org.apache.spark.sql.streaming.{GroupState, GroupStateTimeout} import org.apache.spark.unsafe.types.CalendarInterval @@ -164,7 +164,8 @@ private[sql] class GroupStateImpl[S] private( throw new IllegalArgumentException(s"Provided duration ($duration) is not positive") } - cal.milliseconds + cal.months * MILLIS_PER_MONTH + val millisPerMonth = TimeUnit.MICROSECONDS.toMillis(CalendarInterval.MICROS_PER_DAY) * 31 + cal.milliseconds + cal.months * millisPerMonth } private def checkTimeoutTimestampAllowed(): Unit = {