Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,48 @@ case class Second(child: Expression, timeZoneId: Option[String] = None)
}
}

case class Milliseconds(child: Expression, timeZoneId: Option[String] = None)
extends UnaryExpression with ImplicitCastInputTypes with TimeZoneAwareExpression {

override def inputTypes: Seq[AbstractDataType] = Seq(TimestampType)
// DecimalType is used here to not lose precision while converting microseconds to
// the fractional part of milliseconds. Scale 3 is taken to have all microseconds as
// the fraction. The precision 8 should cover 2 digits for seconds, 3 digits for
// milliseconds and 3 digits for microseconds.
override def dataType: DataType = DecimalType(8, 3)
override def withTimeZone(timeZoneId: String): TimeZoneAwareExpression =
copy(timeZoneId = Option(timeZoneId))

override protected def nullSafeEval(timestamp: Any): Any = {
DateTimeUtils.getMilliseconds(timestamp.asInstanceOf[Long], timeZone)
}

override protected def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
val tz = ctx.addReferenceObj("timeZone", timeZone)
val dtu = DateTimeUtils.getClass.getName.stripSuffix("$")
defineCodeGen(ctx, ev, c => s"$dtu.getMilliseconds($c, $tz)")
}
}

case class Microseconds(child: Expression, timeZoneId: Option[String] = None)
extends UnaryExpression with ImplicitCastInputTypes with TimeZoneAwareExpression {

override def inputTypes: Seq[AbstractDataType] = Seq(TimestampType)
override def dataType: DataType = IntegerType
override def withTimeZone(timeZoneId: String): TimeZoneAwareExpression =
copy(timeZoneId = Option(timeZoneId))

override protected def nullSafeEval(timestamp: Any): Any = {
DateTimeUtils.getMicroseconds(timestamp.asInstanceOf[Long], timeZone)
}

override protected def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
val tz = ctx.addReferenceObj("timeZone", timeZone)
val dtu = DateTimeUtils.getClass.getName.stripSuffix("$")
defineCodeGen(ctx, ev, c => s"$dtu.getMicroseconds($c, $tz)")
}
}

@ExpressionDescription(
usage = "_FUNC_(date) - Returns the day of year of the date/timestamp.",
examples = """
Expand Down Expand Up @@ -350,6 +392,22 @@ case class Year(child: Expression) extends UnaryExpression with ImplicitCastInpu
}
}

case class IsoYear(child: Expression) extends UnaryExpression with ImplicitCastInputTypes {

override def inputTypes: Seq[AbstractDataType] = Seq(DateType)

override def dataType: DataType = IntegerType

override protected def nullSafeEval(date: Any): Any = {
DateTimeUtils.getIsoYear(date.asInstanceOf[Int])
}

override protected def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
val dtu = DateTimeUtils.getClass.getName.stripSuffix("$")
defineCodeGen(ctx, ev, c => s"$dtu.getIsoYear($c)")
}
}

@ExpressionDescription(
usage = "_FUNC_(date) - Returns the quarter of the year for date, in the range 1 to 4.",
examples = """
Expand Down Expand Up @@ -1882,3 +1940,26 @@ case class Decade(child: Expression) extends UnaryExpression with ImplicitCastIn
defineCodeGen(ctx, ev, c => s"$dtu.getDecade($c)")
}
}

case class Epoch(child: Expression, timeZoneId: Option[String] = None)
extends UnaryExpression with ImplicitCastInputTypes with TimeZoneAwareExpression {

override def inputTypes: Seq[AbstractDataType] = Seq(TimestampType)
// DecimalType is used to not lose precision while converting microseconds to
// the fractional part of seconds. Scale 6 is taken to have all microseconds as
// the fraction. The precision 20 should cover whole valid range of years [1, 9999]
// plus negative years that can be used in some cases though are not officially supported.
override def dataType: DataType = DecimalType(20, 6)
override def withTimeZone(timeZoneId: String): TimeZoneAwareExpression =
copy(timeZoneId = Option(timeZoneId))

override protected def nullSafeEval(timestamp: Any): Any = {
DateTimeUtils.getEpoch(timestamp.asInstanceOf[Long], zoneId)
}

override protected def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
val dtu = DateTimeUtils.getClass.getName.stripSuffix("$")
val zid = ctx.addReferenceObj("zoneId", zoneId, classOf[ZoneId].getName)
defineCodeGen(ctx, ev, c => s"$dtu.getEpoch($c, $zid)")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1404,6 +1404,8 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging
Decade(expression(ctx.source))
case "YEAR" | "Y" | "YEARS" | "YR" | "YRS" =>
Year(expression(ctx.source))
case "ISOYEAR" =>
IsoYear(expression(ctx.source))
case "QUARTER" | "QTR" =>
Quarter(expression(ctx.source))
case "MONTH" | "MON" | "MONS" | "MONTHS" =>
Expand All @@ -1426,6 +1428,12 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging
Minute(expression(ctx.source))
case "SECOND" | "S" | "SEC" | "SECONDS" | "SECS" =>
Second(expression(ctx.source))
case "MILLISECONDS" | "MSEC" | "MSECS" | "MILLISECON" | "MSECONDS" | "MS" =>
Milliseconds(expression(ctx.source))
case "MICROSECONDS" | "USEC" | "USECS" | "USECONDS" | "MICROSECON" | "US" =>
Microseconds(expression(ctx.source))
case "EPOCH" =>
Epoch(expression(ctx.source))
case other =>
throw new ParseException(s"Literals of type '$other' are currently not supported.", ctx)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import java.util.concurrent.TimeUnit._

import scala.util.control.NonFatal

import org.apache.spark.sql.types.Decimal
import org.apache.spark.unsafe.types.UTF8String

/**
Expand Down Expand Up @@ -455,6 +456,23 @@ object DateTimeUtils {
(MICROSECONDS.toSeconds(localTimestamp(microsec, timeZone)) % 60).toInt
}

/**
* Returns seconds, including fractional parts, multiplied by 1000. The timestamp
* is expressed in microseconds since the epoch.
*/
def getMilliseconds(timestamp: SQLTimestamp, timeZone: TimeZone): Decimal = {
val micros = Decimal(getMicroseconds(timestamp, timeZone))
(micros / Decimal(MICROS_PER_MILLIS)).toPrecision(8, 3)
}

/**
* Returns seconds, including fractional parts, multiplied by 1000000. The timestamp
* is expressed in microseconds since the epoch.
*/
def getMicroseconds(timestamp: SQLTimestamp, timeZone: TimeZone): Int = {
Math.floorMod(localTimestamp(timestamp, timeZone), MICROS_PER_SECOND * 60).toInt
}

/**
* Returns the 'day in year' value for the given date. The date is expressed in days
* since 1.1.1970.
Expand Down Expand Up @@ -489,6 +507,14 @@ object DateTimeUtils {
LocalDate.ofEpochDay(date).getYear
}

/**
* Returns the year which conforms to ISO 8601. Each ISO 8601 week-numbering
* year begins with the Monday of the week containing the 4th of January.
*/
def getIsoYear(date: SQLDate): Int = {
daysToLocalDate(date).get(IsoFields.WEEK_BASED_YEAR)
}

/**
* Returns the quarter for the given date. The date is expressed in days
* since 1.1.1970.
Expand Down Expand Up @@ -812,4 +838,14 @@ object DateTimeUtils {
def toUTCTime(time: SQLTimestamp, timeZone: String): SQLTimestamp = {
convertTz(time, getTimeZone(timeZone), TimeZoneGMT)
}

/**
* Returns the number of seconds with fractional part in microsecond precision
* since 1970-01-01 00:00:00 local time.
*/
def getEpoch(timestamp: SQLTimestamp, zoneId: ZoneId): Decimal = {
val offset = zoneId.getRules.getOffset(microsToInstant(timestamp)).getTotalSeconds
val sinceEpoch = BigDecimal(timestamp) / MICROS_PER_SECOND + offset
new Decimal().set(sinceEpoch, 20, 6)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package org.apache.spark.sql.catalyst.expressions

import java.sql.{Date, Timestamp}
import java.text.SimpleDateFormat
import java.time.{ZoneId, ZoneOffset}
import java.time.{LocalDateTime, ZoneId, ZoneOffset}
import java.util.{Calendar, Locale, TimeZone}
import java.util.concurrent.TimeUnit
import java.util.concurrent.TimeUnit._
Expand Down Expand Up @@ -1010,4 +1010,47 @@ class DateExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper {
checkEvaluation(Decade(date.copy(year = Literal(-11))), -2)
checkEvaluation(Decade(date.copy(year = Literal(-2019))), -202)
}

test("milliseconds and microseconds") {
outstandingTimezonesIds.foreach { timezone =>
var timestamp = MakeTimestamp(Literal(2019), Literal(8), Literal(10),
Literal(0), Literal(0), Literal(Decimal(BigDecimal(10.123456789), 8, 6)),
Some(Literal(timezone)))

checkEvaluation(Milliseconds(timestamp), Decimal(BigDecimal(10123.457), 8, 3))
checkEvaluation(Microseconds(timestamp), 10123457)

timestamp = timestamp.copy(sec = Literal(Decimal(0.0, 8, 6)))
checkEvaluation(Milliseconds(timestamp), Decimal(0, 8, 3))
checkEvaluation(Microseconds(timestamp), 0)

timestamp = timestamp.copy(sec = Literal(Decimal(BigDecimal(59.999999), 8, 6)))
checkEvaluation(Milliseconds(timestamp), Decimal(BigDecimal(59999.999), 8, 3))
checkEvaluation(Microseconds(timestamp), 59999999)

timestamp = timestamp.copy(sec = Literal(Decimal(BigDecimal(60.0), 8, 6)))
checkEvaluation(Milliseconds(timestamp), Decimal(0, 8, 3))
checkEvaluation(Microseconds(timestamp), 0)
}
}

test("epoch") {
val zoneId = ZoneId.systemDefault()
val nanos = 123456000
val timestamp = Epoch(MakeTimestamp(
Literal(2019), Literal(8), Literal(9), Literal(0), Literal(0),
Literal(Decimal(nanos / DateTimeUtils.NANOS_PER_SECOND.toDouble, 8, 6)),
Some(Literal(zoneId.getId))))
val instant = LocalDateTime.of(2019, 8, 9, 0, 0, 0, nanos)
.atZone(zoneId).toInstant
val expected = Decimal(BigDecimal(nanos) / DateTimeUtils.NANOS_PER_SECOND +
instant.getEpochSecond +
zoneId.getRules.getOffset(instant).getTotalSeconds)
checkEvaluation(timestamp, expected)
}

test("ISO 8601 week-numbering year") {
checkEvaluation(IsoYear(MakeDate(Literal(2006), Literal(1), Literal(1))), 2005)
checkEvaluation(IsoYear(MakeDate(Literal(2006), Literal(1), Literal(2))), 2006)
}
}
18 changes: 18 additions & 0 deletions sql/core/src/test/resources/sql-tests/inputs/extract.sql
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ select extract(years from c) from t;
select extract(yr from c) from t;
select extract(yrs from c) from t;

select extract(isoyear from c) from t;

select extract(quarter from c) from t;
select extract(qtr from c) from t;

Expand Down Expand Up @@ -63,4 +65,20 @@ select extract(sec from c) from t;
select extract(seconds from c) from t;
select extract(secs from c) from t;

select extract(milliseconds from c) from t;
select extract(msec from c) from t;
select extract(msecs from c) from t;
select extract(millisecon from c) from t;
select extract(mseconds from c) from t;
select extract(ms from c) from t;

select extract(microseconds from c) from t;
select extract(usec from c) from t;
select extract(usecs from c) from t;
select extract(useconds from c) from t;
select extract(microsecon from c) from t;
select extract(us from c) from t;

select extract(epoch from c) from t;

select extract(not_supported from c) from t;
4 changes: 2 additions & 2 deletions sql/core/src/test/resources/sql-tests/inputs/pgSQL/date.sql
Original file line number Diff line number Diff line change
Expand Up @@ -229,8 +229,8 @@ SELECT f1 - date '2000-01-01' AS `Days From 2K` FROM DATE_TBL;
--
-- epoch
--
-- SELECT EXTRACT(EPOCH FROM DATE '1970-01-01'); -- 0
-- SELECT EXTRACT(EPOCH FROM TIMESTAMP '1970-01-01'); -- 0
SELECT EXTRACT(EPOCH FROM DATE '1970-01-01'); -- 0
SELECT EXTRACT(EPOCH FROM TIMESTAMP '1970-01-01'); -- 0
-- SELECT EXTRACT(EPOCH FROM TIMESTAMPTZ '1970-01-01+00'); -- 0
--
-- century
Expand Down
Loading