-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-18936][SQL] Infrastructure for session local timezone support. #16308
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 14 commits
c48a70d
1d21fec
0763c8f
449d93d
b59d902
3ddfae4
f58f00d
8f2040b
63c103c
7066850
1aaca29
e5bb246
32cc391
f434378
16fd1e4
009c17b
a2936ed
c5ca73e
b860379
6746265
4b6900c
4f9cc40
2ca2413
c232854
5b6dd4f
1ca5808
702dd81
33a3425
5cc93e3
5521165
bd8275e
183945c
22a3b6e
30d51fa
043ab52
3ba5830
9ab31f0
b954947
dbb2604
186cd3e
6631a69
3610465
c12e596
8a04e80
efe3aff
cdbb266
328399a
7352612
b99cf79
a85377f
f0c911b
6fa1d6a
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -104,6 +104,7 @@ class Analyzer( | |
| ResolveAggregateFunctions :: | ||
| TimeWindowing :: | ||
| ResolveInlineTables :: | ||
| ResolveTimeZone :: | ||
| TypeCoercion.typeCoercionRules ++ | ||
| extendedResolutionRules : _*), | ||
| Batch("Nondeterministic", Once, | ||
|
|
@@ -180,7 +181,7 @@ class Analyzer( | |
| case ne: NamedExpression => ne | ||
| case e if !e.resolved => u | ||
| case g: Generator => MultiAlias(g, Nil) | ||
| case c @ Cast(ne: NamedExpression, _) => Alias(c, ne.name)() | ||
| case c @ Cast(ne: NamedExpression, _, _) => Alias(c, ne.name)() | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. if we add a Cast.unapply that returns only the first two arguments, we can reduce a lot of the cast match changes. Not sure if it is worth it though. |
||
| case e: ExtractValue => Alias(e, toPrettySQL(e))() | ||
| case e if optGenAliasFunc.isDefined => | ||
| Alias(child, optGenAliasFunc.get.apply(e))() | ||
|
|
@@ -2211,6 +2212,18 @@ class Analyzer( | |
| } | ||
| } | ||
| } | ||
|
|
||
| /** | ||
| * Replace [[TimeZoneAwareExpression]] without [[TimeZone]] by its copy with session local | ||
| * time zone. | ||
| */ | ||
| object ResolveTimeZone extends Rule[LogicalPlan] { | ||
|
|
||
| override def apply(plan: LogicalPlan): LogicalPlan = plan.resolveExpressions { | ||
| case e: TimeZoneAwareExpression if !e.timeZoneResolved => | ||
| e.withTimeZone(conf.sessionLocalTimeZone) | ||
| } | ||
| } | ||
| } | ||
|
|
||
| /** | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -24,6 +24,7 @@ import org.apache.spark.sql.catalyst.{FunctionIdentifier, InternalRow, TableIden | |
| import org.apache.spark.sql.catalyst.expressions.{Attribute, Cast, Literal} | ||
| import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, Statistics} | ||
| import org.apache.spark.sql.catalyst.util.quoteIdentifier | ||
| import org.apache.spark.sql.catalyst.util.DateTimeUtils | ||
| import org.apache.spark.sql.types.{StructField, StructType} | ||
|
|
||
|
|
||
|
|
@@ -111,7 +112,8 @@ case class CatalogTablePartition( | |
| */ | ||
| def toRow(partitionSchema: StructType): InternalRow = { | ||
| InternalRow.fromSeq(partitionSchema.map { field => | ||
| Cast(Literal(spec(field.name)), field.dataType).eval() | ||
| Cast(Literal(spec(field.name)), field.dataType, | ||
| DateTimeUtils.defaultTimeZone().getID).eval() | ||
|
||
| }) | ||
| } | ||
| } | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -110,6 +110,14 @@ object Cast { | |
| case (_: FractionalType, _: IntegralType) => true // NaN, infinity | ||
| case _ => false | ||
| } | ||
|
|
||
| def needTimeZone(from: DataType, to: DataType): Boolean = (from, to) match { | ||
|
||
| case (StringType, TimestampType) => true | ||
| case (TimestampType, StringType) => true | ||
| case (DateType, TimestampType) => true | ||
| case (TimestampType, DateType) => true | ||
| case _ => false | ||
| } | ||
| } | ||
|
|
||
| /** Cast the child expression to the target data type. */ | ||
|
|
@@ -120,7 +128,10 @@ object Cast { | |
| > SELECT _FUNC_('10' as int); | ||
| 10 | ||
| """) | ||
| case class Cast(child: Expression, dataType: DataType) extends UnaryExpression with NullIntolerant { | ||
| case class Cast(child: Expression, dataType: DataType, zoneId: String = null) | ||
|
||
| extends UnaryExpression with TimeZoneAwareExpression with NullIntolerant { | ||
|
|
||
| def this(child: Expression, dataType: DataType) = this(child, dataType, null) | ||
|
|
||
| override def toString: String = s"cast($child as ${dataType.simpleString})" | ||
|
|
||
|
|
@@ -135,6 +146,14 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression w | |
|
|
||
| override def nullable: Boolean = Cast.forceNullable(child.dataType, dataType) || child.nullable | ||
|
|
||
| override def timeZoneResolved: Boolean = | ||
|
||
| (!(childrenResolved && Cast.needTimeZone(child.dataType, dataType))) || super.timeZoneResolved | ||
|
|
||
| override lazy val resolved: Boolean = | ||
| childrenResolved && checkInputDataTypes().isSuccess && timeZoneResolved | ||
|
|
||
| override def withTimeZone(zoneId: String): TimeZoneAwareExpression = copy(zoneId = zoneId) | ||
|
||
|
|
||
| // [[func]] assumes the input is no longer null because eval already does the null check. | ||
| @inline private[this] def buildCast[T](a: Any, func: T => Any): Any = func(a.asInstanceOf[T]) | ||
|
|
||
|
|
@@ -143,7 +162,7 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression w | |
| case BinaryType => buildCast[Array[Byte]](_, UTF8String.fromBytes) | ||
| case DateType => buildCast[Int](_, d => UTF8String.fromString(DateTimeUtils.dateToString(d))) | ||
| case TimestampType => buildCast[Long](_, | ||
| t => UTF8String.fromString(DateTimeUtils.timestampToString(t))) | ||
| t => UTF8String.fromString(DateTimeUtils.timestampToString(t, timeZone))) | ||
| case _ => buildCast[Any](_, o => UTF8String.fromString(o.toString)) | ||
| } | ||
|
|
||
|
|
@@ -188,7 +207,7 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression w | |
| // TimestampConverter | ||
| private[this] def castToTimestamp(from: DataType): Any => Any = from match { | ||
| case StringType => | ||
| buildCast[UTF8String](_, utfs => DateTimeUtils.stringToTimestamp(utfs).orNull) | ||
| buildCast[UTF8String](_, utfs => DateTimeUtils.stringToTimestamp(utfs, timeZone).orNull) | ||
| case BooleanType => | ||
| buildCast[Boolean](_, b => if (b) 1L else 0) | ||
| case LongType => | ||
|
|
@@ -200,7 +219,7 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression w | |
| case ByteType => | ||
| buildCast[Byte](_, b => longToTimestamp(b.toLong)) | ||
| case DateType => | ||
| buildCast[Int](_, d => DateTimeUtils.daysToMillis(d) * 1000) | ||
| buildCast[Int](_, d => DateTimeUtils.daysToMillis(d, timeZone) * 1000) | ||
| // TimestampWritable.decimalToTimestamp | ||
| case DecimalType() => | ||
| buildCast[Decimal](_, d => decimalToTimestamp(d)) | ||
|
|
@@ -235,7 +254,7 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression w | |
| case TimestampType => | ||
| // throw valid precision more than seconds, according to Hive. | ||
| // Timestamp.nanos is in 0 to 999,999,999, no more than a second. | ||
| buildCast[Long](_, t => DateTimeUtils.millisToDays(t / 1000L)) | ||
| buildCast[Long](_, t => DateTimeUtils.millisToDays(t / 1000L, timeZone)) | ||
| } | ||
|
|
||
| // IntervalConverter | ||
|
|
@@ -512,8 +531,9 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression w | |
| (c, evPrim, evNull) => s"""$evPrim = UTF8String.fromString( | ||
| org.apache.spark.sql.catalyst.util.DateTimeUtils.dateToString($c));""" | ||
| case TimestampType => | ||
| val tz = ctx.addReferenceObj("timeZone", timeZone) | ||
|
||
| (c, evPrim, evNull) => s"""$evPrim = UTF8String.fromString( | ||
| org.apache.spark.sql.catalyst.util.DateTimeUtils.timestampToString($c));""" | ||
| org.apache.spark.sql.catalyst.util.DateTimeUtils.timestampToString($c, $tz));""" | ||
| case _ => | ||
| (c, evPrim, evNull) => s"$evPrim = UTF8String.fromString(String.valueOf($c));" | ||
| } | ||
|
|
@@ -539,8 +559,9 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression w | |
| } | ||
| """ | ||
| case TimestampType => | ||
| val tz = ctx.addReferenceObj("timeZone", timeZone) | ||
| (c, evPrim, evNull) => | ||
| s"$evPrim = org.apache.spark.sql.catalyst.util.DateTimeUtils.millisToDays($c / 1000L);"; | ||
| s"$evPrim = org.apache.spark.sql.catalyst.util.DateTimeUtils.millisToDays($c / 1000L, $tz);" | ||
| case _ => | ||
| (c, evPrim, evNull) => s"$evNull = true;" | ||
| } | ||
|
|
@@ -618,11 +639,12 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression w | |
| from: DataType, | ||
| ctx: CodegenContext): CastFunction = from match { | ||
| case StringType => | ||
| val tz = ctx.addReferenceObj("timeZone", timeZone) | ||
| val longOpt = ctx.freshName("longOpt") | ||
| (c, evPrim, evNull) => | ||
| s""" | ||
| scala.Option<Long> $longOpt = | ||
| org.apache.spark.sql.catalyst.util.DateTimeUtils.stringToTimestamp($c); | ||
| org.apache.spark.sql.catalyst.util.DateTimeUtils.stringToTimestamp($c, $tz); | ||
| if ($longOpt.isDefined()) { | ||
| $evPrim = ((Long) $longOpt.get()).longValue(); | ||
| } else { | ||
|
|
@@ -634,8 +656,9 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression w | |
| case _: IntegralType => | ||
| (c, evPrim, evNull) => s"$evPrim = ${longToTimeStampCode(c)};" | ||
| case DateType => | ||
| val tz = ctx.addReferenceObj("timeZone", timeZone) | ||
| (c, evPrim, evNull) => | ||
| s"$evPrim = org.apache.spark.sql.catalyst.util.DateTimeUtils.daysToMillis($c) * 1000;" | ||
| s"$evPrim = org.apache.spark.sql.catalyst.util.DateTimeUtils.daysToMillis($c, $tz) * 1000;" | ||
| case DecimalType() => | ||
| (c, evPrim, evNull) => s"$evPrim = ${decimalToTimestampCode(c)};" | ||
| case DoubleType => | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This seems overkill. We only need to run the rule once, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@hvanhovell Thank you for your suggestion.
I overrode
resolvedofTimeZoneAwareExpressions, then we needed to addResolveTimeZonetoResolutionbatch, but I found we don't need to worry about the resolution because to have the timezone or not doesn't affect the resolution and we only need to run once now.